FairMQ: Add heartbeats to DDS plugin

This commit is contained in:
Dennis Klein 2017-11-06 21:42:06 +01:00 committed by Mohammad Al-Turany
parent 72cdd1e3d7
commit 56c0b2fd2b
2 changed files with 59 additions and 0 deletions

View File

@ -37,11 +37,18 @@ DDS::DDS(const string name, const Plugin::Version version, const string maintain
, fEventsMutex()
, fNewEvent()
, fDeviceTerminationRequested(false)
, fIosWork{fIos}
, fHeartbeatInterval{100}
, fHeartbeatTimer{fIos, fHeartbeatInterval}
, fIosWorkerThread([&](){ fIos.run(); })
{
try
{
TakeDeviceControl();
fControllerThread = thread(&DDS::HandleControl, this);
fHeartbeatTimer.expires_from_now(chrono::milliseconds{0});
fHeartbeatTimer.async_wait(bind(&DDS::Heartbeat, this, placeholders::_1));
}
catch (PluginServices::DeviceControlError& e)
{
@ -203,6 +210,24 @@ auto DDS::PublishBoundChannels() -> void
}
}
auto DDS::Heartbeat(const boost::system::error_code&) -> void
{
string id = GetProperty<string>("id");
string pid(to_string(getpid()));
{
lock_guard<mutex> lock{fHeartbeatSubscriberMutex};
for (const auto subscriberId : fHeartbeatSubscribers) {
fDDSCustomCmd.send("heartbeat: " + id + "," + pid, to_string(subscriberId));
}
if (!fHeartbeatSubscribers.empty()) {
fHeartbeatTimer.expires_at(fHeartbeatTimer.expires_at() + fHeartbeatInterval);
fHeartbeatTimer.async_wait(bind(&DDS::Heartbeat, this, placeholders::_1));
}
}
}
auto DDS::SubscribeForCustomCommands() -> void
{
string id = GetProperty<string>("id");
@ -241,6 +266,23 @@ auto DDS::SubscribeForCustomCommands() -> void
}
fDDSCustomCmd.send(ss.str(), to_string(senderId));
}
else if (cmd == "subscribe-to-heartbeats")
{
{
auto size = fHeartbeatSubscribers.size();
std::lock_guard<std::mutex> lock{fHeartbeatSubscriberMutex};
fHeartbeatSubscribers.insert(senderId);
}
fDDSCustomCmd.send("heartbeat-subscription: OK", to_string(senderId));
}
else if (cmd == "unsubscribe-from-heartbeats")
{
{
std::lock_guard<std::mutex> lock{fHeartbeatSubscriberMutex};
fHeartbeatSubscribers.erase(senderId);
}
fDDSCustomCmd.send("heartbeat-unsubscription: OK", to_string(senderId));
}
else
{
LOG(WARN) << "Unknown command: " << cmd;
@ -269,6 +311,9 @@ DDS::~DDS()
{
fControllerThread.join();
}
fIos.stop();
fIosWorkerThread.join();
}
} /* namespace plugins */

View File

@ -21,6 +21,9 @@
#include <vector>
#include <unordered_map>
#include <set>
#include <boost/asio.hpp>
#include <chrono>
#include <functional>
namespace fair
{
@ -58,6 +61,8 @@ class DDS : public Plugin
auto PublishBoundChannels() -> void;
auto SubscribeForCustomCommands() -> void;
auto Heartbeat(const boost::system::error_code&) -> void;
dds::intercom_api::CIntercomService fService;
dds::intercom_api::CCustomCmd fDDSCustomCmd;
dds::intercom_api::CKeyValue fDDSKeyValue;
@ -76,6 +81,15 @@ class DDS : public Plugin
std::condition_variable fNewEvent;
std::atomic<bool> fDeviceTerminationRequested;
std::set<uint64_t> fHeartbeatSubscribers;
std::mutex fHeartbeatSubscriberMutex;
boost::asio::io_service fIos;
boost::asio::io_service::work fIosWork;
std::thread fIosWorkerThread;
boost::asio::basic_waitable_timer<std::chrono::steady_clock> fHeartbeatTimer;
std::chrono::milliseconds fHeartbeatInterval;
};
REGISTER_FAIRMQ_PLUGIN(