From 10f67e4c7295fd4e60ac0dcb473ecfe78af7c21a Mon Sep 17 00:00:00 2001 From: Dennis Klein Date: Thu, 14 Sep 2017 02:10:00 +0200 Subject: [PATCH] FairMQ: Move static and interactive control modes to plugin NOT YET FINISHED --- fairmq/CMakeLists.txt | 4 +- fairmq/PluginServices.cxx | 11 +- fairmq/options/FairMQProgOptions.cxx | 3 +- fairmq/plugins/Builtin.h | 2 +- fairmq/plugins/Control.cxx | 137 ++++++++++++++++++ fairmq/plugins/{ControlStatic.h => Control.h} | 34 +++-- fairmq/plugins/ControlStatic.cxx | 54 ------- fairmq/runFairMQDevice.h | 62 ++++++-- fairmq/tools/runSimpleMQStateMachine.h | 29 ---- 9 files changed, 223 insertions(+), 113 deletions(-) create mode 100644 fairmq/plugins/Control.cxx rename fairmq/plugins/{ControlStatic.h => Control.h} (65%) delete mode 100644 fairmq/plugins/ControlStatic.cxx diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index e7a48676..9ebd713f 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -86,7 +86,7 @@ set(FAIRMQ_HEADER_FILES PluginManager.h PluginServices.h plugins/Builtin.h - plugins/ControlStatic.h + plugins/Control.h runFairMQDevice.h shmem/FairMQMessageSHM.h shmem/FairMQPollerSHM.h @@ -148,7 +148,7 @@ set(FAIRMQ_SOURCE_FILES Plugin.cxx PluginManager.cxx PluginServices.cxx - plugins/ControlStatic.cxx + plugins/Control.cxx shmem/FairMQMessageSHM.cxx shmem/FairMQPollerSHM.cxx shmem/FairMQRegionSHM.cxx diff --git a/fairmq/PluginServices.cxx b/fairmq/PluginServices.cxx index 1e5428bb..b30c2b87 100644 --- a/fairmq/PluginServices.cxx +++ b/fairmq/PluginServices.cxx @@ -89,9 +89,9 @@ const std::unordered_map void { - lock_guard lock{fDeviceControllerMutex}; - - if(!fDeviceController) fDeviceController = controller; + // lock_guard lock{fDeviceControllerMutex}; + // + // if(!fDeviceController) fDeviceController = controller; if(fDeviceController == controller) { @@ -159,5 +159,8 @@ auto PluginServices::WaitForReleaseDeviceControl() -> void { unique_lock lock{fDeviceControllerMutex}; - fReleaseDeviceControlCondition.wait(lock, [&]{ return !GetDeviceController(); }); + while(GetDeviceController()) + { + fReleaseDeviceControlCondition.wait(lock); + } } diff --git a/fairmq/options/FairMQProgOptions.cxx b/fairmq/options/FairMQProgOptions.cxx index 6398ab25..9a5c781a 100644 --- a/fairmq/options/FairMQProgOptions.cxx +++ b/fairmq/options/FairMQProgOptions.cxx @@ -17,6 +17,7 @@ #include "FairMQParser.h" #include "FairMQSuboptParser.h" #include "FairMQLogger.h" +#include using namespace std; @@ -294,7 +295,7 @@ int FairMQProgOptions::NotifySwitchOption() { if (fVarMap.count("help")) { - LOG(INFO) << fHelpTitle << "\n" << fVisibleOptions; + std::cout << fHelpTitle << std::endl << fVisibleOptions; return 1; } diff --git a/fairmq/plugins/Builtin.h b/fairmq/plugins/Builtin.h index 987573ae..eb1ca6c3 100644 --- a/fairmq/plugins/Builtin.h +++ b/fairmq/plugins/Builtin.h @@ -8,4 +8,4 @@ // List of all builtin plugin headers (the ones which call REGISTER_FAIRMQ_PLUGIN macro) -#include +#include diff --git a/fairmq/plugins/Control.cxx b/fairmq/plugins/Control.cxx new file mode 100644 index 00000000..181a3a37 --- /dev/null +++ b/fairmq/plugins/Control.cxx @@ -0,0 +1,137 @@ +/******************************************************************************** + * Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include "Control.h" +#include +#include + + +using namespace std; + +namespace fair +{ +namespace mq +{ +namespace plugins +{ + +Control::Control( + const string name, + const Plugin::Version version, + const string maintainer, + const string homepage, + PluginServices* pluginServices) +: Plugin(name, version, maintainer, homepage, pluginServices) +{ + try + { + TakeDeviceControl(); + + auto control = GetProperty("control"); + + if(control == "static") + { + LOG(DEBUG) << "Running builtin controller: static"; + thread t(&Control::StaticMode, this); + t.detach(); + } + else if(control == "interactive") + { + LOG(DEBUG) << "Running builtin controller: interactive"; + thread t(&Control::InteractiveMode, this); + t.detach(); + } + else + { + LOG(ERROR) << "Unrecognized control mode '" << control << "' requested via command line. " + << "Ignoring and falling back to interactive control mode."; + thread t(&Control::InteractiveMode, this); + t.detach(); + } + } + catch(PluginServices::DeviceControlError& e) + { + LOG(DEBUG) << e.what(); + } +} + +auto ControlPluginProgramOptions() -> Plugin::ProgOptions +{ + auto plugin_options = boost::program_options::options_description{"Control (builtin) Plugin"}; + plugin_options.add_options() + ("ctrlmode", boost::program_options::value(), "Control mode, 'static' or 'interactive'"); + // should rename to --control and remove control from device options ? + return plugin_options; +} + +auto Control::InteractiveMode() -> void +{ + LOG(ERROR) << "NOT YET IMPLEMENTED"; + ReleaseDeviceControl(); +} + +auto Control::WaitForNextState() -> DeviceState +{ + unique_lock lock{fEventsMutex}; + while(fEvents.empty()) + { + fNewEvent.wait(lock); + } + // lock.lock(); + auto result = fEvents.front(); + fEvents.pop(); + return result; +} + +auto Control::StaticMode() -> void +{ + clock_t cStart = clock(); + auto tStart = chrono::high_resolution_clock::now(); + + SubscribeToDeviceStateChange( + [&](DeviceState newState){ + { + lock_guard lock{fEventsMutex}; + fEvents.push(newState); + } + fNewEvent.notify_one(); + } + ); + + ChangeDeviceState(DeviceStateTransition::InitDevice); + while(WaitForNextState() != DeviceState::DeviceReady) {}; + + clock_t cEnd = std::clock(); + auto tEnd = chrono::high_resolution_clock::now(); + + LOG(DEBUG) << "Init time (CPU) : " << fixed << setprecision(2) << 1000.0 * (cEnd - cStart) / CLOCKS_PER_SEC << " ms"; + LOG(DEBUG) << "Init time (Wall): " << chrono::duration(tEnd - tStart).count() << " ms"; + + ChangeDeviceState(DeviceStateTransition::InitTask); + while(WaitForNextState() != DeviceState::Ready) {}; + ChangeDeviceState(DeviceStateTransition::Run); + // WaitForNextState(); + // ChangeDeviceState(DeviceStateTransition::ResetTask); + // WaitForNextState(); + // WaitForNextState(); + // ChangeDeviceState(DeviceStateTransition::ResetDevice); + // WaitForNextState(); + // WaitForNextState(); + // ChangeDeviceState(DeviceStateTransition::End); + while(WaitForNextState() != DeviceState::Exiting) {}; + LOG(WARN) << "1"; + + UnsubscribeFromDeviceStateChange(); + LOG(WARN) << "2"; + ReleaseDeviceControl(); + LOG(WARN) << "3"; +} + +} /* namespace plugins */ +} /* namespace mq */ +} /* namespace fair */ diff --git a/fairmq/plugins/ControlStatic.h b/fairmq/plugins/Control.h similarity index 65% rename from fairmq/plugins/ControlStatic.h rename to fairmq/plugins/Control.h index e51e965c..3c27092d 100644 --- a/fairmq/plugins/ControlStatic.h +++ b/fairmq/plugins/Control.h @@ -6,11 +6,14 @@ * copied verbatim in the file "LICENSE" * ********************************************************************************/ -#ifndef FAIR_MQ_PLUGINS_CONTROLSTATIC -#define FAIR_MQ_PLUGINS_CONTROLSTATIC +#ifndef FAIR_MQ_PLUGINS_CONTROL +#define FAIR_MQ_PLUGINS_CONTROL #include +#include +#include #include +#include namespace fair { @@ -19,28 +22,39 @@ namespace mq namespace plugins { -class ControlStatic : public Plugin +class Control : public Plugin { public: - ControlStatic( + Control( const std::string name, const Plugin::Version version, const std::string maintainer, const std::string homepage, PluginServices* pluginServices ); -}; /* class ControlStatic */ -auto ControlStaticPluginProgramOptions() -> Plugin::ProgOptions; + private: + + auto InteractiveMode() -> void; + auto StaticMode() -> void; + auto WaitForNextState() -> DeviceState; + + std::queue fEvents; + std::mutex fEventsMutex; + std::condition_variable fNewEvent; + +}; /* class Control */ + +auto ControlPluginProgramOptions() -> Plugin::ProgOptions; REGISTER_FAIRMQ_PLUGIN( - ControlStatic, // Class name - control_static, // Plugin name (string, lower case chars only) + Control, // Class name + control, // Plugin name (string, lower case chars only) (Plugin::Version{1,0,0}), // Version "FairRootGroup ", // Maintainer "https://github.com/FairRootGroup/FairRoot", // Homepage - ControlStaticPluginProgramOptions // Free function which declares custom program options for the plugin + ControlPluginProgramOptions // Free function which declares custom program options for the plugin // signature: () -> boost::optional ) @@ -48,4 +62,4 @@ REGISTER_FAIRMQ_PLUGIN( } /* namespace mq */ } /* namespace fair */ -#endif /* FAIR_MQ_PLUGINS_CONTROLSTATIC */ +#endif /* FAIR_MQ_PLUGINS_CONTROL */ diff --git a/fairmq/plugins/ControlStatic.cxx b/fairmq/plugins/ControlStatic.cxx deleted file mode 100644 index eece3993..00000000 --- a/fairmq/plugins/ControlStatic.cxx +++ /dev/null @@ -1,54 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ - -#include "ControlStatic.h" - -namespace fair -{ -namespace mq -{ -namespace plugins -{ - -ControlStatic::ControlStatic( - const std::string name, - const Plugin::Version version, - const std::string maintainer, - const std::string homepage, - PluginServices* pluginServices) -: Plugin(name, version, maintainer, homepage, pluginServices) -{ - SubscribeToDeviceStateChange( - [&](DeviceState newState){ - LOG(WARN) << newState; - switch (newState) - { - case DeviceState::InitializingDevice: - LOG(WARN) << GetPropertyAsString("custom-example-option"); - SetProperty("custom-example-option", std::string{"new value"}); - break; - case DeviceState::Exiting: - LOG(WARN) << GetProperty("custom-example-option"); - UnsubscribeFromDeviceStateChange(); - break; - } - } - ); -} - -auto ControlStaticPluginProgramOptions() -> Plugin::ProgOptions -{ - auto plugin_options = boost::program_options::options_description{"Control Static Plugin"}; - plugin_options.add_options() - ("custom-example-option", boost::program_options::value(), "Custom option."); - return plugin_options; -} - -} /* namespace plugins */ -} /* namespace mq */ -} /* namespace fair */ diff --git a/fairmq/runFairMQDevice.h b/fairmq/runFairMQDevice.h index 47183efb..a47758b8 100644 --- a/fairmq/runFairMQDevice.h +++ b/fairmq/runFairMQDevice.h @@ -9,10 +9,12 @@ #include #include #include -#include #include +#include #include #include +#include +#include template class GenericFairMQDevice : public FairMQDevice @@ -45,43 +47,79 @@ int main(int argc, const char** argv) { try { + // Call custom program options hook boost::program_options::options_description customOptions("Custom options"); addCustomOptions(customOptions); - // Plugin manager needs to be destroyed after config ! - // TODO Investigate, why + // Create plugin manager and load command line supplied plugins + // Plugin manager needs to be destroyed after config! TODO Investigate why auto pluginManager = fair::mq::PluginManager::MakeFromCommandLineOptions(fair::mq::tools::ToStrVector(argc, argv)); + + // Load builtin plugins last + pluginManager->LoadPlugin("s:control"); + + // Construct command line options parser FairMQProgOptions config; config.AddToCmdLineOptions(customOptions); - pluginManager->ForEachPluginProgOptions([&config](boost::program_options::options_description options){ config.AddToCmdLineOptions(options); }); config.AddToCmdLineOptions(pluginManager->ProgramOptions()); + // Parse command line options config.ParseAll(argc, argv, true); + // Call device creation hook std::shared_ptr device{getDevice(config)}; if (!device) { LOG(ERROR) << "getDevice(): no valid device provided. Exiting."; return 1; } + + // Handle --print-channels + device->RegisterChannelEndpoints(); + if (config.Count("print-channels")) + { + device->PrintRegisteredChannels(); + device->ChangeState(FairMQDevice::END); + return 0; + } - pluginManager->EmplacePluginServices(&config, device); - pluginManager->InstantiatePlugins(); - - int result = runStateMachine(*device, config); - + // Handle --version if (config.Count("version")) { - pluginManager->ForEachPlugin([](fair::mq::Plugin& plugin){ std::cout << "plugin: " << plugin << std::endl; }); + std::cout << "User device version: " << device->GetVersion() << std::endl; + std::cout << "FAIRMQ_INTERFACE_VERSION: " << FAIRMQ_INTERFACE_VERSION << std::endl; + device->ChangeState(FairMQDevice::END); + return 0; } - if (result > 0) + // Handle --catch-signals + if (config.GetValue("catch-signals") > 0) { - return 1; + device->CatchSignals(); } + else + { + LOG(WARN) << "Signal handling (e.g. ctrl+C) has been deactivated via command line argument"; + } + + LOG(DEBUG) << "PID: " << getpid(); + + // Configure device + device->SetConfig(config); + + // Initialize plugin services + pluginManager->EmplacePluginServices(&config, device); + + // Instantiate and run plugins + pluginManager->InstantiatePlugins(); + + // Wait for control plugin to release device control + LOG(ERROR) << "1"; + pluginManager->WaitForPluginsToReleaseDeviceControl(); + LOG(ERROR) << "2"; } catch (std::exception& e) { diff --git a/fairmq/tools/runSimpleMQStateMachine.h b/fairmq/tools/runSimpleMQStateMachine.h index 931172b2..bba18e22 100644 --- a/fairmq/tools/runSimpleMQStateMachine.h +++ b/fairmq/tools/runSimpleMQStateMachine.h @@ -24,35 +24,6 @@ template inline int runStateMachine(TMQDevice& device, FairMQProgOptions& cfg) { - device.RegisterChannelEndpoints(); - if (cfg.Count("print-channels")) - { - device.PrintRegisteredChannels(); - device.ChangeState(TMQDevice::END); - return 0; - } - - if (cfg.Count("version")) - { - std::cout << "User device version: " << device.GetVersion() << std::endl; - std::cout << "FAIRMQ_INTERFACE_VERSION: " << FAIRMQ_INTERFACE_VERSION << std::endl; - - device.ChangeState(TMQDevice::END); - return 0; - } - - if (cfg.GetValue("catch-signals") > 0) - { - device.CatchSignals(); - } - else - { - LOG(WARN) << "Signal handling (e.g. ctrl+C) has been deactivated via command line argument"; - } - - LOG(DEBUG) << "PID: " << getpid(); - - device.SetConfig(cfg); std::string config = cfg.GetValue("config"); std::string control = cfg.GetValue("control");