Rebased, cleaned up

This commit is contained in:
Alexey Rybalchenko
2019-07-10 17:00:36 +02:00
committed by Dennis Klein
parent ff581985f3
commit 07f7142ae2
6 changed files with 102 additions and 107 deletions

View File

@@ -40,7 +40,7 @@ DDS::DDS(const string& name, const Plugin::Version version, const string& mainta
, fConnectingChans()
, fStopMutex()
, fStopCondition()
, fCommands({ "BIND", "CONNECT", "INIT TASK", "RUN", "STOP", "RESET TASK", "RESET DEVICE" })
, fCommands({ "BIND", "CONNECT", "INIT TASK", "RUN", "STOP", "RESET TASK", "RESET DEVICE", "SHUTDOWN", "STARTUP" })
, fControllerThread()
, fEvents()
, fEventsMutex()
@@ -48,7 +48,8 @@ DDS::DDS(const string& name, const Plugin::Version version, const string& mainta
, fCurrentState(DeviceState::Idle)
, fLastState(DeviceState::Idle)
, fDeviceTerminationRequested(false)
, fHeartbeatInterval{100}
, fServiceStarted(false)
, fHeartbeatInterval(100)
{
try {
TakeDeviceControl();
@@ -83,9 +84,6 @@ auto DDS::HandleControl() -> void
SubscribeForConnectingChannels();
// start DDS service - subscriptions will only start firing after this step
fService.start(dds_session_id);
// subscribe to device state changes, pushing new state changes into the event queue
SubscribeToDeviceStateChange([&](DeviceState newState) {
{
@@ -97,17 +95,14 @@ auto DDS::HandleControl() -> void
fDeviceTerminationRequested = true;
}
{
if (fServiceStarted) {
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
string id = GetProperty<string>("id");
fLastState = fCurrentState;
fCurrentState = newState;
for (auto subscriberId : fStateChangeSubscribers) {
LOG(debug) << "Publishing state-change: " << fLastState << "->" << newState
<< " to " << subscriberId;
fDDSCustomCmd.send("state-change: " + id + "," + ToStr(fLastState) + "->"
+ ToStr(newState),
to_string(subscriberId));
LOG(debug) << "Publishing state-change: " << fLastState << "->" << newState << " to " << subscriberId;
fDDSCustomCmd.send("state-change: " + id + "," + ToStr(fLastState) + "->" + ToStr(newState), to_string(subscriberId));
}
}
});
@@ -123,6 +118,10 @@ auto DDS::HandleControl() -> void
// and propagate addresses of bound channels to DDS.
FillChannelContainers();
// start DDS service - subscriptions will only start firing after this step
fService.start(dds_session_id);
fServiceStarted = true;
// publish bound addresses via DDS at keys corresponding to the channel prefixes, e.g. 'data' in data[i]
PublishBoundChannels();
@@ -267,8 +266,7 @@ auto DDS::SubscribeForConnectingChannels() -> void
// when multiple subChannels are used, their order on every device should be the same, irregardless of arrival order from DDS.
sort(mi->second.fSubChannelAddresses.begin(), mi->second.fSubChannelAddresses.end());
auto it3 = mi->second.fDDSValues.begin();
for (unsigned int i = 0; i < mi->second.fSubChannelAddresses.size(); ++i)
{
for (unsigned int i = 0; i < mi->second.fSubChannelAddresses.size(); ++i) {
SetProperty<string>(string{"chans." + mi->first + "." + to_string(i) + ".address"}, it3->second);
++it3;
}
@@ -372,11 +370,8 @@ auto DDS::SubscribeForCustomCommands() -> void
fDDSCustomCmd.send("state-changes-subscription: " + id + ",OK", to_string(senderId));
{
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
LOG(debug) << "Publishing state-change: " << fLastState << "->" << fCurrentState
<< " to " << senderId;
fDDSCustomCmd.send("state-change: " + id + "," + ToStr(fLastState) + "->"
+ ToStr(fCurrentState),
to_string(senderId));
LOG(debug) << "Publishing state-change: " << fLastState << "->" << fCurrentState << " to " << senderId;
fDDSCustomCmd.send("state-change: " + id + "," + ToStr(fLastState) + "->" + ToStr(fCurrentState), to_string(senderId));
}
} else if (cmd == "unsubscribe-from-state-changes") {
{