FairMQ DDS plugin: fix incomplete shutdown when dds server is terminated

This commit is contained in:
Alexey Rybalchenko 2018-02-13 08:37:09 +01:00 committed by Mohammad Al-Turany
parent f8d4fe01d0
commit abcc5083f2
4 changed files with 38 additions and 33 deletions

View File

@ -209,10 +209,13 @@ void FairMQStateMachine::SubscribeToStateChange(const std::string& key, std::fun
fStateChangeSignalsMap.insert({key, fStateChangeSignal.connect(callback)}); fStateChangeSignalsMap.insert({key, fStateChangeSignal.connect(callback)});
} }
void FairMQStateMachine::UnsubscribeFromStateChange(const std::string& key) void FairMQStateMachine::UnsubscribeFromStateChange(const std::string& key)
{
if (fStateChangeSignalsMap.count(key))
{ {
fStateChangeSignalsMap.at(key).disconnect(); fStateChangeSignalsMap.at(key).disconnect();
fStateChangeSignalsMap.erase(key); fStateChangeSignalsMap.erase(key);
} }
}
int FairMQStateMachine::GetEventNumber(const std::string& event) int FairMQStateMachine::GetEventNumber(const std::string& event)
{ {

View File

@ -104,7 +104,7 @@ auto PluginServices::ChangeDeviceState(const std::string& controller, const Devi
{ {
throw DeviceControlError{tools::ToString( throw DeviceControlError{tools::ToString(
"Plugin '", controller, "' is not allowed to change device states. ", "Plugin '", controller, "' is not allowed to change device states. ",
"Currently, plugin '", fDeviceController, "' has taken control." "Currently, plugin '", *fDeviceController, "' has taken control."
)}; )};
} }
} }
@ -125,7 +125,7 @@ auto PluginServices::TakeDeviceControl(const std::string& controller) -> void
{ {
throw DeviceControlError{tools::ToString( throw DeviceControlError{tools::ToString(
"Plugin '", controller, "' is not allowed to take over control. ", "Plugin '", controller, "' is not allowed to take over control. ",
"Currently, plugin '", fDeviceController, "' has taken control." "Currently, plugin '", *fDeviceController, "' has taken control."
)}; )};
} }
} }
@ -134,11 +134,8 @@ auto PluginServices::StealDeviceControl(const std::string& controller) -> void
{ {
lock_guard<mutex> lock{fDeviceControllerMutex}; lock_guard<mutex> lock{fDeviceControllerMutex};
if (!fDeviceController)
{
fDeviceController = controller; fDeviceController = controller;
} }
}
auto PluginServices::ReleaseDeviceControl(const std::string& controller) -> void auto PluginServices::ReleaseDeviceControl(const std::string& controller) -> void
{ {

View File

@ -37,19 +37,13 @@ DDS::DDS(const string name, const Plugin::Version version, const string maintain
, fEventsMutex() , fEventsMutex()
, fNewEvent() , fNewEvent()
, fDeviceTerminationRequested(false) , fDeviceTerminationRequested(false)
, fIosWork{fIos}
, fHeartbeatTimer{fIos, fHeartbeatInterval}
, fHeartbeatInterval{100} , fHeartbeatInterval{100}
{ {
try try
{ {
TakeDeviceControl(); TakeDeviceControl();
fControllerThread = thread(&DDS::HandleControl, this); fControllerThread = thread(&DDS::HandleControl, this);
fHeartbeatThread = thread(&DDS::HeartbeatSender, this);
fIosWorkerThread = thread([&]{ fIos.run(); });
fHeartbeatTimer.expires_from_now(chrono::milliseconds{0});
fHeartbeatTimer.async_wait(bind(&DDS::Heartbeat, this, placeholders::_1));
} }
catch (PluginServices::DeviceControlError& e) catch (PluginServices::DeviceControlError& e)
{ {
@ -136,9 +130,16 @@ auto DDS::HandleControl() -> void
fDDSKeyValue.unsubscribe(); fDDSKeyValue.unsubscribe();
fDDSCustomCmd.unsubscribe(); fDDSCustomCmd.unsubscribe();
try
{
UnsubscribeFromDeviceStateChange(); UnsubscribeFromDeviceStateChange();
ReleaseDeviceControl(); ReleaseDeviceControl();
} }
catch (fair::mq::PluginServices::DeviceControlError& e)
{
LOG(error) << e.what();
}
}
auto DDS::FillChannelContainers() -> void auto DDS::FillChannelContainers() -> void
{ {
@ -176,7 +177,8 @@ auto DDS::SubscribeForConnectingChannels() -> void
{ {
fDDSKeyValue.subscribe([&] (const string& propertyId, const string& key, const string& value) fDDSKeyValue.subscribe([&] (const string& propertyId, const string& key, const string& value)
{ {
try { try
{
LOG(debug) << "Received update for " << propertyId << ": key=" << key << " value=" << value; LOG(debug) << "Received update for " << propertyId << ": key=" << key << " value=" << value;
fConnectingChans.at(propertyId).fDDSValues.insert(make_pair<string, string>(key.c_str(), value.c_str())); fConnectingChans.at(propertyId).fDDSValues.insert(make_pair<string, string>(key.c_str(), value.c_str()));
@ -201,7 +203,8 @@ auto DDS::SubscribeForConnectingChannels() -> void
++mi; ++mi;
} }
} }
} catch (const exception& e) }
catch (const exception& e)
{ {
LOG(error) << "Error on handling DDS property update for " << propertyId << ": key=" << key << " value=" << value << ": " << e.what(); LOG(error) << "Error on handling DDS property update for " << propertyId << ": key=" << key << " value=" << value << ": " << e.what();
} }
@ -222,20 +225,24 @@ auto DDS::PublishBoundChannels() -> void
} }
} }
auto DDS::Heartbeat(const boost::system::error_code&) -> void auto DDS::HeartbeatSender() -> void
{ {
string id = GetProperty<string>("id"); string id = GetProperty<string>("id");
string pid(to_string(getpid())); string pid(to_string(getpid()));
while (!fDeviceTerminationRequested)
{
{ {
lock_guard<mutex> lock{fHeartbeatSubscriberMutex}; lock_guard<mutex> lock{fHeartbeatSubscriberMutex};
for (const auto subscriberId : fHeartbeatSubscribers) { for (const auto subscriberId : fHeartbeatSubscribers)
{
fDDSCustomCmd.send("heartbeat: " + id + "," + pid, to_string(subscriberId)); fDDSCustomCmd.send("heartbeat: " + id + "," + pid, to_string(subscriberId));
} }
} }
fHeartbeatTimer.expires_at(fHeartbeatTimer.expires_at() + fHeartbeatInterval);
fHeartbeatTimer.async_wait(bind(&DDS::Heartbeat, this, placeholders::_1)); this_thread::sleep_for(chrono::milliseconds(fHeartbeatInterval));
}
} }
auto DDS::SubscribeForCustomCommands() -> void auto DDS::SubscribeForCustomCommands() -> void
@ -342,8 +349,10 @@ DDS::~DDS()
fControllerThread.join(); fControllerThread.join();
} }
fIos.stop(); if (fHeartbeatThread.joinable())
fIosWorkerThread.join(); {
fHeartbeatThread.join();
}
} }
} /* namespace plugins */ } /* namespace plugins */

