shm: improve exception handling

This commit is contained in:
Alexey Rybalchenko 2021-06-10 11:02:40 +02:00
parent 28a887a457
commit 37c059177f

View File

@ -19,10 +19,11 @@
#include "Region.h" #include "Region.h"
#include "Monitor.h" #include "Monitor.h"
#include <FairMQLogger.h>
#include <FairMQMessage.h>
#include <fairmq/ProgOptions.h> #include <fairmq/ProgOptions.h>
#include <fairmq/tools/Strings.h> #include <fairmq/tools/Strings.h>
#include <fairmq/Transports.h>
#include <FairMQLogger.h>
#include <FairMQMessage.h>
#include <boost/date_time/posix_time/posix_time.hpp> #include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/filesystem.hpp> #include <boost/filesystem.hpp>
@ -108,7 +109,7 @@ class Manager
fHeartbeatThread = std::thread(&Manager::Heartbeats, this); fHeartbeatThread = std::thread(&Manager::Heartbeats, this);
{ try {
std::stringstream ss; std::stringstream ss;
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx); boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
@ -132,9 +133,25 @@ class Manager
LOG(debug) << "initialized event counter with: " << fEventCounter->fCount; LOG(debug) << "initialized event counter with: " << fEventCounter->fCount;
} }
fShmRegions = fManagementSegment.find_or_construct<Uint16RegionInfoHashMap>(unique_instance)(fShmVoidAlloc);
fDeviceCounter = fManagementSegment.find<DeviceCounter>(unique_instance).first;
if (fDeviceCounter) {
LOG(debug) << "device counter found, with value of " << fDeviceCounter->fCount << ". incrementing.";
(fDeviceCounter->fCount)++;
LOG(debug) << "incremented device counter, now: " << fDeviceCounter->fCount;
} else {
LOG(debug) << "no device counter found, creating one and initializing with 1";
fDeviceCounter = fManagementSegment.construct<DeviceCounter>(unique_instance)(1);
LOG(debug) << "initialized device counter with: " << fDeviceCounter->fCount;
}
std::string op("create/open");
try { try {
auto it = fShmSegments->find(fSegmentId); auto it = fShmSegments->find(fSegmentId);
if (it == fShmSegments->end()) { if (it == fShmSegments->end()) {
op = "create";
// no segment with given id exists, creating // no segment with given id exists, creating
if (allocationAlgorithm == "rbtree_best_fit") { if (allocationAlgorithm == "rbtree_best_fit") {
fSegments.emplace(fSegmentId, RBTreeBestFitSegment(create_only, std::string("fmq_" + fShmId + "_m_" + std::to_string(fSegmentId)).c_str(), size)); fSegments.emplace(fSegmentId, RBTreeBestFitSegment(create_only, std::string("fmq_" + fShmId + "_m_" + std::to_string(fSegmentId)).c_str(), size));
@ -146,6 +163,7 @@ class Manager
ss << "Created "; ss << "Created ";
(fEventCounter->fCount)++; (fEventCounter->fCount)++;
} else { } else {
op = "open";
// found segment with the given id, opening // found segment with the given id, opening
if (it->second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) { if (it->second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) {
fSegments.emplace(fSegmentId, RBTreeBestFitSegment(open_only, std::string("fmq_" + fShmId + "_m_" + std::to_string(fSegmentId)).c_str())); fSegments.emplace(fSegmentId, RBTreeBestFitSegment(open_only, std::string("fmq_" + fShmId + "_m_" + std::to_string(fSegmentId)).c_str()));
@ -168,8 +186,8 @@ class Manager
<< " Allocation algorithm: " << allocationAlgorithm; << " Allocation algorithm: " << allocationAlgorithm;
LOG(debug) << ss.str(); LOG(debug) << ss.str();
} catch(interprocess_exception& bie) { } catch(interprocess_exception& bie) {
LOG(error) << "Failed to create/open shared memory segment (" << "fmq_" << fShmId << "_m_" << fSegmentId << "): " << bie.what(); LOG(error) << "Failed to " << op << " shared memory segment (" << "fmq_" << fShmId << "_m_" << fSegmentId << "): " << bie.what();
throw std::runtime_error(tools::ToString("Failed to create/open shared memory segment (", "fmq_", fShmId, "_m_", fSegmentId, "): ", bie.what())); throw TransportError(tools::ToString("Failed to ", op, " shared memory segment (", "fmq_", fShmId, "_m_", fSegmentId, "): ", bie.what()));
} }
if (mlockSegment) { if (mlockSegment) {
@ -185,23 +203,14 @@ class Manager
LOG(debug) << "Successfully zeroed the managed segment free memory."; LOG(debug) << "Successfully zeroed the managed segment free memory.";
} }
fShmRegions = fManagementSegment.find_or_construct<Uint16RegionInfoHashMap>(unique_instance)(fShmVoidAlloc);
fDeviceCounter = fManagementSegment.find<DeviceCounter>(unique_instance).first;
if (fDeviceCounter) {
LOG(debug) << "device counter found, with value of " << fDeviceCounter->fCount << ". incrementing.";
(fDeviceCounter->fCount)++;
LOG(debug) << "incremented device counter, now: " << fDeviceCounter->fCount;
} else {
LOG(debug) << "no device counter found, creating one and initializing with 1";
fDeviceCounter = fManagementSegment.construct<DeviceCounter>(unique_instance)(1);
LOG(debug) << "initialized device counter with: " << fDeviceCounter->fCount;
}
#ifdef FAIRMQ_DEBUG_MODE #ifdef FAIRMQ_DEBUG_MODE
fMsgDebug = fManagementSegment.find_or_construct<Uint16MsgDebugMapHashMap>(unique_instance)(fShmVoidAlloc); fMsgDebug = fManagementSegment.find_or_construct<Uint16MsgDebugMapHashMap>(unique_instance)(fShmVoidAlloc);
fShmMsgCounters = fManagementSegment.find_or_construct<Uint16MsgCounterHashMap>(unique_instance)(fShmVoidAlloc); fShmMsgCounters = fManagementSegment.find_or_construct<Uint16MsgCounterHashMap>(unique_instance)(fShmVoidAlloc);
#endif #endif
} catch (...) {
StopHeartbeats();
CleanupIfLast();
throw;
} }
} }
@ -556,6 +565,18 @@ class Manager
} }
} }
void StopHeartbeats()
{
{
std::unique_lock<std::mutex> lock(fHeartbeatsMtx);
fBeatTheHeart = false;
}
fHeartbeatsCV.notify_one();
if (fHeartbeatThread.joinable()) {
fHeartbeatThread.join();
}
}
bool ThrowingOnBadAlloc() const { return fThrowOnBadAlloc; } bool ThrowingOnBadAlloc() const { return fThrowOnBadAlloc; }
void GetSegment(uint16_t id) void GetSegment(uint16_t id)
@ -660,23 +681,11 @@ class Manager
uint16_t GetSegmentId() const { return fSegmentId; } uint16_t GetSegmentId() const { return fSegmentId; }
~Manager() void CleanupIfLast()
{ {
using namespace boost::interprocess; using namespace boost::interprocess;
bool lastRemoved = false; bool lastRemoved = false;
fRegionsGen += 1; // signal TL cache invalidation
UnsubscribeFromRegionEvents();
{
std::unique_lock<std::mutex> lock(fHeartbeatsMtx);
fBeatTheHeart = false;
}
fHeartbeatsCV.notify_one();
if (fHeartbeatThread.joinable()) {
fHeartbeatThread.join();
}
try { try {
boost::interprocess::scoped_lock<named_mutex> lock(fShmMtx); boost::interprocess::scoped_lock<named_mutex> lock(fShmMtx);
@ -697,6 +706,16 @@ class Manager
} }
} }
~Manager()
{
fRegionsGen += 1; // signal TL cache invalidation
UnsubscribeFromRegionEvents();
StopHeartbeats();
CleanupIfLast();
}
private: private:
uint64_t fShmId64; uint64_t fShmId64;
std::string fShmId; std::string fShmId;