mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 08:41:16 +00:00
shm: refactor UnamangedRegion: rename fRemote to fController
This commit is contained in:
parent
19e40bd32e
commit
9a25c4d28a
|
@ -396,7 +396,7 @@ class Manager
|
||||||
const uint16_t id = cfg.id.value();
|
const uint16_t id = cfg.id.value();
|
||||||
|
|
||||||
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
|
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
|
||||||
auto& region = fRegions[id] = std::make_unique<UnmanagedRegion>(fShmId, size, false, cfg);
|
auto& region = fRegions[id] = std::make_unique<UnmanagedRegion>(fShmId, size, true, cfg);
|
||||||
// LOG(debug) << "Created region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'";
|
// LOG(debug) << "Created region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'";
|
||||||
|
|
||||||
// start ack receiver only if a callback has been provided.
|
// start ack receiver only if a callback has been provided.
|
||||||
|
@ -463,7 +463,7 @@ class Manager
|
||||||
}
|
}
|
||||||
// LOG(debug) << "Located remote region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'";
|
// LOG(debug) << "Located remote region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'";
|
||||||
|
|
||||||
auto r = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, 0, true, std::move(cfg)));
|
auto r = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, 0, false, std::move(cfg)));
|
||||||
r.first->second->InitializeQueues();
|
r.first->second->InitializeQueues();
|
||||||
r.first->second->StartAckSender();
|
r.first->second->StartAckSender();
|
||||||
return r.first->second.get();
|
return r.first->second.get();
|
||||||
|
@ -554,7 +554,7 @@ class Manager
|
||||||
if (it != fRegions.end()) {
|
if (it != fRegions.end()) {
|
||||||
region = it->second.get();
|
region = it->second.get();
|
||||||
} else {
|
} else {
|
||||||
auto r = fRegions.emplace(cfgIt->first, std::make_unique<UnmanagedRegion>(fShmId, 0, true, cfgIt->second));
|
auto r = fRegions.emplace(cfgIt->first, std::make_unique<UnmanagedRegion>(fShmId, 0, false, cfgIt->second));
|
||||||
region = r.first->second.get();
|
region = r.first->second.get();
|
||||||
region->InitializeQueues();
|
region->InitializeQueues();
|
||||||
region->StartAckSender();
|
region->StartAckSender();
|
||||||
|
|
|
@ -44,19 +44,19 @@ struct UnmanagedRegion
|
||||||
friend class Monitor;
|
friend class Monitor;
|
||||||
|
|
||||||
UnmanagedRegion(const std::string& shmId, uint16_t id, uint64_t size)
|
UnmanagedRegion(const std::string& shmId, uint16_t id, uint64_t size)
|
||||||
: UnmanagedRegion(shmId, size, false, makeRegionConfig(id))
|
: UnmanagedRegion(shmId, size, true, makeRegionConfig(id))
|
||||||
{}
|
{}
|
||||||
|
|
||||||
UnmanagedRegion(const std::string& shmId, uint64_t size, RegionConfig cfg)
|
UnmanagedRegion(const std::string& shmId, uint64_t size, RegionConfig cfg)
|
||||||
: UnmanagedRegion(shmId, size, false, std::move(cfg))
|
: UnmanagedRegion(shmId, size, true, std::move(cfg))
|
||||||
{}
|
{}
|
||||||
|
|
||||||
UnmanagedRegion(const std::string& shmId, RegionConfig cfg)
|
UnmanagedRegion(const std::string& shmId, RegionConfig cfg)
|
||||||
: UnmanagedRegion(shmId, cfg.size, false, std::move(cfg))
|
: UnmanagedRegion(shmId, cfg.size, true, std::move(cfg))
|
||||||
{}
|
{}
|
||||||
|
|
||||||
UnmanagedRegion(const std::string& shmId, uint64_t size, bool remote, RegionConfig cfg)
|
UnmanagedRegion(const std::string& shmId, uint64_t size, bool controlling, RegionConfig cfg)
|
||||||
: fRemote(remote)
|
: fControlling(controlling)
|
||||||
, fRemoveOnDestruction(cfg.removeOnDestruction)
|
, fRemoveOnDestruction(cfg.removeOnDestruction)
|
||||||
, fLinger(cfg.linger)
|
, fLinger(cfg.linger)
|
||||||
, fStopAcks(false)
|
, fStopAcks(false)
|
||||||
|
@ -76,12 +76,12 @@ struct UnmanagedRegion
|
||||||
const uint16_t id = cfg.id.value();
|
const uint16_t id = cfg.id.value();
|
||||||
bool created = false;
|
bool created = false;
|
||||||
|
|
||||||
LOG(debug) << "UnmanagedRegion(): " << fName << " | remote: " << remote << ".";
|
LOG(debug) << "UnmanagedRegion(): " << fName << " (" << (fControlling ? "controller" : "viewer") << ")";
|
||||||
|
|
||||||
if (!cfg.path.empty()) {
|
if (!cfg.path.empty()) {
|
||||||
fName = std::string(cfg.path + fName);
|
fName = std::string(cfg.path + fName);
|
||||||
|
|
||||||
if (!fRemote) {
|
if (fControlling) {
|
||||||
// create a file
|
// create a file
|
||||||
std::filebuf fbuf;
|
std::filebuf fbuf;
|
||||||
if (fbuf.open(fName, std::ios_base::in | std::ios_base::out | std::ios_base::trunc | std::ios_base::binary)) {
|
if (fbuf.open(fName, std::ios_base::in | std::ios_base::out | std::ios_base::trunc | std::ios_base::binary)) {
|
||||||
|
@ -108,10 +108,15 @@ struct UnmanagedRegion
|
||||||
fShmemObject = shared_memory_object(open_only, fName.c_str(), read_write);
|
fShmemObject = shared_memory_object(open_only, fName.c_str(), read_write);
|
||||||
created = false;
|
created = false;
|
||||||
} catch (interprocess_exception& e) {
|
} catch (interprocess_exception& e) {
|
||||||
LOG(debug) << "Could not open " << (remote ? "remote" : "local") << " shared_memory_object for region id '" << id << "': " << e.what() << ", creating...";
|
if (fControlling) {
|
||||||
fShmemObject = shared_memory_object(create_only, fName.c_str(), read_write);
|
LOG(debug) << "Could not open controlling shared_memory_object for region " << id << ": " << e.what() << ", creating...";
|
||||||
fShmemObject.truncate(size);
|
fShmemObject = shared_memory_object(create_only, fName.c_str(), read_write);
|
||||||
created = true;
|
fShmemObject.truncate(size);
|
||||||
|
created = true;
|
||||||
|
} else {
|
||||||
|
LOG(error) << "Could not open view for shared_memory_object for region " << id << ": " << e.what();
|
||||||
|
throw TransportError(tools::ToString("Could not open view for shared_memory_object for region ", id, ": ", e.what()));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} catch (interprocess_exception& e) {
|
} catch (interprocess_exception& e) {
|
||||||
LOG(error) << "Failed initializing shared_memory_object for region id " << id << ": " << e.what();
|
LOG(error) << "Failed initializing shared_memory_object for region id " << id << ": " << e.what();
|
||||||
|
@ -141,11 +146,11 @@ struct UnmanagedRegion
|
||||||
LOG(debug) << "Successfully zeroed free memory of region " << id << ".";
|
LOG(debug) << "Successfully zeroed free memory of region " << id << ".";
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!remote) {
|
if (fControlling) {
|
||||||
Register(shmId, cfg);
|
Register(shmId, cfg);
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG(debug) << (created ? "Created" : "Opened") << " unmanaged shared memory region: " << fName << " (" << (remote ? "remote" : "local") << ")";
|
LOG(debug) << (created ? "Created" : "Opened") << " unmanaged shared memory region: " << fName << " (" << (fControlling ? "controller" : "viewer") << ")";
|
||||||
}
|
}
|
||||||
|
|
||||||
UnmanagedRegion() = delete;
|
UnmanagedRegion() = delete;
|
||||||
|
@ -177,7 +182,7 @@ struct UnmanagedRegion
|
||||||
|
|
||||||
~UnmanagedRegion()
|
~UnmanagedRegion()
|
||||||
{
|
{
|
||||||
LOG(debug) << "~UnmanagedRegion(): " << fName << " | remote: " << fRemote << ".";
|
LOG(debug) << "~UnmanagedRegion(): " << fName << " (" << (fControlling ? "controller" : "viewer") << ")";
|
||||||
fStopAcks = true;
|
fStopAcks = true;
|
||||||
|
|
||||||
if (fAcksSender.joinable()) {
|
if (fAcksSender.joinable()) {
|
||||||
|
@ -185,7 +190,7 @@ struct UnmanagedRegion
|
||||||
fAcksSender.join();
|
fAcksSender.join();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!fRemote) {
|
if (fControlling) {
|
||||||
if (fAcksReceiver.joinable()) {
|
if (fAcksReceiver.joinable()) {
|
||||||
fAcksReceiver.join();
|
fAcksReceiver.join();
|
||||||
}
|
}
|
||||||
|
@ -211,14 +216,14 @@ struct UnmanagedRegion
|
||||||
fclose(fFile);
|
fclose(fFile);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// LOG(debug) << "Region queue '" << fQueueName << "' is remote, no cleanup necessary";
|
// LOG(debug) << "Region queue '" << fQueueName << "' is viewer, no cleanup necessary";
|
||||||
}
|
}
|
||||||
|
|
||||||
// LOG(debug) << "Region '" << fName << "' (" << (fRemote ? "remote" : "local") << ") destructed.";
|
// LOG(debug) << "Region '" << fName << "' (" << (fControlling ? "controller" : "viewer") << ") destructed.";
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
bool fRemote;
|
bool fControlling;
|
||||||
bool fRemoveOnDestruction;
|
bool fRemoveOnDestruction;
|
||||||
uint32_t fLinger;
|
uint32_t fLinger;
|
||||||
std::atomic<bool> fStopAcks;
|
std::atomic<bool> fStopAcks;
|
||||||
|
|
Loading…
Reference in New Issue
Block a user