From 5421922668cd9d4b56e8b96df93606541a09e7e2 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Wed, 27 Apr 2016 15:04:29 +0200 Subject: [PATCH] Add a cmd option to control state change mechanism --- fairmq/FairMQDevice.cxx | 1 + fairmq/options/FairMQProgOptions.cxx | 33 ++++++------ fairmq/run/runBenchmarkSampler.cxx | 47 ++++------------- fairmq/run/runMerger.cxx | 26 +++------ fairmq/run/runProxy.cxx | 28 +++------- fairmq/run/runSink.cxx | 47 ++++------------- fairmq/run/runSplitter.cxx | 26 +++------ fairmq/run/startFairMQBenchmark.sh.in | 10 ++-- fairmq/tools/runSimpleMQStateMachine.h | 73 ++++++++++---------------- 9 files changed, 94 insertions(+), 197 deletions(-) diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index e9e88bea..02a80d13 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -614,6 +614,7 @@ void FairMQDevice::SetTransport(const string& transport) void FairMQDevice::SetConfig(FairMQProgOptions& config) { + LOG(DEBUG) << "PID: " << getpid(); fConfig = &config; fChannels = config.GetFairMQMap(); SetTransport(config.GetValue("transport")); diff --git a/fairmq/options/FairMQProgOptions.cxx b/fairmq/options/FairMQProgOptions.cxx index ec00c070..72b59758 100644 --- a/fairmq/options/FairMQProgOptions.cxx +++ b/fairmq/options/FairMQProgOptions.cxx @@ -173,29 +173,32 @@ void FairMQProgOptions::InitOptionDescription() if (fUseConfigFile) { fMQOptionsInCmd.add_options() - ("id", po::value(), "Device ID (required argument).") - ("io-threads", po::value()->default_value(1), "Number of I/O threads.") - ("transport", po::value()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg').") - ("deployment", po::value()->default_value("static"), "Deployment ('static'/'dds').") - ("network-interface", po::value()->default_value("eth0"), "Network interface to bind on (e.g. eth0, ib0, wlan0, en0, lo...).") + ("id", po::value(), "Device ID (required argument).") + ("io-threads", po::value()->default_value(1), "Number of I/O threads.") + ("transport", po::value()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg').") + ("deployment", po::value()->default_value("static"), "Deployment ('static'/'dds').") + ("control", po::value()->default_value("interactive"), "States control ('interactive'/'static'/'dds').") + ("network-interface", po::value()->default_value("eth0"), "Network interface to bind on (e.g. eth0, ib0, wlan0, en0, lo...).") ; fMQOptionsInCfg.add_options() - ("id", po::value()->required(), "Device ID (required argument).") - ("io-threads", po::value()->default_value(1), "Number of I/O threads.") - ("transport", po::value()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg').") - ("deployment", po::value()->default_value("static"), "Deployment ('static'/'dds').") - ("network-interface", po::value()->default_value("eth0"), "Network interface to bind on (e.g. eth0, ib0, wlan0, en0, lo...).") + ("id", po::value()->required(), "Device ID (required argument).") + ("io-threads", po::value()->default_value(1), "Number of I/O threads.") + ("transport", po::value()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg').") + ("deployment", po::value()->default_value("static"), "Deployment ('static'/'dds').") + ("control", po::value()->default_value("interactive"), "States control ('interactive'/'static'/'dds').") + ("network-interface", po::value()->default_value("eth0"), "Network interface to bind on (e.g. eth0, ib0, wlan0, en0, lo...).") ; } else { fMQOptionsInCmd.add_options() - ("id", po::value()->required(), "Device ID (required argument)") - ("io-threads", po::value()->default_value(1), "Number of I/O threads") - ("transport", po::value()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg').") - ("deployment", po::value()->default_value("static"), "Deployment ('static'/'dds').") - ("network-interface", po::value()->default_value("eth0"), "Network interface to bind on (e.g. eth0, ib0, wlan0, en0, lo...).") + ("id", po::value()->required(), "Device ID (required argument)") + ("io-threads", po::value()->default_value(1), "Number of I/O threads") + ("transport", po::value()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg').") + ("deployment", po::value()->default_value("static"), "Deployment ('static'/'dds').") + ("control", po::value()->default_value("interactive"), "States control ('interactive'/'static'/'dds').") + ("network-interface", po::value()->default_value("eth0"), "Network interface to bind on (e.g. eth0, ib0, wlan0, en0, lo...).") ; } diff --git a/fairmq/run/runBenchmarkSampler.cxx b/fairmq/run/runBenchmarkSampler.cxx index 819b521e..ded04ac6 100644 --- a/fairmq/run/runBenchmarkSampler.cxx +++ b/fairmq/run/runBenchmarkSampler.cxx @@ -17,69 +17,42 @@ #include "boost/program_options.hpp" #include "FairMQLogger.h" -#include "FairMQTools.h" -#include "FairMQParser.h" #include "FairMQProgOptions.h" #include "FairMQBenchmarkSampler.h" +#include "runSimpleMQStateMachine.h" -using namespace std; -using namespace FairMQParser; using namespace boost::program_options; int main(int argc, char** argv) { - FairMQBenchmarkSampler sampler; - sampler.CatchSignals(); - - FairMQProgOptions config; - try { int msgSize; int numMsgs; - options_description sampler_options("Sampler options"); - sampler_options.add_options() + options_description samplerOptions("Sampler options"); + samplerOptions.add_options() ("msg-size", value(&msgSize)->default_value(1000), "Message size in bytes") ("num-msgs", value(&numMsgs)->default_value(0), "Number of messages to send"); - config.AddToCmdLineOptions(sampler_options); + FairMQProgOptions config; + config.AddToCmdLineOptions(samplerOptions); if (config.ParseAll(argc, argv)) { return 0; } - string filename = config.GetValue("config-json-file"); - string id = config.GetValue("id"); - - config.UserParser(filename, id); - - sampler.fChannels = config.GetFairMQMap(); - - LOG(INFO) << "PID: " << getpid(); - - sampler.SetTransport(config.GetValue("transport")); - - sampler.SetProperty(FairMQBenchmarkSampler::Id, id); + FairMQBenchmarkSampler sampler; sampler.SetProperty(FairMQBenchmarkSampler::MsgSize, msgSize); sampler.SetProperty(FairMQBenchmarkSampler::NumMsgs, numMsgs); - sampler.SetProperty(FairMQBenchmarkSampler::NumIoThreads, config.GetValue("io-threads")); - sampler.ChangeState("INIT_DEVICE"); - sampler.WaitForEndOfState("INIT_DEVICE"); - - sampler.ChangeState("INIT_TASK"); - sampler.WaitForEndOfState("INIT_TASK"); - - sampler.ChangeState("RUN"); - sampler.InteractiveStateLoop(); + runStateMachine(sampler, config); } - catch (exception& e) + catch (std::exception& e) { - LOG(ERROR) << e.what(); - LOG(INFO) << "Command line options are the following : "; - config.PrintHelp(); + LOG(ERROR) << "Unhandled Exception reached the top of main: " + << e.what() << ", application will now exit"; return 1; } diff --git a/fairmq/run/runMerger.cxx b/fairmq/run/runMerger.cxx index 5a6b1604..26d188ab 100644 --- a/fairmq/run/runMerger.cxx +++ b/fairmq/run/runMerger.cxx @@ -15,40 +15,28 @@ #include #include "FairMQLogger.h" -#include "FairMQParser.h" #include "FairMQProgOptions.h" #include "FairMQMerger.h" +#include "runSimpleMQStateMachine.h" int main(int argc, char** argv) { - FairMQMerger merger; - merger.CatchSignals(); - - FairMQProgOptions config; - try { + FairMQProgOptions config; + if (config.ParseAll(argc, argv)) { return 0; } - merger.SetConfig(config); - - merger.ChangeState("INIT_DEVICE"); - merger.WaitForEndOfState("INIT_DEVICE"); - - merger.ChangeState("INIT_TASK"); - merger.WaitForEndOfState("INIT_TASK"); - - merger.ChangeState("RUN"); - merger.InteractiveStateLoop(); + FairMQMerger merger; + runStateMachine(merger, config); } catch (std::exception& e) { - LOG(ERROR) << e.what(); - LOG(INFO) << "Command line options are the following: "; - config.PrintHelp(); + LOG(ERROR) << "Unhandled Exception reached the top of main: " + << e.what() << ", application will now exit"; return 1; } diff --git a/fairmq/run/runProxy.cxx b/fairmq/run/runProxy.cxx index ae41d7bf..418195f2 100644 --- a/fairmq/run/runProxy.cxx +++ b/fairmq/run/runProxy.cxx @@ -15,42 +15,28 @@ #include #include "FairMQLogger.h" -#include "FairMQParser.h" #include "FairMQProgOptions.h" #include "FairMQProxy.h" - -using namespace std; +#include "runSimpleMQStateMachine.h" int main(int argc, char** argv) { - FairMQProxy proxy; - proxy.CatchSignals(); - - FairMQProgOptions config; - try { + FairMQProgOptions config; + if (config.ParseAll(argc, argv)) { return 0; } - proxy.SetConfig(config); - - proxy.ChangeState("INIT_DEVICE"); - proxy.WaitForEndOfState("INIT_DEVICE"); - - proxy.ChangeState("INIT_TASK"); - proxy.WaitForEndOfState("INIT_TASK"); - - proxy.ChangeState("RUN"); - proxy.InteractiveStateLoop(); + FairMQProxy proxy; + runStateMachine(proxy, config); } catch (std::exception& e) { - LOG(ERROR) << e.what(); - LOG(INFO) << "Command line options are the following: "; - config.PrintHelp(); + LOG(ERROR) << "Unhandled Exception reached the top of main: " + << e.what() << ", application will now exit"; return 1; } diff --git a/fairmq/run/runSink.cxx b/fairmq/run/runSink.cxx index f22cf567..30ecca9f 100644 --- a/fairmq/run/runSink.cxx +++ b/fairmq/run/runSink.cxx @@ -17,66 +17,39 @@ #include "boost/program_options.hpp" #include "FairMQLogger.h" -#include "FairMQParser.h" #include "FairMQProgOptions.h" #include "FairMQSink.h" +#include "runSimpleMQStateMachine.h" -using namespace std; -using namespace FairMQParser; using namespace boost::program_options; int main(int argc, char** argv) { - FairMQSink sink; - sink.CatchSignals(); - - FairMQProgOptions config; - try { int numMsgs; - options_description sink_options("Sink options"); - sink_options.add_options() + options_description sinkOptions("Sink options"); + sinkOptions.add_options() ("num-msgs", value(&numMsgs)->default_value(0), "Number of messages to receive"); - config.AddToCmdLineOptions(sink_options); + FairMQProgOptions config; + config.AddToCmdLineOptions(sinkOptions); if (config.ParseAll(argc, argv)) { return 0; } - string filename = config.GetValue("config-json-file"); - string id = config.GetValue("id"); -// int ioThreads = config.GetValue("io-threads"); - - config.UserParser(filename, id); - - sink.fChannels = config.GetFairMQMap(); - - LOG(INFO) << "PID: " << getpid(); - - sink.SetTransport(config.GetValue("transport")); - - sink.SetProperty(FairMQSink::Id, id); + FairMQSink sink; sink.SetProperty(FairMQSink::NumMsgs, numMsgs); - sink.SetProperty(FairMQSink::NumIoThreads, config.GetValue("io-threads")); - sink.ChangeState("INIT_DEVICE"); - sink.WaitForEndOfState("INIT_DEVICE"); - - sink.ChangeState("INIT_TASK"); - sink.WaitForEndOfState("INIT_TASK"); - - sink.ChangeState("RUN"); - sink.InteractiveStateLoop(); + runStateMachine(sink, config); } - catch (exception& e) + catch (std::exception& e) { - LOG(ERROR) << e.what(); - LOG(INFO) << "Started with: "; - config.PrintHelp(); + LOG(ERROR) << "Unhandled Exception reached the top of main: " + << e.what() << ", application will now exit"; return 1; } diff --git a/fairmq/run/runSplitter.cxx b/fairmq/run/runSplitter.cxx index 47414917..a4e49ea7 100644 --- a/fairmq/run/runSplitter.cxx +++ b/fairmq/run/runSplitter.cxx @@ -15,40 +15,28 @@ #include #include "FairMQLogger.h" -#include "FairMQParser.h" #include "FairMQProgOptions.h" #include "FairMQSplitter.h" +#include "runSimpleMQStateMachine.h" int main(int argc, char** argv) { - FairMQSplitter splitter; - splitter.CatchSignals(); - - FairMQProgOptions config; - try { + FairMQProgOptions config; + if (config.ParseAll(argc, argv)) { return 0; } - splitter.SetConfig(config); - - splitter.ChangeState("INIT_DEVICE"); - splitter.WaitForEndOfState("INIT_DEVICE"); - - splitter.ChangeState("INIT_TASK"); - splitter.WaitForEndOfState("INIT_TASK"); - - splitter.ChangeState("RUN"); - splitter.InteractiveStateLoop(); + FairMQSplitter splitter; + runStateMachine(splitter, config); } catch (std::exception& e) { - LOG(ERROR) << e.what(); - LOG(INFO) << "Command line options are the following: "; - config.PrintHelp(); + LOG(ERROR) << "Unhandled Exception reached the top of main: " + << e.what() << ", application will now exit"; return 1; } diff --git a/fairmq/run/startFairMQBenchmark.sh.in b/fairmq/run/startFairMQBenchmark.sh.in index 4e7a3201..d9a2e47a 100755 --- a/fairmq/run/startFairMQBenchmark.sh.in +++ b/fairmq/run/startFairMQBenchmark.sh.in @@ -25,16 +25,18 @@ echo "Usage: startBenchmark [message size=1000000] [number of messages=0]" SAMPLER="bsampler" SAMPLER+=" --id bsampler1" #SAMPLER+=" --io-threads 2" +#SAMPLER+=" --control static" #SAMPLER+=" --transport nanomsg" SAMPLER+=" --msg-size $msgSize" SAMPLER+=" --num-msgs $numMsgs" -SAMPLER+=" --config-json-file @CMAKE_BINARY_DIR@/bin/config/benchmark.json" -xterm -geometry 80x23+0+0 -hold -e taskset 0x1 @CMAKE_BINARY_DIR@/bin/$SAMPLER & +SAMPLER+=" --mq-config @CMAKE_BINARY_DIR@/bin/config/benchmark.json" +xterm -geometry 80x23+0+0 -hold -e @CMAKE_BINARY_DIR@/bin/$SAMPLER & SINK="sink" SINK+=" --id sink1" #SINK+=" --io-threads 2" +#SINK+=" --control static" #SINK+=" --transport nanomsg" SINK+=" --num-msgs $numMsgs" -SINK+=" --config-json-file @CMAKE_BINARY_DIR@/bin/config/benchmark.json" -xterm -geometry 80x23+500+0 -hold -e taskset 0x2 @CMAKE_BINARY_DIR@/bin/$SINK & +SINK+=" --mq-config @CMAKE_BINARY_DIR@/bin/config/benchmark.json" +xterm -geometry 80x23+500+0 -hold -e @CMAKE_BINARY_DIR@/bin/$SINK & diff --git a/fairmq/tools/runSimpleMQStateMachine.h b/fairmq/tools/runSimpleMQStateMachine.h index d743c247..e3fe8a26 100644 --- a/fairmq/tools/runSimpleMQStateMachine.h +++ b/fairmq/tools/runSimpleMQStateMachine.h @@ -21,24 +21,14 @@ #include "FairMQParser.h" #include "FairMQProgOptions.h" -// template function that take any device, -// and run a simple MQ state machine configured from a JSON file +// template function that takes any device +// and runs a simple MQ state machine configured from a JSON file template inline int runStateMachine(TMQDevice& device, FairMQProgOptions& config) { device.CatchSignals(); - std::string id = config.GetValue("id"); - int ioThreads = config.GetValue("io-threads"); - - device.fChannels = config.GetFairMQMap(); - - device.SetProperty(TMQDevice::Id, id); - device.SetProperty(TMQDevice::NumIoThreads, ioThreads); - - LOG(INFO) << "PID: " << getpid(); - - device.SetTransport(config.GetValue("transport")); + device.SetConfig(config); device.ChangeState(TMQDevice::INIT_DEVICE); device.WaitForEndOfState(TMQDevice::INIT_DEVICE); @@ -46,43 +36,36 @@ inline int runStateMachine(TMQDevice& device, FairMQProgOptions& config) device.WaitForEndOfState(TMQDevice::INIT_TASK); device.ChangeState(TMQDevice::RUN); - device.InteractiveStateLoop(); - return 0; -} + std::string control = config.GetValue("control"); -template -inline int runNonInteractiveStateMachine(TMQDevice& device, FairMQProgOptions& config) -{ - device.CatchSignals(); - std::string id = config.GetValue("id"); - int ioThreads = config.GetValue("io-threads"); + // TODO: Extend this with DDS (requires optional dependency?)? + if (control == "interactive") + { + device.InteractiveStateLoop(); + } + else if (control == "static") + { + device.ChangeState(TMQDevice::RUN); + device.WaitForEndOfState(TMQDevice::RUN); - device.fChannels = config.GetFairMQMap(); + device.ChangeState(TMQDevice::RESET_TASK); + device.WaitForEndOfState(TMQDevice::RESET_TASK); - device.SetProperty(TMQDevice::Id, id); - device.SetProperty(TMQDevice::NumIoThreads, ioThreads); + device.ChangeState(TMQDevice::RESET_DEVICE); + device.WaitForEndOfState(TMQDevice::RESET_DEVICE); - LOG(INFO) << "PID: " << getpid(); - - device.SetTransport(config.GetValue("transport")); - - device.ChangeState(TMQDevice::INIT_DEVICE); - device.WaitForEndOfState(TMQDevice::INIT_DEVICE); - - device.ChangeState(TMQDevice::INIT_TASK); - device.WaitForEndOfState(TMQDevice::INIT_TASK); - - device.ChangeState(TMQDevice::RUN); - device.WaitForEndOfState(TMQDevice::RUN); - - device.ChangeState(TMQDevice::RESET_TASK); - device.WaitForEndOfState(TMQDevice::RESET_TASK); - - device.ChangeState(TMQDevice::RESET_DEVICE); - device.WaitForEndOfState(TMQDevice::RESET_DEVICE); - - device.ChangeState(TMQDevice::END); + device.ChangeState(TMQDevice::END); + } + else + { + LOG(ERROR) << "Requested control mechanism '" << control << "' is not available."; + LOG(ERROR) << "Currently available are:" + << " 'interactive'" + << ", 'static'" + << ". Exiting."; + exit(EXIT_FAILURE); + } return 0; }