From 0cef5692b13528a9708d276aff3165d24102e056 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Tue, 30 Sep 2014 11:37:18 +0200 Subject: [PATCH] Use boost::program_options for managing command line options of the executables. Existing scripts in example/Tutorial3/macro have been updated to use the new format. Your own executables are not affected, but your scripts which use FairMQ executables have to be updated to the new format. Use the `--help` option with any FairMQ executable to find out the available options. --- fairmq/FairMQDevice.cxx | 1 - fairmq/FairMQStateMachine.h | 2 +- fairmq/devices/FairMQBenchmarkSampler.cxx | 1 - fairmq/run/runBenchmarkSampler.cxx | 129 ++++++++++++------ fairmq/run/runBinSampler.cxx | 129 ++++++++++++------ fairmq/run/runBinSink.cxx | 105 +++++++++++---- fairmq/run/runBuffer.cxx | 139 +++++++++++++------ fairmq/run/runMerger.cxx | 155 +++++++++++++++------- fairmq/run/runProtoSampler.cxx | 129 ++++++++++++------ fairmq/run/runProtoSink.cxx | 105 +++++++++++---- fairmq/run/runProxy.cxx | 140 +++++++++++++------ fairmq/run/runSink.cxx | 105 +++++++++++---- fairmq/run/runSplitter.cxx | 155 +++++++++++++++------- 13 files changed, 923 insertions(+), 372 deletions(-) diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index 3d370f73..7b6beb96 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -32,7 +32,6 @@ FairMQDevice::FairMQDevice() void FairMQDevice::Init() { LOG(INFO) << ">>>>>>> Init <<<<<<<"; - LOG(INFO) << "numIoThreads: " << fNumIoThreads; for (int i = 0; i < fNumInputs; ++i) { diff --git a/fairmq/FairMQStateMachine.h b/fairmq/FairMQStateMachine.h index 561ba5a4..701f8e97 100644 --- a/fairmq/FairMQStateMachine.h +++ b/fairmq/FairMQStateMachine.h @@ -73,7 +73,7 @@ namespace FairMQFSM template void operator()(EVT const&, FSM&, SourceState&, TargetState&) { - LOG(STATE) << "Transition from " << typeid(SourceState).name() << " to " << typeid(TargetState).name() << " with event:" << typeid(EVT).name(); + // LOG(STATE) << "Transition from " << typeid(SourceState).name() << " to " << typeid(TargetState).name() << " with event:" << typeid(EVT).name(); } }; struct InitFct diff --git a/fairmq/devices/FairMQBenchmarkSampler.cxx b/fairmq/devices/FairMQBenchmarkSampler.cxx index a0b77b11..6e5c8f64 100644 --- a/fairmq/devices/FairMQBenchmarkSampler.cxx +++ b/fairmq/devices/FairMQBenchmarkSampler.cxx @@ -39,7 +39,6 @@ void FairMQBenchmarkSampler::Init() void FairMQBenchmarkSampler::Run() { LOG(INFO) << ">>>>>>> Run <<<<<<<"; - // boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); boost::thread resetEventCounter(boost::bind(&FairMQBenchmarkSampler::ResetEventCounter, this)); diff --git a/fairmq/run/runBenchmarkSampler.cxx b/fairmq/run/runBenchmarkSampler.cxx index 5c52a718..51351eb0 100644 --- a/fairmq/run/runBenchmarkSampler.cxx +++ b/fairmq/run/runBenchmarkSampler.cxx @@ -15,6 +15,8 @@ #include #include +#include "boost/program_options.hpp" + #include "FairMQLogger.h" #include "FairMQBenchmarkSampler.h" @@ -24,10 +26,7 @@ #include "FairMQTransportFactoryZMQ.h" #endif -using std::cout; -using std::cin; -using std::endl; -using std::stringstream; +using namespace std; FairMQBenchmarkSampler sampler; @@ -52,18 +51,93 @@ static void s_catch_signals(void) sigaction(SIGTERM, &action, NULL); } +typedef struct DeviceOptions +{ + string id; + int eventSize; + int eventRate; + int ioThreads; + string outputSocketType; + int outputBufSize; + string outputMethod; + string outputAddress; +} DeviceOptions_t; + +inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) +{ + if (_options == NULL) + throw std::runtime_error("Internal error: options' container is empty."); + + namespace bpo = boost::program_options; + bpo::options_description desc("Options"); + desc.add_options() + ("id", bpo::value()->required(), "Device ID") + ("event-size", bpo::value()->default_value(1000), "Event size in bytes") + ("event-rate", bpo::value()->default_value(0), "Event rate limit in maximum number of events per second") + ("io-threads", bpo::value()->default_value(1), "Number of I/O threads") + ("output-socket-type", bpo::value()->required(), "Output socket type: pub/push") + ("output-buff-size", bpo::value()->required(), "Output buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") + ("output-method", bpo::value()->required(), "Output method: bind/connect") + ("output-address", bpo::value()->required(), "Output address, e.g.: \"tcp://*:5555\"") + ("help", "Print help messages"); + + bpo::variables_map vm; + bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm); + + if ( vm.count("help") ) + { + LOG(INFO) << "FairMQ Benchmark Sampler" << endl << desc; + return false; + } + + bpo::notify(vm); + + if ( vm.count("id") ) + _options->id = vm["id"].as(); + + if ( vm.count("event-size") ) + _options->eventSize = vm["event-size"].as(); + + if ( vm.count("event-rate") ) + _options->eventRate = vm["event-rate"].as(); + + if ( vm.count("io-threads") ) + _options->ioThreads = vm["io-threads"].as(); + + if ( vm.count("output-socket-type") ) + _options->outputSocketType = vm["output-socket-type"].as(); + + if ( vm.count("output-buff-size") ) + _options->outputBufSize = vm["output-buff-size"].as(); + + if ( vm.count("output-method") ) + _options->outputMethod = vm["output-method"].as(); + + if ( vm.count("output-address") ) + _options->outputAddress = vm["output-address"].as(); + + return true; +} + int main(int argc, char** argv) { - if (argc != 9) + s_catch_signals(); + + DeviceOptions_t options; + try { - cout << "Usage: bsampler ID eventSize eventRate numIoTreads\n" - << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << endl; + if (!parse_cmd_line(argc, argv, &options)) + return 0; + } + catch (exception& e) + { + LOG(ERROR) << e.what(); return 1; } - s_catch_signals(); - LOG(INFO) << "PID: " << getpid(); + LOG(INFO) << "CONFIG: " << "id: " << options.id << ", event size: " << options.eventSize << ", event rate: " << options.eventRate << ", I/O threads: " << options.ioThreads; + LOG(INFO) << "OUTPUT: " << options.outputSocketType << " " << options.outputBufSize << " " << options.outputMethod << " " << options.outputAddress; #ifdef NANOMSG FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); @@ -73,41 +147,20 @@ int main(int argc, char** argv) sampler.SetTransport(transportFactory); - int i = 1; - - sampler.SetProperty(FairMQBenchmarkSampler::Id, argv[i]); - ++i; - - int eventSize; - stringstream(argv[i]) >> eventSize; - sampler.SetProperty(FairMQBenchmarkSampler::EventSize, eventSize); - ++i; - - int eventRate; - stringstream(argv[i]) >> eventRate; - sampler.SetProperty(FairMQBenchmarkSampler::EventRate, eventRate); - ++i; - - int numIoThreads; - stringstream(argv[i]) >> numIoThreads; - sampler.SetProperty(FairMQBenchmarkSampler::NumIoThreads, numIoThreads); - ++i; + sampler.SetProperty(FairMQBenchmarkSampler::Id, options.id); + sampler.SetProperty(FairMQBenchmarkSampler::EventSize, options.eventSize); + sampler.SetProperty(FairMQBenchmarkSampler::EventRate, options.eventRate); + sampler.SetProperty(FairMQBenchmarkSampler::NumIoThreads, options.ioThreads); sampler.SetProperty(FairMQBenchmarkSampler::NumInputs, 0); sampler.SetProperty(FairMQBenchmarkSampler::NumOutputs, 1); sampler.ChangeState(FairMQBenchmarkSampler::INIT); - sampler.SetProperty(FairMQBenchmarkSampler::OutputSocketType, argv[i], 0); - ++i; - int outputSndBufSize; - stringstream(argv[i]) >> outputSndBufSize; - sampler.SetProperty(FairMQBenchmarkSampler::OutputSndBufSize, outputSndBufSize, 0); - ++i; - sampler.SetProperty(FairMQBenchmarkSampler::OutputMethod, argv[i], 0); - ++i; - sampler.SetProperty(FairMQBenchmarkSampler::OutputAddress, argv[i], 0); - ++i; + sampler.SetProperty(FairMQBenchmarkSampler::OutputSocketType, options.outputSocketType); + sampler.SetProperty(FairMQBenchmarkSampler::OutputSndBufSize, options.outputBufSize); + sampler.SetProperty(FairMQBenchmarkSampler::OutputMethod, options.outputMethod); + sampler.SetProperty(FairMQBenchmarkSampler::OutputAddress, options.outputAddress); sampler.ChangeState(FairMQBenchmarkSampler::SETOUTPUT); sampler.ChangeState(FairMQBenchmarkSampler::SETINPUT); diff --git a/fairmq/run/runBinSampler.cxx b/fairmq/run/runBinSampler.cxx index 38c51606..cfd704fc 100644 --- a/fairmq/run/runBinSampler.cxx +++ b/fairmq/run/runBinSampler.cxx @@ -15,6 +15,8 @@ #include #include +#include "boost/program_options.hpp" + #include "FairMQLogger.h" #include "FairMQBinSampler.h" @@ -24,10 +26,7 @@ #include "FairMQTransportFactoryZMQ.h" #endif -using std::cout; -using std::cin; -using std::endl; -using std::stringstream; +using namespace std; FairMQBinSampler sampler; @@ -52,18 +51,93 @@ static void s_catch_signals(void) sigaction(SIGTERM, &action, NULL); } +typedef struct DeviceOptions +{ + string id; + int eventSize; + int eventRate; + int ioThreads; + string outputSocketType; + int outputBufSize; + string outputMethod; + string outputAddress; +} DeviceOptions_t; + +inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) +{ + if (_options == NULL) + throw std::runtime_error("Internal error: options' container is empty."); + + namespace bpo = boost::program_options; + bpo::options_description desc("Options"); + desc.add_options() + ("id", bpo::value()->required(), "Device ID") + ("event-size", bpo::value()->default_value(1000), "Event size in bytes") + ("event-rate", bpo::value()->default_value(0), "Event rate limit in maximum number of events per second") + ("io-threads", bpo::value()->default_value(1), "Number of I/O threads") + ("output-socket-type", bpo::value()->required(), "Output socket type: pub/push") + ("output-buff-size", bpo::value()->required(), "Output buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") + ("output-method", bpo::value()->required(), "Output method: bind/connect") + ("output-address", bpo::value()->required(), "Output address, e.g.: \"tcp://*:5555\"") + ("help", "Print help messages"); + + bpo::variables_map vm; + bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm); + + if ( vm.count("help") ) + { + LOG(INFO) << "FairMQ Bin Sampler" << endl << desc; + return false; + } + + bpo::notify(vm); + + if ( vm.count("id") ) + _options->id = vm["id"].as(); + + if ( vm.count("event-size") ) + _options->eventSize = vm["event-size"].as(); + + if ( vm.count("event-rate") ) + _options->eventRate = vm["event-rate"].as(); + + if ( vm.count("io-threads") ) + _options->ioThreads = vm["io-threads"].as(); + + if ( vm.count("output-socket-type") ) + _options->outputSocketType = vm["output-socket-type"].as(); + + if ( vm.count("output-buff-size") ) + _options->outputBufSize = vm["output-buff-size"].as(); + + if ( vm.count("output-method") ) + _options->outputMethod = vm["output-method"].as(); + + if ( vm.count("output-address") ) + _options->outputAddress = vm["output-address"].as(); + + return true; +} + int main(int argc, char** argv) { - if (argc != 9) + s_catch_signals(); + + DeviceOptions_t options; + try { - cout << "Usage: bsampler ID eventSize eventRate numIoTreads\n" - << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << endl; + if (!parse_cmd_line(argc, argv, &options)) + return 0; + } + catch (exception& e) + { + LOG(ERROR) << e.what(); return 1; } - s_catch_signals(); - LOG(INFO) << "PID: " << getpid(); + LOG(INFO) << "CONFIG: " << "id: " << options.id << ", event size: " << options.eventSize << ", event rate: " << options.eventRate << ", I/O threads: " << options.ioThreads; + LOG(INFO) << "OUTPUT: " << options.outputSocketType << " " << options.outputBufSize << " " << options.outputMethod << " " << options.outputAddress; #ifdef NANOMSG FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); @@ -73,41 +147,20 @@ int main(int argc, char** argv) sampler.SetTransport(transportFactory); - int i = 1; - - sampler.SetProperty(FairMQBinSampler::Id, argv[i]); - ++i; - - int eventSize; - stringstream(argv[i]) >> eventSize; - sampler.SetProperty(FairMQBinSampler::EventSize, eventSize); - ++i; - - int eventRate; - stringstream(argv[i]) >> eventRate; - sampler.SetProperty(FairMQBinSampler::EventRate, eventRate); - ++i; - - int numIoThreads; - stringstream(argv[i]) >> numIoThreads; - sampler.SetProperty(FairMQBinSampler::NumIoThreads, numIoThreads); - ++i; + sampler.SetProperty(FairMQBinSampler::Id, options.id); + sampler.SetProperty(FairMQBinSampler::EventSize, options.eventSize); + sampler.SetProperty(FairMQBinSampler::EventRate, options.eventRate); + sampler.SetProperty(FairMQBinSampler::NumIoThreads, options.ioThreads); sampler.SetProperty(FairMQBinSampler::NumInputs, 0); sampler.SetProperty(FairMQBinSampler::NumOutputs, 1); sampler.ChangeState(FairMQBinSampler::INIT); - sampler.SetProperty(FairMQBinSampler::OutputSocketType, argv[i], 0); - ++i; - int outputSndBufSize; - stringstream(argv[i]) >> outputSndBufSize; - sampler.SetProperty(FairMQBinSampler::OutputSndBufSize, outputSndBufSize, 0); - ++i; - sampler.SetProperty(FairMQBinSampler::OutputMethod, argv[i], 0); - ++i; - sampler.SetProperty(FairMQBinSampler::OutputAddress, argv[i], 0); - ++i; + sampler.SetProperty(FairMQBinSampler::OutputSocketType, options.outputSocketType); + sampler.SetProperty(FairMQBinSampler::OutputSndBufSize, options.outputBufSize); + sampler.SetProperty(FairMQBinSampler::OutputMethod, options.outputMethod); + sampler.SetProperty(FairMQBinSampler::OutputAddress, options.outputAddress); sampler.ChangeState(FairMQBinSampler::SETOUTPUT); sampler.ChangeState(FairMQBinSampler::SETINPUT); diff --git a/fairmq/run/runBinSink.cxx b/fairmq/run/runBinSink.cxx index 7b0c0fcb..dcd7f79e 100644 --- a/fairmq/run/runBinSink.cxx +++ b/fairmq/run/runBinSink.cxx @@ -15,6 +15,8 @@ #include #include +#include "boost/program_options.hpp" + #include "FairMQLogger.h" #include "FairMQBinSink.h" @@ -24,10 +26,7 @@ #include "FairMQTransportFactoryZMQ.h" #endif -using std::cout; -using std::cin; -using std::endl; -using std::stringstream; +using namespace std; FairMQBinSink sink; @@ -52,17 +51,80 @@ static void s_catch_signals(void) sigaction(SIGTERM, &action, NULL); } -int main(int argc, char** argv) +typedef struct DeviceOptions { - if (argc != 7) + string id; + int ioThreads; + string inputSocketType; + int inputBufSize; + string inputMethod; + string inputAddress; +} DeviceOptions_t; + +inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) +{ + if (_options == NULL) + throw std::runtime_error("Internal error: options' container is empty."); + + namespace bpo = boost::program_options; + bpo::options_description desc("Options"); + desc.add_options() + ("id", bpo::value()->required(), "Device ID") + ("io-threads", bpo::value()->default_value(1), "Number of I/O threads") + ("input-socket-type", bpo::value()->required(), "Input socket type: sub/pull") + ("input-buff-size", bpo::value()->required(), "Input buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") + ("input-method", bpo::value()->required(), "Input method: bind/connect") + ("input-address", bpo::value()->required(), "Input address, e.g.: \"tcp://*:5555\"") + ("help", "Print help messages"); + + bpo::variables_map vm; + bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm); + + if ( vm.count("help") ) { - cout << "Usage: sink \tID numIoTreads\n" - << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" << endl; - return 1; + LOG(INFO) << "FairMQ Bin Sink" << endl << desc; + return false; } + bpo::notify(vm); + + if ( vm.count("id") ) + _options->id = vm["id"].as(); + + if ( vm.count("io-threads") ) + _options->ioThreads = vm["io-threads"].as(); + + if ( vm.count("input-socket-type") ) + _options->inputSocketType = vm["input-socket-type"].as(); + + if ( vm.count("input-buff-size") ) + _options->inputBufSize = vm["input-buff-size"].as(); + + if ( vm.count("input-method") ) + _options->inputMethod = vm["input-method"].as(); + + if ( vm.count("input-address") ) + _options->inputAddress = vm["input-address"].as(); + + return true; +} + +int main(int argc, char** argv) +{ s_catch_signals(); + DeviceOptions_t options; + try + { + if (!parse_cmd_line(argc, argv, &options)) + return 0; + } + catch (exception& e) + { + LOG(ERROR) << e.what(); + return 1; + } + LOG(INFO) << "PID: " << getpid(); #ifdef NANOMSG @@ -73,31 +135,18 @@ int main(int argc, char** argv) sink.SetTransport(transportFactory); - int i = 1; - - sink.SetProperty(FairMQBinSink::Id, argv[i]); - ++i; - - int numIoThreads; - stringstream(argv[i]) >> numIoThreads; - sink.SetProperty(FairMQBinSink::NumIoThreads, numIoThreads); - ++i; + sink.SetProperty(FairMQBinSink::Id, options.id); + sink.SetProperty(FairMQBinSink::NumIoThreads, options.ioThreads); sink.SetProperty(FairMQBinSink::NumInputs, 1); sink.SetProperty(FairMQBinSink::NumOutputs, 0); sink.ChangeState(FairMQBinSink::INIT); - sink.SetProperty(FairMQBinSink::InputSocketType, argv[i], 0); - ++i; - int inputRcvBufSize; - stringstream(argv[i]) >> inputRcvBufSize; - sink.SetProperty(FairMQBinSink::InputRcvBufSize, inputRcvBufSize, 0); - ++i; - sink.SetProperty(FairMQBinSink::InputMethod, argv[i], 0); - ++i; - sink.SetProperty(FairMQBinSink::InputAddress, argv[i], 0); - ++i; + sink.SetProperty(FairMQBinSink::InputSocketType, options.inputSocketType); + sink.SetProperty(FairMQBinSink::InputSndBufSize, options.inputBufSize); + sink.SetProperty(FairMQBinSink::InputMethod, options.inputMethod); + sink.SetProperty(FairMQBinSink::InputAddress, options.inputAddress); sink.ChangeState(FairMQBinSink::SETOUTPUT); sink.ChangeState(FairMQBinSink::SETINPUT); diff --git a/fairmq/run/runBuffer.cxx b/fairmq/run/runBuffer.cxx index 85eb17c0..928cab01 100644 --- a/fairmq/run/runBuffer.cxx +++ b/fairmq/run/runBuffer.cxx @@ -15,6 +15,8 @@ #include #include +#include "boost/program_options.hpp" + #include "FairMQLogger.h" #include "FairMQBuffer.h" @@ -24,10 +26,7 @@ #include "FairMQTransportFactoryZMQ.h" #endif -using std::cout; -using std::cin; -using std::endl; -using std::stringstream; +using namespace std; FairMQBuffer buffer; @@ -52,18 +51,100 @@ static void s_catch_signals(void) sigaction(SIGTERM, &action, NULL); } -int main(int argc, char** argv) +typedef struct DeviceOptions { - if (argc != 11) + string id; + int ioThreads; + string inputSocketType; + int inputBufSize; + string inputMethod; + string inputAddress; + string outputSocketType; + int outputBufSize; + string outputMethod; + string outputAddress; +} DeviceOptions_t; + +inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) +{ + if (_options == NULL) + throw std::runtime_error("Internal error: options' container is empty."); + + namespace bpo = boost::program_options; + bpo::options_description desc("Options"); + desc.add_options() + ("id", bpo::value(), "Device ID") + ("io-threads", bpo::value()->default_value(1), "Number of I/O threads") + ("input-socket-type", bpo::value()->required(), "Input socket type: sub/pull") + ("input-buff-size", bpo::value()->required(), "Input buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") + ("input-method", bpo::value()->required(), "Input method: bind/connect") + ("input-address", bpo::value()->required(), "Input address, e.g.: \"tcp://localhost:5555\"") + ("output-socket-type", bpo::value()->required(), "Output socket type: pub/push") + ("output-buff-size", bpo::value()->required(), "Output buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") + ("output-method", bpo::value()->required(), "Output method: bind/connect") + ("output-address", bpo::value()->required(), "Output address, e.g.: \"tcp://localhost:5555\"") + ("help", "Print help messages"); + + bpo::variables_map vm; + bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm); + + if ( vm.count("help") ) { - cout << "Usage: buffer \tID numIoTreads\n" - << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" - << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << endl; - return 1; + LOG(INFO) << "FairMQ Buffer" << endl << desc; + return false; } + bpo::notify(vm); + + if ( vm.count("id") ) + _options->id = vm["id"].as(); + + if ( vm.count("io-threads") ) + _options->ioThreads = vm["io-threads"].as(); + + if ( vm.count("input-socket-type") ) + _options->inputSocketType = vm["input-socket-type"].as(); + + if ( vm.count("input-buff-size") ) + _options->inputBufSize = vm["input-buff-size"].as(); + + if ( vm.count("input-method") ) + _options->inputMethod = vm["input-method"].as(); + + if ( vm.count("input-address") ) + _options->inputAddress = vm["input-address"].as(); + + if ( vm.count("output-socket-type") ) + _options->outputSocketType = vm["output-socket-type"].as(); + + if ( vm.count("output-buff-size") ) + _options->outputBufSize = vm["output-buff-size"].as(); + + if ( vm.count("output-method") ) + _options->outputMethod = vm["output-method"].as(); + + if ( vm.count("output-address") ) + _options->outputAddress = vm["output-address"].as(); + + return true; +} + +int main(int argc, char** argv) +{ s_catch_signals(); + DeviceOptions_t options; + try + { + if (!parse_cmd_line(argc, argv, &options)) + return 0; + } + catch (exception& e) + { + LOG(ERROR) << e.what(); + return 1; + } + LOG(INFO) << "PID: " << getpid(); #ifdef NANOMSG @@ -74,41 +155,23 @@ int main(int argc, char** argv) buffer.SetTransport(transportFactory); - int i = 1; + buffer.SetProperty(FairMQBuffer::Id, options.id); + buffer.SetProperty(FairMQBuffer::NumIoThreads, options.ioThreads); - buffer.SetProperty(FairMQBuffer::Id, argv[i]); - ++i; - - int numIoThreads; - stringstream(argv[i]) >> numIoThreads; - buffer.SetProperty(FairMQBuffer::NumIoThreads, numIoThreads); - ++i; buffer.SetProperty(FairMQBuffer::NumInputs, 1); buffer.SetProperty(FairMQBuffer::NumOutputs, 1); buffer.ChangeState(FairMQBuffer::INIT); - buffer.SetProperty(FairMQBuffer::InputSocketType, argv[i], 0); - ++i; - int inputRcvBufSize; - stringstream(argv[i]) >> inputRcvBufSize; - buffer.SetProperty(FairMQBuffer::InputRcvBufSize, inputRcvBufSize, 0); - ++i; - buffer.SetProperty(FairMQBuffer::InputMethod, argv[i], 0); - ++i; - buffer.SetProperty(FairMQBuffer::InputAddress, argv[i], 0); - ++i; + buffer.SetProperty(FairMQBuffer::InputSocketType, options.inputSocketType); + buffer.SetProperty(FairMQBuffer::InputSndBufSize, options.inputBufSize); + buffer.SetProperty(FairMQBuffer::InputMethod, options.inputMethod); + buffer.SetProperty(FairMQBuffer::InputAddress, options.inputAddress); - buffer.SetProperty(FairMQBuffer::OutputSocketType, argv[i], 0); - ++i; - int outputSndBufSize; - stringstream(argv[i]) >> outputSndBufSize; - buffer.SetProperty(FairMQBuffer::OutputSndBufSize, outputSndBufSize, 0); - ++i; - buffer.SetProperty(FairMQBuffer::OutputMethod, argv[i], 0); - ++i; - buffer.SetProperty(FairMQBuffer::OutputAddress, argv[i], 0); - ++i; + buffer.SetProperty(FairMQBuffer::OutputSocketType, options.outputSocketType); + buffer.SetProperty(FairMQBuffer::OutputSndBufSize, options.outputBufSize); + buffer.SetProperty(FairMQBuffer::OutputMethod, options.outputMethod); + buffer.SetProperty(FairMQBuffer::OutputAddress, options.outputAddress); buffer.ChangeState(FairMQBuffer::SETOUTPUT); buffer.ChangeState(FairMQBuffer::SETINPUT); diff --git a/fairmq/run/runMerger.cxx b/fairmq/run/runMerger.cxx index 3c04da92..a6bc874e 100644 --- a/fairmq/run/runMerger.cxx +++ b/fairmq/run/runMerger.cxx @@ -15,6 +15,8 @@ #include #include +#include "boost/program_options.hpp" + #include "FairMQLogger.h" #include "FairMQMerger.h" @@ -24,10 +26,7 @@ #include "FairMQTransportFactoryZMQ.h" #endif -using std::cout; -using std::cin; -using std::endl; -using std::stringstream; +using namespace std; FairMQMerger merger; @@ -52,20 +51,105 @@ static void s_catch_signals(void) sigaction(SIGTERM, &action, NULL); } -int main(int argc, char** argv) +typedef struct DeviceOptions { - if (argc < 16 || (argc - 8) % 4 != 0) + string id; + int ioThreads; + int numInputs; + vector inputSocketType; + vector inputBufSize; + vector inputMethod; + vector inputAddress; + string outputSocketType; + int outputBufSize; + string outputMethod; + string outputAddress; +} DeviceOptions_t; + +inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) +{ + if (_options == NULL) + throw std::runtime_error("Internal error: options' container is empty."); + + namespace bpo = boost::program_options; + bpo::options_description desc("Options"); + desc.add_options() + ("id", bpo::value()->required(), "Device ID") + ("io-threads", bpo::value()->default_value(1), "Number of I/O threads") + ("num-inputs", bpo::value()->required(), "Number of Merger input sockets") + ("input-socket-type", bpo::value< vector >()->required(), "Input socket type: sub/pull") + ("input-buff-size", bpo::value< vector >()->required(), "Input buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") + ("input-method", bpo::value< vector >()->required(), "Input method: bind/connect") + ("input-address", bpo::value< vector >()->required(), "Input address, e.g.: \"tcp://localhost:5555\"") + ("output-socket-type", bpo::value()->required(), "Output socket type: pub/push") + ("output-buff-size", bpo::value()->required(), "Output buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") + ("output-method", bpo::value()->required(), "Output method: bind/connect") + ("output-address", bpo::value()->required(), "Output address, e.g.: \"tcp://localhost:5555\"") + ("help", "Print help messages"); + + bpo::variables_map vm; + bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm); + + if ( vm.count("help") ) { - cout << "Usage: merger \tID numIoTreads numInputs\n" - << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" - << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" - << "\t\t...\n" - << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << argc << " arguments provided" << endl; - return 1; + LOG(INFO) << "FairMQ Merger" << endl << desc; + return false; } + bpo::notify(vm); + + if ( vm.count("id") ) + _options->id = vm["id"].as(); + + if ( vm.count("io-threads") ) + _options->ioThreads = vm["io-threads"].as(); + + if ( vm.count("num-inputs") ) + _options->numInputs = vm["num-inputs"].as(); + + if ( vm.count("input-socket-type") ) + _options->inputSocketType = vm["input-socket-type"].as< vector >(); + + if ( vm.count("input-buff-size") ) + _options->inputBufSize = vm["input-buff-size"].as< vector >(); + + if ( vm.count("input-method") ) + _options->inputMethod = vm["input-method"].as< vector >(); + + if ( vm.count("input-address") ) + _options->inputAddress = vm["input-address"].as< vector >(); + + if ( vm.count("output-socket-type") ) + _options->outputSocketType = vm["output-socket-type"].as(); + + if ( vm.count("output-buff-size") ) + _options->outputBufSize = vm["output-buff-size"].as(); + + if ( vm.count("output-method") ) + _options->outputMethod = vm["output-method"].as(); + + if ( vm.count("output-address") ) + _options->outputAddress = vm["output-address"].as(); + + return true; +} + +int main(int argc, char** argv) +{ s_catch_signals(); + DeviceOptions_t options; + try + { + if (!parse_cmd_line(argc, argv, &options)) + return 0; + } + catch (exception& e) + { + LOG(ERROR) << e.what(); + return 1; + } + LOG(INFO) << "PID: " << getpid(); #ifdef NANOMSG @@ -76,49 +160,26 @@ int main(int argc, char** argv) merger.SetTransport(transportFactory); - int i = 1; - - merger.SetProperty(FairMQMerger::Id, argv[i]); - ++i; - - int numIoThreads; - stringstream(argv[i]) >> numIoThreads; - merger.SetProperty(FairMQMerger::NumIoThreads, numIoThreads); - ++i; - - int numInputs; - stringstream(argv[i]) >> numInputs; - merger.SetProperty(FairMQMerger::NumInputs, numInputs); - ++i; + merger.SetProperty(FairMQMerger::Id, options.id); + merger.SetProperty(FairMQMerger::NumIoThreads, options.ioThreads); + merger.SetProperty(FairMQMerger::NumInputs, options.numInputs); merger.SetProperty(FairMQMerger::NumOutputs, 1); merger.ChangeState(FairMQMerger::INIT); - for (int iInput = 0; iInput < numInputs; iInput++) + for (int i = 0; i < options.numInputs; ++i) { - merger.SetProperty(FairMQMerger::InputSocketType, argv[i], iInput); - ++i; - int inputRcvBufSize; - stringstream(argv[i]) >> inputRcvBufSize; - merger.SetProperty(FairMQMerger::InputRcvBufSize, inputRcvBufSize, iInput); - ++i; - merger.SetProperty(FairMQMerger::InputMethod, argv[i], iInput); - ++i; - merger.SetProperty(FairMQMerger::InputAddress, argv[i], iInput); - ++i; + merger.SetProperty(FairMQMerger::InputSocketType, options.inputSocketType.at(i), i); + merger.SetProperty(FairMQMerger::InputRcvBufSize, options.inputBufSize.at(i), i); + merger.SetProperty(FairMQMerger::InputMethod, options.inputMethod.at(i), i); + merger.SetProperty(FairMQMerger::InputAddress, options.inputAddress.at(i), i); } - merger.SetProperty(FairMQMerger::OutputSocketType, argv[i], 0); - ++i; - int outputSndBufSize; - stringstream(argv[i]) >> outputSndBufSize; - merger.SetProperty(FairMQMerger::OutputSndBufSize, outputSndBufSize, 0); - ++i; - merger.SetProperty(FairMQMerger::OutputMethod, argv[i], 0); - ++i; - merger.SetProperty(FairMQMerger::OutputAddress, argv[i], 0); - ++i; + merger.SetProperty(FairMQMerger::OutputSocketType, options.outputSocketType); + merger.SetProperty(FairMQMerger::OutputSndBufSize, options.outputBufSize); + merger.SetProperty(FairMQMerger::OutputMethod, options.outputMethod); + merger.SetProperty(FairMQMerger::OutputAddress, options.outputAddress); merger.ChangeState(FairMQMerger::SETOUTPUT); merger.ChangeState(FairMQMerger::SETINPUT); diff --git a/fairmq/run/runProtoSampler.cxx b/fairmq/run/runProtoSampler.cxx index bbe5cb8d..eccd0777 100644 --- a/fairmq/run/runProtoSampler.cxx +++ b/fairmq/run/runProtoSampler.cxx @@ -15,6 +15,8 @@ #include #include +#include "boost/program_options.hpp" + #include "FairMQLogger.h" #include "FairMQProtoSampler.h" @@ -24,10 +26,7 @@ #include "FairMQTransportFactoryZMQ.h" #endif -using std::cout; -using std::cin; -using std::endl; -using std::stringstream; +using namespace std; FairMQProtoSampler sampler; @@ -52,18 +51,93 @@ static void s_catch_signals(void) sigaction(SIGTERM, &action, NULL); } +typedef struct DeviceOptions +{ + string id; + int eventSize; + int eventRate; + int ioThreads; + string outputSocketType; + int outputBufSize; + string outputMethod; + string outputAddress; +} DeviceOptions_t; + +inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) +{ + if (_options == NULL) + throw std::runtime_error("Internal error: options' container is empty."); + + namespace bpo = boost::program_options; + bpo::options_description desc("Options"); + desc.add_options() + ("id", bpo::value()->required(), "Device ID") + ("event-size", bpo::value()->default_value(1000), "Event size in bytes") + ("event-rate", bpo::value()->default_value(0), "Event rate limit in maximum number of events per second") + ("io-threads", bpo::value()->default_value(1), "Number of I/O threads") + ("output-socket-type", bpo::value()->required(), "Output socket type: pub/push") + ("output-buff-size", bpo::value()->required(), "Output buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") + ("output-method", bpo::value()->required(), "Output method: bind/connect") + ("output-address", bpo::value()->required(), "Output address, e.g.: \"tcp://*:5555\"") + ("help", "Print help messages"); + + bpo::variables_map vm; + bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm); + + if ( vm.count("help") ) + { + LOG(INFO) << "FairMQ Proto Sampler" << endl << desc; + return false; + } + + bpo::notify(vm); + + if ( vm.count("id") ) + _options->id = vm["id"].as(); + + if ( vm.count("event-size") ) + _options->eventSize = vm["event-size"].as(); + + if ( vm.count("event-rate") ) + _options->eventRate = vm["event-rate"].as(); + + if ( vm.count("io-threads") ) + _options->ioThreads = vm["io-threads"].as(); + + if ( vm.count("output-socket-type") ) + _options->outputSocketType = vm["output-socket-type"].as(); + + if ( vm.count("output-buff-size") ) + _options->outputBufSize = vm["output-buff-size"].as(); + + if ( vm.count("output-method") ) + _options->outputMethod = vm["output-method"].as(); + + if ( vm.count("output-address") ) + _options->outputAddress = vm["output-address"].as(); + + return true; +} + int main(int argc, char** argv) { - if (argc != 9) + s_catch_signals(); + + DeviceOptions_t options; + try { - cout << "Usage: bsampler ID eventSize eventRate numIoTreads\n" - << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << endl; + if (!parse_cmd_line(argc, argv, &options)) + return 0; + } + catch (exception& e) + { + LOG(ERROR) << e.what(); return 1; } - s_catch_signals(); - LOG(INFO) << "PID: " << getpid(); + LOG(INFO) << "CONFIG: " << "id: " << options.id << ", event size: " << options.eventSize << ", event rate: " << options.eventRate << ", I/O threads: " << options.ioThreads; + LOG(INFO) << "OUTPUT: " << options.outputSocketType << " " << options.outputBufSize << " " << options.outputMethod << " " << options.outputAddress; #ifdef NANOMSG FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); @@ -73,41 +147,20 @@ int main(int argc, char** argv) sampler.SetTransport(transportFactory); - int i = 1; - - sampler.SetProperty(FairMQProtoSampler::Id, argv[i]); - ++i; - - int eventSize; - stringstream(argv[i]) >> eventSize; - sampler.SetProperty(FairMQProtoSampler::EventSize, eventSize); - ++i; - - int eventRate; - stringstream(argv[i]) >> eventRate; - sampler.SetProperty(FairMQProtoSampler::EventRate, eventRate); - ++i; - - int numIoThreads; - stringstream(argv[i]) >> numIoThreads; - sampler.SetProperty(FairMQProtoSampler::NumIoThreads, numIoThreads); - ++i; + sampler.SetProperty(FairMQProtoSampler::Id, options.id); + sampler.SetProperty(FairMQProtoSampler::EventSize, options.eventSize); + sampler.SetProperty(FairMQProtoSampler::EventRate, options.eventRate); + sampler.SetProperty(FairMQProtoSampler::NumIoThreads, options.ioThreads); sampler.SetProperty(FairMQProtoSampler::NumInputs, 0); sampler.SetProperty(FairMQProtoSampler::NumOutputs, 1); sampler.ChangeState(FairMQProtoSampler::INIT); - sampler.SetProperty(FairMQProtoSampler::OutputSocketType, argv[i], 0); - ++i; - int outputSndBufSize; - stringstream(argv[i]) >> outputSndBufSize; - sampler.SetProperty(FairMQProtoSampler::OutputSndBufSize, outputSndBufSize, 0); - ++i; - sampler.SetProperty(FairMQProtoSampler::OutputMethod, argv[i], 0); - ++i; - sampler.SetProperty(FairMQProtoSampler::OutputAddress, argv[i], 0); - ++i; + sampler.SetProperty(FairMQProtoSampler::OutputSocketType, options.outputSocketType); + sampler.SetProperty(FairMQProtoSampler::OutputSndBufSize, options.outputBufSize); + sampler.SetProperty(FairMQProtoSampler::OutputMethod, options.outputMethod); + sampler.SetProperty(FairMQProtoSampler::OutputAddress, options.outputAddress); sampler.ChangeState(FairMQProtoSampler::SETOUTPUT); sampler.ChangeState(FairMQProtoSampler::SETINPUT); diff --git a/fairmq/run/runProtoSink.cxx b/fairmq/run/runProtoSink.cxx index dc694146..f40635ee 100644 --- a/fairmq/run/runProtoSink.cxx +++ b/fairmq/run/runProtoSink.cxx @@ -15,6 +15,8 @@ #include #include +#include "boost/program_options.hpp" + #include "FairMQLogger.h" #include "FairMQProtoSink.h" @@ -24,10 +26,7 @@ #include "FairMQTransportFactoryZMQ.h" #endif -using std::cout; -using std::cin; -using std::endl; -using std::stringstream; +using namespace std; FairMQProtoSink sink; @@ -52,17 +51,80 @@ static void s_catch_signals(void) sigaction(SIGTERM, &action, NULL); } -int main(int argc, char** argv) +typedef struct DeviceOptions { - if (argc != 7) + string id; + int ioThreads; + string inputSocketType; + int inputBufSize; + string inputMethod; + string inputAddress; +} DeviceOptions_t; + +inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) +{ + if (_options == NULL) + throw std::runtime_error("Internal error: options' container is empty."); + + namespace bpo = boost::program_options; + bpo::options_description desc("Options"); + desc.add_options() + ("id", bpo::value()->required(), "Device ID") + ("io-threads", bpo::value()->default_value(1), "Number of I/O threads") + ("input-socket-type", bpo::value()->required(), "Input socket type: sub/pull") + ("input-buff-size", bpo::value()->required(), "Input buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") + ("input-method", bpo::value()->required(), "Input method: bind/connect") + ("input-address", bpo::value()->required(), "Input address, e.g.: \"tcp://*:5555\"") + ("help", "Print help messages"); + + bpo::variables_map vm; + bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm); + + if ( vm.count("help") ) { - cout << "Usage: sink \tID numIoTreads\n" - << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" << endl; - return 1; + LOG(INFO) << "FairMQ Proto Sink" << endl << desc; + return false; } + bpo::notify(vm); + + if ( vm.count("id") ) + _options->id = vm["id"].as(); + + if ( vm.count("io-threads") ) + _options->ioThreads = vm["io-threads"].as(); + + if ( vm.count("input-socket-type") ) + _options->inputSocketType = vm["input-socket-type"].as(); + + if ( vm.count("input-buff-size") ) + _options->inputBufSize = vm["input-buff-size"].as(); + + if ( vm.count("input-method") ) + _options->inputMethod = vm["input-method"].as(); + + if ( vm.count("input-address") ) + _options->inputAddress = vm["input-address"].as(); + + return true; +} + +int main(int argc, char** argv) +{ s_catch_signals(); + DeviceOptions_t options; + try + { + if (!parse_cmd_line(argc, argv, &options)) + return 0; + } + catch (exception& e) + { + LOG(ERROR) << e.what(); + return 1; + } + LOG(INFO) << "PID: " << getpid(); #ifdef NANOMSG @@ -73,31 +135,18 @@ int main(int argc, char** argv) sink.SetTransport(transportFactory); - int i = 1; - - sink.SetProperty(FairMQProtoSink::Id, argv[i]); - ++i; - - int numIoThreads; - stringstream(argv[i]) >> numIoThreads; - sink.SetProperty(FairMQProtoSink::NumIoThreads, numIoThreads); - ++i; + sink.SetProperty(FairMQProtoSink::Id, options.id); + sink.SetProperty(FairMQProtoSink::NumIoThreads, options.ioThreads); sink.SetProperty(FairMQProtoSink::NumInputs, 1); sink.SetProperty(FairMQProtoSink::NumOutputs, 0); sink.ChangeState(FairMQProtoSink::INIT); - sink.SetProperty(FairMQProtoSink::InputSocketType, argv[i], 0); - ++i; - int inputRcvBufSize; - stringstream(argv[i]) >> inputRcvBufSize; - sink.SetProperty(FairMQProtoSink::InputRcvBufSize, inputRcvBufSize, 0); - ++i; - sink.SetProperty(FairMQProtoSink::InputMethod, argv[i], 0); - ++i; - sink.SetProperty(FairMQProtoSink::InputAddress, argv[i], 0); - ++i; + sink.SetProperty(FairMQProtoSink::InputSocketType, options.inputSocketType); + sink.SetProperty(FairMQProtoSink::InputSndBufSize, options.inputBufSize); + sink.SetProperty(FairMQProtoSink::InputMethod, options.inputMethod); + sink.SetProperty(FairMQProtoSink::InputAddress, options.inputAddress); sink.ChangeState(FairMQProtoSink::SETOUTPUT); sink.ChangeState(FairMQProtoSink::SETINPUT); diff --git a/fairmq/run/runProxy.cxx b/fairmq/run/runProxy.cxx index e3df0125..434274fa 100644 --- a/fairmq/run/runProxy.cxx +++ b/fairmq/run/runProxy.cxx @@ -15,6 +15,8 @@ #include #include +#include "boost/program_options.hpp" + #include "FairMQLogger.h" #include "FairMQProxy.h" @@ -24,10 +26,7 @@ #include "FairMQTransportFactoryZMQ.h" #endif -using std::cout; -using std::cin; -using std::endl; -using std::stringstream; +using namespace std; FairMQProxy proxy; @@ -52,18 +51,100 @@ static void s_catch_signals(void) sigaction(SIGTERM, &action, NULL); } -int main(int argc, char** argv) +typedef struct DeviceOptions { - if (argc != 11) + string id; + int ioThreads; + string inputSocketType; + int inputBufSize; + string inputMethod; + string inputAddress; + string outputSocketType; + int outputBufSize; + string outputMethod; + string outputAddress; +} DeviceOptions_t; + +inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) +{ + if (_options == NULL) + throw std::runtime_error("Internal error: options' container is empty."); + + namespace bpo = boost::program_options; + bpo::options_description desc("Options"); + desc.add_options() + ("id", bpo::value()->required(), "Device ID") + ("io-threads", bpo::value()->default_value(1), "Number of I/O threads") + ("input-socket-type", bpo::value()->required(), "Input socket type: sub/pull") + ("input-buff-size", bpo::value()->required(), "Input buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") + ("input-method", bpo::value()->required(), "Input method: bind/connect") + ("input-address", bpo::value()->required(), "Input address, e.g.: \"tcp://localhost:5555\"") + ("output-socket-type", bpo::value()->required(), "Output socket type: pub/push") + ("output-buff-size", bpo::value()->required(), "Output buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") + ("output-method", bpo::value()->required(), "Output method: bind/connect") + ("output-address", bpo::value()->required(), "Output address, e.g.: \"tcp://localhost:5555\"") + ("help", "Print help messages"); + + bpo::variables_map vm; + bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm); + + if ( vm.count("help") ) { - cout << "Usage: proxy \tID numIoTreads\n" - << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" - << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << endl; - return 1; + LOG(INFO) << "FairMQ Proxy" << endl << desc; + return false; } + bpo::notify(vm); + + if ( vm.count("id") ) + _options->id = vm["id"].as(); + + if ( vm.count("io-threads") ) + _options->ioThreads = vm["io-threads"].as(); + + if ( vm.count("input-socket-type") ) + _options->inputSocketType = vm["input-socket-type"].as(); + + if ( vm.count("input-buff-size") ) + _options->inputBufSize = vm["input-buff-size"].as(); + + if ( vm.count("input-method") ) + _options->inputMethod = vm["input-method"].as(); + + if ( vm.count("input-address") ) + _options->inputAddress = vm["input-address"].as(); + + if ( vm.count("output-socket-type") ) + _options->outputSocketType = vm["output-socket-type"].as(); + + if ( vm.count("output-buff-size") ) + _options->outputBufSize = vm["output-buff-size"].as(); + + if ( vm.count("output-method") ) + _options->outputMethod = vm["output-method"].as(); + + if ( vm.count("output-address") ) + _options->outputAddress = vm["output-address"].as(); + + return true; +} + +int main(int argc, char** argv) +{ s_catch_signals(); + DeviceOptions_t options; + try + { + if (!parse_cmd_line(argc, argv, &options)) + return 0; + } + catch (exception& e) + { + LOG(ERROR) << e.what(); + return 1; + } + LOG(INFO) << "PID: " << getpid(); #ifdef NANOMSG @@ -74,42 +155,23 @@ int main(int argc, char** argv) proxy.SetTransport(transportFactory); - int i = 1; - - proxy.SetProperty(FairMQProxy::Id, argv[i]); - ++i; - - int numIoThreads; - stringstream(argv[i]) >> numIoThreads; - proxy.SetProperty(FairMQProxy::NumIoThreads, numIoThreads); - ++i; + proxy.SetProperty(FairMQProxy::Id, options.id); + proxy.SetProperty(FairMQProxy::NumIoThreads, options.ioThreads); proxy.SetProperty(FairMQProxy::NumInputs, 1); proxy.SetProperty(FairMQProxy::NumOutputs, 1); proxy.ChangeState(FairMQProxy::INIT); - proxy.SetProperty(FairMQProxy::InputSocketType, argv[i], 0); - ++i; - int inputRcvBufSize; - stringstream(argv[i]) >> inputRcvBufSize; - proxy.SetProperty(FairMQProxy::InputRcvBufSize, inputRcvBufSize, 0); - ++i; - proxy.SetProperty(FairMQProxy::InputMethod, argv[i], 0); - ++i; - proxy.SetProperty(FairMQProxy::InputAddress, argv[i], 0); - ++i; + proxy.SetProperty(FairMQProxy::InputSocketType, options.inputSocketType); + proxy.SetProperty(FairMQProxy::InputSndBufSize, options.inputBufSize); + proxy.SetProperty(FairMQProxy::InputMethod, options.inputMethod); + proxy.SetProperty(FairMQProxy::InputAddress, options.inputAddress); - proxy.SetProperty(FairMQProxy::OutputSocketType, argv[i], 0); - ++i; - int outputSndBufSize; - stringstream(argv[i]) >> outputSndBufSize; - proxy.SetProperty(FairMQProxy::OutputSndBufSize, outputSndBufSize, 0); - ++i; - proxy.SetProperty(FairMQProxy::OutputMethod, argv[i], 0); - ++i; - proxy.SetProperty(FairMQProxy::OutputAddress, argv[i], 0); - ++i; + proxy.SetProperty(FairMQProxy::OutputSocketType, options.outputSocketType); + proxy.SetProperty(FairMQProxy::OutputSndBufSize, options.outputBufSize); + proxy.SetProperty(FairMQProxy::OutputMethod, options.outputMethod); + proxy.SetProperty(FairMQProxy::OutputAddress, options.outputAddress); proxy.ChangeState(FairMQProxy::SETOUTPUT); proxy.ChangeState(FairMQProxy::SETINPUT); diff --git a/fairmq/run/runSink.cxx b/fairmq/run/runSink.cxx index 02c35ce9..8e9de458 100644 --- a/fairmq/run/runSink.cxx +++ b/fairmq/run/runSink.cxx @@ -15,6 +15,8 @@ #include #include +#include "boost/program_options.hpp" + #include "FairMQLogger.h" #include "FairMQSink.h" @@ -24,10 +26,7 @@ #include "FairMQTransportFactoryZMQ.h" #endif -using std::cout; -using std::cin; -using std::endl; -using std::stringstream; +using namespace std; FairMQSink sink; @@ -52,17 +51,80 @@ static void s_catch_signals(void) sigaction(SIGTERM, &action, NULL); } -int main(int argc, char** argv) +typedef struct DeviceOptions { - if (argc != 7) + string id; + int ioThreads; + string inputSocketType; + int inputBufSize; + string inputMethod; + string inputAddress; +} DeviceOptions_t; + +inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) +{ + if (_options == NULL) + throw std::runtime_error("Internal error: options' container is empty."); + + namespace bpo = boost::program_options; + bpo::options_description desc("Options"); + desc.add_options() + ("id", bpo::value()->required(), "Device ID") + ("io-threads", bpo::value()->default_value(1), "Number of I/O threads") + ("input-socket-type", bpo::value()->required(), "Input socket type: sub/pull") + ("input-buff-size", bpo::value()->required(), "Input buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") + ("input-method", bpo::value()->required(), "Input method: bind/connect") + ("input-address", bpo::value()->required(), "Input address, e.g.: \"tcp://*:5555\"") + ("help", "Print help messages"); + + bpo::variables_map vm; + bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm); + + if ( vm.count("help") ) { - cout << "Usage: sink \tID numIoTreads\n" - << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" << endl; - return 1; + LOG(INFO) << "FairMQ Sink" << endl << desc; + return false; } + bpo::notify(vm); + + if ( vm.count("id") ) + _options->id = vm["id"].as(); + + if ( vm.count("io-threads") ) + _options->ioThreads = vm["io-threads"].as(); + + if ( vm.count("input-socket-type") ) + _options->inputSocketType = vm["input-socket-type"].as(); + + if ( vm.count("input-buff-size") ) + _options->inputBufSize = vm["input-buff-size"].as(); + + if ( vm.count("input-method") ) + _options->inputMethod = vm["input-method"].as(); + + if ( vm.count("input-address") ) + _options->inputAddress = vm["input-address"].as(); + + return true; +} + +int main(int argc, char** argv) +{ s_catch_signals(); + DeviceOptions_t options; + try + { + if (!parse_cmd_line(argc, argv, &options)) + return 0; + } + catch (exception& e) + { + LOG(ERROR) << e.what(); + return 1; + } + LOG(INFO) << "PID: " << getpid(); #ifdef NANOMSG @@ -73,31 +135,18 @@ int main(int argc, char** argv) sink.SetTransport(transportFactory); - int i = 1; - - sink.SetProperty(FairMQSink::Id, argv[i]); - ++i; - - int numIoThreads; - stringstream(argv[i]) >> numIoThreads; - sink.SetProperty(FairMQSink::NumIoThreads, numIoThreads); - ++i; + sink.SetProperty(FairMQSink::Id, options.id); + sink.SetProperty(FairMQSink::NumIoThreads, options.ioThreads); sink.SetProperty(FairMQSink::NumInputs, 1); sink.SetProperty(FairMQSink::NumOutputs, 0); sink.ChangeState(FairMQSink::INIT); - sink.SetProperty(FairMQSink::InputSocketType, argv[i], 0); - ++i; - int inputRcvBufSize; - stringstream(argv[i]) >> inputRcvBufSize; - sink.SetProperty(FairMQSink::InputRcvBufSize, inputRcvBufSize, 0); - ++i; - sink.SetProperty(FairMQSink::InputMethod, argv[i], 0); - ++i; - sink.SetProperty(FairMQSink::InputAddress, argv[i], 0); - ++i; + sink.SetProperty(FairMQSink::InputSocketType, options.inputSocketType); + sink.SetProperty(FairMQSink::InputRcvBufSize, options.inputBufSize); + sink.SetProperty(FairMQSink::InputMethod, options.inputMethod); + sink.SetProperty(FairMQSink::InputAddress, options.inputAddress); sink.ChangeState(FairMQSink::SETOUTPUT); sink.ChangeState(FairMQSink::SETINPUT); diff --git a/fairmq/run/runSplitter.cxx b/fairmq/run/runSplitter.cxx index 8babee06..8a28655b 100644 --- a/fairmq/run/runSplitter.cxx +++ b/fairmq/run/runSplitter.cxx @@ -15,6 +15,8 @@ #include #include +#include "boost/program_options.hpp" + #include "FairMQLogger.h" #include "FairMQSplitter.h" @@ -24,10 +26,7 @@ #include "FairMQTransportFactoryZMQ.h" #endif -using std::cout; -using std::cin; -using std::endl; -using std::stringstream; +using namespace std; FairMQSplitter splitter; @@ -52,20 +51,105 @@ static void s_catch_signals(void) sigaction(SIGTERM, &action, NULL); } -int main(int argc, char** argv) +typedef struct DeviceOptions { - if (argc < 16 || (argc - 8) % 4 != 0) + string id; + int ioThreads; + int numOutputs; + string inputSocketType; + int inputBufSize; + string inputMethod; + string inputAddress; + vector outputSocketType; + vector outputBufSize; + vector outputMethod; + vector outputAddress; +} DeviceOptions_t; + +inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) +{ + if (_options == NULL) + throw std::runtime_error("Internal error: options' container is empty."); + + namespace bpo = boost::program_options; + bpo::options_description desc("Options"); + desc.add_options() + ("id", bpo::value()->required(), "Device ID") + ("io-threads", bpo::value()->default_value(1), "Number of I/O threads") + ("num-outputs", bpo::value()->required(), "Number of Splitter output sockets") + ("input-socket-type", bpo::value()->required(), "Input socket type: sub/pull") + ("input-buff-size", bpo::value()->required(), "Input buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") + ("input-method", bpo::value()->required(), "Input method: bind/connect") + ("input-address", bpo::value()->required(), "Input address, e.g.: \"tcp://localhost:5555\"") + ("output-socket-type", bpo::value< vector >()->required(), "Output socket type: pub/push") + ("output-buff-size", bpo::value< vector >()->required(), "Output buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") + ("output-method", bpo::value< vector >()->required(), "Output method: bind/connect") + ("output-address", bpo::value< vector >()->required(), "Output address, e.g.: \"tcp://localhost:5555\"") + ("help", "Print help messages"); + + bpo::variables_map vm; + bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm); + + if ( vm.count("help") ) { - cout << "Usage: splitter \tID numIoTreads numOutputs\n" - << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" - << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" - << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" - << "\t\t..." << argc << " arguments provided" << endl; - return 1; + LOG(INFO) << "FairMQ Splitter" << endl << desc; + return false; } + bpo::notify(vm); + + if ( vm.count("id") ) + _options->id = vm["id"].as(); + + if ( vm.count("io-threads") ) + _options->ioThreads = vm["io-threads"].as(); + + if ( vm.count("num-outputs") ) + _options->numOutputs = vm["num-outputs"].as(); + + if ( vm.count("input-socket-type") ) + _options->inputSocketType = vm["input-socket-type"].as(); + + if ( vm.count("input-buff-size") ) + _options->inputBufSize = vm["input-buff-size"].as(); + + if ( vm.count("input-method") ) + _options->inputMethod = vm["input-method"].as(); + + if ( vm.count("input-address") ) + _options->inputAddress = vm["input-address"].as(); + + if ( vm.count("output-socket-type") ) + _options->outputSocketType = vm["output-socket-type"].as< vector >(); + + if ( vm.count("output-buff-size") ) + _options->outputBufSize = vm["output-buff-size"].as< vector >(); + + if ( vm.count("output-method") ) + _options->outputMethod = vm["output-method"].as< vector >(); + + if ( vm.count("output-address") ) + _options->outputAddress = vm["output-address"].as< vector >(); + + return true; +} + +int main(int argc, char** argv) +{ s_catch_signals(); + DeviceOptions_t options; + try + { + if (!parse_cmd_line(argc, argv, &options)) + return 0; + } + catch (exception& e) + { + LOG(ERROR) << e.what(); + return 1; + } + LOG(INFO) << "PID: " << getpid(); #ifdef NANOMSG @@ -76,48 +160,25 @@ int main(int argc, char** argv) splitter.SetTransport(transportFactory); - int i = 1; - - splitter.SetProperty(FairMQSplitter::Id, argv[i]); - ++i; - - int numIoThreads; - stringstream(argv[i]) >> numIoThreads; - splitter.SetProperty(FairMQSplitter::NumIoThreads, numIoThreads); - ++i; + splitter.SetProperty(FairMQSplitter::Id, options.id); + splitter.SetProperty(FairMQSplitter::NumIoThreads, options.ioThreads); splitter.SetProperty(FairMQSplitter::NumInputs, 1); - - int numOutputs; - stringstream(argv[i]) >> numOutputs; - splitter.SetProperty(FairMQSplitter::NumOutputs, numOutputs); - ++i; + splitter.SetProperty(FairMQSplitter::NumOutputs, options.numOutputs); splitter.ChangeState(FairMQSplitter::INIT); - splitter.SetProperty(FairMQSplitter::InputSocketType, argv[i], 0); - ++i; - int inputRcvBufSize; - stringstream(argv[i]) >> inputRcvBufSize; - splitter.SetProperty(FairMQSplitter::InputRcvBufSize, inputRcvBufSize, 0); - ++i; - splitter.SetProperty(FairMQSplitter::InputMethod, argv[i], 0); - ++i; - splitter.SetProperty(FairMQSplitter::InputAddress, argv[i], 0); - ++i; + splitter.SetProperty(FairMQSplitter::InputSocketType, options.inputSocketType); + splitter.SetProperty(FairMQSplitter::InputSndBufSize, options.inputBufSize); + splitter.SetProperty(FairMQSplitter::InputMethod, options.inputMethod); + splitter.SetProperty(FairMQSplitter::InputAddress, options.inputAddress); - int outputSndBufSize; - for (int iOutput = 0; iOutput < numOutputs; iOutput++) + for (int i = 0; i < options.numOutputs; ++i) { - splitter.SetProperty(FairMQSplitter::OutputSocketType, argv[i], iOutput); - ++i; - stringstream(argv[i]) >> outputSndBufSize; - splitter.SetProperty(FairMQSplitter::OutputSndBufSize, outputSndBufSize, iOutput); - ++i; - splitter.SetProperty(FairMQSplitter::OutputMethod, argv[i], iOutput); - ++i; - splitter.SetProperty(FairMQSplitter::OutputAddress, argv[i], iOutput); - ++i; + splitter.SetProperty(FairMQSplitter::OutputSocketType, options.outputSocketType.at(i), i); + splitter.SetProperty(FairMQSplitter::OutputRcvBufSize, options.outputBufSize.at(i), i); + splitter.SetProperty(FairMQSplitter::OutputMethod, options.outputMethod.at(i), i); + splitter.SetProperty(FairMQSplitter::OutputAddress, options.outputAddress.at(i), i); } splitter.ChangeState(FairMQSplitter::SETOUTPUT);