diff --git a/fairmq/plugins/DDS/DDS.cxx b/fairmq/plugins/DDS/DDS.cxx index 2e6fe0fc..4256247b 100644 --- a/fairmq/plugins/DDS/DDS.cxx +++ b/fairmq/plugins/DDS/DDS.cxx @@ -40,13 +40,14 @@ DDS::DDS(const string name, const Plugin::Version version, const string maintain , fIosWork{fIos} , fHeartbeatInterval{100} , fHeartbeatTimer{fIos, fHeartbeatInterval} - , fIosWorkerThread([&](){ fIos.run(); }) { try { TakeDeviceControl(); fControllerThread = thread(&DDS::HandleControl, this); + fIosWorkerThread = thread([&]{ fIos.run(); }); + fHeartbeatTimer.expires_from_now(chrono::milliseconds{0}); fHeartbeatTimer.async_wait(bind(&DDS::Heartbeat, this, placeholders::_1)); } @@ -64,11 +65,17 @@ auto DDS::HandleControl() -> void { try { + // subscribe for state changes from DDS (subscriptions start firing after fService.start() is called) + SubscribeForCustomCommands(); + // subscribe for DDS service errors. fService.subscribeOnError([](const dds::intercom_api::EErrorCode errorCode, const string& errorMsg) { LOG(ERROR) << "DDS Error received: error code: " << errorCode << ", error message: " << errorMsg << endl; }); + LOG(DEBUG) << "Subscribing for DDS properties."; + SubscribeForConnectingChannels(); + // subscribe to device state changes, pushing new state chenges into the event queue SubscribeToDeviceStateChange([&](DeviceState newState) { @@ -81,6 +88,16 @@ auto DDS::HandleControl() -> void { fDeviceTerminationRequested = true; } + + { + lock_guard lock{fStateChangeSubscriberMutex}; + string id = GetProperty("id"); + for (auto subscriberId : fStateChangeSubscribers) + { + LOG(DEBUG) << "Publishing state-change: " << newState << " to " << subscriberId; + fDDSCustomCmd.send("state-change: " + id + "," + ToStr(newState), to_string(subscriberId)); + } + } }); ChangeDeviceState(DeviceStateTransition::InitDevice); @@ -90,16 +107,6 @@ auto DDS::HandleControl() -> void // and propagate addresses of bound channels to DDS. FillChannelContainers(); - if (fConnectingChans.size() > 0) - { - LOG(DEBUG) << "Subscribing for DDS properties."; - - SubscribeForConnectingChannels(); - } - - // subscribe for state changes from DDS (subscriptions start firing after fService.start() is called) - SubscribeForCustomCommands(); - // start DDS service - subscriptions will only start firing after this step fService.start(); @@ -169,29 +176,34 @@ auto DDS::SubscribeForConnectingChannels() -> void { fDDSKeyValue.subscribe([&] (const string& propertyId, const string& key, const string& value) { - LOG(DEBUG) << "Received update for " << propertyId << ": key=" << key << " value=" << value; - fConnectingChans.at(propertyId).fDDSValues.insert(make_pair(key.c_str(), value.c_str())); + try { + LOG(DEBUG) << "Received update for " << propertyId << ": key=" << key << " value=" << value; + fConnectingChans.at(propertyId).fDDSValues.insert(make_pair(key.c_str(), value.c_str())); - // update channels and remove them from unfinished container - for (auto mi = fConnectingChans.begin(); mi != fConnectingChans.end(); /* no increment */) - { - if (mi->second.fSubChannelAddresses.size() == mi->second.fDDSValues.size()) + // update channels and remove them from unfinished container + for (auto mi = fConnectingChans.begin(); mi != fConnectingChans.end(); /* no increment */) { - // when multiple subChannels are used, their order on every device should be the same, irregardless of arrival order from DDS. - sort(mi->second.fSubChannelAddresses.begin(), mi->second.fSubChannelAddresses.end()); - auto it = mi->second.fDDSValues.begin(); - for (unsigned int i = 0; i < mi->second.fSubChannelAddresses.size(); ++i) + if (mi->second.fSubChannelAddresses.size() == mi->second.fDDSValues.size()) { - string k = "chans." + mi->first + "." + to_string(i) + ".address"; - SetProperty(k, it->second); - ++it; + // when multiple subChannels are used, their order on every device should be the same, irregardless of arrival order from DDS. + sort(mi->second.fSubChannelAddresses.begin(), mi->second.fSubChannelAddresses.end()); + auto it = mi->second.fDDSValues.begin(); + for (unsigned int i = 0; i < mi->second.fSubChannelAddresses.size(); ++i) + { + string k = "chans." + mi->first + "." + to_string(i) + ".address"; + SetProperty(k, it->second); + ++it; + } + fConnectingChans.erase(mi++); + } + else + { + ++mi; } - fConnectingChans.erase(mi++); - } - else - { - ++mi; } + } catch (const exception& e) + { + LOG(ERROR) << "Error on handling DDS property update for " << propertyId << ": key=" << key << " value=" << value << ": " << e.what(); } }); } @@ -221,11 +233,9 @@ auto DDS::Heartbeat(const boost::system::error_code&) -> void for (const auto subscriberId : fHeartbeatSubscribers) { fDDSCustomCmd.send("heartbeat: " + id + "," + pid, to_string(subscriberId)); } - if (!fHeartbeatSubscribers.empty()) { - fHeartbeatTimer.expires_at(fHeartbeatTimer.expires_at() + fHeartbeatInterval); - fHeartbeatTimer.async_wait(bind(&DDS::Heartbeat, this, placeholders::_1)); - } } + fHeartbeatTimer.expires_at(fHeartbeatTimer.expires_at() + fHeartbeatInterval); + fHeartbeatTimer.async_wait(bind(&DDS::Heartbeat, this, placeholders::_1)); } auto DDS::SubscribeForCustomCommands() -> void @@ -273,7 +283,7 @@ auto DDS::SubscribeForCustomCommands() -> void std::lock_guard lock{fHeartbeatSubscriberMutex}; fHeartbeatSubscribers.insert(senderId); } - fDDSCustomCmd.send("heartbeat-subscription: OK", to_string(senderId)); + fDDSCustomCmd.send("heartbeat-subscription: " + id + ",OK", to_string(senderId)); } else if (cmd == "unsubscribe-from-heartbeats") { @@ -281,7 +291,27 @@ auto DDS::SubscribeForCustomCommands() -> void std::lock_guard lock{fHeartbeatSubscriberMutex}; fHeartbeatSubscribers.erase(senderId); } - fDDSCustomCmd.send("heartbeat-unsubscription: OK", to_string(senderId)); + fDDSCustomCmd.send("heartbeat-unsubscription: " + id + ",OK", to_string(senderId)); + } + else if (cmd == "subscribe-to-state-changes") + { + { + auto size = fStateChangeSubscribers.size(); + std::lock_guard lock{fStateChangeSubscriberMutex}; + fStateChangeSubscribers.insert(senderId); + } + fDDSCustomCmd.send("state-changes-subscription: " + id + ",OK", to_string(senderId)); + auto state = GetCurrentDeviceState(); + LOG(DEBUG) << "Publishing state-change: " << state << " to " << senderId; + fDDSCustomCmd.send("state-change: " + id + "," + ToStr(state), to_string(senderId)); + } + else if (cmd == "unsubscribe-from-state-changes") + { + { + std::lock_guard lock{fStateChangeSubscriberMutex}; + fStateChangeSubscribers.erase(senderId); + } + fDDSCustomCmd.send("state-changes-unsubscription: " + id + ",OK", to_string(senderId)); } else { diff --git a/fairmq/plugins/DDS/DDS.h b/fairmq/plugins/DDS/DDS.h index f74746e5..282ea3f9 100644 --- a/fairmq/plugins/DDS/DDS.h +++ b/fairmq/plugins/DDS/DDS.h @@ -84,6 +84,8 @@ class DDS : public Plugin std::set fHeartbeatSubscribers; std::mutex fHeartbeatSubscriberMutex; + std::set fStateChangeSubscribers; + std::mutex fStateChangeSubscriberMutex; boost::asio::io_service fIos; boost::asio::io_service::work fIosWork;