diff --git a/fairmq/plugins/DDS/CMakeLists.txt b/fairmq/plugins/DDS/CMakeLists.txt index d3d3a0ca..88335e3d 100644 --- a/fairmq/plugins/DDS/CMakeLists.txt +++ b/fairmq/plugins/DDS/CMakeLists.txt @@ -19,7 +19,7 @@ set_target_properties(${plugin} PROPERTIES set(exe fairmq-dds-command-ui) add_executable(${exe} ${CMAKE_CURRENT_SOURCE_DIR}/runDDSCommandUI.cxx) -target_link_libraries(${exe} FairMQ DDS::dds_intercom_lib DDS::dds_protocol_lib) +target_link_libraries(${exe} FairMQ DDS::dds_intercom_lib DDS::dds_protocol_lib Boost::boost) target_include_directories(${exe} PRIVATE ${CMAKE_CURRENT_BINARY_DIR}) install(TARGETS ${plugin} ${exe} diff --git a/fairmq/plugins/DDS/DDS.cxx b/fairmq/plugins/DDS/DDS.cxx index a702ae6d..264d866b 100644 --- a/fairmq/plugins/DDS/DDS.cxx +++ b/fairmq/plugins/DDS/DDS.cxx @@ -13,6 +13,7 @@ #include #include #include +#include #include // for the interactive mode #include // for the interactive mode @@ -54,6 +55,7 @@ DDS::DDS(const string& name, , fExitingAckedByLastExternalController(false) , fHeartbeatInterval(100) , fUpdatesAllowed(false) + , fWorkGuard(fWorkerQueue.get_executor()) { try { TakeDeviceControl(); @@ -83,7 +85,6 @@ DDS::DDS(const string& name, SubscribeForCustomCommands(); SubscribeForConnectingChannels(); - fDDS.Start(); // subscribe to device state changes, pushing new state changes into the event queue SubscribeToDeviceStateChange([&](DeviceState newState) { @@ -104,6 +105,7 @@ DDS::DDS(const string& name, break; } case DeviceState::Exiting: + fWorkGuard.reset(); fDeviceTerminationRequested = true; UnsubscribeFromDeviceStateChange(); ReleaseDeviceControl(); @@ -124,7 +126,17 @@ DDS::DDS(const string& name, if (staticMode) { fControllerThread = thread(&DDS::StaticControl, this); + } else { + fWorkerThread = thread([this]() { + { + std::unique_lock lk(fUpdateMutex); + fUpdateCondition.wait(lk, [&]{ return fUpdatesAllowed; }); + } + fWorkerQueue.run(); + }); } + + fDDS.Start(); } catch (PluginServices::DeviceControlError& e) { LOG(debug) << e.what(); } catch (exception& e) { @@ -241,59 +253,58 @@ auto DDS::SubscribeForConnectingChannels() -> void LOG(debug) << "Subscribing for DDS properties."; fDDS.SubscribeKeyValue([&] (const string& propertyId, const string& value, uint64_t senderTaskID) { - try { - LOG(debug) << "Received update for " << propertyId << ": value=" << value << ", senderTaskID=" << senderTaskID; + LOG(debug) << "Received update for " << propertyId << ": value=" << value << ", senderTaskID=" << senderTaskID; - std::unique_lock lk(fUpdateMutex); - fUpdateCondition.wait(lk, [&]{ return fUpdatesAllowed; }); - - string val = value; - // check if it is to handle as one out of multiple values - auto it = fIofN.find(propertyId); - if (it != fIofN.end()) { - it->second.fEntries.push_back(value); - if (it->second.fEntries.size() == it->second.fN) { - sort(it->second.fEntries.begin(), it->second.fEntries.end()); - val = it->second.fEntries.at(it->second.fI); - } else { - LOG(debug) << "received " << it->second.fEntries.size() << " values for " << propertyId << ", expecting total of " << it->second.fN; - return; - } - } - - vector connectionStrings; - boost::algorithm::split(connectionStrings, val, boost::algorithm::is_any_of(",")); - if (connectionStrings.size() > 1) { // multiple bound channels received - auto it2 = fI.find(propertyId); - if (it2 != fI.end()) { - LOG(debug) << "adding connecting channel " << propertyId << " : " << connectionStrings.at(it2->second); - fConnectingChans.at(propertyId).fDDSValues.insert({senderTaskID, connectionStrings.at(it2->second).c_str()}); - } else { - LOG(error) << "multiple bound channels received, but no task index specified, only assigning the first"; - fConnectingChans.at(propertyId).fDDSValues.insert({senderTaskID, connectionStrings.at(0).c_str()}); - } - } else { // only one bound channel received - fConnectingChans.at(propertyId).fDDSValues.insert({senderTaskID, val.c_str()}); - } - - // update channels and remove them from unfinished container - for (auto mi = fConnectingChans.begin(); mi != fConnectingChans.end(); /* no increment */) { - if (mi->second.fSubChannelAddresses.size() == mi->second.fDDSValues.size()) { - // when multiple subChannels are used, their order on every device should be the same, irregardless of arrival order from DDS. - sort(mi->second.fSubChannelAddresses.begin(), mi->second.fSubChannelAddresses.end()); - auto it3 = mi->second.fDDSValues.begin(); - for (unsigned int i = 0; i < mi->second.fSubChannelAddresses.size(); ++i) { - SetProperty(string{"chans." + mi->first + "." + to_string(i) + ".address"}, it3->second); - ++it3; + boost::asio::post(fWorkerQueue, [=]() { + try { + string val = value; + // check if it is to handle as one out of multiple values + auto it = fIofN.find(propertyId); + if (it != fIofN.end()) { + it->second.fEntries.push_back(value); + if (it->second.fEntries.size() == it->second.fN) { + sort(it->second.fEntries.begin(), it->second.fEntries.end()); + val = it->second.fEntries.at(it->second.fI); + } else { + LOG(debug) << "received " << it->second.fEntries.size() << " values for " << propertyId << ", expecting total of " << it->second.fN; + return; } - fConnectingChans.erase(mi++); - } else { - ++mi; } + + vector connectionStrings; + boost::algorithm::split(connectionStrings, val, boost::algorithm::is_any_of(",")); + if (connectionStrings.size() > 1) { // multiple bound channels received + auto it2 = fI.find(propertyId); + if (it2 != fI.end()) { + LOG(debug) << "adding connecting channel " << propertyId << " : " << connectionStrings.at(it2->second); + fConnectingChans.at(propertyId).fDDSValues.insert({senderTaskID, connectionStrings.at(it2->second).c_str()}); + } else { + LOG(error) << "multiple bound channels received, but no task index specified, only assigning the first"; + fConnectingChans.at(propertyId).fDDSValues.insert({senderTaskID, connectionStrings.at(0).c_str()}); + } + } else { // only one bound channel received + fConnectingChans.at(propertyId).fDDSValues.insert({senderTaskID, val.c_str()}); + } + + // update channels and remove them from unfinished container + for (auto mi = fConnectingChans.begin(); mi != fConnectingChans.end(); /* no increment */) { + if (mi->second.fSubChannelAddresses.size() == mi->second.fDDSValues.size()) { + // when multiple subChannels are used, their order on every device should be the same, irregardless of arrival order from DDS. + sort(mi->second.fSubChannelAddresses.begin(), mi->second.fSubChannelAddresses.end()); + auto it3 = mi->second.fDDSValues.begin(); + for (unsigned int i = 0; i < mi->second.fSubChannelAddresses.size(); ++i) { + SetProperty(string{"chans." + mi->first + "." + to_string(i) + ".address"}, it3->second); + ++it3; + } + fConnectingChans.erase(mi++); + } else { + ++mi; + } + } + } catch (const exception& e) { + LOG(error) << "Error on handling DDS property update for " << propertyId << ": value=" << value << ", senderTaskID=" << senderTaskID << ": " << e.what(); } - } catch (const exception& e) { - LOG(error) << "Error on handling DDS property update for " << propertyId << ": value=" << value << ", senderTaskID=" << senderTaskID << ": " << e.what(); - } + }); }); } @@ -421,6 +432,11 @@ DDS::~DDS() if (fHeartbeatThread.joinable()) { fHeartbeatThread.join(); } + + fWorkGuard.reset(); + if (fWorkerThread.joinable()) { + fWorkerThread.join(); + } } } /* namespace plugins */ diff --git a/fairmq/plugins/DDS/DDS.h b/fairmq/plugins/DDS/DDS.h index f1564896..36ba637a 100644 --- a/fairmq/plugins/DDS/DDS.h +++ b/fairmq/plugins/DDS/DDS.h @@ -11,6 +11,9 @@ #include #include +#include +#include +#include #include #include #include @@ -172,6 +175,10 @@ class DDS : public Plugin bool fUpdatesAllowed; std::mutex fUpdateMutex; std::condition_variable fUpdateCondition; + + std::thread fWorkerThread; + boost::asio::io_context fWorkerQueue; + boost::asio::executor_work_guard fWorkGuard; }; Plugin::ProgOptions DDSProgramOptions()