From 56c0b2fd2b89b97646845c7e294a725e7fae80bb Mon Sep 17 00:00:00 2001 From: Dennis Klein Date: Mon, 6 Nov 2017 21:42:06 +0100 Subject: [PATCH] FairMQ: Add heartbeats to DDS plugin --- fairmq/plugins/DDS/DDS.cxx | 45 ++++++++++++++++++++++++++++++++++++++ fairmq/plugins/DDS/DDS.h | 14 ++++++++++++ 2 files changed, 59 insertions(+) diff --git a/fairmq/plugins/DDS/DDS.cxx b/fairmq/plugins/DDS/DDS.cxx index fb8c4398..2e6fe0fc 100644 --- a/fairmq/plugins/DDS/DDS.cxx +++ b/fairmq/plugins/DDS/DDS.cxx @@ -37,11 +37,18 @@ DDS::DDS(const string name, const Plugin::Version version, const string maintain , fEventsMutex() , fNewEvent() , fDeviceTerminationRequested(false) + , fIosWork{fIos} + , fHeartbeatInterval{100} + , fHeartbeatTimer{fIos, fHeartbeatInterval} + , fIosWorkerThread([&](){ fIos.run(); }) { try { TakeDeviceControl(); fControllerThread = thread(&DDS::HandleControl, this); + + fHeartbeatTimer.expires_from_now(chrono::milliseconds{0}); + fHeartbeatTimer.async_wait(bind(&DDS::Heartbeat, this, placeholders::_1)); } catch (PluginServices::DeviceControlError& e) { @@ -203,6 +210,24 @@ auto DDS::PublishBoundChannels() -> void } } +auto DDS::Heartbeat(const boost::system::error_code&) -> void +{ + string id = GetProperty("id"); + string pid(to_string(getpid())); + + { + lock_guard lock{fHeartbeatSubscriberMutex}; + + for (const auto subscriberId : fHeartbeatSubscribers) { + fDDSCustomCmd.send("heartbeat: " + id + "," + pid, to_string(subscriberId)); + } + if (!fHeartbeatSubscribers.empty()) { + fHeartbeatTimer.expires_at(fHeartbeatTimer.expires_at() + fHeartbeatInterval); + fHeartbeatTimer.async_wait(bind(&DDS::Heartbeat, this, placeholders::_1)); + } + } +} + auto DDS::SubscribeForCustomCommands() -> void { string id = GetProperty("id"); @@ -241,6 +266,23 @@ auto DDS::SubscribeForCustomCommands() -> void } fDDSCustomCmd.send(ss.str(), to_string(senderId)); } + else if (cmd == "subscribe-to-heartbeats") + { + { + auto size = fHeartbeatSubscribers.size(); + std::lock_guard lock{fHeartbeatSubscriberMutex}; + fHeartbeatSubscribers.insert(senderId); + } + fDDSCustomCmd.send("heartbeat-subscription: OK", to_string(senderId)); + } + else if (cmd == "unsubscribe-from-heartbeats") + { + { + std::lock_guard lock{fHeartbeatSubscriberMutex}; + fHeartbeatSubscribers.erase(senderId); + } + fDDSCustomCmd.send("heartbeat-unsubscription: OK", to_string(senderId)); + } else { LOG(WARN) << "Unknown command: " << cmd; @@ -269,6 +311,9 @@ DDS::~DDS() { fControllerThread.join(); } + + fIos.stop(); + fIosWorkerThread.join(); } } /* namespace plugins */ diff --git a/fairmq/plugins/DDS/DDS.h b/fairmq/plugins/DDS/DDS.h index 01acf999..f74746e5 100644 --- a/fairmq/plugins/DDS/DDS.h +++ b/fairmq/plugins/DDS/DDS.h @@ -21,6 +21,9 @@ #include #include #include +#include +#include +#include namespace fair { @@ -58,6 +61,8 @@ class DDS : public Plugin auto PublishBoundChannels() -> void; auto SubscribeForCustomCommands() -> void; + auto Heartbeat(const boost::system::error_code&) -> void; + dds::intercom_api::CIntercomService fService; dds::intercom_api::CCustomCmd fDDSCustomCmd; dds::intercom_api::CKeyValue fDDSKeyValue; @@ -76,6 +81,15 @@ class DDS : public Plugin std::condition_variable fNewEvent; std::atomic fDeviceTerminationRequested; + + std::set fHeartbeatSubscribers; + std::mutex fHeartbeatSubscriberMutex; + + boost::asio::io_service fIos; + boost::asio::io_service::work fIosWork; + std::thread fIosWorkerThread; + boost::asio::basic_waitable_timer fHeartbeatTimer; + std::chrono::milliseconds fHeartbeatInterval; }; REGISTER_FAIRMQ_PLUGIN(