diff --git a/examples/region/Sampler.cxx b/examples/region/Sampler.cxx index 9a047922..c5e82366 100644 --- a/examples/region/Sampler.cxx +++ b/examples/region/Sampler.cxx @@ -35,6 +35,14 @@ void Sampler::InitTask() fMsgSize = fConfig->GetProperty("msg-size"); fMaxIterations = fConfig->GetProperty("max-iterations"); + fChannels.at("data").at(0).Transport()->SubscribeToRegionEvents([](FairMQRegionInfo info) { + LOG(warn) << ">>>" << info.event; + LOG(warn) << "id: " << info.id; + LOG(warn) << "ptr: " << info.ptr; + LOG(warn) << "size: " << info.size; + LOG(warn) << "flags: " << info.flags; + }); + fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data", 0, 10000000, @@ -82,6 +90,7 @@ void Sampler::ResetTask() LOG(debug) << "done, still unacked: " << fNumUnackedMsgs; } fRegion.reset(); + fChannels.at("data").at(0).Transport()->UnsubscribeFromRegionEvents(); } Sampler::~Sampler() diff --git a/examples/region/Sink.cxx b/examples/region/Sink.cxx index 3163fe57..fd06b639 100644 --- a/examples/region/Sink.cxx +++ b/examples/region/Sink.cxx @@ -29,6 +29,13 @@ void Sink::InitTask() { // Get the fMaxIterations value from the command line options (via fConfig) fMaxIterations = fConfig->GetProperty("max-iterations"); + fChannels.at("data").at(0).Transport()->SubscribeToRegionEvents([](FairMQRegionInfo info) { + LOG(warn) << ">>>" << info.event; + LOG(warn) << "id: " << info.id; + LOG(warn) << "ptr: " << info.ptr; + LOG(warn) << "size: " << info.size; + LOG(warn) << "flags: " << info.flags; + }); } void Sink::Run() @@ -50,6 +57,11 @@ void Sink::Run() } } +void Sink::ResetTask() +{ + fChannels.at("data").at(0).Transport()->UnsubscribeFromRegionEvents(); +} + Sink::~Sink() { } diff --git a/examples/region/Sink.h b/examples/region/Sink.h index 7e372e7a..5eb7dd42 100644 --- a/examples/region/Sink.h +++ b/examples/region/Sink.h @@ -31,6 +31,7 @@ class Sink : public FairMQDevice protected: virtual void Run(); virtual void InitTask(); + virtual void ResetTask(); private: uint64_t fMaxIterations; diff --git a/examples/region/test-ex-region.sh.in b/examples/region/test-ex-region.sh.in index e1139154..783d0a79 100755 --- a/examples/region/test-ex-region.sh.in +++ b/examples/region/test-ex-region.sh.in @@ -19,6 +19,7 @@ SAMPLER+=" --id sampler1" SAMPLER+=" --transport $transport" SAMPLER+=" --severity debug" SAMPLER+=" --session $SESSION" +SAMPLER+=" --verbosity veryhigh" SAMPLER+=" --control static --color false" SAMPLER+=" --max-iterations 1" SAMPLER+=" --msg-size $msgSize" diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index 878f4619..e3136be5 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -245,7 +245,6 @@ if(BUILD_FAIRMQ) plugins/Control.cxx shmem/Message.cxx shmem/Poller.cxx - shmem/UnmanagedRegion.cxx shmem/Socket.cxx shmem/TransportFactory.cxx shmem/Manager.cxx diff --git a/fairmq/FairMQChannel.h b/fairmq/FairMQChannel.h index 8e7a927e..8054d8b5 100644 --- a/fairmq/FairMQChannel.h +++ b/fairmq/FairMQChannel.h @@ -340,6 +340,11 @@ class FairMQChannel return Transport()->CreateUnmanagedRegion(size, callback, path, flags); } + FairMQUnmanagedRegionPtr NewUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) + { + return Transport()->CreateUnmanagedRegion(size, userFlags, callback, path, flags); + } + static constexpr fair::mq::Transport DefaultTransportType = fair::mq::Transport::DEFAULT; static constexpr const char* DefaultTransportName = "default"; static constexpr const char* DefaultName = ""; diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index d3792594..d475d74d 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -217,17 +217,47 @@ class FairMQDevice } // creates unamanaged region with the default device transport - FairMQUnmanagedRegionPtr NewUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr) + FairMQUnmanagedRegionPtr NewUnmanagedRegion(const size_t size, + FairMQRegionCallback callback = nullptr, + const std::string& path = "", + int flags = 0) { - return Transport()->CreateUnmanagedRegion(size, callback); + return Transport()->CreateUnmanagedRegion(size, callback, path, flags); + } + + // creates unamanaged region with the default device transport + FairMQUnmanagedRegionPtr NewUnmanagedRegion(const size_t size, + const int64_t userFlags, + FairMQRegionCallback callback = nullptr, + const std::string& path = "", + int flags = 0) + { + return Transport()->CreateUnmanagedRegion(size, userFlags, callback, path, flags); } // creates unmanaged region with the transport of the specified channel - FairMQUnmanagedRegionPtr NewUnmanagedRegionFor(const std::string& channel, int index, const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) + FairMQUnmanagedRegionPtr NewUnmanagedRegionFor(const std::string& channel, + int index, + const size_t size, + FairMQRegionCallback callback = nullptr, + const std::string& path = "", + int flags = 0) { return GetChannel(channel, index).NewUnmanagedRegion(size, callback, path, flags); } + // creates unmanaged region with the transport of the specified channel + FairMQUnmanagedRegionPtr NewUnmanagedRegionFor(const std::string& channel, + int index, + const size_t size, + const int64_t userFlags, + FairMQRegionCallback callback = nullptr, + const std::string& path = "", + int flags = 0) + { + return GetChannel(channel, index).NewUnmanagedRegion(size, userFlags, callback, path, flags); + } + template FairMQPollerPtr NewPoller(const Ts&... inputs) { diff --git a/fairmq/FairMQTransportFactory.cxx b/fairmq/FairMQTransportFactory.cxx index 0e6c5452..4b53a9aa 100644 --- a/fairmq/FairMQTransportFactory.cxx +++ b/fairmq/FairMQTransportFactory.cxx @@ -7,8 +7,8 @@ ********************************************************************************/ #include -#include #include +#include #ifdef BUILD_NANOMSG_TRANSPORT #include #endif /* BUILD_NANOMSG_TRANSPORT */ @@ -17,50 +17,46 @@ #endif #include #include - #include #include -FairMQTransportFactory::FairMQTransportFactory(const std::string& id) +using namespace std; + +FairMQTransportFactory::FairMQTransportFactory(const string& id) : fkId(id) -{ -} +{} -auto FairMQTransportFactory::CreateTransportFactory(const std::string& type, const std::string& id, const fair::mq::ProgOptions* config) -> std::shared_ptr +auto FairMQTransportFactory::CreateTransportFactory(const string& type, + const string& id, + const fair::mq::ProgOptions* config) + -> shared_ptr { - using namespace std; - auto finalId = id; // Generate uuid if empty - if (finalId == "") - { + if (finalId == "") { finalId = fair::mq::tools::Uuid(); } - if (type == "zeromq") - { + if (type == "zeromq") { return make_shared(finalId, config); - } - else if (type == "shmem") - { + } else if (type == "shmem") { return make_shared(finalId, config); } #ifdef BUILD_NANOMSG_TRANSPORT - else if (type == "nanomsg") - { + else if (type == "nanomsg") { return make_shared(finalId, config); } #endif /* BUILD_NANOMSG_TRANSPORT */ #ifdef BUILD_OFI_TRANSPORT - else if (type == "ofi") - { + else if (type == "ofi") { return make_shared(finalId, config); } #endif /* BUILD_OFI_TRANSPORT */ - else - { - LOG(error) << "Unavailable transport requested: " << "\"" << type << "\"" << ". Available are: " + else { + LOG(error) << "Unavailable transport requested: " + << "\"" << type << "\"" + << ". Available are: " << "\"zeromq\"" << "\"shmem\"" #ifdef BUILD_NANOMSG_TRANSPORT diff --git a/fairmq/FairMQTransportFactory.h b/fairmq/FairMQTransportFactory.h index 78e9b53b..cc5e3fa2 100644 --- a/fairmq/FairMQTransportFactory.h +++ b/fairmq/FairMQTransportFactory.h @@ -75,6 +75,12 @@ class FairMQTransportFactory virtual FairMQPollerPtr CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const = 0; virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const = 0; + virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const = 0; + + virtual void SubscribeToRegionEvents(FairMQRegionEventCallback callback) = 0; + virtual void UnsubscribeFromRegionEvents() = 0; + + virtual std::vector GetRegionInfo() = 0; /// Get transport type virtual fair::mq::Transport GetType() const = 0; diff --git a/fairmq/FairMQUnmanagedRegion.h b/fairmq/FairMQUnmanagedRegion.h index 59e14f85..11d02ac1 100644 --- a/fairmq/FairMQUnmanagedRegion.h +++ b/fairmq/FairMQUnmanagedRegion.h @@ -12,8 +12,24 @@ #include // size_t #include // std::unique_ptr #include // std::function +#include // std::ostream + +enum class FairMQRegionEvent : int +{ + created, + destroyed +}; + +struct FairMQRegionInfo { + uint64_t id; // id of the region + void* ptr; // pointer to the start of the region + size_t size; // region size + int64_t flags; // custom flags set by the creator + FairMQRegionEvent event; +}; using FairMQRegionCallback = std::function; +using FairMQRegionEventCallback = std::function; class FairMQUnmanagedRegion { @@ -26,12 +42,24 @@ class FairMQUnmanagedRegion using FairMQUnmanagedRegionPtr = std::unique_ptr; +inline std::ostream& operator<<(std::ostream& os, const FairMQRegionEvent& event) +{ + if (event == FairMQRegionEvent::created) { + return os << "created"; + } else { + return os << "destroyed"; + } +} + namespace fair { namespace mq { using RegionCallback = FairMQRegionCallback; +using RegionEventCallback = FairMQRegionEventCallback; +using RegionEvent = FairMQRegionEvent; +using RegionInfo = FairMQRegionInfo; using UnmanagedRegion = FairMQUnmanagedRegion; using UnmanagedRegionPtr = FairMQUnmanagedRegionPtr; diff --git a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx index fe13e4f5..f2f3dbb1 100644 --- a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx +++ b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx @@ -70,6 +70,11 @@ FairMQUnmanagedRegionPtr FairMQTransportFactoryNN::CreateUnmanagedRegion(const s return unique_ptr(new FairMQUnmanagedRegionNN(size, callback, path, flags)); } +FairMQUnmanagedRegionPtr FairMQTransportFactoryNN::CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */) const +{ + return unique_ptr(new FairMQUnmanagedRegionNN(size, userFlags, callback, path, flags)); +} + fair::mq::Transport FairMQTransportFactoryNN::GetType() const { return fTransportType; diff --git a/fairmq/nanomsg/FairMQTransportFactoryNN.h b/fairmq/nanomsg/FairMQTransportFactoryNN.h index a4afa245..d573ec20 100644 --- a/fairmq/nanomsg/FairMQTransportFactoryNN.h +++ b/fairmq/nanomsg/FairMQTransportFactoryNN.h @@ -37,6 +37,11 @@ class FairMQTransportFactoryNN final : public FairMQTransportFactory FairMQPollerPtr CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const override; FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const std::string& path = "", int flags = 0) const override; + FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const override; + + void SubscribeToRegionEvents(FairMQRegionEventCallback /* callback */) override { LOG(error) << "SubscribeToRegionEvents not yet implemented for nanomsg"; } + void UnsubscribeFromRegionEvents() override { LOG(error) << "UnsubscribeFromRegionEvents not yet implemented for nanomsg"; } + std::vector GetRegionInfo() override { LOG(error) << "GetRegionInfo not yet implemented for nanomsg, returning empty vector"; return std::vector(); } fair::mq::Transport GetType() const override; diff --git a/fairmq/nanomsg/FairMQUnmanagedRegionNN.cxx b/fairmq/nanomsg/FairMQUnmanagedRegionNN.cxx index 4490b776..b651b6cd 100644 --- a/fairmq/nanomsg/FairMQUnmanagedRegionNN.cxx +++ b/fairmq/nanomsg/FairMQUnmanagedRegionNN.cxx @@ -18,6 +18,13 @@ FairMQUnmanagedRegionNN::FairMQUnmanagedRegionNN(const size_t size, FairMQRegion { } +FairMQUnmanagedRegionNN::FairMQUnmanagedRegionNN(const size_t size, const int64_t /*userFlags*/, FairMQRegionCallback callback, const std::string& /*path = "" */, int /*flags = 0 */) + : fBuffer(malloc(size)) + , fSize(size) + , fCallback(callback) +{ +} + void* FairMQUnmanagedRegionNN::GetData() const { return fBuffer; diff --git a/fairmq/nanomsg/FairMQUnmanagedRegionNN.h b/fairmq/nanomsg/FairMQUnmanagedRegionNN.h index 1f2da647..ca9f54de 100644 --- a/fairmq/nanomsg/FairMQUnmanagedRegionNN.h +++ b/fairmq/nanomsg/FairMQUnmanagedRegionNN.h @@ -20,6 +20,8 @@ class FairMQUnmanagedRegionNN final : public FairMQUnmanagedRegion public: FairMQUnmanagedRegionNN(const size_t size, FairMQRegionCallback callback, const std::string& path = "", int flags = 0); + FairMQUnmanagedRegionNN(const size_t size, const int64_t userFlags, FairMQRegionCallback callback, const std::string& path = "", int flags = 0); + FairMQUnmanagedRegionNN(const FairMQUnmanagedRegionNN&) = delete; FairMQUnmanagedRegionNN operator=(const FairMQUnmanagedRegionNN&) = delete; @@ -34,4 +36,4 @@ class FairMQUnmanagedRegionNN final : public FairMQUnmanagedRegion FairMQRegionCallback fCallback; }; -#endif /* FAIRMQUNMANAGEDREGIONNN_H_ */ \ No newline at end of file +#endif /* FAIRMQUNMANAGEDREGIONNN_H_ */ diff --git a/fairmq/ofi/TransportFactory.cxx b/fairmq/ofi/TransportFactory.cxx index 93437162..7be2ea51 100644 --- a/fairmq/ofi/TransportFactory.cxx +++ b/fairmq/ofi/TransportFactory.cxx @@ -90,6 +90,11 @@ auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, FairMQRegion throw runtime_error{"Not yet implemented UMR."}; } +auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, const int64_t /*userFlags*/, FairMQRegionCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */) const -> UnmanagedRegionPtr +{ + throw runtime_error{"Not yet implemented UMR."}; +} + auto TransportFactory::GetType() const -> Transport { return Transport::OFI; diff --git a/fairmq/ofi/TransportFactory.h b/fairmq/ofi/TransportFactory.h index e183a96d..c1afe174 100644 --- a/fairmq/ofi/TransportFactory.h +++ b/fairmq/ofi/TransportFactory.h @@ -46,7 +46,12 @@ class TransportFactory final : public FairMQTransportFactory auto CreatePoller(const std::vector& channels) const -> PollerPtr override; auto CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const -> PollerPtr override; - auto CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const -> UnmanagedRegionPtr override; + auto CreateUnmanagedRegion(const size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const -> UnmanagedRegionPtr override; + auto CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const -> UnmanagedRegionPtr override; + + void SubscribeToRegionEvents(RegionEventCallback /* callback */) override { LOG(error) << "SubscribeToRegionEvents not yet implemented for OFI"; } + void UnsubscribeFromRegionEvents() override { LOG(error) << "UnsubscribeFromRegionEvents not yet implemented for OFI"; } + std::vector GetRegionInfo() override { LOG(error) << "GetRegionInfo not yet implemented for OFI, returning empty vector"; return std::vector(); } auto GetType() const -> Transport override; diff --git a/fairmq/shmem/Common.h b/fairmq/shmem/Common.h index ba98b3c8..956cadf5 100644 --- a/fairmq/shmem/Common.h +++ b/fairmq/shmem/Common.h @@ -41,15 +41,21 @@ struct RegionInfo RegionInfo(const VoidAlloc& alloc) : fPath("", alloc) , fFlags(0) + , fUserFlags(0) + , fDestroyed(false) {} - RegionInfo(const char* path, int flags, const VoidAlloc& alloc) + RegionInfo(const char* path, const int flags, const uint64_t userFlags, const VoidAlloc& alloc) : fPath(path, alloc) , fFlags(flags) + , fUserFlags(userFlags) + , fDestroyed(false) {} Str fPath; int fFlags; + uint64_t fUserFlags; + bool fDestroyed; }; using Uint64RegionInfoPairAlloc = boost::interprocess::allocator, SegmentManager>; diff --git a/fairmq/shmem/Manager.cxx b/fairmq/shmem/Manager.cxx index f4364f67..741e14f7 100644 --- a/fairmq/shmem/Manager.cxx +++ b/fairmq/shmem/Manager.cxx @@ -7,7 +7,6 @@ ********************************************************************************/ #include "Manager.h" -#include "Common.h" #include #include @@ -19,6 +18,7 @@ using namespace std; using bie = ::boost::interprocess::interprocess_exception; namespace bipc = ::boost::interprocess; namespace bfs = ::boost::filesystem; +namespace bpt = ::boost::posix_time; namespace fair { @@ -27,19 +27,26 @@ namespace mq namespace shmem { -std::unordered_map> Manager::fRegions; - -Manager::Manager(const std::string& id, size_t size) +Manager::Manager(const string& id, size_t size) : fShmId(id) , fSegmentName("fmq_" + fShmId + "_main") , fManagementSegmentName("fmq_" + fShmId + "_mng") , fSegment(bipc::open_or_create, fSegmentName.c_str(), size) - , fManagementSegment(bipc::open_or_create, fManagementSegmentName.c_str(), 65536) + , fManagementSegment(bipc::open_or_create, fManagementSegmentName.c_str(), 655360) + , fShmVoidAlloc(fManagementSegment.get_segment_manager()) , fShmMtx(bipc::open_or_create, string("fmq_" + fShmId + "_mtx").c_str()) + , fRegionEventsCV(bipc::open_or_create, string("fmq_" + fShmId + "_cv").c_str()) + , fRegionEventsSubscriptionActive(false) , fDeviceCounter(nullptr) + , fRegionInfos(nullptr) + , fInterrupted(false) { LOG(debug) << "created/opened shared memory segment '" << "fmq_" << fShmId << "_main" << "' of " << size << " bytes. Available are " << fSegment.get_free_memory() << " bytes."; + fRegionInfos = fManagementSegment.find_or_construct(bipc::unique_instance)(fShmVoidAlloc); + // store info about the managed segment as region with id 0 + fRegionInfos->emplace(0, RegionInfo("", 0, 0, fShmVoidAlloc)); + bipc::scoped_lock lock(fShmMtx); fDeviceCounter = fManagementSegment.find(bipc::unique_instance).first; @@ -55,7 +62,7 @@ Manager::Manager(const std::string& id, size_t size) } } -void Manager::StartMonitor(const std::string& id) +void Manager::StartMonitor(const string& id) { try { bipc::named_mutex monitorStatus(bipc::open_only, string("fmq_" + id + "_ms").c_str()); @@ -94,47 +101,74 @@ void Manager::StartMonitor(const std::string& id) } } -void Manager::Interrupt() +pair Manager::CreateRegion(const size_t size, const int64_t userFlags, RegionCallback callback, const string& path /* = "" */, int flags /* = 0 */) { -} + try { -void Manager::Resume() -{ - // close remote regions before processing new transfers - for (auto it = fRegions.begin(); it != fRegions.end(); /**/) { - if (it->second->fRemote) { - it = fRegions.erase(it); - } else { - ++it; - } - } -} + pair result; -bipc::mapped_region* Manager::CreateRegion(const size_t size, const uint64_t id, RegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */) -{ - auto it = fRegions.find(id); - if (it != fRegions.end()) { - LOG(error) << "Trying to create a region that already exists"; - return nullptr; - } else { - // create region info { + uint64_t id = 0; bipc::scoped_lock lock(fShmMtx); - VoidAlloc voidAlloc(fManagementSegment.get_segment_manager()); - Uint64RegionInfoMap* infoMap = fManagementSegment.find_or_construct(bipc::unique_instance)(voidAlloc); - infoMap->emplace(id, RegionInfo(path.c_str(), flags, voidAlloc)); + + RegionCounter* rc = fManagementSegment.find(bipc::unique_instance).first; + + if (rc) { + LOG(debug) << "region counter found, with value of " << rc->fCount << ". incrementing."; + (rc->fCount)++; + LOG(debug) << "incremented region counter, now: " << rc->fCount; + } else { + LOG(debug) << "no region counter found, creating one and initializing with 1"; + rc = fManagementSegment.construct(bipc::unique_instance)(1); + LOG(debug) << "initialized region counter with: " << rc->fCount; + } + + id = rc->fCount; + + auto it = fRegions.find(id); + if (it != fRegions.end()) { + LOG(error) << "Trying to create a region that already exists"; + return {nullptr, id}; + } + + // create region info + fRegionInfos->emplace(id, RegionInfo(path.c_str(), flags, userFlags, fShmVoidAlloc)); + + auto r = fRegions.emplace(id, tools::make_unique(*this, id, size, false, callback, path, flags)); + // LOG(debug) << "Created region with id '" << id << "', path: '" << path << "', flags: '" << flags << "'"; + + r.first->second->StartReceivingAcks(); + result.first = &(r.first->second->fRegion); + result.second = id; } - // LOG(debug) << "Created region with id '" << id << "', path: '" << path << "', flags: '" << flags << "'"; + fRegionEventsCV.notify_all(); - auto r = fRegions.emplace(id, tools::make_unique(*this, id, size, false, callback, path, flags)); + return result; - r.first->second->StartReceivingAcks(); - - return &(r.first->second->fRegion); + } catch (bipc::interprocess_exception& e) { + LOG(error) << "cannot create region. Already created/not cleaned up?"; + LOG(error) << e.what(); + throw; } } -Region* Manager::GetRemoteRegion(const uint64_t id) +void Manager::RemoveRegion(const uint64_t id) +{ + { + bipc::scoped_lock lock(fShmMtx); + fRegions.erase(id); + fRegionInfos->at(id).fDestroyed = true; + } + fRegionEventsCV.notify_all(); +} + +Region* Manager::GetRegion(const uint64_t id) +{ + bipc::scoped_lock lock(fShmMtx); + return GetRegionUnsafe(id); +} + +Region* Manager::GetRegionUnsafe(const uint64_t id) { // remote region could actually be a local one if a message originates from this device (has been sent out and returned) auto it = fRegions.find(id); @@ -142,36 +176,108 @@ Region* Manager::GetRemoteRegion(const uint64_t id) return it->second.get(); } else { try { - string path; - int flags; - // get region info - { - bipc::scoped_lock lock(fShmMtx); - Uint64RegionInfoMap* infoMap = fManagementSegment.find(bipc::unique_instance).first; - if (infoMap == nullptr) { - LOG(error) << "Unable to locate the region info"; - throw SharedMemoryError("Unable to locate remote region info"); - } - RegionInfo regionInfo = infoMap->at(id); - path = regionInfo.fPath.c_str(); - flags = regionInfo.fFlags; - } + RegionInfo regionInfo = fRegionInfos->at(id); + string path = regionInfo.fPath.c_str(); + int flags = regionInfo.fFlags; // LOG(debug) << "Located remote region with id '" << id << "', path: '" << path << "', flags: '" << flags << "'"; auto r = fRegions.emplace(id, tools::make_unique(*this, id, 0, true, nullptr, path, flags)); + r.first->second->StartSendingAcks(); return r.first->second.get(); } catch (bie& e) { LOG(warn) << "Could not get remote region for id: " << id; return nullptr; } - } } -void Manager::RemoveRegion(const uint64_t id) +vector Manager::GetRegionInfo() { - fRegions.erase(id); + bipc::scoped_lock lock(fShmMtx); + return GetRegionInfoUnsafe(); +} + +vector Manager::GetRegionInfoUnsafe() +{ + vector result; + + for (const auto& e : *fRegionInfos) { + fair::mq::RegionInfo info; + info.id = e.first; + info.flags = e.second.fUserFlags; + info.event = e.second.fDestroyed ? RegionEvent::destroyed : RegionEvent::created; + if (info.id != 0) { + if (!e.second.fDestroyed) { + auto region = GetRegionUnsafe(info.id); + info.ptr = region->fRegion.get_address(); + info.size = region->fRegion.get_size(); + } else { + info.ptr = nullptr; + info.size = 0; + } + result.push_back(info); + } else { + if (!e.second.fDestroyed) { + info.ptr = fSegment.get_address(); + info.size = fSegment.get_size(); + } else { + info.ptr = nullptr; + info.size = 0; + } + result.push_back(info); + } + } + + return result; +} + +void Manager::SubscribeToRegionEvents(RegionEventCallback callback) +{ + bipc::scoped_lock lock(fShmMtx); + if (fRegionEventThread.joinable()) { + fRegionEventsSubscriptionActive.store(false); + fRegionEventThread.join(); + } + fRegionEventCallback = callback; + fRegionEventsSubscriptionActive.store(true); + fRegionEventThread = thread(&Manager::RegionEventsSubscription, this); +} + +void Manager::UnsubscribeFromRegionEvents() +{ + if (fRegionEventThread.joinable()) { + fRegionEventsSubscriptionActive.store(false); + fRegionEventsCV.notify_all(); + fRegionEventThread.join(); + } + bipc::scoped_lock lock(fShmMtx); + fRegionEventCallback = nullptr; +} + +void Manager::RegionEventsSubscription() +{ + while (fRegionEventsSubscriptionActive.load()) { + bipc::scoped_lock lock(fShmMtx); + auto infos = GetRegionInfoUnsafe(); + for (const auto& i : infos) { + auto el = fObservedRegionEvents.find(i.id); + if (el == fObservedRegionEvents.end()) { + fRegionEventCallback(i); + fObservedRegionEvents.emplace(i.id, i.event); + } else { + if (el->second == RegionEvent::created && i.event == RegionEvent::destroyed) { + fRegionEventCallback(i); + el->second = i.event; + } else { + // LOG(debug) << "ignoring event for id" << i.id << ":"; + // LOG(debug) << "incoming event: " << i.event; + // LOG(debug) << "stored event: " << el->second; + } + } + } + fRegionEventsCV.wait(lock); + } } void Manager::RemoveSegments() @@ -193,6 +299,12 @@ Manager::~Manager() { bool lastRemoved = false; + if (fRegionEventThread.joinable()) { + fRegionEventsSubscriptionActive.store(false); + fRegionEventsCV.notify_all(); + fRegionEventThread.join(); + } + try { bipc::scoped_lock lock(fShmMtx); @@ -212,6 +324,7 @@ Manager::~Manager() if (lastRemoved) { bipc::named_mutex::remove(string("fmq_" + fShmId + "_mtx").c_str()); + bipc::named_condition::remove(string("fmq_" + fShmId + "_cv").c_str()); } } diff --git a/fairmq/shmem/Manager.h b/fairmq/shmem/Manager.h index c9bb5e92..ccce4e85 100644 --- a/fairmq/shmem/Manager.h +++ b/fairmq/shmem/Manager.h @@ -15,18 +15,24 @@ #ifndef FAIR_MQ_SHMEM_MANAGER_H_ #define FAIR_MQ_SHMEM_MANAGER_H_ -#include "Region.h" #include "Common.h" +#include "Region.h" #include +#include -#include #include +#include +#include #include -#include -#include +#include #include +#include +#include +#include +#include +#include namespace fair { @@ -56,17 +62,25 @@ class Manager static void StartMonitor(const std::string&); - static void Interrupt(); - static void Resume(); + void Interrupt() { fInterrupted.store(true); } + void Resume() { fInterrupted.store(false); } + bool Interrupted() { return fInterrupted.load(); } int GetDeviceCounter(); int IncrementDeviceCounter(); int DecrementDeviceCounter(); - boost::interprocess::mapped_region* CreateRegion(const size_t size, const uint64_t id, RegionCallback callback, const std::string& path = "", int flags = 0); - Region* GetRemoteRegion(const uint64_t id); + std::pair CreateRegion(const size_t size, const int64_t userFlags, RegionCallback callback, const std::string& path = "", int flags = 0); + Region* GetRegion(const uint64_t id); + Region* GetRegionUnsafe(const uint64_t id); void RemoveRegion(const uint64_t id); + std::vector GetRegionInfo(); + std::vector GetRegionInfoUnsafe(); + void SubscribeToRegionEvents(RegionEventCallback callback); + void UnsubscribeFromRegionEvents(); + void RegionEventsSubscription(); + void RemoveSegments(); private: @@ -75,9 +89,20 @@ class Manager std::string fManagementSegmentName; boost::interprocess::managed_shared_memory fSegment; boost::interprocess::managed_shared_memory fManagementSegment; + VoidAlloc fShmVoidAlloc; boost::interprocess::named_mutex fShmMtx; + + boost::interprocess::named_condition fRegionEventsCV; + std::thread fRegionEventThread; + std::atomic fRegionEventsSubscriptionActive; + std::function fRegionEventCallback; + std::unordered_map fObservedRegionEvents; + DeviceCounter* fDeviceCounter; - static std::unordered_map> fRegions; + Uint64RegionInfoMap* fRegionInfos; + std::unordered_map> fRegions; + + std::atomic fInterrupted; }; } // namespace shmem diff --git a/fairmq/shmem/Message.cxx b/fairmq/shmem/Message.cxx index 73f1403d..3d55d7e3 100644 --- a/fairmq/shmem/Message.cxx +++ b/fairmq/shmem/Message.cxx @@ -29,7 +29,6 @@ namespace mq namespace shmem { -atomic Message::fInterrupted(false); Transport Message::fTransportType = Transport::SHM; Message::Message(Manager& manager, FairMQTransportFactory* factory) @@ -113,7 +112,7 @@ bool Message::InitializeChunk(const size_t size) } catch (bipc::bad_alloc& ba) { // LOG(warn) << "Shared memory full..."; this_thread::sleep_for(chrono::milliseconds(50)); - if (fInterrupted) { + if (fManager.Interrupted()) { return false; } else { continue; @@ -164,7 +163,7 @@ void* Message::GetData() const fLocalPtr = nullptr; } } else { - fRegionPtr = fManager.GetRemoteRegion(fMeta.fRegionId); + fRegionPtr = fManager.GetRegion(fMeta.fRegionId); if (fRegionPtr) { fLocalPtr = reinterpret_cast(fRegionPtr->fRegion.get_address()) + fMeta.fHandle; } else { @@ -221,7 +220,7 @@ void Message::CloseMessage() fMeta.fHandle = -1; } else { if (!fRegionPtr) { - fRegionPtr = fManager.GetRemoteRegion(fMeta.fRegionId); + fRegionPtr = fManager.GetRegion(fMeta.fRegionId); } if (fRegionPtr) { diff --git a/fairmq/shmem/Message.h b/fairmq/shmem/Message.h index 8ebeb9b4..3b0e37bf 100644 --- a/fairmq/shmem/Message.h +++ b/fairmq/shmem/Message.h @@ -65,7 +65,6 @@ class Message final : public fair::mq::Message mutable Region* fRegionPtr; mutable char* fLocalPtr; - static std::atomic fInterrupted; static Transport fTransportType; bool InitializeChunk(const size_t size); diff --git a/fairmq/shmem/Monitor.cxx b/fairmq/shmem/Monitor.cxx index f62d76a9..4f3b6cc5 100644 --- a/fairmq/shmem/Monitor.cxx +++ b/fairmq/shmem/Monitor.cxx @@ -15,6 +15,7 @@ #include #include +#include #include #include @@ -427,6 +428,15 @@ void Monitor::RemoveMutex(const string& name) } } +void Monitor::RemoveCondition(const string& name) +{ + if (bipc::named_condition::remove(name.c_str())) { + cout << "Successfully removed \"" << name << "\"." << endl; + } else { + cout << "Did not remove \"" << name << "\". Already removed?" << endl; + } +} + void Monitor::Cleanup(const string& shmId) { string managementSegmentName("fmq_" + shmId + "_mng"); @@ -469,6 +479,7 @@ void Monitor::Cleanup(const string& shmId) RemoveObject("fmq_" + shmId + "_main"); RemoveMutex("fmq_" + shmId + "_mtx"); + RemoveCondition("fmq_" + shmId + "_cv"); cout << endl; } diff --git a/fairmq/shmem/Monitor.h b/fairmq/shmem/Monitor.h index cbcedb0f..4492668d 100644 --- a/fairmq/shmem/Monitor.h +++ b/fairmq/shmem/Monitor.h @@ -42,6 +42,7 @@ class Monitor static void RemoveFileMapping(const std::string&); static void RemoveQueue(const std::string&); static void RemoveMutex(const std::string&); + static void RemoveCondition(const std::string&); struct DaemonPresent : std::runtime_error { using std::runtime_error::runtime_error; }; diff --git a/fairmq/shmem/Region.cxx b/fairmq/shmem/Region.cxx index cd046dc1..4259fee4 100644 --- a/fairmq/shmem/Region.cxx +++ b/fairmq/shmem/Region.cxx @@ -71,7 +71,6 @@ Region::Region(Manager& manager, uint64_t id, uint64_t size, bool remote, Region InitializeQueues(); LOG(debug) << "shmem: initialized region: " << fName; - fSendAcksWorker = thread(&Region::SendAcks, this); } void Region::InitializeQueues() @@ -84,6 +83,11 @@ void Region::InitializeQueues() LOG(debug) << "shmem: initialized region queue: " << fQueueName; } +void Region::StartSendingAcks() +{ + fSendAcksWorker = thread(&Region::SendAcks, this); +} + void Region::StartReceivingAcks() { fReceiveAcksWorker = thread(&Region::ReceiveAcks, this); @@ -114,12 +118,12 @@ void Region::ReceiveAcks() void Region::ReleaseBlock(const RegionBlock &block) { - unique_lock lock(fBlockLock); + unique_lock lock(fBlockMtx); fBlocksToFree.emplace_back(block); if (fBlocksToFree.size() >= fAckBunchSize) { - lock.unlock(); // reduces contention on fBlockLock + lock.unlock(); // reduces contention on fBlockMtx fBlockSendCV.notify_one(); } } @@ -132,7 +136,7 @@ void Region::SendAcks() size_t blocksToSend = 0; { // mutex locking block - unique_lock lock(fBlockLock); + unique_lock lock(fBlockMtx); // try to get more blocks without waiting (we can miss a notify from CloseMessage()) if (!fStop && (fBlocksToFree.size() < fAckBunchSize)) { @@ -166,6 +170,7 @@ Region::~Region() fStop = true; if (fSendAcksWorker.joinable()) { + fBlockSendCV.notify_one(); fSendAcksWorker.join(); } diff --git a/fairmq/shmem/Region.h b/fairmq/shmem/Region.h index ea2f83dd..99d047b4 100644 --- a/fairmq/shmem/Region.h +++ b/fairmq/shmem/Region.h @@ -49,11 +49,11 @@ struct Region void InitializeQueues(); + void StartSendingAcks(); + void SendAcks(); void StartReceivingAcks(); void ReceiveAcks(); - void ReleaseBlock(const RegionBlock &); - void SendAcks(); ~Region(); @@ -67,7 +67,7 @@ struct Region boost::interprocess::file_mapping fFileMapping; boost::interprocess::mapped_region fRegion; - std::mutex fBlockLock; + std::mutex fBlockMtx; std::condition_variable fBlockSendCV; std::vector fBlocksToFree; const std::size_t fAckBunchSize = 256; diff --git a/fairmq/shmem/Socket.cxx b/fairmq/shmem/Socket.cxx index b58df26c..42e183a4 100644 --- a/fairmq/shmem/Socket.cxx +++ b/fairmq/shmem/Socket.cxx @@ -28,8 +28,6 @@ namespace mq namespace shmem { -atomic Socket::fInterrupted(false); - struct ZMsg { ZMsg() { int rc __attribute__((unused)) = zmq_msg_init(&fMsg); assert(rc == 0); } @@ -133,7 +131,7 @@ int Socket::Send(MessagePtr& msg, const int timeout) ZMsg zmqMsg(sizeof(MetaHeader)); std::memcpy(zmqMsg.Data(), &(shmMsg->fMeta), sizeof(MetaHeader)); - while (true && !fInterrupted) { + while (true && !fManager.Interrupted()) { int nbytes = zmq_msg_send(zmqMsg.Msg(), fSocket, flags); if (nbytes > 0) { shmMsg->fQueued = true; @@ -142,7 +140,7 @@ int Socket::Send(MessagePtr& msg, const int timeout) fBytesTx += size; return size; } else if (zmq_errno() == EAGAIN) { - if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) { + if (!fManager.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) { if (timeout > 0) { elapsed += fSndTimeout; if (elapsed >= timeout) { @@ -198,7 +196,7 @@ int Socket::Receive(MessagePtr& msg, const int timeout) ++fMessagesRx; return size; } else if (zmq_errno() == EAGAIN) { - if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) { + if (!fManager.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) { if (timeout > 0) { elapsed += fRcvTimeout; if (elapsed >= timeout) { @@ -242,7 +240,7 @@ int64_t Socket::Send(vector& msgVec, const int timeout) std::memcpy(metas++, &(shmMsg->fMeta), sizeof(MetaHeader)); } - while (!fInterrupted) { + while (!fManager.Interrupted()) { int64_t totalSize = 0; int nbytes = zmq_msg_send(zmqMsg.Msg(), fSocket, flags); if (nbytes > 0) { @@ -260,7 +258,7 @@ int64_t Socket::Send(vector& msgVec, const int timeout) return totalSize; } else if (zmq_errno() == EAGAIN) { - if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) { + if (!fManager.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) { if (timeout > 0) { elapsed += fSndTimeout; if (elapsed >= timeout) { @@ -296,7 +294,7 @@ int64_t Socket::Receive(vector& msgVec, const int timeout) ZMsg zmqMsg; - while (!fInterrupted) { + while (!fManager.Interrupted()) { int64_t totalSize = 0; int nbytes = zmq_msg_recv(zmqMsg.Msg(), fSocket, flags); if (nbytes > 0) { @@ -327,7 +325,7 @@ int64_t Socket::Receive(vector& msgVec, const int timeout) return totalSize; } else if (zmq_errno() == EAGAIN) { - if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) { + if (!fManager.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) { if (timeout > 0) { elapsed += fRcvTimeout; if (elapsed >= timeout) { @@ -365,20 +363,6 @@ void Socket::Close() fSocket = nullptr; } -void Socket::Interrupt() -{ - Manager::Interrupt(); - Message::fInterrupted = true; - fInterrupted = true; -} - -void Socket::Resume() -{ - Manager::Resume(); - Message::fInterrupted = false; - fInterrupted = false; -} - void Socket::SetOption(const string& option, const void* value, size_t valueSize) { if (zmq_setsockopt(fSocket, GetConstant(option), value, valueSize) < 0) { diff --git a/fairmq/shmem/Socket.h b/fairmq/shmem/Socket.h index 76940f6d..143ffdb4 100644 --- a/fairmq/shmem/Socket.h +++ b/fairmq/shmem/Socket.h @@ -46,9 +46,6 @@ class Socket final : public fair::mq::Socket void Close() override; - static void Interrupt(); - static void Resume(); - void SetOption(const std::string& option, const void* value, size_t valueSize) override; void GetOption(const std::string& option, void* value, size_t* valueSize) override; @@ -81,8 +78,6 @@ class Socket final : public fair::mq::Socket std::atomic fMessagesTx; std::atomic fMessagesRx; - static std::atomic fInterrupted; - int fSndTimeout; int fRcvTimeout; }; diff --git a/fairmq/shmem/TransportFactory.cxx b/fairmq/shmem/TransportFactory.cxx index 8502dcb7..ed6bca02 100644 --- a/fairmq/shmem/TransportFactory.cxx +++ b/fairmq/shmem/TransportFactory.cxx @@ -164,6 +164,26 @@ UnmanagedRegionPtr TransportFactory::CreateUnmanagedRegion(const size_t size, Re return tools::make_unique(*fManager, size, callback, path, flags); } +UnmanagedRegionPtr TransportFactory::CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */) const +{ + return tools::make_unique(*fManager, size, userFlags, callback, path, flags); +} + +void TransportFactory::SubscribeToRegionEvents(RegionEventCallback callback) +{ + fManager->SubscribeToRegionEvents(callback); +} + +void TransportFactory::UnsubscribeFromRegionEvents() +{ + fManager->UnsubscribeFromRegionEvents(); +} + +vector TransportFactory::GetRegionInfo() +{ + return fManager->GetRegionInfo(); +} + Transport TransportFactory::GetType() const { return fTransportType; diff --git a/fairmq/shmem/TransportFactory.h b/fairmq/shmem/TransportFactory.h index 2864004e..97136e66 100644 --- a/fairmq/shmem/TransportFactory.h +++ b/fairmq/shmem/TransportFactory.h @@ -50,11 +50,16 @@ class TransportFactory final : public fair::mq::TransportFactory PollerPtr CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const override; UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const override; + UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const override; + + void SubscribeToRegionEvents(RegionEventCallback callback) override; + void UnsubscribeFromRegionEvents() override; + std::vector GetRegionInfo() override; Transport GetType() const override; - void Interrupt() override { Socket::Interrupt(); } - void Resume() override { Socket::Resume(); } + void Interrupt() override { fManager->Interrupt(); } + void Resume() override { fManager->Resume(); } void Reset() override; void IncrementMsgCounter() { ++fMsgCounter; } diff --git a/fairmq/shmem/UnmanagedRegion.cxx b/fairmq/shmem/UnmanagedRegion.cxx deleted file mode 100644 index 88a7df8d..00000000 --- a/fairmq/shmem/UnmanagedRegion.cxx +++ /dev/null @@ -1,52 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ - -#include "Common.h" -#include "UnmanagedRegion.h" - -using namespace std; - -namespace bipc = ::boost::interprocess; - -namespace fair -{ -namespace mq -{ -namespace shmem -{ - -UnmanagedRegion::UnmanagedRegion(Manager& manager, const size_t size, RegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */) - : fManager(manager) - , fRegion(nullptr) - , fRegionId(0) -{ - try { - RegionCounter* rc = fManager.ManagementSegment().find(bipc::unique_instance).first; - if (rc) { - LOG(debug) << "region counter found, with value of " << rc->fCount << ". incrementing."; - (rc->fCount)++; - LOG(debug) << "incremented region counter, now: " << rc->fCount; - } else { - LOG(debug) << "no region counter found, creating one and initializing with 1"; - rc = fManager.ManagementSegment().construct(bipc::unique_instance)(1); - LOG(debug) << "initialized region counter with: " << rc->fCount; - } - - fRegionId = rc->fCount; - - fRegion = fManager.CreateRegion(size, fRegionId, callback, path, flags); - } catch (bipc::interprocess_exception& e) { - LOG(error) << "cannot create region. Already created/not cleaned up?"; - LOG(error) << e.what(); - throw; - } -} - -} -} -} diff --git a/fairmq/shmem/UnmanagedRegion.h b/fairmq/shmem/UnmanagedRegion.h index cb583373..8d7a47a6 100644 --- a/fairmq/shmem/UnmanagedRegion.h +++ b/fairmq/shmem/UnmanagedRegion.h @@ -36,7 +36,19 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion friend class Socket; public: - UnmanagedRegion(Manager& manager, const size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0); + UnmanagedRegion(Manager& manager, const size_t size, RegionCallback callback, const std::string& path = "", int flags = 0) + : UnmanagedRegion(manager, size, 0, callback, path, flags) + {} + + UnmanagedRegion(Manager& manager, const size_t size, const int64_t userFlags, RegionCallback callback, const std::string& path = "", int flags = 0) + : fManager(manager) + , fRegion(nullptr) + , fRegionId(0) + { + auto result = fManager.CreateRegion(size, userFlags, callback, path, flags); + fRegion = result.first; + fRegionId = result.second; + } void* GetData() const override { return fRegion->get_address(); } size_t GetSize() const override { return fRegion->get_size(); } @@ -53,4 +65,4 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion } } -#endif /* FAIR_MQ_SHMEM_UNMANAGEDREGION_H_ */ \ No newline at end of file +#endif /* FAIR_MQ_SHMEM_UNMANAGEDREGION_H_ */ diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx b/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx index 458c6353..73ebc90f 100644 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx +++ b/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx @@ -95,6 +95,11 @@ FairMQUnmanagedRegionPtr FairMQTransportFactoryZMQ::CreateUnmanagedRegion(const return unique_ptr(new FairMQUnmanagedRegionZMQ(size, callback, path, flags)); } +FairMQUnmanagedRegionPtr FairMQTransportFactoryZMQ::CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */) const +{ + return unique_ptr(new FairMQUnmanagedRegionZMQ(size, userFlags, callback, path, flags)); +} + fair::mq::Transport FairMQTransportFactoryZMQ::GetType() const { return fTransportType; diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.h b/fairmq/zeromq/FairMQTransportFactoryZMQ.h index 139f9697..6a05163d 100644 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.h +++ b/fairmq/zeromq/FairMQTransportFactoryZMQ.h @@ -46,6 +46,11 @@ class FairMQTransportFactoryZMQ final : public FairMQTransportFactory FairMQPollerPtr CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const override; FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const std::string& path = "", int flags = 0) const override; + FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const override; + + void SubscribeToRegionEvents(FairMQRegionEventCallback /* callback */) override { LOG(error) << "SubscribeToRegionEvents not yet implemented for ZeroMQ"; } + void UnsubscribeFromRegionEvents() override { LOG(error) << "UnsubscribeFromRegionEvents not yet implemented for ZeroMQ"; } + std::vector GetRegionInfo() override { LOG(error) << "GetRegionInfo not yet implemented for ZeroMQ, returning empty vector"; return std::vector(); } fair::mq::Transport GetType() const override; diff --git a/fairmq/zeromq/FairMQUnmanagedRegionZMQ.cxx b/fairmq/zeromq/FairMQUnmanagedRegionZMQ.cxx index ed9ce453..66184e51 100644 --- a/fairmq/zeromq/FairMQUnmanagedRegionZMQ.cxx +++ b/fairmq/zeromq/FairMQUnmanagedRegionZMQ.cxx @@ -18,6 +18,13 @@ FairMQUnmanagedRegionZMQ::FairMQUnmanagedRegionZMQ(const size_t size, FairMQRegi { } +FairMQUnmanagedRegionZMQ::FairMQUnmanagedRegionZMQ(const size_t size, int64_t /* userFlags */, FairMQRegionCallback callback, const std::string& /* path = "" */, int /* flags = 0 */) + : fBuffer(malloc(size)) + , fSize(size) + , fCallback(callback) +{ +} + void* FairMQUnmanagedRegionZMQ::GetData() const { return fBuffer; diff --git a/fairmq/zeromq/FairMQUnmanagedRegionZMQ.h b/fairmq/zeromq/FairMQUnmanagedRegionZMQ.h index bf7c4a1d..025690ed 100644 --- a/fairmq/zeromq/FairMQUnmanagedRegionZMQ.h +++ b/fairmq/zeromq/FairMQUnmanagedRegionZMQ.h @@ -21,6 +21,7 @@ class FairMQUnmanagedRegionZMQ final : public FairMQUnmanagedRegion public: FairMQUnmanagedRegionZMQ(const size_t size, FairMQRegionCallback callback, const std::string& path = "", int flags = 0); + FairMQUnmanagedRegionZMQ(const size_t size, const int64_t userFlags, FairMQRegionCallback callback, const std::string& path = "", int flags = 0); FairMQUnmanagedRegionZMQ(const FairMQUnmanagedRegionZMQ&) = delete; FairMQUnmanagedRegionZMQ operator=(const FairMQUnmanagedRegionZMQ&) = delete; @@ -35,4 +36,4 @@ class FairMQUnmanagedRegionZMQ final : public FairMQUnmanagedRegion FairMQRegionCallback fCallback; }; -#endif /* FAIRMQUNMANAGEDREGIONZMQ_H_ */ \ No newline at end of file +#endif /* FAIRMQUNMANAGEDREGIONZMQ_H_ */