diff --git a/fairmq/plugins/DDS/DDS.cxx b/fairmq/plugins/DDS/DDS.cxx index 8b4d7166..6435acd7 100644 --- a/fairmq/plugins/DDS/DDS.cxx +++ b/fairmq/plugins/DDS/DDS.cxx @@ -87,7 +87,6 @@ DDS::DDS(const string& name, // subscribe to device state changes, pushing new state changes into the event queue SubscribeToDeviceStateChange([&](DeviceState newState) { - fStateQueue.Push(newState); switch (newState) { case DeviceState::Bound: // Receive addresses of connecting channels from DDS @@ -109,7 +108,11 @@ DDS::DDS(const string& name, } case DeviceState::Exiting: fWorkGuard.reset(); - fDeviceTerminationRequested = true; + { + unique_lock lock(fStopMutex); + fDeviceTerminationRequested = true; + } + fStopCondition.notify_one(); UnsubscribeFromDeviceStateChange(); ReleaseDeviceControl(); break; @@ -373,6 +376,7 @@ auto DDS::SubscribeForCustomCommands() -> void inCmds.Deserialize(cmdStr); for (const auto& cmd : inCmds) { + // LOG(info) << "Received command type: '" << cmd->GetType() << "' from " << senderId; switch (cmd->GetType()) { case Type::check_state: { fDDS.Send(Cmds(make(id, GetCurrentDeviceState())).Serialize(), to_string(senderId)); @@ -387,6 +391,10 @@ auto DDS::SubscribeForCustomCommands() -> void Cmds outCmds(make(id, Result::Failure, transition)); fDDS.Send(outCmds.Serialize(), to_string(senderId)); } + { + lock_guard lock{fStateChangeSubscriberMutex}; + fLastExternalController = senderId; + } } break; case Type::dump_config: { diff --git a/fairmq/plugins/DDS/DDS.h b/fairmq/plugins/DDS/DDS.h index 92feade4..242e715a 100644 --- a/fairmq/plugins/DDS/DDS.h +++ b/fairmq/plugins/DDS/DDS.h @@ -161,7 +161,6 @@ class DDS : public Plugin std::thread fControllerThread; DeviceState fCurrentState, fLastState; - fair::mq::StateQueue fStateQueue; std::atomic fDeviceTerminationRequested;