FairMQ: Add state subscription to DDS plugin

This commit is contained in:
Dennis Klein 2017-11-08 15:25:36 +01:00 committed by Mohammad Al-Turany
parent 56c0b2fd2b
commit 80332583ee
2 changed files with 67 additions and 35 deletions

View File

@ -40,13 +40,14 @@ DDS::DDS(const string name, const Plugin::Version version, const string maintain
, fIosWork{fIos}
, fHeartbeatInterval{100}
, fHeartbeatTimer{fIos, fHeartbeatInterval}
, fIosWorkerThread([&](){ fIos.run(); })
{
try
{
TakeDeviceControl();
fControllerThread = thread(&DDS::HandleControl, this);
fIosWorkerThread = thread([&]{ fIos.run(); });
fHeartbeatTimer.expires_from_now(chrono::milliseconds{0});
fHeartbeatTimer.async_wait(bind(&DDS::Heartbeat, this, placeholders::_1));
}
@ -64,11 +65,17 @@ auto DDS::HandleControl() -> void
{
try
{
// subscribe for state changes from DDS (subscriptions start firing after fService.start() is called)
SubscribeForCustomCommands();
// subscribe for DDS service errors.
fService.subscribeOnError([](const dds::intercom_api::EErrorCode errorCode, const string& errorMsg) {
LOG(ERROR) << "DDS Error received: error code: " << errorCode << ", error message: " << errorMsg << endl;
});
LOG(DEBUG) << "Subscribing for DDS properties.";
SubscribeForConnectingChannels();
// subscribe to device state changes, pushing new state chenges into the event queue
SubscribeToDeviceStateChange([&](DeviceState newState)
{
@ -81,6 +88,16 @@ auto DDS::HandleControl() -> void
{
fDeviceTerminationRequested = true;
}
{
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
string id = GetProperty<string>("id");
for (auto subscriberId : fStateChangeSubscribers)
{
LOG(DEBUG) << "Publishing state-change: " << newState << " to " << subscriberId;
fDDSCustomCmd.send("state-change: " + id + "," + ToStr(newState), to_string(subscriberId));
}
}
});
ChangeDeviceState(DeviceStateTransition::InitDevice);
@ -90,16 +107,6 @@ auto DDS::HandleControl() -> void
// and propagate addresses of bound channels to DDS.
FillChannelContainers();
if (fConnectingChans.size() > 0)
{
LOG(DEBUG) << "Subscribing for DDS properties.";
SubscribeForConnectingChannels();
}
// subscribe for state changes from DDS (subscriptions start firing after fService.start() is called)
SubscribeForCustomCommands();
// start DDS service - subscriptions will only start firing after this step
fService.start();
@ -169,29 +176,34 @@ auto DDS::SubscribeForConnectingChannels() -> void
{
fDDSKeyValue.subscribe([&] (const string& propertyId, const string& key, const string& value)
{
LOG(DEBUG) << "Received update for " << propertyId << ": key=" << key << " value=" << value;
fConnectingChans.at(propertyId).fDDSValues.insert(make_pair<string, string>(key.c_str(), value.c_str()));
try {
LOG(DEBUG) << "Received update for " << propertyId << ": key=" << key << " value=" << value;
fConnectingChans.at(propertyId).fDDSValues.insert(make_pair<string, string>(key.c_str(), value.c_str()));
// update channels and remove them from unfinished container
for (auto mi = fConnectingChans.begin(); mi != fConnectingChans.end(); /* no increment */)
{
if (mi->second.fSubChannelAddresses.size() == mi->second.fDDSValues.size())
// update channels and remove them from unfinished container
for (auto mi = fConnectingChans.begin(); mi != fConnectingChans.end(); /* no increment */)
{
// when multiple subChannels are used, their order on every device should be the same, irregardless of arrival order from DDS.
sort(mi->second.fSubChannelAddresses.begin(), mi->second.fSubChannelAddresses.end());
auto it = mi->second.fDDSValues.begin();
for (unsigned int i = 0; i < mi->second.fSubChannelAddresses.size(); ++i)
if (mi->second.fSubChannelAddresses.size() == mi->second.fDDSValues.size())
{
string k = "chans." + mi->first + "." + to_string(i) + ".address";
SetProperty<string>(k, it->second);
++it;
// when multiple subChannels are used, their order on every device should be the same, irregardless of arrival order from DDS.
sort(mi->second.fSubChannelAddresses.begin(), mi->second.fSubChannelAddresses.end());
auto it = mi->second.fDDSValues.begin();
for (unsigned int i = 0; i < mi->second.fSubChannelAddresses.size(); ++i)
{
string k = "chans." + mi->first + "." + to_string(i) + ".address";
SetProperty<string>(k, it->second);
++it;
}
fConnectingChans.erase(mi++);
}
else
{
++mi;
}
fConnectingChans.erase(mi++);
}
else
{
++mi;
}
} catch (const exception& e)
{
LOG(ERROR) << "Error on handling DDS property update for " << propertyId << ": key=" << key << " value=" << value << ": " << e.what();
}
});
}
@ -221,11 +233,9 @@ auto DDS::Heartbeat(const boost::system::error_code&) -> void
for (const auto subscriberId : fHeartbeatSubscribers) {
fDDSCustomCmd.send("heartbeat: " + id + "," + pid, to_string(subscriberId));
}
if (!fHeartbeatSubscribers.empty()) {
fHeartbeatTimer.expires_at(fHeartbeatTimer.expires_at() + fHeartbeatInterval);
fHeartbeatTimer.async_wait(bind(&DDS::Heartbeat, this, placeholders::_1));
}
}
fHeartbeatTimer.expires_at(fHeartbeatTimer.expires_at() + fHeartbeatInterval);
fHeartbeatTimer.async_wait(bind(&DDS::Heartbeat, this, placeholders::_1));
}
auto DDS::SubscribeForCustomCommands() -> void
@ -273,7 +283,7 @@ auto DDS::SubscribeForCustomCommands() -> void
std::lock_guard<std::mutex> lock{fHeartbeatSubscriberMutex};
fHeartbeatSubscribers.insert(senderId);
}
fDDSCustomCmd.send("heartbeat-subscription: OK", to_string(senderId));
fDDSCustomCmd.send("heartbeat-subscription: " + id + ",OK", to_string(senderId));
}
else if (cmd == "unsubscribe-from-heartbeats")
{
@ -281,7 +291,27 @@ auto DDS::SubscribeForCustomCommands() -> void
std::lock_guard<std::mutex> lock{fHeartbeatSubscriberMutex};
fHeartbeatSubscribers.erase(senderId);
}
fDDSCustomCmd.send("heartbeat-unsubscription: OK", to_string(senderId));
fDDSCustomCmd.send("heartbeat-unsubscription: " + id + ",OK", to_string(senderId));
}
else if (cmd == "subscribe-to-state-changes")
{
{
auto size = fStateChangeSubscribers.size();
std::lock_guard<std::mutex> lock{fStateChangeSubscriberMutex};
fStateChangeSubscribers.insert(senderId);
}
fDDSCustomCmd.send("state-changes-subscription: " + id + ",OK", to_string(senderId));
auto state = GetCurrentDeviceState();
LOG(DEBUG) << "Publishing state-change: " << state << " to " << senderId;
fDDSCustomCmd.send("state-change: " + id + "," + ToStr(state), to_string(senderId));
}
else if (cmd == "unsubscribe-from-state-changes")
{
{
std::lock_guard<std::mutex> lock{fStateChangeSubscriberMutex};
fStateChangeSubscribers.erase(senderId);
}
fDDSCustomCmd.send("state-changes-unsubscription: " + id + ",OK", to_string(senderId));
}
else
{

View File

@ -84,6 +84,8 @@ class DDS : public Plugin
std::set<uint64_t> fHeartbeatSubscribers;
std::mutex fHeartbeatSubscriberMutex;
std::set<uint64_t> fStateChangeSubscribers;
std::mutex fStateChangeSubscriberMutex;
boost::asio::io_service fIos;
boost::asio::io_service::work fIosWork;