View File

@ -21,7 +21,6 @@
#include <vector> #include <vector>
#include <unordered_map> #include <unordered_map>
#include <set> #include <set>
#include <boost/asio.hpp>
#include <chrono> #include <chrono>
#include <functional> #include <functional>
@ -61,7 +60,7 @@ class DDS : public Plugin
auto PublishBoundChannels() -> void; auto PublishBoundChannels() -> void;
auto SubscribeForCustomCommands() -> void; auto SubscribeForCustomCommands() -> void;
auto Heartbeat(const boost::system::error_code&) -> void; auto HeartbeatSender() -> void;
dds::intercom_api::CIntercomService fService; dds::intercom_api::CIntercomService fService;
dds::intercom_api::CCustomCmd fDDSCustomCmd; dds::intercom_api::CCustomCmd fDDSCustomCmd;
@ -87,10 +86,7 @@ class DDS : public Plugin
std::set<uint64_t> fStateChangeSubscribers; std::set<uint64_t> fStateChangeSubscribers;
std::mutex fStateChangeSubscriberMutex; std::mutex fStateChangeSubscriberMutex;
boost::asio::io_service fIos; std::thread fHeartbeatThread;
boost::asio::io_service::work fIosWork;
std::thread fIosWorkerThread;
boost::asio::basic_waitable_timer<std::chrono::steady_clock> fHeartbeatTimer;
std::chrono::milliseconds fHeartbeatInterval; std::chrono::milliseconds fHeartbeatInterval;
}; };