mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 08:41:16 +00:00
DDS plugin: Synchronize FillChannelContainers and DDSKeyValue updates
This was a regression after introducing external control mode in f7cdf5e
.
This commit is contained in:
parent
7b773cde51
commit
9cbccface7
|
@ -51,6 +51,7 @@ DDS::DDS(const string& name,
|
||||||
, fLastState(DeviceState::Idle)
|
, fLastState(DeviceState::Idle)
|
||||||
, fDeviceTerminationRequested(false)
|
, fDeviceTerminationRequested(false)
|
||||||
, fHeartbeatInterval(100)
|
, fHeartbeatInterval(100)
|
||||||
|
, fUpdatesAllowed(false)
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
TakeDeviceControl();
|
TakeDeviceControl();
|
||||||
|
@ -85,22 +86,28 @@ auto DDS::HandleControl() -> void
|
||||||
// subscribe to device state changes, pushing new state changes into the event queue
|
// subscribe to device state changes, pushing new state changes into the event queue
|
||||||
SubscribeToDeviceStateChange([&](DeviceState newState) {
|
SubscribeToDeviceStateChange([&](DeviceState newState) {
|
||||||
fStateQueue.Push(newState);
|
fStateQueue.Push(newState);
|
||||||
switch(newState) {
|
switch (newState) {
|
||||||
case DeviceState::Bound:
|
case DeviceState::Bound:
|
||||||
// Receive addresses of connecting channels from DDS
|
// Receive addresses of connecting channels from DDS
|
||||||
// and propagate addresses of bound channels to DDS.
|
// and propagate addresses of bound channels to DDS.
|
||||||
FillChannelContainers();
|
FillChannelContainers();
|
||||||
|
|
||||||
// publish bound addresses via DDS at keys corresponding to the channel prefixes, e.g. 'data' in data[i]
|
// publish bound addresses via DDS at keys corresponding to the channel
|
||||||
PublishBoundChannels();
|
// prefixes, e.g. 'data' in data[i]
|
||||||
break;
|
PublishBoundChannels();
|
||||||
case DeviceState::Exiting:
|
break;
|
||||||
fDeviceTerminationRequested = true;
|
case DeviceState::ResettingDevice: {
|
||||||
UnsubscribeFromDeviceStateChange();
|
std::lock_guard<std::mutex> lk(fUpdateMutex);
|
||||||
ReleaseDeviceControl();
|
fUpdatesAllowed = false;
|
||||||
break;
|
break;
|
||||||
default:
|
}
|
||||||
break;
|
case DeviceState::Exiting:
|
||||||
|
fDeviceTerminationRequested = true;
|
||||||
|
UnsubscribeFromDeviceStateChange();
|
||||||
|
ReleaseDeviceControl();
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
|
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
|
||||||
|
@ -197,6 +204,11 @@ auto DDS::FillChannelContainers() -> void
|
||||||
LOG(debug) << "dds-i-n: adding " << chanName << " -> i: " << i << " n: " << n;
|
LOG(debug) << "dds-i-n: adding " << chanName << " -> i: " << i << " n: " << n;
|
||||||
fIofN.insert(make_pair(chanName, IofN(i, n)));
|
fIofN.insert(make_pair(chanName, IofN(i, n)));
|
||||||
}
|
}
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lk(fUpdateMutex);
|
||||||
|
fUpdatesAllowed = true;
|
||||||
|
}
|
||||||
|
fUpdateCondition.notify_one();
|
||||||
} catch (const exception& e) {
|
} catch (const exception& e) {
|
||||||
LOG(error) << "Error filling channel containers: " << e.what();
|
LOG(error) << "Error filling channel containers: " << e.what();
|
||||||
}
|
}
|
||||||
|
@ -209,6 +221,10 @@ auto DDS::SubscribeForConnectingChannels() -> void
|
||||||
fDDS.SubscribeKeyValue([&] (const string& propertyId, const string& value, uint64_t senderTaskID) {
|
fDDS.SubscribeKeyValue([&] (const string& propertyId, const string& value, uint64_t senderTaskID) {
|
||||||
try {
|
try {
|
||||||
LOG(debug) << "Received update for " << propertyId << ": value=" << value << ", senderTaskID=" << senderTaskID;
|
LOG(debug) << "Received update for " << propertyId << ": value=" << value << ", senderTaskID=" << senderTaskID;
|
||||||
|
|
||||||
|
std::unique_lock<std::mutex> lk(fUpdateMutex);
|
||||||
|
fUpdateCondition.wait(lk, [&]{ return fUpdatesAllowed; });
|
||||||
|
|
||||||
string val = value;
|
string val = value;
|
||||||
// check if it is to handle as one out of multiple values
|
// check if it is to handle as one out of multiple values
|
||||||
auto it = fIofN.find(propertyId);
|
auto it = fIofN.find(propertyId);
|
||||||
|
|
|
@ -163,6 +163,10 @@ class DDS : public Plugin
|
||||||
|
|
||||||
std::thread fHeartbeatThread;
|
std::thread fHeartbeatThread;
|
||||||
std::chrono::milliseconds fHeartbeatInterval;
|
std::chrono::milliseconds fHeartbeatInterval;
|
||||||
|
|
||||||
|
bool fUpdatesAllowed;
|
||||||
|
std::mutex fUpdateMutex;
|
||||||
|
std::condition_variable fUpdateCondition;
|
||||||
};
|
};
|
||||||
|
|
||||||
Plugin::ProgOptions DDSProgramOptions()
|
Plugin::ProgOptions DDSProgramOptions()
|
||||||
|
|
Loading…
Reference in New Issue
Block a user