mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 08:41:16 +00:00
FairMQ: Move static and interactive control modes to plugin (3)
This commit is contained in:
parent
01327426c3
commit
7dcd09692c
|
@ -16,9 +16,6 @@
|
||||||
#include <thread>
|
#include <thread>
|
||||||
#include <functional>
|
#include <functional>
|
||||||
|
|
||||||
#include <termios.h> // for the InteractiveStateLoop
|
|
||||||
#include <poll.h>
|
|
||||||
|
|
||||||
#include <boost/algorithm/string.hpp> // join/split
|
#include <boost/algorithm/string.hpp> // join/split
|
||||||
|
|
||||||
#include <boost/uuid/uuid.hpp>
|
#include <boost/uuid/uuid.hpp>
|
||||||
|
@ -62,7 +59,7 @@ FairMQDevice::FairMQDevice()
|
||||||
, fDefaultTransport()
|
, fDefaultTransport()
|
||||||
, fInitializationTimeoutInS(120)
|
, fInitializationTimeoutInS(120)
|
||||||
, fCatchingSignals(false)
|
, fCatchingSignals(false)
|
||||||
, fInteractiveRunning(false)
|
, fTerminationRequested(false)
|
||||||
, fDataCallbacks(false)
|
, fDataCallbacks(false)
|
||||||
, fDeviceCmdSockets()
|
, fDeviceCmdSockets()
|
||||||
, fMsgInputs()
|
, fMsgInputs()
|
||||||
|
@ -93,7 +90,7 @@ FairMQDevice::FairMQDevice(const fair::mq::tools::Version version)
|
||||||
, fDefaultTransport()
|
, fDefaultTransport()
|
||||||
, fInitializationTimeoutInS(120)
|
, fInitializationTimeoutInS(120)
|
||||||
, fCatchingSignals(false)
|
, fCatchingSignals(false)
|
||||||
, fInteractiveRunning(false)
|
, fTerminationRequested(false)
|
||||||
, fDataCallbacks(false)
|
, fDataCallbacks(false)
|
||||||
, fDeviceCmdSockets()
|
, fDeviceCmdSockets()
|
||||||
, fMsgInputs()
|
, fMsgInputs()
|
||||||
|
@ -138,7 +135,6 @@ void FairMQDevice::SignalHandler(int signal)
|
||||||
ChangeState(END);
|
ChangeState(END);
|
||||||
|
|
||||||
// exit(EXIT_FAILURE);
|
// exit(EXIT_FAILURE);
|
||||||
fInteractiveRunning = false;
|
|
||||||
LOG(INFO) << "Exiting.";
|
LOG(INFO) << "Exiting.";
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -1136,103 +1132,6 @@ void FairMQDevice::LogSocketRates()
|
||||||
// LOG(DEBUG) << "FairMQDevice::LogSocketRates() stopping";
|
// LOG(DEBUG) << "FairMQDevice::LogSocketRates() stopping";
|
||||||
}
|
}
|
||||||
|
|
||||||
void FairMQDevice::InteractiveStateLoop()
|
|
||||||
{
|
|
||||||
fInteractiveRunning = true;
|
|
||||||
char c; // hold the user console input
|
|
||||||
pollfd cinfd[1];
|
|
||||||
cinfd[0].fd = fileno(stdin);
|
|
||||||
cinfd[0].events = POLLIN;
|
|
||||||
|
|
||||||
struct termios t;
|
|
||||||
tcgetattr(STDIN_FILENO, &t); // get the current terminal I/O structure
|
|
||||||
t.c_lflag &= ~ICANON; // disable canonical input
|
|
||||||
tcsetattr(STDIN_FILENO, TCSANOW, &t); // apply the new settings
|
|
||||||
|
|
||||||
PrintInteractiveStateLoopHelp();
|
|
||||||
|
|
||||||
while (fInteractiveRunning)
|
|
||||||
{
|
|
||||||
if (poll(cinfd, 1, 500))
|
|
||||||
{
|
|
||||||
if (!fInteractiveRunning)
|
|
||||||
{
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
cin >> c;
|
|
||||||
|
|
||||||
switch (c)
|
|
||||||
{
|
|
||||||
case 'i':
|
|
||||||
LOG(INFO) << "[i] init device";
|
|
||||||
ChangeState(INIT_DEVICE);
|
|
||||||
break;
|
|
||||||
case 'j':
|
|
||||||
LOG(INFO) << "[j] init task";
|
|
||||||
ChangeState(INIT_TASK);
|
|
||||||
break;
|
|
||||||
case 'p':
|
|
||||||
LOG(INFO) << "[p] pause";
|
|
||||||
ChangeState(PAUSE);
|
|
||||||
break;
|
|
||||||
case 'r':
|
|
||||||
LOG(INFO) << "[r] run";
|
|
||||||
ChangeState(RUN);
|
|
||||||
break;
|
|
||||||
case 's':
|
|
||||||
LOG(INFO) << "[s] stop";
|
|
||||||
ChangeState(STOP);
|
|
||||||
break;
|
|
||||||
case 't':
|
|
||||||
LOG(INFO) << "[t] reset task";
|
|
||||||
ChangeState(RESET_TASK);
|
|
||||||
break;
|
|
||||||
case 'd':
|
|
||||||
LOG(INFO) << "[d] reset device";
|
|
||||||
ChangeState(RESET_DEVICE);
|
|
||||||
break;
|
|
||||||
case 'h':
|
|
||||||
LOG(INFO) << "[h] help";
|
|
||||||
PrintInteractiveStateLoopHelp();
|
|
||||||
break;
|
|
||||||
// case 'x':
|
|
||||||
// LOG(INFO) << "[x] ERROR";
|
|
||||||
// ChangeState(ERROR_FOUND);
|
|
||||||
// break;
|
|
||||||
case 'q':
|
|
||||||
LOG(INFO) << "[q] end";
|
|
||||||
|
|
||||||
ChangeState(STOP);
|
|
||||||
|
|
||||||
ChangeState(RESET_TASK);
|
|
||||||
WaitForEndOfState(RESET_TASK);
|
|
||||||
|
|
||||||
ChangeState(RESET_DEVICE);
|
|
||||||
WaitForEndOfState(RESET_DEVICE);
|
|
||||||
|
|
||||||
ChangeState(END);
|
|
||||||
|
|
||||||
if (CheckCurrentState(EXITING))
|
|
||||||
{
|
|
||||||
fInteractiveRunning = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
LOG(INFO) << "Exiting.";
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
LOG(INFO) << "Invalid input: [" << c << "]";
|
|
||||||
PrintInteractiveStateLoopHelp();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
tcgetattr(STDIN_FILENO, &t); // get the current terminal I/O structure
|
|
||||||
t.c_lflag |= ICANON; // re-enable canonical input
|
|
||||||
tcsetattr(STDIN_FILENO, TCSANOW, &t); // apply the new settings
|
|
||||||
}
|
|
||||||
|
|
||||||
void FairMQDevice::Unblock()
|
void FairMQDevice::Unblock()
|
||||||
{
|
{
|
||||||
FairMQChannel::fInterrupted = true;
|
FairMQChannel::fInterrupted = true;
|
||||||
|
|
|
@ -299,16 +299,6 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable
|
||||||
/// Waits for the first initialization run to finish
|
/// Waits for the first initialization run to finish
|
||||||
void WaitForInitialValidation();
|
void WaitForInitialValidation();
|
||||||
|
|
||||||
/// Starts interactive (console) loop for controlling the device
|
|
||||||
/// Works only when running in a terminal. Running in background would exit, because no interactive input (std::cin) is possible.
|
|
||||||
void InteractiveStateLoop();
|
|
||||||
/// Prints the available commands of the InteractiveStateLoop()
|
|
||||||
void PrintInteractiveStateLoopHelp()
|
|
||||||
{
|
|
||||||
LOG(INFO) << "Use keys to control the state machine:";
|
|
||||||
LOG(INFO) << "[h] help, [p] pause, [r] run, [s] stop, [t] reset task, [d] reset device, [q] end, [j] init task, [i] init device";
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Set Device properties stored as strings
|
/// Set Device properties stored as strings
|
||||||
/// @param key Property key
|
/// @param key Property key
|
||||||
/// @param value Property value
|
/// @param value Property value
|
||||||
|
@ -569,8 +559,7 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable
|
||||||
/// Signal handler
|
/// Signal handler
|
||||||
void SignalHandler(int signal);
|
void SignalHandler(int signal);
|
||||||
bool fCatchingSignals;
|
bool fCatchingSignals;
|
||||||
// Interactive state loop helper
|
std::atomic<bool> fTerminationRequested;
|
||||||
std::atomic<bool> fInteractiveRunning;
|
|
||||||
|
|
||||||
bool fDataCallbacks;
|
bool fDataCallbacks;
|
||||||
std::unordered_map<FairMQ::Transport, FairMQSocketPtr> fDeviceCmdSockets; ///< Sockets used for the internal unblocking mechanism
|
std::unordered_map<FairMQ::Transport, FairMQSocketPtr> fDeviceCmdSockets; ///< Sockets used for the internal unblocking mechanism
|
||||||
|
|
|
@ -100,8 +100,8 @@ auto PluginServices::ChangeDeviceState(const std::string& controller, const Devi
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
throw DeviceControlError{tools::ToString(
|
throw DeviceControlError{tools::ToString(
|
||||||
"Plugin ", controller, " is not allowed to change device states. ",
|
"Plugin '", controller, "' is not allowed to change device states. ",
|
||||||
"Currently, plugin ", fDeviceController, " has taken control."
|
"Currently, plugin '", fDeviceController, "' has taken control."
|
||||||
)};
|
)};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -121,8 +121,8 @@ auto PluginServices::TakeDeviceControl(const std::string& controller) -> void
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
throw DeviceControlError{tools::ToString(
|
throw DeviceControlError{tools::ToString(
|
||||||
"Plugin ", controller, " is not allowed to take over control. ",
|
"Plugin '", controller, "' is not allowed to take over control. ",
|
||||||
"Currently, plugin ", fDeviceController, " has taken control."
|
"Currently, plugin '", fDeviceController, "' has taken control."
|
||||||
)};
|
)};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -139,7 +139,7 @@ auto PluginServices::ReleaseDeviceControl(const std::string& controller) -> void
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
throw DeviceControlError{tools::ToString("Plugin ", controller, " cannot release control because it has not taken over control.")};
|
throw DeviceControlError{tools::ToString("Plugin '", controller, "' cannot release control because it has not taken over control.")};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -323,7 +323,6 @@ void FairMQProgOptions::InitOptionDescription()
|
||||||
("io-threads", po::value<int >()->default_value(1), "Number of I/O threads.")
|
("io-threads", po::value<int >()->default_value(1), "Number of I/O threads.")
|
||||||
("transport", po::value<string>()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg').")
|
("transport", po::value<string>()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg').")
|
||||||
("config", po::value<string>()->default_value("static"), "Config source ('static'/<config library filename>).")
|
("config", po::value<string>()->default_value("static"), "Config source ('static'/<config library filename>).")
|
||||||
("control", po::value<string>()->default_value("interactive"), "States control ('interactive'/'static'/<control library filename>).")
|
|
||||||
("network-interface", po::value<string>()->default_value("default"), "Network interface to bind on (e.g. eth0, ib0..., default will try to detect the interface of the default route).")
|
("network-interface", po::value<string>()->default_value("default"), "Network interface to bind on (e.g. eth0, ib0..., default will try to detect the interface of the default route).")
|
||||||
("config-key", po::value<string>(), "Use provided value instead of device id for fetching the configuration from the config file.")
|
("config-key", po::value<string>(), "Use provided value instead of device id for fetching the configuration from the config file.")
|
||||||
("catch-signals", po::value<int >()->default_value(1), "Enable signal handling (1/0).")
|
("catch-signals", po::value<int >()->default_value(1), "Enable signal handling (1/0).")
|
||||||
|
@ -341,7 +340,6 @@ void FairMQProgOptions::InitOptionDescription()
|
||||||
("io-threads", po::value<int >()->default_value(1), "Number of I/O threads.")
|
("io-threads", po::value<int >()->default_value(1), "Number of I/O threads.")
|
||||||
("transport", po::value<string>()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg').")
|
("transport", po::value<string>()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg').")
|
||||||
("config", po::value<string>()->default_value("static"), "Config source ('static'/<config library filename>).")
|
("config", po::value<string>()->default_value("static"), "Config source ('static'/<config library filename>).")
|
||||||
("control", po::value<string>()->default_value("interactive"), "States control ('interactive'/'static'/<control library filename>).")
|
|
||||||
("network-interface", po::value<string>()->default_value("default"), "Network interface to bind on (e.g. eth0, ib0..., default will try to detect the interface of the default route).")
|
("network-interface", po::value<string>()->default_value("default"), "Network interface to bind on (e.g. eth0, ib0..., default will try to detect the interface of the default route).")
|
||||||
("config-key", po::value<string>(), "Use provided value instead of device id for fetching the configuration from the config file.")
|
("config-key", po::value<string>(), "Use provided value instead of device id for fetching the configuration from the config file.")
|
||||||
("catch-signals", po::value<int >()->default_value(1), "Enable signal handling (1/0).")
|
("catch-signals", po::value<int >()->default_value(1), "Enable signal handling (1/0).")
|
||||||
|
@ -361,7 +359,6 @@ void FairMQProgOptions::InitOptionDescription()
|
||||||
("io-threads", po::value<int >()->default_value(1), "Number of I/O threads.")
|
("io-threads", po::value<int >()->default_value(1), "Number of I/O threads.")
|
||||||
("transport", po::value<string>()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg').")
|
("transport", po::value<string>()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg').")
|
||||||
("config", po::value<string>()->default_value("static"), "Config source ('static'/<config library filename>).")
|
("config", po::value<string>()->default_value("static"), "Config source ('static'/<config library filename>).")
|
||||||
("control", po::value<string>()->default_value("interactive"), "States control ('interactive'/'static'/<control library filename>).")
|
|
||||||
("network-interface", po::value<string>()->default_value("default"), "Network interface to bind on (e.g. eth0, ib0..., default will try to detect the interface of the default route).")
|
("network-interface", po::value<string>()->default_value("default"), "Network interface to bind on (e.g. eth0, ib0..., default will try to detect the interface of the default route).")
|
||||||
("config-key", po::value<string>(), "Use provided value instead of device id for fetching the configuration from the config file.")
|
("config-key", po::value<string>(), "Use provided value instead of device id for fetching the configuration from the config file.")
|
||||||
("catch-signals", po::value<int >()->default_value(1), "Enable signal handling (1/0).")
|
("catch-signals", po::value<int >()->default_value(1), "Enable signal handling (1/0).")
|
||||||
|
|
|
@ -51,6 +51,7 @@ Control::Control(const string name, const Plugin::Version version, const string
|
||||||
}
|
}
|
||||||
catch (PluginServices::DeviceControlError& e)
|
catch (PluginServices::DeviceControlError& e)
|
||||||
{
|
{
|
||||||
|
// If we are here, it means another plugin has taken control. That's fine, just print the exception message and do nothing else.
|
||||||
LOG(DEBUG) << e.what();
|
LOG(DEBUG) << e.what();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -59,8 +60,7 @@ auto ControlPluginProgramOptions() -> Plugin::ProgOptions
|
||||||
{
|
{
|
||||||
auto pluginOptions = boost::program_options::options_description{"Control (builtin) Plugin"};
|
auto pluginOptions = boost::program_options::options_description{"Control (builtin) Plugin"};
|
||||||
pluginOptions.add_options()
|
pluginOptions.add_options()
|
||||||
("ctrlmode", boost::program_options::value<string>(), "Control mode, 'static' or 'interactive'");
|
("control", boost::program_options::value<string>()->default_value("interactive"), "Control mode, 'static' or 'interactive'");
|
||||||
// should rename to --control and remove control from device options ?
|
|
||||||
return pluginOptions;
|
return pluginOptions;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user