From abcc5083f20cfd7094082516c327e600470f7000 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Tue, 13 Feb 2018 08:37:09 +0100 Subject: [PATCH] FairMQ DDS plugin: fix incomplete shutdown when dds server is terminated --- fairmq/FairMQStateMachine.cxx | 7 ++++-- fairmq/PluginServices.cxx | 9 +++---- fairmq/plugins/DDS/DDS.cxx | 47 +++++++++++++++++++++-------------- fairmq/plugins/DDS/DDS.h | 8 ++---- 4 files changed, 38 insertions(+), 33 deletions(-) diff --git a/fairmq/FairMQStateMachine.cxx b/fairmq/FairMQStateMachine.cxx index b524c993..bd9199b7 100644 --- a/fairmq/FairMQStateMachine.cxx +++ b/fairmq/FairMQStateMachine.cxx @@ -210,8 +210,11 @@ void FairMQStateMachine::SubscribeToStateChange(const std::string& key, std::fun } void FairMQStateMachine::UnsubscribeFromStateChange(const std::string& key) { - fStateChangeSignalsMap.at(key).disconnect(); - fStateChangeSignalsMap.erase(key); + if (fStateChangeSignalsMap.count(key)) + { + fStateChangeSignalsMap.at(key).disconnect(); + fStateChangeSignalsMap.erase(key); + } } int FairMQStateMachine::GetEventNumber(const std::string& event) diff --git a/fairmq/PluginServices.cxx b/fairmq/PluginServices.cxx index d47f0c6c..cd673763 100644 --- a/fairmq/PluginServices.cxx +++ b/fairmq/PluginServices.cxx @@ -104,7 +104,7 @@ auto PluginServices::ChangeDeviceState(const std::string& controller, const Devi { throw DeviceControlError{tools::ToString( "Plugin '", controller, "' is not allowed to change device states. ", - "Currently, plugin '", fDeviceController, "' has taken control." + "Currently, plugin '", *fDeviceController, "' has taken control." )}; } } @@ -125,7 +125,7 @@ auto PluginServices::TakeDeviceControl(const std::string& controller) -> void { throw DeviceControlError{tools::ToString( "Plugin '", controller, "' is not allowed to take over control. ", - "Currently, plugin '", fDeviceController, "' has taken control." + "Currently, plugin '", *fDeviceController, "' has taken control." )}; } } @@ -134,10 +134,7 @@ auto PluginServices::StealDeviceControl(const std::string& controller) -> void { lock_guard lock{fDeviceControllerMutex}; - if (!fDeviceController) - { - fDeviceController = controller; - } + fDeviceController = controller; } auto PluginServices::ReleaseDeviceControl(const std::string& controller) -> void diff --git a/fairmq/plugins/DDS/DDS.cxx b/fairmq/plugins/DDS/DDS.cxx index 3827eb0e..4ba8f3a5 100644 --- a/fairmq/plugins/DDS/DDS.cxx +++ b/fairmq/plugins/DDS/DDS.cxx @@ -37,19 +37,13 @@ DDS::DDS(const string name, const Plugin::Version version, const string maintain , fEventsMutex() , fNewEvent() , fDeviceTerminationRequested(false) - , fIosWork{fIos} - , fHeartbeatTimer{fIos, fHeartbeatInterval} , fHeartbeatInterval{100} { try { TakeDeviceControl(); fControllerThread = thread(&DDS::HandleControl, this); - - fIosWorkerThread = thread([&]{ fIos.run(); }); - - fHeartbeatTimer.expires_from_now(chrono::milliseconds{0}); - fHeartbeatTimer.async_wait(bind(&DDS::Heartbeat, this, placeholders::_1)); + fHeartbeatThread = thread(&DDS::HeartbeatSender, this); } catch (PluginServices::DeviceControlError& e) { @@ -136,8 +130,15 @@ auto DDS::HandleControl() -> void fDDSKeyValue.unsubscribe(); fDDSCustomCmd.unsubscribe(); - UnsubscribeFromDeviceStateChange(); - ReleaseDeviceControl(); + try + { + UnsubscribeFromDeviceStateChange(); + ReleaseDeviceControl(); + } + catch (fair::mq::PluginServices::DeviceControlError& e) + { + LOG(error) << e.what(); + } } auto DDS::FillChannelContainers() -> void @@ -176,7 +177,8 @@ auto DDS::SubscribeForConnectingChannels() -> void { fDDSKeyValue.subscribe([&] (const string& propertyId, const string& key, const string& value) { - try { + try + { LOG(debug) << "Received update for " << propertyId << ": key=" << key << " value=" << value; fConnectingChans.at(propertyId).fDDSValues.insert(make_pair(key.c_str(), value.c_str())); @@ -201,7 +203,8 @@ auto DDS::SubscribeForConnectingChannels() -> void ++mi; } } - } catch (const exception& e) + } + catch (const exception& e) { LOG(error) << "Error on handling DDS property update for " << propertyId << ": key=" << key << " value=" << value << ": " << e.what(); } @@ -222,20 +225,24 @@ auto DDS::PublishBoundChannels() -> void } } -auto DDS::Heartbeat(const boost::system::error_code&) -> void +auto DDS::HeartbeatSender() -> void { string id = GetProperty("id"); string pid(to_string(getpid())); + while (!fDeviceTerminationRequested) { - lock_guard lock{fHeartbeatSubscriberMutex}; + { + lock_guard lock{fHeartbeatSubscriberMutex}; - for (const auto subscriberId : fHeartbeatSubscribers) { - fDDSCustomCmd.send("heartbeat: " + id + "," + pid, to_string(subscriberId)); + for (const auto subscriberId : fHeartbeatSubscribers) + { + fDDSCustomCmd.send("heartbeat: " + id + "," + pid, to_string(subscriberId)); + } } + + this_thread::sleep_for(chrono::milliseconds(fHeartbeatInterval)); } - fHeartbeatTimer.expires_at(fHeartbeatTimer.expires_at() + fHeartbeatInterval); - fHeartbeatTimer.async_wait(bind(&DDS::Heartbeat, this, placeholders::_1)); } auto DDS::SubscribeForCustomCommands() -> void @@ -342,8 +349,10 @@ DDS::~DDS() fControllerThread.join(); } - fIos.stop(); - fIosWorkerThread.join(); + if (fHeartbeatThread.joinable()) + { + fHeartbeatThread.join(); + } } } /* namespace plugins */ diff --git a/fairmq/plugins/DDS/DDS.h b/fairmq/plugins/DDS/DDS.h index 282ea3f9..335dea7a 100644 --- a/fairmq/plugins/DDS/DDS.h +++ b/fairmq/plugins/DDS/DDS.h @@ -21,7 +21,6 @@ #include #include #include -#include #include #include @@ -61,7 +60,7 @@ class DDS : public Plugin auto PublishBoundChannels() -> void; auto SubscribeForCustomCommands() -> void; - auto Heartbeat(const boost::system::error_code&) -> void; + auto HeartbeatSender() -> void; dds::intercom_api::CIntercomService fService; dds::intercom_api::CCustomCmd fDDSCustomCmd; @@ -87,10 +86,7 @@ class DDS : public Plugin std::set fStateChangeSubscribers; std::mutex fStateChangeSubscriberMutex; - boost::asio::io_service fIos; - boost::asio::io_service::work fIosWork; - std::thread fIosWorkerThread; - boost::asio::basic_waitable_timer fHeartbeatTimer; + std::thread fHeartbeatThread; std::chrono::milliseconds fHeartbeatInterval; };