shm: reduce shm contention when dealing with ack queues

This commit is contained in:
Alexey Rybalchenko 2021-04-22 18:32:39 +02:00
parent 4e466514d2
commit c85d6e079c
4 changed files with 88 additions and 44 deletions

View File

@ -67,7 +67,7 @@ class Manager
, fManagementSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mng").c_str(), 6553600) , fManagementSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mng").c_str(), 6553600)
, fShmVoidAlloc(fManagementSegment.get_segment_manager()) , fShmVoidAlloc(fManagementSegment.get_segment_manager())
, fShmMtx(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mtx").c_str()) , fShmMtx(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mtx").c_str())
, fRegionEventsCV(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_cv").c_str()) , fRegionEventsShmCV(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_cv").c_str())
, fRegionEventsSubscriptionActive(false) , fRegionEventsSubscriptionActive(false)
, fNumObservedEvents(0) , fNumObservedEvents(0)
, fDeviceCounter(nullptr) , fDeviceCounter(nullptr)
@ -78,13 +78,13 @@ class Manager
#ifdef FAIRMQ_DEBUG_MODE #ifdef FAIRMQ_DEBUG_MODE
, fMsgDebug(nullptr) , fMsgDebug(nullptr)
, fShmMsgCounters(nullptr) , fShmMsgCounters(nullptr)
, fMsgCounterNew(0)
, fMsgCounterDelete(0)
#endif #endif
, fHeartbeatThread() , fHeartbeatThread()
, fSendHeartbeats(true) , fSendHeartbeats(true)
, fThrowOnBadAlloc(config ? config->GetProperty<bool>("shm-throw-bad-alloc", true) : true) , fThrowOnBadAlloc(config ? config->GetProperty<bool>("shm-throw-bad-alloc", true) : true)
, fNoCleanup(config ? config->GetProperty<bool>("shm-no-cleanup", false) : false) , fNoCleanup(config ? config->GetProperty<bool>("shm-no-cleanup", false) : false)
, fMsgCounterNew(0)
, fMsgCounterDelete(0)
{ {
using namespace boost::interprocess; using namespace boost::interprocess;
@ -263,11 +263,13 @@ class Manager
void Resume() { fInterrupted.store(false); } void Resume() { fInterrupted.store(false); }
void Reset() void Reset()
{ {
#ifdef FAIRMQ_DEBUG_MODE
auto diff = fMsgCounterNew.load() - fMsgCounterDelete.load(); auto diff = fMsgCounterNew.load() - fMsgCounterDelete.load();
if (diff != 0) { if (diff != 0) {
LOG(error) << "Message counter during Reset expected to be 0, found: " << diff; LOG(error) << "Message counter during Reset expected to be 0, found: " << diff;
throw MessageError(tools::ToString("Message counter during Reset expected to be 0, found: ", diff)); throw MessageError(tools::ToString("Message counter during Reset expected to be 0, found: ", diff));
} }
#endif
} }
bool Interrupted() { return fInterrupted.load(); } bool Interrupted() { return fInterrupted.load(); }
@ -306,12 +308,12 @@ class Manager
return {nullptr, id}; return {nullptr, id};
} }
// create region info
fShmRegions->emplace(id, RegionInfo(path.c_str(), flags, userFlags, fShmVoidAlloc));
auto r = fRegions.emplace(id, std::make_unique<Region>(fShmId, id, size, false, callback, bulkCallback, path, flags)); auto r = fRegions.emplace(id, std::make_unique<Region>(fShmId, id, size, false, callback, bulkCallback, path, flags));
// LOG(debug) << "Created region with id '" << id << "', path: '" << path << "', flags: '" << flags << "'"; // LOG(debug) << "Created region with id '" << id << "', path: '" << path << "', flags: '" << flags << "'";
fShmRegions->emplace(id, RegionInfo(path.c_str(), flags, userFlags, fShmVoidAlloc));
r.first->second->InitializeQueues();
r.first->second->StartReceivingAcks(); r.first->second->StartReceivingAcks();
result.first = &(r.first->second->fRegion); result.first = &(r.first->second->fRegion);
result.second = id; result.second = id;
@ -319,7 +321,7 @@ class Manager
(fEventCounter->fCount)++; (fEventCounter->fCount)++;
} }
fRegionsGen += 1; // signal TL cache invalidation fRegionsGen += 1; // signal TL cache invalidation
fRegionEventsCV.notify_all(); fRegionEventsShmCV.notify_all();
return result; return result;
} catch (interprocess_exception& e) { } catch (interprocess_exception& e) {
@ -384,14 +386,19 @@ class Manager
void RemoveRegion(const uint16_t id) void RemoveRegion(const uint16_t id)
{ {
{ try {
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx); fRegions.at(id)->StopAcks();
fShmRegions->at(id).fDestroyed = true; {
fRegions.erase(id); boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
(fEventCounter->fCount)++; fShmRegions->at(id).fDestroyed = true;
fRegions.erase(id);
(fEventCounter->fCount)++;
}
fRegionEventsShmCV.notify_all();
} catch(std::out_of_range& oor) {
LOG(debug) << "RemoveRegion() could not locate region with id'" << id << "'";
} }
fRegionsGen += 1; // signal TL cache invalidation fRegionsGen += 1; // signal TL cache invalidation
fRegionEventsCV.notify_all();
} }
std::vector<fair::mq::RegionInfo> GetRegionInfo() std::vector<fair::mq::RegionInfo> GetRegionInfo()
@ -452,7 +459,7 @@ class Manager
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx); boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
fRegionEventsSubscriptionActive = false; fRegionEventsSubscriptionActive = false;
lock.unlock(); lock.unlock();
fRegionEventsCV.notify_all(); fRegionEventsShmCV.notify_all();
fRegionEventThread.join(); fRegionEventThread.join();
} }
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx); boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
@ -469,7 +476,7 @@ class Manager
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx); boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
fRegionEventsSubscriptionActive = false; fRegionEventsSubscriptionActive = false;
lock.unlock(); lock.unlock();
fRegionEventsCV.notify_all(); fRegionEventsShmCV.notify_all();
fRegionEventThread.join(); fRegionEventThread.join();
lock.lock(); lock.lock();
fRegionEventCallback = nullptr; fRegionEventCallback = nullptr;
@ -500,26 +507,32 @@ class Manager
el->second = i.event; el->second = i.event;
++fNumObservedEvents; ++fNumObservedEvents;
} else { } else {
// LOG(debug) << "ignoring event for id " << i.id << ":" // LOG(debug) << "ignoring event " << i.id << ": incoming: " << i.event << ", stored: " << el->second;
// << " incoming: " << i.event << ","
// << " stored: " << el->second;
} }
} }
} }
fRegionEventsCV.wait(lock, [&] { return !fRegionEventsSubscriptionActive || fNumObservedEvents != fEventCounter->fCount; }); fRegionEventsShmCV.wait(lock, [&] { return !fRegionEventsSubscriptionActive || fNumObservedEvents != fEventCounter->fCount; });
} }
} }
void IncrementMsgCounter() { fMsgCounterNew.fetch_add(1, std::memory_order_relaxed); } void IncrementMsgCounter()
void DecrementMsgCounter() { fMsgCounterDelete.fetch_add(1, std::memory_order_relaxed); } {
#ifdef FAIRMQ_DEBUG_MODE
fMsgCounterNew.fetch_add(1, std::memory_order_relaxed);
#endif
}
void DecrementMsgCounter()
{
#ifdef FAIRMQ_DEBUG_MODE
fMsgCounterDelete.fetch_add(1, std::memory_order_relaxed);
#endif
}
#ifdef FAIRMQ_DEBUG_MODE #ifdef FAIRMQ_DEBUG_MODE
void IncrementShmMsgCounter(uint16_t segmentId) { ++((*fShmMsgCounters)[segmentId].fCount); } void IncrementShmMsgCounter(uint16_t segmentId) { ++((*fShmMsgCounters)[segmentId].fCount); }
void DecrementShmMsgCounter(uint16_t segmentId) { --((*fShmMsgCounters)[segmentId].fCount); } void DecrementShmMsgCounter(uint16_t segmentId) { --((*fShmMsgCounters)[segmentId].fCount); }
#endif #endif
boost::interprocess::named_mutex& GetMtx() { return fShmMtx; }
void SendHeartbeats() void SendHeartbeats()
{ {
std::string controlQueueName("fmq_" + fShmId + "_cq"); std::string controlQueueName("fmq_" + fShmId + "_cq");
@ -547,7 +560,7 @@ class Manager
auto it = fSegments.find(id); auto it = fSegments.find(id);
if (it == fSegments.end()) { if (it == fSegments.end()) {
try { try {
// get region info // get segment info
SegmentInfo segmentInfo = fShmSegments->at(id); SegmentInfo segmentInfo = fShmSegments->at(id);
LOG(debug) << "Located segment with id '" << id << "'"; LOG(debug) << "Located segment with id '" << id << "'";
@ -691,7 +704,7 @@ class Manager
VoidAlloc fShmVoidAlloc; VoidAlloc fShmVoidAlloc;
boost::interprocess::named_mutex fShmMtx; boost::interprocess::named_mutex fShmMtx;
boost::interprocess::named_condition fRegionEventsCV; boost::interprocess::named_condition fRegionEventsShmCV;
std::thread fRegionEventThread; std::thread fRegionEventThread;
bool fRegionEventsSubscriptionActive; bool fRegionEventsSubscriptionActive;
std::function<void(fair::mq::RegionInfo)> fRegionEventCallback; std::function<void(fair::mq::RegionInfo)> fRegionEventCallback;
@ -712,8 +725,11 @@ class Manager
std::atomic<bool> fInterrupted; std::atomic<bool> fInterrupted;
#ifdef FAIRMQ_DEBUG_MODE #ifdef FAIRMQ_DEBUG_MODE
// make sure the counters are not thrashing the cache line between threads doing creation and deallocation
Uint16MsgDebugMapHashMap* fMsgDebug; Uint16MsgDebugMapHashMap* fMsgDebug;
Uint16MsgCounterHashMap* fShmMsgCounters; Uint16MsgCounterHashMap* fShmMsgCounters;
alignas(128) std::atomic_uint64_t fMsgCounterNew;
alignas(128) std::atomic_uint64_t fMsgCounterDelete;
#endif #endif
std::thread fHeartbeatThread; std::thread fHeartbeatThread;
@ -724,9 +740,7 @@ class Manager
bool fThrowOnBadAlloc; bool fThrowOnBadAlloc;
bool fNoCleanup; bool fNoCleanup;
// make sure the counters are not thrashing the cache line between threads doing creation and deallocation
alignas(128) std::atomic_uint64_t fMsgCounterNew; // TODO: find a better lifetime solution instead of the counter
alignas(128) std::atomic_uint64_t fMsgCounterDelete;
}; };
} // namespace fair::mq::shmem } // namespace fair::mq::shmem

View File

@ -309,6 +309,8 @@ class Message final : public fair::mq::Message
} }
if (fRegionPtr) { if (fRegionPtr) {
fRegionPtr->InitializeQueues();
fRegionPtr->StartSendingAcks();
fRegionPtr->ReleaseBlock({fMeta.fHandle, fMeta.fSize, fMeta.fHint}); fRegionPtr->ReleaseBlock({fMeta.fHandle, fMeta.fSize, fMeta.fHint});
} else { } else {
LOG(warn) << "region ack queue for id " << fMeta.fRegionId << " no longer exist. Not sending ack"; LOG(warn) << "region ack queue for id " << fMeta.fRegionId << " no longer exist. Not sending ack";
@ -324,7 +326,7 @@ class Message final : public fair::mq::Message
Deallocate(); Deallocate();
fAlignment = 0; fAlignment = 0;
fManager.DecrementMsgCounter(); // TODO: put this to debug mode fManager.DecrementMsgCounter();
} }
}; };

