diff --git a/fairmq/FairMQTransportFactory.h b/fairmq/FairMQTransportFactory.h index de7fc802..ff560ca2 100644 --- a/fairmq/FairMQTransportFactory.h +++ b/fairmq/FairMQTransportFactory.h @@ -84,7 +84,7 @@ class FairMQTransportFactory /// @param path optional parameter to pass to the underlying transport /// @param flags optional parameter to pass to the underlying transport /// @return pointer to UnmanagedRegion - virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const = 0; + virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) = 0; /// @brief Create new UnmanagedRegion /// @param size size of the region /// @param userFlags flags to be stored with the region, have no effect on the transport, but can be retrieved from the region by the user @@ -92,7 +92,7 @@ class FairMQTransportFactory /// @param path optional parameter to pass to the underlying transport /// @param flags optional parameter to pass to the underlying transport /// @return pointer to UnmanagedRegion - virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const = 0; + virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) = 0; /// @brief Subscribe to region events (creation, destruction, ...) /// @param callback the callback that is called when a region event occurs diff --git a/fairmq/FairMQUnmanagedRegion.h b/fairmq/FairMQUnmanagedRegion.h index 11d02ac1..b91c7836 100644 --- a/fairmq/FairMQUnmanagedRegion.h +++ b/fairmq/FairMQUnmanagedRegion.h @@ -14,13 +14,32 @@ #include // std::function #include // std::ostream +class FairMQTransportFactory; + enum class FairMQRegionEvent : int { created, destroyed }; -struct FairMQRegionInfo { +struct FairMQRegionInfo +{ + FairMQRegionInfo() + : id(0) + , ptr(nullptr) + , size(0) + , flags(0) + , event(FairMQRegionEvent::created) + {} + + FairMQRegionInfo(uint64_t _id, void* _ptr, size_t _size, int64_t _flags, FairMQRegionEvent _event) + : id(_id) + , ptr(_ptr) + , size(_size) + , flags(_flags) + , event (_event) + {} + uint64_t id; // id of the region void* ptr; // pointer to the start of the region size_t size; // region size @@ -34,10 +53,19 @@ using FairMQRegionEventCallback = std::function; class FairMQUnmanagedRegion { public: + FairMQUnmanagedRegion() {} + FairMQUnmanagedRegion(FairMQTransportFactory* factory): fTransport(factory) {} + virtual void* GetData() const = 0; virtual size_t GetSize() const = 0; + FairMQTransportFactory* GetTransport() { return fTransport; } + void SetTransport(FairMQTransportFactory* transport) { fTransport = transport; } + virtual ~FairMQUnmanagedRegion() {}; + + private: + FairMQTransportFactory* fTransport{nullptr}; }; using FairMQUnmanagedRegionPtr = std::unique_ptr; diff --git a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx index f2f3dbb1..6eb53966 100644 --- a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx +++ b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx @@ -65,14 +65,14 @@ FairMQPollerPtr FairMQTransportFactoryNN::CreatePoller(const unordered_map(new FairMQPollerNN(channelsMap, channelList)); } -FairMQUnmanagedRegionPtr FairMQTransportFactoryNN::CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */) const +FairMQUnmanagedRegionPtr FairMQTransportFactoryNN::CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */) { - return unique_ptr(new FairMQUnmanagedRegionNN(size, callback, path, flags)); + return unique_ptr(new FairMQUnmanagedRegionNN(size, callback, path, flags, this)); } -FairMQUnmanagedRegionPtr FairMQTransportFactoryNN::CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */) const +FairMQUnmanagedRegionPtr FairMQTransportFactoryNN::CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */) { - return unique_ptr(new FairMQUnmanagedRegionNN(size, userFlags, callback, path, flags)); + return unique_ptr(new FairMQUnmanagedRegionNN(size, userFlags, callback, path, flags, this)); } fair::mq::Transport FairMQTransportFactoryNN::GetType() const diff --git a/fairmq/nanomsg/FairMQTransportFactoryNN.h b/fairmq/nanomsg/FairMQTransportFactoryNN.h index d573ec20..b80a3f33 100644 --- a/fairmq/nanomsg/FairMQTransportFactoryNN.h +++ b/fairmq/nanomsg/FairMQTransportFactoryNN.h @@ -36,8 +36,8 @@ class FairMQTransportFactoryNN final : public FairMQTransportFactory FairMQPollerPtr CreatePoller(const std::vector& channels) const override; FairMQPollerPtr CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const override; - FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const std::string& path = "", int flags = 0) const override; - FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const override; + FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const std::string& path = "", int flags = 0) override; + FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) override; void SubscribeToRegionEvents(FairMQRegionEventCallback /* callback */) override { LOG(error) << "SubscribeToRegionEvents not yet implemented for nanomsg"; } void UnsubscribeFromRegionEvents() override { LOG(error) << "UnsubscribeFromRegionEvents not yet implemented for nanomsg"; } diff --git a/fairmq/nanomsg/FairMQUnmanagedRegionNN.cxx b/fairmq/nanomsg/FairMQUnmanagedRegionNN.cxx index b651b6cd..638f0646 100644 --- a/fairmq/nanomsg/FairMQUnmanagedRegionNN.cxx +++ b/fairmq/nanomsg/FairMQUnmanagedRegionNN.cxx @@ -11,15 +11,17 @@ using namespace std; -FairMQUnmanagedRegionNN::FairMQUnmanagedRegionNN(const size_t size, FairMQRegionCallback callback, const std::string& /*path = "" */, int /*flags = 0 */) - : fBuffer(malloc(size)) +FairMQUnmanagedRegionNN::FairMQUnmanagedRegionNN(const size_t size, FairMQRegionCallback callback, const std::string& /*path = "" */, int /*flags = 0 */, FairMQTransportFactory* factory /* = nullptr */) + : FairMQUnmanagedRegion(factory) + , fBuffer(malloc(size)) , fSize(size) , fCallback(callback) { } -FairMQUnmanagedRegionNN::FairMQUnmanagedRegionNN(const size_t size, const int64_t /*userFlags*/, FairMQRegionCallback callback, const std::string& /*path = "" */, int /*flags = 0 */) - : fBuffer(malloc(size)) +FairMQUnmanagedRegionNN::FairMQUnmanagedRegionNN(const size_t size, const int64_t /*userFlags*/, FairMQRegionCallback callback, const std::string& /*path = "" */, int /*flags = 0 */, FairMQTransportFactory* factory /* = nullptr */) + : FairMQUnmanagedRegion(factory) + , fBuffer(malloc(size)) , fSize(size) , fCallback(callback) { diff --git a/fairmq/nanomsg/FairMQUnmanagedRegionNN.h b/fairmq/nanomsg/FairMQUnmanagedRegionNN.h index ca9f54de..6a4ac668 100644 --- a/fairmq/nanomsg/FairMQUnmanagedRegionNN.h +++ b/fairmq/nanomsg/FairMQUnmanagedRegionNN.h @@ -19,8 +19,8 @@ class FairMQUnmanagedRegionNN final : public FairMQUnmanagedRegion friend class FairMQSocketNN; public: - FairMQUnmanagedRegionNN(const size_t size, FairMQRegionCallback callback, const std::string& path = "", int flags = 0); - FairMQUnmanagedRegionNN(const size_t size, const int64_t userFlags, FairMQRegionCallback callback, const std::string& path = "", int flags = 0); + FairMQUnmanagedRegionNN(const size_t size, FairMQRegionCallback callback, const std::string& path = "", int flags = 0, FairMQTransportFactory* factory = nullptr); + FairMQUnmanagedRegionNN(const size_t size, const int64_t userFlags, FairMQRegionCallback callback, const std::string& path = "", int flags = 0, FairMQTransportFactory* factory = nullptr); FairMQUnmanagedRegionNN(const FairMQUnmanagedRegionNN&) = delete; FairMQUnmanagedRegionNN operator=(const FairMQUnmanagedRegionNN&) = delete; diff --git a/fairmq/ofi/TransportFactory.cxx b/fairmq/ofi/TransportFactory.cxx index 7be2ea51..918f76df 100644 --- a/fairmq/ofi/TransportFactory.cxx +++ b/fairmq/ofi/TransportFactory.cxx @@ -85,12 +85,12 @@ auto TransportFactory::CreatePoller(const unordered_map UnmanagedRegionPtr +auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, FairMQRegionCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */) -> UnmanagedRegionPtr { throw runtime_error{"Not yet implemented UMR."}; } -auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, const int64_t /*userFlags*/, FairMQRegionCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */) const -> UnmanagedRegionPtr +auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, const int64_t /*userFlags*/, FairMQRegionCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */) -> UnmanagedRegionPtr { throw runtime_error{"Not yet implemented UMR."}; } diff --git a/fairmq/ofi/TransportFactory.h b/fairmq/ofi/TransportFactory.h index c1afe174..2b3fcf89 100644 --- a/fairmq/ofi/TransportFactory.h +++ b/fairmq/ofi/TransportFactory.h @@ -46,8 +46,8 @@ class TransportFactory final : public FairMQTransportFactory auto CreatePoller(const std::vector& channels) const -> PollerPtr override; auto CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const -> PollerPtr override; - auto CreateUnmanagedRegion(const size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const -> UnmanagedRegionPtr override; - auto CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const -> UnmanagedRegionPtr override; + auto CreateUnmanagedRegion(const size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) -> UnmanagedRegionPtr override; + auto CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) -> UnmanagedRegionPtr override; void SubscribeToRegionEvents(RegionEventCallback /* callback */) override { LOG(error) << "SubscribeToRegionEvents not yet implemented for OFI"; } void UnsubscribeFromRegionEvents() override { LOG(error) << "UnsubscribeFromRegionEvents not yet implemented for OFI"; } diff --git a/fairmq/shmem/TransportFactory.cxx b/fairmq/shmem/TransportFactory.cxx index ed6bca02..46dc3675 100644 --- a/fairmq/shmem/TransportFactory.cxx +++ b/fairmq/shmem/TransportFactory.cxx @@ -159,14 +159,14 @@ PollerPtr TransportFactory::CreatePoller(const unordered_map(channelsMap, channelList); } -UnmanagedRegionPtr TransportFactory::CreateUnmanagedRegion(const size_t size, RegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */) const +UnmanagedRegionPtr TransportFactory::CreateUnmanagedRegion(const size_t size, RegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */) { - return tools::make_unique(*fManager, size, callback, path, flags); + return tools::make_unique(*fManager, size, callback, path, flags, this); } -UnmanagedRegionPtr TransportFactory::CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */) const +UnmanagedRegionPtr TransportFactory::CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */) { - return tools::make_unique(*fManager, size, userFlags, callback, path, flags); + return tools::make_unique(*fManager, size, userFlags, callback, path, flags, this); } void TransportFactory::SubscribeToRegionEvents(RegionEventCallback callback) diff --git a/fairmq/shmem/TransportFactory.h b/fairmq/shmem/TransportFactory.h index 97136e66..fbcc309b 100644 --- a/fairmq/shmem/TransportFactory.h +++ b/fairmq/shmem/TransportFactory.h @@ -49,8 +49,8 @@ class TransportFactory final : public fair::mq::TransportFactory PollerPtr CreatePoller(const std::vector& channels) const override; PollerPtr CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const override; - UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const override; - UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const override; + UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) override; + UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) override; void SubscribeToRegionEvents(RegionEventCallback callback) override; void UnsubscribeFromRegionEvents() override; diff --git a/fairmq/shmem/UnmanagedRegion.h b/fairmq/shmem/UnmanagedRegion.h index 8d7a47a6..c034e9d8 100644 --- a/fairmq/shmem/UnmanagedRegion.h +++ b/fairmq/shmem/UnmanagedRegion.h @@ -36,12 +36,13 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion friend class Socket; public: - UnmanagedRegion(Manager& manager, const size_t size, RegionCallback callback, const std::string& path = "", int flags = 0) - : UnmanagedRegion(manager, size, 0, callback, path, flags) + UnmanagedRegion(Manager& manager, const size_t size, RegionCallback callback, const std::string& path = "", int flags = 0, FairMQTransportFactory* factory = nullptr) + : UnmanagedRegion(manager, size, 0, callback, path, flags, factory) {} - UnmanagedRegion(Manager& manager, const size_t size, const int64_t userFlags, RegionCallback callback, const std::string& path = "", int flags = 0) - : fManager(manager) + UnmanagedRegion(Manager& manager, const size_t size, const int64_t userFlags, RegionCallback callback, const std::string& path = "", int flags = 0, FairMQTransportFactory* factory = nullptr) + : FairMQUnmanagedRegion(factory) + , fManager(manager) , fRegion(nullptr) , fRegionId(0) { diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx b/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx index 73ebc90f..1bd434ba 100644 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx +++ b/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx @@ -9,6 +9,8 @@ #include "FairMQTransportFactoryZMQ.h" #include +#include // find_if + using namespace std; fair::mq::Transport FairMQTransportFactoryZMQ::fTransportType = fair::mq::Transport::ZMQ; @@ -16,6 +18,7 @@ fair::mq::Transport FairMQTransportFactoryZMQ::fTransportType = fair::mq::Transp FairMQTransportFactoryZMQ::FairMQTransportFactoryZMQ(const string& id, const fair::mq::ProgOptions* config) : FairMQTransportFactory(id) , fContext(zmq_ctx_new()) + , fRegionCounter(0) { int major, minor, patch; zmq_version(&major, &minor, &patch); @@ -80,7 +83,7 @@ FairMQPollerPtr FairMQTransportFactoryZMQ::CreatePoller(const vector(new FairMQPollerZMQ(channels)); } -FairMQPollerPtr FairMQTransportFactoryZMQ::CreatePoller(const std::vector& channels) const +FairMQPollerPtr FairMQTransportFactoryZMQ::CreatePoller(const vector& channels) const { return unique_ptr(new FairMQPollerZMQ(channels)); } @@ -90,14 +93,93 @@ FairMQPollerPtr FairMQTransportFactoryZMQ::CreatePoller(const unordered_map(new FairMQPollerZMQ(channelsMap, channelList)); } -FairMQUnmanagedRegionPtr FairMQTransportFactoryZMQ::CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */) const +FairMQUnmanagedRegionPtr FairMQTransportFactoryZMQ::CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const string& path /* = "" */, int flags /* = 0 */) { - return unique_ptr(new FairMQUnmanagedRegionZMQ(size, callback, path, flags)); + return CreateUnmanagedRegion(size, 0, callback, path, flags); } -FairMQUnmanagedRegionPtr FairMQTransportFactoryZMQ::CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */) const +FairMQUnmanagedRegionPtr FairMQTransportFactoryZMQ::CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback, const string& path /* = "" */, int flags /* = 0 */) { - return unique_ptr(new FairMQUnmanagedRegionZMQ(size, userFlags, callback, path, flags)); + unique_ptr ptr = nullptr; + { + lock_guard lock(fMtx); + + ++fRegionCounter; + ptr = unique_ptr(new FairMQUnmanagedRegionZMQ(fRegionCounter, size, userFlags, callback, path, flags, this)); + auto zPtr = static_cast(ptr.get()); + fRegionInfos.emplace_back(zPtr->GetId(), zPtr->GetData(), zPtr->GetSize(), zPtr->GetUserFlags(), fair::mq::RegionEvent::created); + fRegionEvents.emplace(zPtr->GetId(), zPtr->GetData(), zPtr->GetSize(), zPtr->GetUserFlags(), fair::mq::RegionEvent::created); + } + fRegionEventsCV.notify_one(); + return ptr; +} + +void FairMQTransportFactoryZMQ::SubscribeToRegionEvents(FairMQRegionEventCallback callback) +{ + if (fRegionEventThread.joinable()) { + LOG(debug) << "Already subscribed. Overwriting previous subscription."; + { + lock_guard lock(fMtx); + fRegionEventsSubscriptionActive = false; + } + fRegionEventsCV.notify_one(); + fRegionEventThread.join(); + } + lock_guard lock(fMtx); + fRegionEventCallback = callback; + fRegionEventsSubscriptionActive = true; + fRegionEventThread = thread(&FairMQTransportFactoryZMQ::RegionEventsSubscription, this); +} + +void FairMQTransportFactoryZMQ::UnsubscribeFromRegionEvents() +{ + if (fRegionEventThread.joinable()) { + unique_lock lock(fMtx); + fRegionEventsSubscriptionActive = false; + lock.unlock(); + fRegionEventsCV.notify_one(); + fRegionEventThread.join(); + lock.lock(); + fRegionEventCallback = nullptr; + } +} + +void FairMQTransportFactoryZMQ::RegionEventsSubscription() +{ + unique_lock lock(fMtx); + while (fRegionEventsSubscriptionActive) { + + while (!fRegionEvents.empty()) { + auto i = fRegionEvents.front(); + fRegionEventCallback(i); + fRegionEvents.pop(); + } + fRegionEventsCV.wait(lock, [&]() { return !fRegionEventsSubscriptionActive || !fRegionEvents.empty(); }); + } +} + +vector FairMQTransportFactoryZMQ::GetRegionInfo() +{ + lock_guard lock(fMtx); + return fRegionInfos; +} + +void FairMQTransportFactoryZMQ::RemoveRegion(uint64_t id) +{ + { + lock_guard lock(fMtx); + auto it = find_if(fRegionInfos.begin(), fRegionInfos.end(), [id](const fair::mq::RegionInfo& i) { + return i.id == id; + }); + if (it != fRegionInfos.end()) { + fRegionEvents.push(*it); + fRegionEvents.back().event = fair::mq::RegionEvent::destroyed; + fRegionInfos.erase(it); + } else { + LOG(error) << "RemoveRegion: given id (" << id << ") not found."; + } + } + fRegionEventsCV.notify_one(); } fair::mq::Transport FairMQTransportFactoryZMQ::GetType() const @@ -108,23 +190,19 @@ fair::mq::Transport FairMQTransportFactoryZMQ::GetType() const FairMQTransportFactoryZMQ::~FairMQTransportFactoryZMQ() { LOG(debug) << "Destroying ZeroMQ transport..."; - if (fContext) - { - if (zmq_ctx_term(fContext) != 0) - { - if (errno == EINTR) - { + + UnsubscribeFromRegionEvents(); + + if (fContext) { + if (zmq_ctx_term(fContext) != 0) { + if (errno == EINTR) { LOG(error) << " failed closing context, reason: " << zmq_strerror(errno); - } - else - { + } else { fContext = nullptr; return; } } - } - else - { + } else { LOG(error) << "context not available for shutdown"; } } diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.h b/fairmq/zeromq/FairMQTransportFactoryZMQ.h index 6a05163d..07423acd 100644 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.h +++ b/fairmq/zeromq/FairMQTransportFactoryZMQ.h @@ -15,9 +15,6 @@ #ifndef FAIRMQTRANSPORTFACTORYZMQ_H_ #define FAIRMQTRANSPORTFACTORYZMQ_H_ -#include -#include - #include "FairMQTransportFactory.h" #include "FairMQMessageZMQ.h" #include "FairMQSocketZMQ.h" @@ -25,6 +22,14 @@ #include "FairMQUnmanagedRegionZMQ.h" #include +#include +#include +#include +#include +#include +#include +#include + class FairMQTransportFactoryZMQ final : public FairMQTransportFactory { public: @@ -32,8 +37,6 @@ class FairMQTransportFactoryZMQ final : public FairMQTransportFactory FairMQTransportFactoryZMQ(const FairMQTransportFactoryZMQ&) = delete; FairMQTransportFactoryZMQ operator=(const FairMQTransportFactoryZMQ&) = delete; - ~FairMQTransportFactoryZMQ() override; - FairMQMessagePtr CreateMessage() override; FairMQMessagePtr CreateMessage(const size_t size) override; FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override; @@ -45,12 +48,14 @@ class FairMQTransportFactoryZMQ final : public FairMQTransportFactory FairMQPollerPtr CreatePoller(const std::vector& channels) const override; FairMQPollerPtr CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const override; - FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const std::string& path = "", int flags = 0) const override; - FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const override; + FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const std::string& path = "", int flags = 0) override; + FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback, const std::string& path = "", int flags = 0) override; - void SubscribeToRegionEvents(FairMQRegionEventCallback /* callback */) override { LOG(error) << "SubscribeToRegionEvents not yet implemented for ZeroMQ"; } - void UnsubscribeFromRegionEvents() override { LOG(error) << "UnsubscribeFromRegionEvents not yet implemented for ZeroMQ"; } - std::vector GetRegionInfo() override { LOG(error) << "GetRegionInfo not yet implemented for ZeroMQ, returning empty vector"; return std::vector(); } + void SubscribeToRegionEvents(FairMQRegionEventCallback callback) override; + void UnsubscribeFromRegionEvents() override; + void RegionEventsSubscription(); + std::vector GetRegionInfo() override; + void RemoveRegion(uint64_t id); fair::mq::Transport GetType() const override; @@ -58,9 +63,20 @@ class FairMQTransportFactoryZMQ final : public FairMQTransportFactory void Resume() override { FairMQSocketZMQ::Resume(); } void Reset() override {} + ~FairMQTransportFactoryZMQ() override; + private: static fair::mq::Transport fTransportType; void* fContext; + + std::mutex fMtx; + uint64_t fRegionCounter; + std::condition_variable fRegionEventsCV; + std::vector fRegionInfos; + std::queue fRegionEvents; + std::thread fRegionEventThread; + std::function fRegionEventCallback; + bool fRegionEventsSubscriptionActive; }; #endif /* FAIRMQTRANSPORTFACTORYZMQ_H_ */ diff --git a/fairmq/zeromq/FairMQUnmanagedRegionZMQ.cxx b/fairmq/zeromq/FairMQUnmanagedRegionZMQ.cxx index 66184e51..ec02fb4c 100644 --- a/fairmq/zeromq/FairMQUnmanagedRegionZMQ.cxx +++ b/fairmq/zeromq/FairMQUnmanagedRegionZMQ.cxx @@ -7,23 +7,17 @@ ********************************************************************************/ #include "FairMQUnmanagedRegionZMQ.h" +#include "FairMQTransportFactoryZMQ.h" #include "FairMQLogger.h" -using namespace std; - -FairMQUnmanagedRegionZMQ::FairMQUnmanagedRegionZMQ(const size_t size, FairMQRegionCallback callback, const std::string& /* path = "" */, int /* flags = 0 */) - : fBuffer(malloc(size)) +FairMQUnmanagedRegionZMQ::FairMQUnmanagedRegionZMQ(uint64_t id, const size_t size, int64_t userFlags, FairMQRegionCallback callback, const std::string& /* path = "" */, int /* flags = 0 */, FairMQTransportFactory* factory /* = nullptr */) + : FairMQUnmanagedRegion(factory) + , fId(id) + , fBuffer(malloc(size)) , fSize(size) + , fUserFlags(userFlags) , fCallback(callback) -{ -} - -FairMQUnmanagedRegionZMQ::FairMQUnmanagedRegionZMQ(const size_t size, int64_t /* userFlags */, FairMQRegionCallback callback, const std::string& /* path = "" */, int /* flags = 0 */) - : fBuffer(malloc(size)) - , fSize(size) - , fCallback(callback) -{ -} +{} void* FairMQUnmanagedRegionZMQ::GetData() const { @@ -38,5 +32,6 @@ size_t FairMQUnmanagedRegionZMQ::GetSize() const FairMQUnmanagedRegionZMQ::~FairMQUnmanagedRegionZMQ() { LOG(debug) << "destroying region"; + static_cast(GetTransport())->RemoveRegion(fId); free(fBuffer); } diff --git a/fairmq/zeromq/FairMQUnmanagedRegionZMQ.h b/fairmq/zeromq/FairMQUnmanagedRegionZMQ.h index 025690ed..751f7149 100644 --- a/fairmq/zeromq/FairMQUnmanagedRegionZMQ.h +++ b/fairmq/zeromq/FairMQUnmanagedRegionZMQ.h @@ -13,6 +13,7 @@ #include // size_t #include +class FairMQTransportFactory; class FairMQUnmanagedRegionZMQ final : public FairMQUnmanagedRegion { @@ -20,19 +21,23 @@ class FairMQUnmanagedRegionZMQ final : public FairMQUnmanagedRegion friend class FairMQMessageZMQ; public: - FairMQUnmanagedRegionZMQ(const size_t size, FairMQRegionCallback callback, const std::string& path = "", int flags = 0); - FairMQUnmanagedRegionZMQ(const size_t size, const int64_t userFlags, FairMQRegionCallback callback, const std::string& path = "", int flags = 0); + FairMQUnmanagedRegionZMQ(uint64_t id, const size_t size, int64_t userFlags, FairMQRegionCallback callback, const std::string& /* path = "" */, int /* flags = 0 */, FairMQTransportFactory* factory = nullptr); + FairMQUnmanagedRegionZMQ(const FairMQUnmanagedRegionZMQ&) = delete; FairMQUnmanagedRegionZMQ operator=(const FairMQUnmanagedRegionZMQ&) = delete; + uint64_t GetId() const { return fId; } virtual void* GetData() const override; virtual size_t GetSize() const override; + int64_t GetUserFlags() const { return fUserFlags; } virtual ~FairMQUnmanagedRegionZMQ(); private: + uint64_t fId; void* fBuffer; size_t fSize; + int64_t fUserFlags; FairMQRegionCallback fCallback; };