From 334b91785b6fd86c8ff54563eb4c986d73c1c5b6 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Thu, 14 Sep 2017 12:42:07 +0200 Subject: [PATCH] FairMQ: Move static and interactive control modes to plugin (2) --- fairmq/FairMQDevice.cxx | 5 - fairmq/FairMQDevice.h | 8 +- fairmq/Plugin.cxx | 14 +- fairmq/Plugin.h | 7 +- fairmq/PluginManager.h | 7 +- fairmq/PluginServices.cxx | 16 +- fairmq/PluginServices.h | 21 +- fairmq/devices/FairMQBenchmarkSampler.cxx | 15 +- fairmq/devices/FairMQBenchmarkSampler.h | 5 + fairmq/plugins/Control.cxx | 238 ++++++++++++++++------ fairmq/plugins/Control.h | 21 +- fairmq/runFairMQDevice.h | 8 +- fairmq/test/plugin_services/_control.cxx | 2 + fairmq/test/plugins/_plugin_manager.cxx | 8 +- fairmq/tools/runSimpleMQStateMachine.h | 9 - 15 files changed, 246 insertions(+), 138 deletions(-) diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index c4e382e2..88606f56 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -1265,11 +1265,6 @@ void FairMQDevice::Reset() } } -bool FairMQDevice::Terminated() -{ - return fTerminationRequested; -} - const FairMQChannel& FairMQDevice::GetChannel(const std::string& channelName, const int index) const { return fChannels.at(channelName).at(index); diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index 2f58e32e..33cd72d3 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -383,7 +383,6 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable } } - template void OnData(const std::string& channelName, bool (T::* memberFunction)(FairMQParts& parts, int index)) { @@ -410,7 +409,10 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable } } - bool Terminated(); + bool Terminated() + { + return fTerminationRequested; + } const FairMQChannel& GetChannel(const std::string& channelName, const int index = 0) const; @@ -572,7 +574,7 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable /// Signal handler void SignalHandler(int signal); bool fCatchingSignals; - bool fTerminationRequested; + std::atomic fTerminationRequested; // Interactive state loop helper std::atomic fInteractiveRunning; diff --git a/fairmq/Plugin.cxx b/fairmq/Plugin.cxx index 55b9cd10..8b70bef8 100644 --- a/fairmq/Plugin.cxx +++ b/fairmq/Plugin.cxx @@ -12,16 +12,16 @@ using namespace std; fair::mq::Plugin::Plugin(const string name, const Version version, const string maintainer, const string homepage, PluginServices* pluginServices) -: fkName{name} -, fkVersion(version) -, fkMaintainer{maintainer} -, fkHomepage{homepage} -, fPluginServices{pluginServices} + : fkName{name} + , fkVersion(version) + , fkMaintainer{maintainer} + , fkHomepage{homepage} + , fPluginServices{pluginServices} { - LOG(DEBUG) << "Loaded plugin: " << *this; + LOG(DEBUG) << "Loaded plugin: " << *this; } fair::mq::Plugin::~Plugin() { - LOG(DEBUG) << "Unloaded plugin: " << *this; + LOG(DEBUG) << "Unloaded plugin: " << *this; } diff --git a/fairmq/Plugin.h b/fairmq/Plugin.h index e4d0e9c7..966772ba 100644 --- a/fairmq/Plugin.h +++ b/fairmq/Plugin.h @@ -34,8 +34,7 @@ namespace mq */ class Plugin { - public: - + public: using ProgOptions = boost::optional; using Version = tools::Version; @@ -72,6 +71,7 @@ class Plugin auto ChangeDeviceState(const DeviceStateTransition next) -> void { fPluginServices->ChangeDeviceState(fkName, next); } auto SubscribeToDeviceStateChange(std::function callback) -> void { fPluginServices->SubscribeToDeviceStateChange(fkName, callback); } auto UnsubscribeFromDeviceStateChange() -> void { fPluginServices->UnsubscribeFromDeviceStateChange(fkName); } + auto DeviceTerminated() const -> bool { return fPluginServices->DeviceTerminated(); } // device config API // see for docs template @@ -84,8 +84,7 @@ class Plugin // template // auto UnsubscribeFromPropertyChange() -> void { fPluginServices.UnsubscribeFromPropertyChange(fkName); } - private: - + private: const std::string fkName; const Version fkVersion; const std::string fkMaintainer; diff --git a/fairmq/PluginManager.h b/fairmq/PluginManager.h index 90475067..346cb281 100644 --- a/fairmq/PluginManager.h +++ b/fairmq/PluginManager.h @@ -47,8 +47,7 @@ namespace mq */ class PluginManager { - public: - + public: using PluginFactory = std::shared_ptr(PluginServices&); PluginManager(); @@ -79,8 +78,7 @@ class PluginManager auto WaitForPluginsToReleaseDeviceControl() -> void { fPluginServices->WaitForReleaseDeviceControl(); } - private: - + private: static auto ValidateSearchPath(const boost::filesystem::path&) -> void; auto LoadPluginPrelinkedDynamic(const std::string& pluginName) -> void; @@ -118,7 +116,6 @@ class PluginManager std::vector fPluginOrder; std::map fPluginProgOptions; std::unique_ptr fPluginServices; - }; /* class PluginManager */ } /* namespace mq */ diff --git a/fairmq/PluginServices.cxx b/fairmq/PluginServices.cxx index b30c2b87..c57d0438 100644 --- a/fairmq/PluginServices.cxx +++ b/fairmq/PluginServices.cxx @@ -91,9 +91,9 @@ auto PluginServices::ChangeDeviceState(const std::string& controller, const Devi { // lock_guard lock{fDeviceControllerMutex}; // - // if(!fDeviceController) fDeviceController = controller; + // if (!fDeviceController) fDeviceController = controller; - if(fDeviceController == controller) + if (fDeviceController == controller) { fDevice->ChangeState(fkDeviceStateTransitionMap.at(next)); } @@ -110,11 +110,11 @@ auto PluginServices::TakeDeviceControl(const std::string& controller) -> void { lock_guard lock{fDeviceControllerMutex}; - if(!fDeviceController) + if (!fDeviceController) { fDeviceController = controller; } - else if(fDeviceController == controller) + else if (fDeviceController == controller) { // nothing to do } @@ -133,15 +133,13 @@ auto PluginServices::ReleaseDeviceControl(const std::string& controller) -> void { lock_guard lock{fDeviceControllerMutex}; - if(fDeviceController == controller) + if (fDeviceController == controller) { fDeviceController = boost::none; } else { - throw DeviceControlError{tools::ToString( - "Plugin ", controller, " cannot release control because it has not taken over control." - )}; + throw DeviceControlError{tools::ToString("Plugin ", controller, " cannot release control because it has not taken over control.")}; } } @@ -159,7 +157,7 @@ auto PluginServices::WaitForReleaseDeviceControl() -> void { unique_lock lock{fDeviceControllerMutex}; - while(GetDeviceController()) + while (fDeviceController) { fReleaseDeviceControlCondition.wait(lock); } diff --git a/fairmq/PluginServices.h b/fairmq/PluginServices.h index 8c4876b2..1a23c622 100644 --- a/fairmq/PluginServices.h +++ b/fairmq/PluginServices.h @@ -34,12 +34,11 @@ namespace mq */ class PluginServices { - public: - + public: PluginServices() = delete; PluginServices(FairMQProgOptions* config, std::shared_ptr device) - : fDevice{device} - , fConfig{config} + : fDevice{device} + , fConfig{config} { } @@ -77,24 +76,24 @@ class PluginServices /// @brief Convert string to DeviceState /// @param state to convert - /// @return DeviceState enum entry + /// @return DeviceState enum entry /// @throw std::out_of_range if a string cannot be resolved to a DeviceState static auto ToDeviceState(const std::string& state) -> DeviceState { return fkDeviceStateStrMap.at(state); } /// @brief Convert string to DeviceStateTransition /// @param transition to convert - /// @return DeviceStateTransition enum entry + /// @return DeviceStateTransition enum entry /// @throw std::out_of_range if a string cannot be resolved to a DeviceStateTransition static auto ToDeviceStateTransition(const std::string& transition) -> DeviceStateTransition { return fkDeviceStateTransitionStrMap.at(transition); } /// @brief Convert DeviceState to string /// @param state to convert - /// @return string representation of DeviceState enum entry + /// @return string representation of DeviceState enum entry static auto ToStr(DeviceState state) -> std::string { return fkStrDeviceStateMap.at(state); } /// @brief Convert DeviceStateTransition to string /// @param transition to convert - /// @return string representation of DeviceStateTransition enum entry + /// @return string representation of DeviceStateTransition enum entry static auto ToStr(DeviceStateTransition transition) -> std::string { return fkStrDeviceStateTransitionMap.at(transition); } friend auto operator<<(std::ostream& os, const DeviceState& state) -> std::ostream& { return os << ToStr(state); } @@ -149,6 +148,7 @@ class PluginServices /// @param subscriber id auto UnsubscribeFromDeviceStateChange(const std::string& subscriber) -> void { fDevice->UnsubscribeFromStateChange(subscriber); } + auto DeviceTerminated() const -> bool { return fDevice->Terminated(); } // Config API @@ -186,7 +186,7 @@ class PluginServices /// @brief Read config property as string /// @param key /// @return config property value converted to string - /// + /// /// If a type is not supported, the user can provide support by overloading the ostream operator for this type auto GetPropertyAsString(const std::string& key) const -> std::string { return fConfig->GetStringValue(key); } @@ -221,8 +221,7 @@ class PluginServices static const std::unordered_map> fkDeviceStateMap; static const std::unordered_map> fkDeviceStateTransitionMap; - private: - + private: FairMQProgOptions* fConfig; // TODO make it a shared pointer, once old AliceO2 code is cleaned up std::shared_ptr fDevice; boost::optional fDeviceController; diff --git a/fairmq/devices/FairMQBenchmarkSampler.cxx b/fairmq/devices/FairMQBenchmarkSampler.cxx index 80dbe845..0b453e13 100644 --- a/fairmq/devices/FairMQBenchmarkSampler.cxx +++ b/fairmq/devices/FairMQBenchmarkSampler.cxx @@ -16,7 +16,6 @@ #include #include -#include #include "../FairMQLogger.h" #include "../options/FairMQProgOptions.h" @@ -30,6 +29,7 @@ FairMQBenchmarkSampler::FairMQBenchmarkSampler() , fMsgRate(1) , fNumMsgs(0) , fOutChannelName() + , fResetMsgCounter() { } @@ -46,10 +46,13 @@ void FairMQBenchmarkSampler::InitTask() fOutChannelName = fConfig->GetValue("out-channel"); } +void FairMQBenchmarkSampler::PreRun() +{ + fResetMsgCounter = std::thread(&FairMQBenchmarkSampler::ResetMsgCounter, this); +} + void FairMQBenchmarkSampler::Run() { - std::thread resetMsgCounter(&FairMQBenchmarkSampler::ResetMsgCounter, this); - uint64_t numSentMsgs = 0; // store the channel reference to avoid traversing the map on every loop iteration @@ -108,7 +111,11 @@ void FairMQBenchmarkSampler::Run() LOG(INFO) << "Leaving RUNNING state. Sent " << numSentMsgs << " messages in " << chrono::duration(tEnd - tStart).count() << "ms."; - resetMsgCounter.join(); +} + +void FairMQBenchmarkSampler::PostRun() +{ + fResetMsgCounter.join(); } void FairMQBenchmarkSampler::ResetMsgCounter() diff --git a/fairmq/devices/FairMQBenchmarkSampler.h b/fairmq/devices/FairMQBenchmarkSampler.h index f7301ab4..3bf66fb8 100644 --- a/fairmq/devices/FairMQBenchmarkSampler.h +++ b/fairmq/devices/FairMQBenchmarkSampler.h @@ -16,6 +16,7 @@ #define FAIRMQBENCHMARKSAMPLER_H_ #include +#include #include "FairMQDevice.h" @@ -29,6 +30,9 @@ class FairMQBenchmarkSampler : public FairMQDevice FairMQBenchmarkSampler(); virtual ~FairMQBenchmarkSampler(); + void PreRun() override; + void PostRun() override; + void ResetMsgCounter(); protected: @@ -38,6 +42,7 @@ class FairMQBenchmarkSampler : public FairMQDevice int fMsgRate; uint64_t fNumMsgs; std::string fOutChannelName; + std::thread fResetMsgCounter; virtual void InitTask(); virtual void Run(); diff --git a/fairmq/plugins/Control.cxx b/fairmq/plugins/Control.cxx index 181a3a37..d189d35b 100644 --- a/fairmq/plugins/Control.cxx +++ b/fairmq/plugins/Control.cxx @@ -7,9 +7,11 @@ ********************************************************************************/ #include "Control.h" -#include -#include +#include + +#include // for the interactive mode +#include // for the interactive mode using namespace std; @@ -20,13 +22,12 @@ namespace mq namespace plugins { -Control::Control( - const string name, - const Plugin::Version version, - const string maintainer, - const string homepage, - PluginServices* pluginServices) -: Plugin(name, version, maintainer, homepage, pluginServices) +Control::Control(const string name, const Plugin::Version version, const string maintainer, const string homepage, PluginServices* pluginServices) + : Plugin(name, version, maintainer, homepage, pluginServices) + , fControllerThread() + , fEvents() + , fEventsMutex() + , fNewEvent() { try { @@ -34,55 +35,174 @@ Control::Control( auto control = GetProperty("control"); - if(control == "static") + if (control == "static") { LOG(DEBUG) << "Running builtin controller: static"; - thread t(&Control::StaticMode, this); - t.detach(); + fControllerThread = thread(&Control::StaticMode, this); } - else if(control == "interactive") + else if (control == "interactive") { LOG(DEBUG) << "Running builtin controller: interactive"; - thread t(&Control::InteractiveMode, this); - t.detach(); + fControllerThread = thread(&Control::InteractiveMode, this); } else { - LOG(ERROR) << "Unrecognized control mode '" << control << "' requested via command line. " - << "Ignoring and falling back to interactive control mode."; - thread t(&Control::InteractiveMode, this); - t.detach(); + LOG(ERROR) << "Unrecognized control mode '" << control << "' requested via command line. " << "Ignoring and falling back to static control mode."; + fControllerThread = thread(&Control::StaticMode, this); } } - catch(PluginServices::DeviceControlError& e) + catch (PluginServices::DeviceControlError& e) { LOG(DEBUG) << e.what(); - } + } } auto ControlPluginProgramOptions() -> Plugin::ProgOptions { - auto plugin_options = boost::program_options::options_description{"Control (builtin) Plugin"}; - plugin_options.add_options() + auto pluginOptions = boost::program_options::options_description{"Control (builtin) Plugin"}; + pluginOptions.add_options() ("ctrlmode", boost::program_options::value(), "Control mode, 'static' or 'interactive'"); // should rename to --control and remove control from device options ? - return plugin_options; + return pluginOptions; } - + auto Control::InteractiveMode() -> void { - LOG(ERROR) << "NOT YET IMPLEMENTED"; + SubscribeToDeviceStateChange([&](DeviceState newState) + { + { + lock_guard lock{fEventsMutex}; + fEvents.push(newState); + } + fNewEvent.notify_one(); + }); + + ChangeDeviceState(DeviceStateTransition::InitDevice); + while (WaitForNextState() != DeviceState::DeviceReady) {} + + ChangeDeviceState(DeviceStateTransition::InitTask); + while (WaitForNextState() != DeviceState::Ready) {} + ChangeDeviceState(DeviceStateTransition::Run); + + char input; // hold the user console input + pollfd cinfd[1]; + cinfd[0].fd = fileno(stdin); + cinfd[0].events = POLLIN; + + struct termios t; + tcgetattr(STDIN_FILENO, &t); // get the current terminal I/O structure + t.c_lflag &= ~ICANON; // disable canonical input + tcsetattr(STDIN_FILENO, TCSANOW, &t); // apply the new settings + + PrintInteractiveHelp(); + + bool keepRunning = true; + + while (keepRunning) + { + if (poll(cinfd, 1, 500)) + { + if (DeviceTerminated()) + { + keepRunning = false; + break; + } + + cin >> input; + + switch (input) + { + case 'i': + LOG(INFO) << "[i] init device"; + ChangeDeviceState(DeviceStateTransition::InitDevice); + break; + case 'j': + LOG(INFO) << "[j] init task"; + ChangeDeviceState(DeviceStateTransition::InitTask); + break; + case 'p': + LOG(INFO) << "[p] pause"; + ChangeDeviceState(DeviceStateTransition::Pause); + break; + case 'r': + LOG(INFO) << "[r] run"; + ChangeDeviceState(DeviceStateTransition::Run); + break; + case 's': + LOG(INFO) << "[s] stop"; + ChangeDeviceState(DeviceStateTransition::Stop); + break; + case 't': + LOG(INFO) << "[t] reset task"; + ChangeDeviceState(DeviceStateTransition::ResetTask); + break; + case 'd': + LOG(INFO) << "[d] reset device"; + ChangeDeviceState(DeviceStateTransition::ResetDevice); + break; + case 'h': + LOG(INFO) << "[h] help"; + PrintInteractiveHelp(); + break; + // case 'x': + // LOG(INFO) << "[x] ERROR"; + // ChangeDeviceState(DeviceStateTransition::ERROR_FOUND); + // break; + case 'q': + LOG(INFO) << "[q] end"; + + ChangeDeviceState(DeviceStateTransition::Stop); + + ChangeDeviceState(DeviceStateTransition::ResetTask); + while (WaitForNextState() != DeviceState::DeviceReady) {} + + ChangeDeviceState(DeviceStateTransition::ResetDevice); + while (WaitForNextState() != DeviceState::Idle) {} + + ChangeDeviceState(DeviceStateTransition::End); + + if (GetCurrentDeviceState() == DeviceState::Exiting) + { + keepRunning = false; + } + + LOG(INFO) << "Exiting."; + break; + default: + LOG(INFO) << "Invalid input: [" << input << "]"; + PrintInteractiveHelp(); + break; + } + } + + if (DeviceTerminated()) + { + keepRunning = false; + } + } + + tcgetattr(STDIN_FILENO, &t); // get the current terminal I/O structure + t.c_lflag |= ICANON; // re-enable canonical input + tcsetattr(STDIN_FILENO, TCSANOW, &t); // apply the new settings + + UnsubscribeFromDeviceStateChange(); ReleaseDeviceControl(); } +auto Control::PrintInteractiveHelp() -> void +{ + LOG(INFO) << "Use keys to control the state machine:"; + LOG(INFO) << "[h] help, [p] pause, [r] run, [s] stop, [t] reset task, [d] reset device, [q] end, [j] init task, [i] init device"; +} + auto Control::WaitForNextState() -> DeviceState { unique_lock lock{fEventsMutex}; - while(fEvents.empty()) + while (fEvents.empty()) { fNewEvent.wait(lock); } - // lock.lock(); + auto result = fEvents.front(); fEvents.pop(); return result; @@ -90,46 +210,42 @@ auto Control::WaitForNextState() -> DeviceState auto Control::StaticMode() -> void { - clock_t cStart = clock(); - auto tStart = chrono::high_resolution_clock::now(); - - SubscribeToDeviceStateChange( - [&](DeviceState newState){ - { - lock_guard lock{fEventsMutex}; - fEvents.push(newState); - } - fNewEvent.notify_one(); + SubscribeToDeviceStateChange([&](DeviceState newState) + { + { + lock_guard lock{fEventsMutex}; + fEvents.push(newState); } - ); - - ChangeDeviceState(DeviceStateTransition::InitDevice); - while(WaitForNextState() != DeviceState::DeviceReady) {}; - - clock_t cEnd = std::clock(); - auto tEnd = chrono::high_resolution_clock::now(); + fNewEvent.notify_one(); + }); + + ChangeDeviceState(DeviceStateTransition::InitDevice); + while (WaitForNextState() != DeviceState::DeviceReady) {} - LOG(DEBUG) << "Init time (CPU) : " << fixed << setprecision(2) << 1000.0 * (cEnd - cStart) / CLOCKS_PER_SEC << " ms"; - LOG(DEBUG) << "Init time (Wall): " << chrono::duration(tEnd - tStart).count() << " ms"; - ChangeDeviceState(DeviceStateTransition::InitTask); - while(WaitForNextState() != DeviceState::Ready) {}; + while (WaitForNextState() != DeviceState::Ready) {} ChangeDeviceState(DeviceStateTransition::Run); - // WaitForNextState(); - // ChangeDeviceState(DeviceStateTransition::ResetTask); - // WaitForNextState(); - // WaitForNextState(); - // ChangeDeviceState(DeviceStateTransition::ResetDevice); - // WaitForNextState(); - // WaitForNextState(); - // ChangeDeviceState(DeviceStateTransition::End); - while(WaitForNextState() != DeviceState::Exiting) {}; - LOG(WARN) << "1"; + while (WaitForNextState() != DeviceState::Ready) {} + if (!DeviceTerminated()) + { + ChangeDeviceState(DeviceStateTransition::ResetTask); + while (WaitForNextState() != DeviceState::DeviceReady) {} + ChangeDeviceState(DeviceStateTransition::ResetDevice); + while (WaitForNextState() != DeviceState::Idle) {} + ChangeDeviceState(DeviceStateTransition::End); + while (WaitForNextState() != DeviceState::Exiting) {} + } UnsubscribeFromDeviceStateChange(); - LOG(WARN) << "2"; ReleaseDeviceControl(); - LOG(WARN) << "3"; +} + +Control::~Control() +{ + if (fControllerThread.joinable()) + { + fControllerThread.join(); + } } } /* namespace plugins */ diff --git a/fairmq/plugins/Control.h b/fairmq/plugins/Control.h index 3c27092d..d238744f 100644 --- a/fairmq/plugins/Control.h +++ b/fairmq/plugins/Control.h @@ -10,10 +10,12 @@ #define FAIR_MQ_PLUGINS_CONTROL #include + #include #include #include #include +#include namespace fair { @@ -24,26 +26,21 @@ namespace plugins class Control : public Plugin { - public: + public: + Control(const std::string name, const Plugin::Version version, const std::string maintainer, const std::string homepage, PluginServices* pluginServices); - Control( - const std::string name, - const Plugin::Version version, - const std::string maintainer, - const std::string homepage, - PluginServices* pluginServices - ); - - private: + ~Control(); + private: auto InteractiveMode() -> void; + auto PrintInteractiveHelp() -> void; auto StaticMode() -> void; auto WaitForNextState() -> DeviceState; - + + std::thread fControllerThread; std::queue fEvents; std::mutex fEventsMutex; std::condition_variable fNewEvent; - }; /* class Control */ auto ControlPluginProgramOptions() -> Plugin::ProgOptions; diff --git a/fairmq/runFairMQDevice.h b/fairmq/runFairMQDevice.h index a47758b8..442956d5 100644 --- a/fairmq/runFairMQDevice.h +++ b/fairmq/runFairMQDevice.h @@ -54,10 +54,10 @@ int main(int argc, const char** argv) // Create plugin manager and load command line supplied plugins // Plugin manager needs to be destroyed after config! TODO Investigate why auto pluginManager = fair::mq::PluginManager::MakeFromCommandLineOptions(fair::mq::tools::ToStrVector(argc, argv)); - + // Load builtin plugins last pluginManager->LoadPlugin("s:control"); - + // Construct command line options parser FairMQProgOptions config; config.AddToCmdLineOptions(customOptions); @@ -76,7 +76,7 @@ int main(int argc, const char** argv) LOG(ERROR) << "getDevice(): no valid device provided. Exiting."; return 1; } - + // Handle --print-channels device->RegisterChannelEndpoints(); if (config.Count("print-channels")) @@ -117,9 +117,7 @@ int main(int argc, const char** argv) pluginManager->InstantiatePlugins(); // Wait for control plugin to release device control - LOG(ERROR) << "1"; pluginManager->WaitForPluginsToReleaseDeviceControl(); - LOG(ERROR) << "2"; } catch (std::exception& e) { diff --git a/fairmq/test/plugin_services/_control.cxx b/fairmq/test/plugin_services/_control.cxx index aa115bb3..306f906f 100644 --- a/fairmq/test/plugin_services/_control.cxx +++ b/fairmq/test/plugin_services/_control.cxx @@ -34,6 +34,7 @@ TEST_F(PluginServices, OnlySingleController) ASSERT_NO_THROW(mServices.ReleaseDeviceControl("foo")); ASSERT_FALSE(mServices.GetDeviceController()); // take control implicitely + ASSERT_NO_THROW(mServices.TakeDeviceControl("foo")); ASSERT_NO_THROW(mServices.ChangeDeviceState("foo", DeviceStateTransition::InitDevice)); EXPECT_EQ(mServices.GetDeviceController(), string{"foo"}); @@ -47,6 +48,7 @@ TEST_F(PluginServices, OnlySingleController) TEST_F(PluginServices, Control) { ASSERT_EQ(mServices.GetCurrentDeviceState(), DeviceState::Idle); + ASSERT_NO_THROW(mServices.TakeDeviceControl("foo")); ASSERT_NO_THROW(mServices.ChangeDeviceState("foo", DeviceStateTransition::InitDevice)); DeviceState nextState; diff --git a/fairmq/test/plugins/_plugin_manager.cxx b/fairmq/test/plugins/_plugin_manager.cxx index 6311e422..1d8d9c75 100644 --- a/fairmq/test/plugins/_plugin_manager.cxx +++ b/fairmq/test/plugins/_plugin_manager.cxx @@ -72,12 +72,14 @@ TEST(PluginManager, LoadPluginStatic) auto device = make_shared(); mgr.EmplacePluginServices(&config, device); - ASSERT_NO_THROW(mgr.LoadPlugin("s:control_static")); + device->SetTransport("zeromq"); + + ASSERT_NO_THROW(mgr.LoadPlugin("s:control")); ASSERT_NO_THROW(mgr.InstantiatePlugins()); // check order - const auto expected = vector{"control_static"}; + const auto expected = vector{"control"}; auto actual = vector{}; mgr.ForEachPlugin([&](Plugin& plugin){ actual.push_back(plugin.GetName()); }); ASSERT_TRUE(actual == expected); @@ -87,7 +89,7 @@ TEST(PluginManager, LoadPluginStatic) mgr.ForEachPluginProgOptions([&count](const options_description& d){ ++count; }); ASSERT_EQ(count, 1); - control(device); + mgr.WaitForPluginsToReleaseDeviceControl(); } TEST(PluginManager, Factory) diff --git a/fairmq/tools/runSimpleMQStateMachine.h b/fairmq/tools/runSimpleMQStateMachine.h index bba18e22..296f6cd2 100644 --- a/fairmq/tools/runSimpleMQStateMachine.h +++ b/fairmq/tools/runSimpleMQStateMachine.h @@ -27,21 +27,12 @@ inline int runStateMachine(TMQDevice& device, FairMQProgOptions& cfg) std::string config = cfg.GetValue("config"); std::string control = cfg.GetValue("control"); - std::clock_t cStart = std::clock(); - auto tStart = std::chrono::high_resolution_clock::now(); - device.ChangeState(TMQDevice::INIT_DEVICE); // Wait for the binding channels to bind device.WaitForInitialValidation(); device.WaitForEndOfState(TMQDevice::INIT_DEVICE); - std::clock_t cEnd = std::clock(); - auto tEnd = std::chrono::high_resolution_clock::now(); - - LOG(DEBUG) << "Init time (CPU) : " << std::fixed << std::setprecision(2) << 1000.0 * (cEnd - cStart) / CLOCKS_PER_SEC << " ms"; - LOG(DEBUG) << "Init time (Wall): " << std::chrono::duration(tEnd - tStart).count() << " ms"; - device.ChangeState(TMQDevice::INIT_TASK); device.WaitForEndOfState(TMQDevice::INIT_TASK);