From 9cbccface71b9a4cf0eca3a3d314ce15a4a4ba4f Mon Sep 17 00:00:00 2001 From: Dennis Klein Date: Sun, 28 Jul 2019 14:31:00 +0200 Subject: [PATCH] DDS plugin: Synchronize FillChannelContainers and DDSKeyValue updates This was a regression after introducing external control mode in f7cdf5e. --- fairmq/plugins/DDS/DDS.cxx | 46 +++++++++++++++++++++++++------------- fairmq/plugins/DDS/DDS.h | 4 ++++ 2 files changed, 35 insertions(+), 15 deletions(-) diff --git a/fairmq/plugins/DDS/DDS.cxx b/fairmq/plugins/DDS/DDS.cxx index f471ee3a..fe91ae09 100644 --- a/fairmq/plugins/DDS/DDS.cxx +++ b/fairmq/plugins/DDS/DDS.cxx @@ -51,6 +51,7 @@ DDS::DDS(const string& name, , fLastState(DeviceState::Idle) , fDeviceTerminationRequested(false) , fHeartbeatInterval(100) + , fUpdatesAllowed(false) { try { TakeDeviceControl(); @@ -85,22 +86,28 @@ auto DDS::HandleControl() -> void // subscribe to device state changes, pushing new state changes into the event queue SubscribeToDeviceStateChange([&](DeviceState newState) { fStateQueue.Push(newState); - switch(newState) { - case DeviceState::Bound: - // Receive addresses of connecting channels from DDS - // and propagate addresses of bound channels to DDS. - FillChannelContainers(); + switch (newState) { + case DeviceState::Bound: + // Receive addresses of connecting channels from DDS + // and propagate addresses of bound channels to DDS. + FillChannelContainers(); - // publish bound addresses via DDS at keys corresponding to the channel prefixes, e.g. 'data' in data[i] - PublishBoundChannels(); - break; - case DeviceState::Exiting: - fDeviceTerminationRequested = true; - UnsubscribeFromDeviceStateChange(); - ReleaseDeviceControl(); - break; - default: - break; + // publish bound addresses via DDS at keys corresponding to the channel + // prefixes, e.g. 'data' in data[i] + PublishBoundChannels(); + break; + case DeviceState::ResettingDevice: { + std::lock_guard lk(fUpdateMutex); + fUpdatesAllowed = false; + break; + } + case DeviceState::Exiting: + fDeviceTerminationRequested = true; + UnsubscribeFromDeviceStateChange(); + ReleaseDeviceControl(); + break; + default: + break; } lock_guard lock{fStateChangeSubscriberMutex}; @@ -197,6 +204,11 @@ auto DDS::FillChannelContainers() -> void LOG(debug) << "dds-i-n: adding " << chanName << " -> i: " << i << " n: " << n; fIofN.insert(make_pair(chanName, IofN(i, n))); } + { + std::lock_guard lk(fUpdateMutex); + fUpdatesAllowed = true; + } + fUpdateCondition.notify_one(); } catch (const exception& e) { LOG(error) << "Error filling channel containers: " << e.what(); } @@ -209,6 +221,10 @@ auto DDS::SubscribeForConnectingChannels() -> void fDDS.SubscribeKeyValue([&] (const string& propertyId, const string& value, uint64_t senderTaskID) { try { LOG(debug) << "Received update for " << propertyId << ": value=" << value << ", senderTaskID=" << senderTaskID; + + std::unique_lock lk(fUpdateMutex); + fUpdateCondition.wait(lk, [&]{ return fUpdatesAllowed; }); + string val = value; // check if it is to handle as one out of multiple values auto it = fIofN.find(propertyId); diff --git a/fairmq/plugins/DDS/DDS.h b/fairmq/plugins/DDS/DDS.h index 3a1aa448..c24726b2 100644 --- a/fairmq/plugins/DDS/DDS.h +++ b/fairmq/plugins/DDS/DDS.h @@ -163,6 +163,10 @@ class DDS : public Plugin std::thread fHeartbeatThread; std::chrono::milliseconds fHeartbeatInterval; + + bool fUpdatesAllowed; + std::mutex fUpdateMutex; + std::condition_variable fUpdateCondition; }; Plugin::ProgOptions DDSProgramOptions()