diff --git a/fairmq/plugins/DDS/DDS.cxx b/fairmq/plugins/DDS/DDS.cxx index 9509ff29..560c5f59 100644 --- a/fairmq/plugins/DDS/DDS.cxx +++ b/fairmq/plugins/DDS/DDS.cxx @@ -74,15 +74,22 @@ DDS::DDS(const string& name, // subscribe to device state changes, pushing new state changes into the event queue SubscribeToDeviceStateChange([&](DeviceState newState) { switch (newState) { - case DeviceState::Bound: + case DeviceState::Bound: { // Receive addresses of connecting channels from DDS // and propagate addresses of bound channels to DDS. FillChannelContainers(); + // allow updates from key value after channel containers are filled + { + lock_guard lk(fUpdateMutex); + fUpdatesAllowed = true; + } + fUpdateCondition.notify_one(); + // publish bound addresses via DDS at keys corresponding to the channel // prefixes, e.g. 'data' in data[i] PublishBoundChannels(); - break; + } break; case DeviceState::ResettingDevice: { { lock_guard lk(fUpdateMutex); @@ -90,9 +97,8 @@ DDS::DDS(const string& name, } EmptyChannelContainers(); - break; - } - case DeviceState::Exiting: + } break; + case DeviceState::Exiting: { if (!fControllerThread.joinable()) { fControllerThread = thread(&DDS::WaitForExitingAck, this); } @@ -100,7 +106,7 @@ DDS::DDS(const string& name, fDeviceTerminationRequested = true; UnsubscribeFromDeviceStateChange(); ReleaseDeviceControl(); - break; + } break; default: break; } @@ -213,11 +219,6 @@ auto DDS::FillChannelContainers() -> void LOG(debug) << "dds-i-n: adding " << chanName << " -> i: " << i << " n: " << n; fIofN.insert(make_pair(chanName, IofN(i, n))); } - { - lock_guard lk(fUpdateMutex); - fUpdatesAllowed = true; - } - fUpdateCondition.notify_one(); } catch (const exception& e) { LOG(error) << "Error filling channel containers: " << e.what(); }