View File

@ -47,7 +47,7 @@ struct Region
Region(const std::string& shmId, uint16_t id, uint64_t size, bool remote, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string& path, int flags) Region(const std::string& shmId, uint16_t id, uint64_t size, bool remote, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string& path, int flags)
: fRemote(remote) : fRemote(remote)
, fLinger(100) , fLinger(100)
, fStop(false) , fStopAcks(false)
, fName("fmq_" + shmId + "_rg_" + std::to_string(id)) , fName("fmq_" + shmId + "_rg_" + std::to_string(id))
, fQueueName("fmq_" + shmId + "_rgq_" + std::to_string(id)) , fQueueName("fmq_" + shmId + "_rgq_" + std::to_string(id))
, fShmemObject() , fShmemObject()
@ -104,8 +104,6 @@ struct Region
} }
} }
InitializeQueues();
StartSendingAcks();
LOG(trace) << "shmem: initialized region: " << fName << " (" << (fRemote ? "remote" : "local") << ")"; LOG(trace) << "shmem: initialized region: " << fName << " (" << (fRemote ? "remote" : "local") << ")";
} }
@ -118,15 +116,22 @@ struct Region
{ {
using namespace boost::interprocess; using namespace boost::interprocess;
if (fRemote) { if (fQueue == nullptr) {
fQueue = std::make_unique<message_queue>(open_only, fQueueName.c_str()); if (fRemote) {
} else { fQueue = std::make_unique<message_queue>(open_only, fQueueName.c_str());
fQueue = std::make_unique<message_queue>(create_only, fQueueName.c_str(), 1024, fAckBunchSize * sizeof(RegionBlock)); } else {
fQueue = std::make_unique<message_queue>(create_only, fQueueName.c_str(), 1024, fAckBunchSize * sizeof(RegionBlock));
}
LOG(trace) << "shmem: initialized region queue: " << fQueueName << " (" << (fRemote ? "remote" : "local") << ")";
} }
LOG(trace) << "shmem: initialized region queue: " << fQueueName << " (" << (fRemote ? "remote" : "local") << ")";
} }
void StartSendingAcks() { fAcksSender = std::thread(&Region::SendAcks, this); } void StartSendingAcks()
{
if (!fAcksSender.joinable()) {
fAcksSender = std::thread(&Region::SendAcks, this);
}
}
void SendAcks() void SendAcks()
{ {
std::unique_ptr<RegionBlock[]> blocks = std::make_unique<RegionBlock[]>(fAckBunchSize); std::unique_ptr<RegionBlock[]> blocks = std::make_unique<RegionBlock[]>(fAckBunchSize);
@ -150,13 +155,13 @@ struct Region
} }
if (blocksToSend > 0) { if (blocksToSend > 0) {
while (!fQueue->try_send(blocks.get(), blocksToSend * sizeof(RegionBlock), 0) && !fStop) { while (!fQueue->try_send(blocks.get(), blocksToSend * sizeof(RegionBlock), 0) && !fStopAcks) {
// receiver slow? yield and try again... // receiver slow? yield and try again...
std::this_thread::yield(); std::this_thread::yield();
} }
// LOG(debug) << "Sent " << blocksToSend << " blocks."; // LOG(debug) << "Sent " << blocksToSend << " blocks.";
} else { // blocksToSend == 0 } else { // blocksToSend == 0
if (fStop) { if (fStopAcks) {
break; break;
} }
} }
@ -166,7 +171,12 @@ struct Region
<< " blocks left to send: " << blocksToSend << ")."; << " blocks left to send: " << blocksToSend << ").";
} }
void StartReceivingAcks() { fAcksReceiver = std::thread(&Region::ReceiveAcks, this); } void StartReceivingAcks()
{
if (!fAcksReceiver.joinable()) {
fAcksReceiver = std::thread(&Region::ReceiveAcks, this);
}
}
void ReceiveAcks() void ReceiveAcks()
{ {
unsigned int priority; unsigned int priority;
@ -178,7 +188,7 @@ struct Region
while (true) { while (true) {
uint32_t timeout = 100; uint32_t timeout = 100;
bool leave = false; bool leave = false;
if (fStop) { if (fStopAcks) {
timeout = fLinger; timeout = fLinger;
leave = true; leave = true;
} }
@ -223,9 +233,25 @@ struct Region
void SetLinger(uint32_t linger) { fLinger = linger; } void SetLinger(uint32_t linger) { fLinger = linger; }
uint32_t GetLinger() const { return fLinger; } uint32_t GetLinger() const { return fLinger; }
void StopAcks()
{
fStopAcks = true;
if (fAcksSender.joinable()) {
fBlockSendCV.notify_one();
fAcksSender.join();
}
if (!fRemote) {
if (fAcksReceiver.joinable()) {
fAcksReceiver.join();
}
}
}
~Region() ~Region()
{ {
fStop = true; fStopAcks = true;
if (fAcksSender.joinable()) { if (fAcksSender.joinable()) {
fBlockSendCV.notify_one(); fBlockSendCV.notify_one();
@ -261,7 +287,7 @@ struct Region
bool fRemote; bool fRemote;
uint32_t fLinger; uint32_t fLinger;
std::atomic<bool> fStop; std::atomic<bool> fStopAcks;
std::string fName; std::string fName;
std::string fQueueName; std::string fQueueName;
boost::interprocess::shared_memory_object fShmemObject; boost::interprocess::shared_memory_object fShmemObject;

View File

@ -153,9 +153,11 @@ TEST(ErrorState, interactive_InReset)
EXPECT_EXIT(RunErrorStateIn("Reset", "interactive", "q"), ::testing::ExitedWithCode(1), ""); EXPECT_EXIT(RunErrorStateIn("Reset", "interactive", "q"), ::testing::ExitedWithCode(1), "");
} }
#ifdef FAIRMQ_DEBUG_MODE
TEST(ErrorState, OrphanMessages) TEST(ErrorState, OrphanMessages)
{ {
BadDevice badDevice; BadDevice badDevice;
} }
#endif
} // namespace } // namespace