diff --git a/fairmq/shmem/Manager.h b/fairmq/shmem/Manager.h index 052752b3..ba90ca14 100644 --- a/fairmq/shmem/Manager.h +++ b/fairmq/shmem/Manager.h @@ -396,7 +396,7 @@ class Manager const uint16_t id = cfg.id.value(); std::lock_guard lock(fLocalRegionsMtx); - auto& region = fRegions[id] = std::make_unique(fShmId, size, false, cfg); + auto& region = fRegions[id] = std::make_unique(fShmId, size, true, cfg); // LOG(debug) << "Created region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'"; // start ack receiver only if a callback has been provided. @@ -463,7 +463,7 @@ class Manager } // LOG(debug) << "Located remote region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'"; - auto r = fRegions.emplace(id, std::make_unique(fShmId, 0, true, std::move(cfg))); + auto r = fRegions.emplace(id, std::make_unique(fShmId, 0, false, std::move(cfg))); r.first->second->InitializeQueues(); r.first->second->StartAckSender(); return r.first->second.get(); @@ -554,7 +554,7 @@ class Manager if (it != fRegions.end()) { region = it->second.get(); } else { - auto r = fRegions.emplace(cfgIt->first, std::make_unique(fShmId, 0, true, cfgIt->second)); + auto r = fRegions.emplace(cfgIt->first, std::make_unique(fShmId, 0, false, cfgIt->second)); region = r.first->second.get(); region->InitializeQueues(); region->StartAckSender(); diff --git a/fairmq/shmem/UnmanagedRegion.h b/fairmq/shmem/UnmanagedRegion.h index 52b681f8..2981cf39 100644 --- a/fairmq/shmem/UnmanagedRegion.h +++ b/fairmq/shmem/UnmanagedRegion.h @@ -44,19 +44,19 @@ struct UnmanagedRegion friend class Monitor; UnmanagedRegion(const std::string& shmId, uint16_t id, uint64_t size) - : UnmanagedRegion(shmId, size, false, makeRegionConfig(id)) + : UnmanagedRegion(shmId, size, true, makeRegionConfig(id)) {} UnmanagedRegion(const std::string& shmId, uint64_t size, RegionConfig cfg) - : UnmanagedRegion(shmId, size, false, std::move(cfg)) + : UnmanagedRegion(shmId, size, true, std::move(cfg)) {} UnmanagedRegion(const std::string& shmId, RegionConfig cfg) - : UnmanagedRegion(shmId, cfg.size, false, std::move(cfg)) + : UnmanagedRegion(shmId, cfg.size, true, std::move(cfg)) {} - UnmanagedRegion(const std::string& shmId, uint64_t size, bool remote, RegionConfig cfg) - : fRemote(remote) + UnmanagedRegion(const std::string& shmId, uint64_t size, bool controlling, RegionConfig cfg) + : fControlling(controlling) , fRemoveOnDestruction(cfg.removeOnDestruction) , fLinger(cfg.linger) , fStopAcks(false) @@ -76,12 +76,12 @@ struct UnmanagedRegion const uint16_t id = cfg.id.value(); bool created = false; - LOG(debug) << "UnmanagedRegion(): " << fName << " | remote: " << remote << "."; + LOG(debug) << "UnmanagedRegion(): " << fName << " (" << (fControlling ? "controller" : "viewer") << ")"; if (!cfg.path.empty()) { fName = std::string(cfg.path + fName); - if (!fRemote) { + if (fControlling) { // create a file std::filebuf fbuf; if (fbuf.open(fName, std::ios_base::in | std::ios_base::out | std::ios_base::trunc | std::ios_base::binary)) { @@ -108,10 +108,15 @@ struct UnmanagedRegion fShmemObject = shared_memory_object(open_only, fName.c_str(), read_write); created = false; } catch (interprocess_exception& e) { - LOG(debug) << "Could not open " << (remote ? "remote" : "local") << " shared_memory_object for region id '" << id << "': " << e.what() << ", creating..."; - fShmemObject = shared_memory_object(create_only, fName.c_str(), read_write); - fShmemObject.truncate(size); - created = true; + if (fControlling) { + LOG(debug) << "Could not open controlling shared_memory_object for region " << id << ": " << e.what() << ", creating..."; + fShmemObject = shared_memory_object(create_only, fName.c_str(), read_write); + fShmemObject.truncate(size); + created = true; + } else { + LOG(error) << "Could not open view for shared_memory_object for region " << id << ": " << e.what(); + throw TransportError(tools::ToString("Could not open view for shared_memory_object for region ", id, ": ", e.what())); + } } } catch (interprocess_exception& e) { LOG(error) << "Failed initializing shared_memory_object for region id " << id << ": " << e.what(); @@ -141,11 +146,11 @@ struct UnmanagedRegion LOG(debug) << "Successfully zeroed free memory of region " << id << "."; } - if (!remote) { + if (fControlling) { Register(shmId, cfg); } - LOG(debug) << (created ? "Created" : "Opened") << " unmanaged shared memory region: " << fName << " (" << (remote ? "remote" : "local") << ")"; + LOG(debug) << (created ? "Created" : "Opened") << " unmanaged shared memory region: " << fName << " (" << (fControlling ? "controller" : "viewer") << ")"; } UnmanagedRegion() = delete; @@ -177,7 +182,7 @@ struct UnmanagedRegion ~UnmanagedRegion() { - LOG(debug) << "~UnmanagedRegion(): " << fName << " | remote: " << fRemote << "."; + LOG(debug) << "~UnmanagedRegion(): " << fName << " (" << (fControlling ? "controller" : "viewer") << ")"; fStopAcks = true; if (fAcksSender.joinable()) { @@ -185,7 +190,7 @@ struct UnmanagedRegion fAcksSender.join(); } - if (!fRemote) { + if (fControlling) { if (fAcksReceiver.joinable()) { fAcksReceiver.join(); } @@ -211,14 +216,14 @@ struct UnmanagedRegion fclose(fFile); } } else { - // LOG(debug) << "Region queue '" << fQueueName << "' is remote, no cleanup necessary"; + // LOG(debug) << "Region queue '" << fQueueName << "' is viewer, no cleanup necessary"; } - // LOG(debug) << "Region '" << fName << "' (" << (fRemote ? "remote" : "local") << ") destructed."; + // LOG(debug) << "Region '" << fName << "' (" << (fControlling ? "controller" : "viewer") << ") destructed."; } private: - bool fRemote; + bool fControlling; bool fRemoveOnDestruction; uint32_t fLinger; std::atomic fStopAcks;