improve message counter cache line use

This commit is contained in:
Gvozden Neskovic 2021-05-06 11:08:28 +02:00 committed by Alexey Rybalchenko
parent f7ba3052aa
commit ef5b3c782e

View File

@ -73,7 +73,6 @@ class Manager
, fShmSegments(nullptr) , fShmSegments(nullptr)
, fShmRegions(nullptr) , fShmRegions(nullptr)
, fInterrupted(false) , fInterrupted(false)
, fMsgCounter(0)
#ifdef FAIRMQ_DEBUG_MODE #ifdef FAIRMQ_DEBUG_MODE
, fMsgDebug(nullptr) , fMsgDebug(nullptr)
, fShmMsgCounters(nullptr) , fShmMsgCounters(nullptr)
@ -82,6 +81,8 @@ class Manager
, fSendHeartbeats(true) , fSendHeartbeats(true)
, fThrowOnBadAlloc(config ? config->GetProperty<bool>("shm-throw-bad-alloc", true) : true) , fThrowOnBadAlloc(config ? config->GetProperty<bool>("shm-throw-bad-alloc", true) : true)
, fNoCleanup(config ? config->GetProperty<bool>("shm-no-cleanup", false) : false) , fNoCleanup(config ? config->GetProperty<bool>("shm-no-cleanup", false) : false)
, fMsgCounterNew(0)
, fMsgCounterDelete(0)
{ {
using namespace boost::interprocess; using namespace boost::interprocess;
@ -260,9 +261,10 @@ class Manager
void Resume() { fInterrupted.store(false); } void Resume() { fInterrupted.store(false); }
void Reset() void Reset()
{ {
if (fMsgCounter.load() != 0) { auto diff = fMsgCounterNew.load() - fMsgCounterDelete.load();
LOG(error) << "Message counter during Reset expected to be 0, found: " << fMsgCounter.load(); if (diff != 0) {
throw MessageError(tools::ToString("Message counter during Reset expected to be 0, found: ", fMsgCounter.load())); LOG(error) << "Message counter during Reset expected to be 0, found: " << diff;
throw MessageError(tools::ToString("Message counter during Reset expected to be 0, found: ", diff));
} }
} }
bool Interrupted() { return fInterrupted.load(); } bool Interrupted() { return fInterrupted.load(); }
@ -496,8 +498,8 @@ class Manager
} }
} }
void IncrementMsgCounter() { fMsgCounter.fetch_add(1, std::memory_order_relaxed); } void IncrementMsgCounter() { fMsgCounterNew.fetch_add(1, std::memory_order_relaxed); }
void DecrementMsgCounter() { fMsgCounter.fetch_sub(1, std::memory_order_relaxed); } void DecrementMsgCounter() { fMsgCounterDelete.fetch_add(1, std::memory_order_relaxed); }
#ifdef FAIRMQ_DEBUG_MODE #ifdef FAIRMQ_DEBUG_MODE
void IncrementShmMsgCounter(uint16_t segmentId) { ++((*fShmMsgCounters)[segmentId].fCount); } void IncrementShmMsgCounter(uint16_t segmentId) { ++((*fShmMsgCounters)[segmentId].fCount); }
@ -696,7 +698,6 @@ class Manager
} fTlRegionCache; } fTlRegionCache;
std::atomic<bool> fInterrupted; std::atomic<bool> fInterrupted;
std::atomic<int32_t> fMsgCounter; // TODO: find a better lifetime solution instead of the counter
#ifdef FAIRMQ_DEBUG_MODE #ifdef FAIRMQ_DEBUG_MODE
Uint16MsgDebugMapHashMap* fMsgDebug; Uint16MsgDebugMapHashMap* fMsgDebug;
Uint16MsgCounterHashMap* fShmMsgCounters; Uint16MsgCounterHashMap* fShmMsgCounters;
@ -709,6 +710,10 @@ class Manager
bool fThrowOnBadAlloc; bool fThrowOnBadAlloc;
bool fNoCleanup; bool fNoCleanup;
// make sure the counters are not thrashing the cache line between threads doing creation and deallocation
alignas(128) std::atomic_uint64_t fMsgCounterNew; // TODO: find a better lifetime solution instead of the counter
alignas(128) std::atomic_uint64_t fMsgCounterDelete;
}; };
} // namespace fair::mq::shmem } // namespace fair::mq::shmem