From d630fbb1e4537e4febc831b5b9cc7345f6963db7 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Thu, 18 Nov 2021 11:52:05 +0100 Subject: [PATCH] consolidate UnmanagedRegion options --- examples/region/sampler.cxx | 12 ++++------ fairmq/TransportFactory.h | 23 ++++++++++++++++-- fairmq/UnmanagedRegion.h | 21 ++++++++++------ fairmq/ofi/TransportFactory.h | 16 +++++++++++++ fairmq/shmem/Common.h | 6 ++--- fairmq/shmem/Manager.h | 29 ++++++++++------------ fairmq/shmem/Message.h | 7 +++--- fairmq/shmem/Monitor.cxx | 2 +- fairmq/shmem/Region.h | 23 +++++++++--------- fairmq/shmem/Socket.h | 8 +++---- fairmq/shmem/TransportFactory.h | 41 +++++++++++++++++++++++--------- fairmq/shmem/UnmanagedRegion.h | 13 ++++------ fairmq/zeromq/TransportFactory.h | 9 +++++++ 13 files changed, 136 insertions(+), 74 deletions(-) diff --git a/examples/region/sampler.cxx b/examples/region/sampler.cxx index 42f982c3..088c1092 100644 --- a/examples/region/sampler.cxx +++ b/examples/region/sampler.cxx @@ -32,22 +32,20 @@ struct Sampler : fair::mq::Device << ", flags: " << info.flags; }); + fair::mq::RegionConfig regionCfg; + regionCfg.linger = fLinger; // delay in ms before region destruction to collect outstanding events + regionCfg.lock = true; // mlock region after creation + regionCfg.zero = true; // zero region content after creation fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data", // region is created using the transport of this channel... 0, // ... and this sub-channel 10000000, // region size [this](const std::vector& blocks) { // callback to be called when message buffers no longer needed by transport std::lock_guard lock(fMtx); fNumUnackedMsgs -= blocks.size(); - if (fMaxIterations > 0) { LOG(info) << "Received " << blocks.size() << " acks"; } - }, - "", // path, if a region is backed by a file - 0, // flags that are passed for region creation - fair::mq::RegionConfig{true, true} // additional config: { call mlock on the region, zero the region memory } - )); - fRegion->SetLinger(fLinger); + }, regionCfg)); } bool ConditionalRun() override diff --git a/fairmq/TransportFactory.h b/fairmq/TransportFactory.h index b63576ba..effe54bd 100644 --- a/fairmq/TransportFactory.h +++ b/fairmq/TransportFactory.h @@ -109,13 +109,15 @@ class TransportFactory /// @param path optional parameter to pass to the underlying transport /// @param flags optional parameter to pass to the underlying transport /// @return pointer to UnmanagedRegion + // [[deprecated("Use CreateUnmanagedRegion(size_t size, RegionCallback callback, RegionConfig cfg)")]] virtual UnmanagedRegionPtr CreateUnmanagedRegion(size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0, RegionConfig cfg = RegionConfig()) = 0; + // [[deprecated("Use CreateUnmanagedRegion(size_t size, RegionCallback callback, RegionConfig cfg)")]] virtual UnmanagedRegionPtr CreateUnmanagedRegion(size_t size, - RegionBulkCallback callback = nullptr, + RegionBulkCallback bulkCallback = nullptr, const std::string& path = "", int flags = 0, RegionConfig cfg = RegionConfig()) = 0; @@ -128,19 +130,36 @@ class TransportFactory /// @param path optional parameter to pass to the underlying transport /// @param flags optional parameter to pass to the underlying transport /// @return pointer to UnmanagedRegion + // [[deprecated("Use CreateUnmanagedRegion(size_t size, RegionCallback callback, RegionConfig cfg)")]] virtual UnmanagedRegionPtr CreateUnmanagedRegion(size_t size, int64_t userFlags, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0, RegionConfig cfg = RegionConfig()) = 0; + // [[deprecated("Use CreateUnmanagedRegion(size_t size, RegionCallback callback, RegionConfig cfg)")]] virtual UnmanagedRegionPtr CreateUnmanagedRegion(size_t size, int64_t userFlags, - RegionBulkCallback callback = nullptr, + RegionBulkCallback bulkCallback = nullptr, const std::string& path = "", int flags = 0, RegionConfig cfg = RegionConfig()) = 0; + + /// @brief Create new UnmanagedRegion + /// @param size size of the region + /// @param callback callback to be called when a message belonging to this region is no longer needed by the transport + /// @param cfg region configuration + /// @return pointer to UnmanagedRegion + virtual UnmanagedRegionPtr CreateUnmanagedRegion(size_t size, RegionCallback callback, RegionConfig cfg) = 0; + + /// @brief Create new UnmanagedRegion + /// @param size size of the region + /// @param bulkCallback callback to be called when message(s) belonging to this region is no longer needed by the transport + /// @param cfg region configuration + /// @return pointer to UnmanagedRegion + virtual UnmanagedRegionPtr CreateUnmanagedRegion(size_t size, RegionBulkCallback bulkCallback, RegionConfig cfg) = 0; + /// @brief Subscribe to region events (creation, destruction, ...) /// @param callback the callback that is called when a region event occurs virtual void SubscribeToRegionEvents(RegionEventCallback callback) = 0; diff --git a/fairmq/UnmanagedRegion.h b/fairmq/UnmanagedRegion.h index 11a7595b..f4c2e4e0 100644 --- a/fairmq/UnmanagedRegion.h +++ b/fairmq/UnmanagedRegion.h @@ -9,12 +9,15 @@ #ifndef FAIR_MQ_UNMANAGEDREGION_H #define FAIR_MQ_UNMANAGEDREGION_H +#include + #include // size_t #include // uint32_t -#include + #include // std::function #include // std::unique_ptr -#include // std::ostream +#include +#include #include namespace fair::mq { @@ -119,13 +122,17 @@ struct RegionConfig { RegionConfig() = default; - RegionConfig(bool l, bool z) - : lock(l) - , zero(z) + RegionConfig(bool _lock, bool _zero) + : lock(_lock) + , zero(_zero) {} - bool lock = false; - bool zero = false; + bool lock = false; /// mlock region after creation + bool zero = false; /// zero region content after creation + int creationFlags = 0; /// flags passed to the underlying transport on region creation + int64_t userFlags = 0; /// custom flags that have no effect on the transport, but can be retrieved from the region by the user + std::string path = ""; /// file path, if the region is backed by a file + uint32_t linger = 100; /// delay in ms before region destruction to collect outstanding events }; } // namespace fair::mq diff --git a/fairmq/ofi/TransportFactory.h b/fairmq/ofi/TransportFactory.h index 3931d9aa..ecb88d0e 100644 --- a/fairmq/ofi/TransportFactory.h +++ b/fairmq/ofi/TransportFactory.h @@ -166,6 +166,22 @@ struct TransportFactory final : mq::TransportFactory throw std::runtime_error("Not yet implemented UMR."); } + auto CreateUnmanagedRegion(std::size_t /*size*/, + RegionCallback /*callback*/, + RegionConfig /*cfg*/) + -> std::unique_ptr override + { + throw std::runtime_error("Not yet implemented UMR."); + } + + auto CreateUnmanagedRegion(std::size_t /*size*/, + RegionBulkCallback /*callback*/, + RegionConfig /*cfg*/) + -> std::unique_ptr override + { + throw std::runtime_error("Not yet implemented UMR."); + } + auto SubscribeToRegionEvents(RegionEventCallback /*callback*/) -> void override { throw std::runtime_error("Not yet implemented."); diff --git a/fairmq/shmem/Common.h b/fairmq/shmem/Common.h index b9fabe66..b90a93e2 100644 --- a/fairmq/shmem/Common.h +++ b/fairmq/shmem/Common.h @@ -60,20 +60,20 @@ struct RegionInfo { RegionInfo(const VoidAlloc& alloc) : fPath("", alloc) - , fFlags(0) + , fCreationFlags(0) , fUserFlags(0) , fDestroyed(false) {} RegionInfo(const char* path, const int flags, const uint64_t userFlags, const VoidAlloc& alloc) : fPath(path, alloc) - , fFlags(flags) + , fCreationFlags(flags) , fUserFlags(userFlags) , fDestroyed(false) {} Str fPath; - int fFlags; + int fCreationFlags; uint64_t fUserFlags; bool fDestroyed; }; diff --git a/fairmq/shmem/Manager.h b/fairmq/shmem/Manager.h index fa234167..8139c79d 100644 --- a/fairmq/shmem/Manager.h +++ b/fairmq/shmem/Manager.h @@ -12,12 +12,12 @@ #include "Common.h" #include "Monitor.h" #include "Region.h" - +#include #include #include #include -#include -#include + +#include #include #include @@ -369,19 +369,15 @@ class Manager bool Interrupted() { return fInterrupted.load(); } std::pair CreateRegion(const size_t size, - const int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, - const std::string& path, - int flags, - fair::mq::RegionConfig cfg) + RegionConfig cfg) { using namespace boost::interprocess; try { std::pair result; { - uint16_t id = 0; boost::interprocess::scoped_lock lock(fShmMtx); RegionCounter* rc = fManagementSegment.find(unique_instance).first; @@ -396,7 +392,7 @@ class Manager LOG(debug) << "initialized region counter with: " << rc->fCount; } - id = rc->fCount; + uint16_t id = rc->fCount; auto it = fRegions.find(id); if (it != fRegions.end()) { @@ -404,8 +400,8 @@ class Manager return {nullptr, id}; } - auto r = fRegions.emplace(id, std::make_unique(fShmId, id, size, false, callback, bulkCallback, path, flags)); - // LOG(debug) << "Created region with id '" << id << "', path: '" << path << "', flags: '" << flags << "'"; + auto r = fRegions.emplace(id, std::make_unique(fShmId, id, size, false, callback, bulkCallback, cfg)); + // LOG(debug) << "Created region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'"; if (cfg.lock) { LOG(debug) << "Locking region " << id << "..."; @@ -421,7 +417,7 @@ class Manager LOG(debug) << "Successfully zeroed free memory of region " << id << "."; } - fShmRegions->emplace(id, RegionInfo(path.c_str(), flags, userFlags, fShmVoidAlloc)); + fShmRegions->emplace(id, RegionInfo(cfg.path.c_str(), cfg.creationFlags, cfg.userFlags, fShmVoidAlloc)); r.first->second->StartReceivingAcks(); result.first = &(r.first->second->fRegion); @@ -476,11 +472,12 @@ class Manager try { // get region info RegionInfo regionInfo = fShmRegions->at(id); - std::string path = regionInfo.fPath.c_str(); - int flags = regionInfo.fFlags; - // LOG(debug) << "Located remote region with id '" << id << "', path: '" << path << "', flags: '" << flags << "'"; + RegionConfig cfg; + cfg.creationFlags = regionInfo.fCreationFlags; + cfg.path = regionInfo.fPath.c_str(); + // LOG(debug) << "Located remote region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'"; - auto r = fRegions.emplace(id, std::make_unique(fShmId, id, 0, true, nullptr, nullptr, path, flags)); + auto r = fRegions.emplace(id, std::make_unique(fShmId, id, 0, true, nullptr, nullptr, std::move(cfg))); return r.first->second.get(); } catch (std::out_of_range& oor) { LOG(error) << "Could not get remote region with id '" << id << "'. Does the region creator run with the same session id?"; diff --git a/fairmq/shmem/Message.h b/fairmq/shmem/Message.h index b249455b..ab43630f 100644 --- a/fairmq/shmem/Message.h +++ b/fairmq/shmem/Message.h @@ -12,9 +12,10 @@ #include "Manager.h" #include "Region.h" #include "UnmanagedRegion.h" -#include -#include -#include +#include +#include + +#include #include diff --git a/fairmq/shmem/Monitor.cxx b/fairmq/shmem/Monitor.cxx index ccc6bfc9..ded11ca4 100644 --- a/fairmq/shmem/Monitor.cxx +++ b/fairmq/shmem/Monitor.cxx @@ -545,7 +545,7 @@ std::vector> Monitor::Cleanup(const ShmId& shmId, b if (m != nullptr) { RegionInfo ri = m->at(i); string path = ri.fPath.c_str(); - int flags = ri.fFlags; + int flags = ri.fCreationFlags; if (verbose) { LOG(info) << "Found RegionInfo with path: '" << path << "', flags: " << flags << ", fDestroyed: " << ri.fDestroyed << "."; } diff --git a/fairmq/shmem/Region.h b/fairmq/shmem/Region.h index 65fca6ac..97ffd799 100644 --- a/fairmq/shmem/Region.h +++ b/fairmq/shmem/Region.h @@ -10,11 +10,11 @@ #define FAIR_MQ_SHMEM_REGION_H_ #include "Common.h" - -#include -#include +#include #include +#include + #include #include #include @@ -38,9 +38,9 @@ namespace fair::mq::shmem struct Region { - Region(const std::string& shmId, uint16_t id, uint64_t size, bool remote, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string& path, int flags) + Region(const std::string& shmId, uint16_t id, uint64_t size, bool remote, RegionCallback callback, RegionBulkCallback bulkCallback, RegionConfig cfg) : fRemote(remote) - , fLinger(100) + , fLinger(cfg.linger) , fStopAcks(false) , fName("fmq_" + shmId + "_rg_" + std::to_string(id)) , fQueueName("fmq_" + shmId + "_rgq_" + std::to_string(id)) @@ -53,8 +53,8 @@ struct Region { using namespace boost::interprocess; - if (!path.empty()) { - fName = std::string(path + fName); + if (!cfg.path.empty()) { + fName = std::string(cfg.path + fName); if (!fRemote) { // create a file @@ -75,7 +75,7 @@ struct Region } fFileMapping = file_mapping(fName.c_str(), read_write); LOG(debug) << "shmem: initialized file: " << fName; - fRegion = mapped_region(fFileMapping, read_write, 0, size, 0, flags); + fRegion = mapped_region(fFileMapping, read_write, 0, size, 0, cfg.creationFlags); } else { try { if (fRemote) { @@ -84,19 +84,18 @@ struct Region fShmemObject = shared_memory_object(create_only, fName.c_str(), read_write); fShmemObject.truncate(size); } - } catch(interprocess_exception& e) { + } catch (interprocess_exception& e) { LOG(error) << "Failed " << (fRemote ? "opening" : "creating") << " shared_memory_object for region id '" << id << "': " << e.what(); throw; } try { - fRegion = mapped_region(fShmemObject, read_write, 0, 0, 0, flags); - } catch(interprocess_exception& e) { + fRegion = mapped_region(fShmemObject, read_write, 0, 0, 0, cfg.creationFlags); + } catch (interprocess_exception& e) { LOG(error) << "Failed mapping shared_memory_object for region id '" << id << "': " << e.what(); throw; } } - InitializeQueues(); StartSendingAcks(); diff --git a/fairmq/shmem/Socket.h b/fairmq/shmem/Socket.h index df742293..65e07516 100644 --- a/fairmq/shmem/Socket.h +++ b/fairmq/shmem/Socket.h @@ -11,13 +11,13 @@ #include "Common.h" #include "Manager.h" #include "Message.h" - -#include -#include -#include +#include +#include #include #include +#include + #include #include diff --git a/fairmq/shmem/TransportFactory.h b/fairmq/shmem/TransportFactory.h index 3aedf39d..09f0d532 100644 --- a/fairmq/shmem/TransportFactory.h +++ b/fairmq/shmem/TransportFactory.h @@ -9,17 +9,17 @@ #ifndef FAIR_MQ_SHMEM_TRANSPORTFACTORY_H_ #define FAIR_MQ_SHMEM_TRANSPORTFACTORY_H_ -#include "Manager.h" #include "Common.h" +#include "Manager.h" #include "Message.h" -#include "Socket.h" #include "Poller.h" +#include "Socket.h" #include "UnmanagedRegion.h" - -#include #include -#include #include +#include + +#include #include @@ -145,27 +145,46 @@ class TransportFactory final : public fair::mq::TransportFactory UnmanagedRegionPtr CreateUnmanagedRegion(size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override { - return CreateUnmanagedRegion(size, 0, callback, nullptr, path, flags, cfg); + cfg.path = path; + cfg.creationFlags = flags; + return CreateUnmanagedRegion(size, callback, nullptr, cfg); } UnmanagedRegionPtr CreateUnmanagedRegion(size_t size, RegionBulkCallback bulkCallback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override { - return CreateUnmanagedRegion(size, 0, nullptr, bulkCallback, path, flags, cfg); + cfg.path = path; + cfg.creationFlags = flags; + return CreateUnmanagedRegion(size, nullptr, bulkCallback, cfg); } UnmanagedRegionPtr CreateUnmanagedRegion(size_t size, int64_t userFlags, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override { - return CreateUnmanagedRegion(size, userFlags, callback, nullptr, path, flags, cfg); + cfg.path = path; + cfg.userFlags = userFlags; + cfg.creationFlags = flags; + return CreateUnmanagedRegion(size, callback, nullptr, cfg); } UnmanagedRegionPtr CreateUnmanagedRegion(size_t size, int64_t userFlags, RegionBulkCallback bulkCallback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override { - return CreateUnmanagedRegion(size, userFlags, nullptr, bulkCallback, path, flags, cfg); + cfg.path = path; + cfg.userFlags = userFlags; + cfg.creationFlags = flags; + return CreateUnmanagedRegion(size, nullptr, bulkCallback, cfg); } - UnmanagedRegionPtr CreateUnmanagedRegion(size_t size, int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string& path, int flags, fair::mq::RegionConfig cfg) + UnmanagedRegionPtr CreateUnmanagedRegion(size_t size, RegionCallback callback, RegionConfig cfg) override { - return std::make_unique(*fManager, size, userFlags, callback, bulkCallback, path, flags, this, cfg); + return CreateUnmanagedRegion(size, callback, nullptr, cfg); + } + UnmanagedRegionPtr CreateUnmanagedRegion(size_t size, RegionBulkCallback bulkCallback, RegionConfig cfg) override + { + return CreateUnmanagedRegion(size, nullptr, bulkCallback, cfg); + } + + UnmanagedRegionPtr CreateUnmanagedRegion(size_t size, RegionCallback callback, RegionBulkCallback bulkCallback, fair::mq::RegionConfig cfg) + { + return std::make_unique(*fManager, size, callback, bulkCallback, std::move(cfg), this); } void SubscribeToRegionEvents(RegionEventCallback callback) override { fManager->SubscribeToRegionEvents(callback); } diff --git a/fairmq/shmem/UnmanagedRegion.h b/fairmq/shmem/UnmanagedRegion.h index 009899b4..956e7aa9 100644 --- a/fairmq/shmem/UnmanagedRegion.h +++ b/fairmq/shmem/UnmanagedRegion.h @@ -10,9 +10,9 @@ #define FAIR_MQ_SHMEM_UNMANAGEDREGION_H_ #include "Manager.h" +#include -#include -#include +#include #include #include @@ -34,19 +34,16 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion public: UnmanagedRegion(Manager& manager, const size_t size, - const int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, - const std::string& path, - int flags, - FairMQTransportFactory* factory, - fair::mq::RegionConfig cfg) + fair::mq::RegionConfig cfg, + FairMQTransportFactory* factory) : FairMQUnmanagedRegion(factory) , fManager(manager) , fRegion(nullptr) , fRegionId(0) { - auto result = fManager.CreateRegion(size, userFlags, callback, bulkCallback, path, flags, cfg); + auto result = fManager.CreateRegion(size, callback, bulkCallback, std::move(cfg)); fRegion = result.first; fRegionId = result.second; } diff --git a/fairmq/zeromq/TransportFactory.h b/fairmq/zeromq/TransportFactory.h index d27ec167..ed8249c9 100644 --- a/fairmq/zeromq/TransportFactory.h +++ b/fairmq/zeromq/TransportFactory.h @@ -118,6 +118,15 @@ class TransportFactory final : public FairMQTransportFactory return CreateUnmanagedRegion(size, userFlags, nullptr, bulkCallback, path, flags, cfg); } + UnmanagedRegionPtr CreateUnmanagedRegion(size_t size, RegionCallback callback, RegionConfig cfg) override + { + return CreateUnmanagedRegion(size, cfg.userFlags, callback, nullptr, cfg.path, cfg.creationFlags, cfg); + } + UnmanagedRegionPtr CreateUnmanagedRegion(size_t size, RegionBulkCallback bulkCallback, RegionConfig cfg) override + { + return CreateUnmanagedRegion(size, cfg.userFlags, nullptr, bulkCallback, cfg.path, cfg.creationFlags, cfg); + } + UnmanagedRegionPtr CreateUnmanagedRegion(size_t size, int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string&, int /* flags */, fair::mq::RegionConfig cfg) { UnmanagedRegionPtr ptr = std::make_unique(*fCtx, size, userFlags, callback, bulkCallback, this, cfg);