diff --git a/fairmq/FairMQTransportFactory.h b/fairmq/FairMQTransportFactory.h index ff560ca2..9073d62b 100644 --- a/fairmq/FairMQTransportFactory.h +++ b/fairmq/FairMQTransportFactory.h @@ -97,6 +97,9 @@ class FairMQTransportFactory /// @brief Subscribe to region events (creation, destruction, ...) /// @param callback the callback that is called when a region event occurs virtual void SubscribeToRegionEvents(FairMQRegionEventCallback callback) = 0; + /// @brief Check if there is an active subscription to region events + /// @return true/false + virtual bool SubscribedToRegionEvents() = 0; /// @brief Unsubscribe from region events virtual void UnsubscribeFromRegionEvents() = 0; diff --git a/fairmq/nanomsg/FairMQTransportFactoryNN.h b/fairmq/nanomsg/FairMQTransportFactoryNN.h index b80a3f33..af7b2f5a 100644 --- a/fairmq/nanomsg/FairMQTransportFactoryNN.h +++ b/fairmq/nanomsg/FairMQTransportFactoryNN.h @@ -40,6 +40,7 @@ class FairMQTransportFactoryNN final : public FairMQTransportFactory FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) override; void SubscribeToRegionEvents(FairMQRegionEventCallback /* callback */) override { LOG(error) << "SubscribeToRegionEvents not yet implemented for nanomsg"; } + bool SubscribedToRegionEvents() override { LOG(error) << "Region event subscriptions not yet implemented for nanomsg"; return false; } void UnsubscribeFromRegionEvents() override { LOG(error) << "UnsubscribeFromRegionEvents not yet implemented for nanomsg"; } std::vector GetRegionInfo() override { LOG(error) << "GetRegionInfo not yet implemented for nanomsg, returning empty vector"; return std::vector(); } diff --git a/fairmq/ofi/TransportFactory.h b/fairmq/ofi/TransportFactory.h index 2b3fcf89..bee2a814 100644 --- a/fairmq/ofi/TransportFactory.h +++ b/fairmq/ofi/TransportFactory.h @@ -50,6 +50,7 @@ class TransportFactory final : public FairMQTransportFactory auto CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) -> UnmanagedRegionPtr override; void SubscribeToRegionEvents(RegionEventCallback /* callback */) override { LOG(error) << "SubscribeToRegionEvents not yet implemented for OFI"; } + bool SubscribedToRegionEvents() override { LOG(error) << "Region event subscriptions not yet implemented for OFI"; return false; } void UnsubscribeFromRegionEvents() override { LOG(error) << "UnsubscribeFromRegionEvents not yet implemented for OFI"; } std::vector GetRegionInfo() override { LOG(error) << "GetRegionInfo not yet implemented for OFI, returning empty vector"; return std::vector(); } diff --git a/fairmq/shmem/Manager.cxx b/fairmq/shmem/Manager.cxx index 645eacac..c239f4a6 100644 --- a/fairmq/shmem/Manager.cxx +++ b/fairmq/shmem/Manager.cxx @@ -247,6 +247,11 @@ void Manager::SubscribeToRegionEvents(RegionEventCallback callback) fRegionEventThread = thread(&Manager::RegionEventsSubscription, this); } +bool Manager::SubscribedToRegionEvents() +{ + return fRegionEventThread.joinable(); +} + void Manager::UnsubscribeFromRegionEvents() { if (fRegionEventThread.joinable()) { diff --git a/fairmq/shmem/Manager.h b/fairmq/shmem/Manager.h index bd0a5e4b..95eef9df 100644 --- a/fairmq/shmem/Manager.h +++ b/fairmq/shmem/Manager.h @@ -78,6 +78,7 @@ class Manager std::vector GetRegionInfo(); std::vector GetRegionInfoUnsafe(); void SubscribeToRegionEvents(RegionEventCallback callback); + bool SubscribedToRegionEvents(); void UnsubscribeFromRegionEvents(); void RegionEventsSubscription(); diff --git a/fairmq/shmem/TransportFactory.cxx b/fairmq/shmem/TransportFactory.cxx index 46dc3675..faeb5711 100644 --- a/fairmq/shmem/TransportFactory.cxx +++ b/fairmq/shmem/TransportFactory.cxx @@ -174,6 +174,11 @@ void TransportFactory::SubscribeToRegionEvents(RegionEventCallback callback) fManager->SubscribeToRegionEvents(callback); } +bool TransportFactory::SubscribedToRegionEvents() +{ + return fManager->SubscribedToRegionEvents(); +} + void TransportFactory::UnsubscribeFromRegionEvents() { fManager->UnsubscribeFromRegionEvents(); diff --git a/fairmq/shmem/TransportFactory.h b/fairmq/shmem/TransportFactory.h index fbcc309b..991fe7da 100644 --- a/fairmq/shmem/TransportFactory.h +++ b/fairmq/shmem/TransportFactory.h @@ -53,6 +53,7 @@ class TransportFactory final : public fair::mq::TransportFactory UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) override; void SubscribeToRegionEvents(RegionEventCallback callback) override; + bool SubscribedToRegionEvents() override; void UnsubscribeFromRegionEvents() override; std::vector GetRegionInfo() override; diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx b/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx index 87e40609..371297c4 100644 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx +++ b/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx @@ -132,6 +132,11 @@ void FairMQTransportFactoryZMQ::SubscribeToRegionEvents(FairMQRegionEventCallbac fRegionEventThread = thread(&FairMQTransportFactoryZMQ::RegionEventsSubscription, this); } +bool FairMQTransportFactoryZMQ::SubscribedToRegionEvents() +{ + return fRegionEventThread.joinable(); +} + void FairMQTransportFactoryZMQ::UnsubscribeFromRegionEvents() { if (fRegionEventThread.joinable()) { diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.h b/fairmq/zeromq/FairMQTransportFactoryZMQ.h index 07423acd..79c5b77e 100644 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.h +++ b/fairmq/zeromq/FairMQTransportFactoryZMQ.h @@ -52,6 +52,7 @@ class FairMQTransportFactoryZMQ final : public FairMQTransportFactory FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback, const std::string& path = "", int flags = 0) override; void SubscribeToRegionEvents(FairMQRegionEventCallback callback) override; + bool SubscribedToRegionEvents() override; void UnsubscribeFromRegionEvents() override; void RegionEventsSubscription(); std::vector GetRegionInfo() override;