diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index 3d370f73..7b6beb96 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -32,7 +32,6 @@ FairMQDevice::FairMQDevice() void FairMQDevice::Init() { LOG(INFO) << ">>>>>>> Init <<<<<<<"; - LOG(INFO) << "numIoThreads: " << fNumIoThreads; for (int i = 0; i < fNumInputs; ++i) { diff --git a/fairmq/FairMQStateMachine.h b/fairmq/FairMQStateMachine.h index 561ba5a4..701f8e97 100644 --- a/fairmq/FairMQStateMachine.h +++ b/fairmq/FairMQStateMachine.h @@ -73,7 +73,7 @@ namespace FairMQFSM template void operator()(EVT const&, FSM&, SourceState&, TargetState&) { - LOG(STATE) << "Transition from " << typeid(SourceState).name() << " to " << typeid(TargetState).name() << " with event:" << typeid(EVT).name(); + // LOG(STATE) << "Transition from " << typeid(SourceState).name() << " to " << typeid(TargetState).name() << " with event:" << typeid(EVT).name(); } }; struct InitFct diff --git a/fairmq/devices/FairMQBenchmarkSampler.cxx b/fairmq/devices/FairMQBenchmarkSampler.cxx index a0b77b11..6e5c8f64 100644 --- a/fairmq/devices/FairMQBenchmarkSampler.cxx +++ b/fairmq/devices/FairMQBenchmarkSampler.cxx @@ -39,7 +39,6 @@ void FairMQBenchmarkSampler::Init() void FairMQBenchmarkSampler::Run() { LOG(INFO) << ">>>>>>> Run <<<<<<<"; - // boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); boost::thread resetEventCounter(boost::bind(&FairMQBenchmarkSampler::ResetEventCounter, this)); diff --git a/fairmq/run/runBenchmarkSampler.cxx b/fairmq/run/runBenchmarkSampler.cxx index 5c52a718..51351eb0 100644 --- a/fairmq/run/runBenchmarkSampler.cxx +++ b/fairmq/run/runBenchmarkSampler.cxx @@ -15,6 +15,8 @@ #include #include +#include "boost/program_options.hpp" + #include "FairMQLogger.h" #include "FairMQBenchmarkSampler.h" @@ -24,10 +26,7 @@ #include "FairMQTransportFactoryZMQ.h" #endif -using std::cout; -using std::cin; -using std::endl; -using std::stringstream; +using namespace std; FairMQBenchmarkSampler sampler; @@ -52,18 +51,93 @@ static void s_catch_signals(void) sigaction(SIGTERM, &action, NULL); } +typedef struct DeviceOptions +{ + string id; + int eventSize; + int eventRate; + int ioThreads; + string outputSocketType; + int outputBufSize; + string outputMethod; + string outputAddress; +} DeviceOptions_t; + +inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) +{ + if (_options == NULL) + throw std::runtime_error("Internal error: options' container is empty."); + + namespace bpo = boost::program_options; + bpo::options_description desc("Options"); + desc.add_options() + ("id", bpo::value()->required(), "Device ID") + ("event-size", bpo::value()->default_value(1000), "Event size in bytes") + ("event-rate", bpo::value()->default_value(0), "Event rate limit in maximum number of events per second") + ("io-threads", bpo::value()->default_value(1), "Number of I/O threads") + ("output-socket-type", bpo::value()->required(), "Output socket type: pub/push") + ("output-buff-size", bpo::value()->required(), "Output buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") + ("output-method", bpo::value()->required(), "Output method: bind/connect") + ("output-address", bpo::value()->required(), "Output address, e.g.: \"tcp://*:5555\"") + ("help", "Print help messages"); + + bpo::variables_map vm; + bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm); + + if ( vm.count("help") ) + { + LOG(INFO) << "FairMQ Benchmark Sampler" << endl << desc; + return false; + } + + bpo::notify(vm); + + if ( vm.count("id") ) + _options->id = vm["id"].as(); + + if ( vm.count("event-size") ) + _options->eventSize = vm["event-size"].as(); + + if ( vm.count("event-rate") ) + _options->eventRate = vm["event-rate"].as(); + + if ( vm.count("io-threads") ) + _options->ioThreads = vm["io-threads"].as(); + + if ( vm.count("output-socket-type") ) + _options->outputSocketType = vm["output-socket-type"].as(); + + if ( vm.count("output-buff-size") ) + _options->outputBufSize = vm["output-buff-size"].as(); + + if ( vm.count("output-method") ) + _options->outputMethod = vm["output-method"].as(); + + if ( vm.count("output-address") ) + _options->outputAddress = vm["output-address"].as(); + + return true; +} + int main(int argc, char** argv) { - if (argc != 9) + s_catch_signals(); + + DeviceOptions_t options; + try { - cout << "Usage: bsampler ID eventSize eventRate numIoTreads\n" - << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << endl; + if (!parse_cmd_line(argc, argv, &options)) + return 0; + } + catch (exception& e) + { + LOG(ERROR) << e.what(); return 1; } - s_catch_signals(); - LOG(INFO) << "PID: " << getpid(); + LOG(INFO) << "CONFIG: " << "id: " << options.id << ", event size: " << options.eventSize << ", event rate: " << options.eventRate << ", I/O threads: " << options.ioThreads; + LOG(INFO) << "OUTPUT: " << options.outputSocketType << " " << options.outputBufSize << " " << options.outputMethod << " " << options.outputAddress; #ifdef NANOMSG FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); @@ -73,41 +147,20 @@ int main(int argc, char** argv) sampler.SetTransport(transportFactory); - int i = 1; - - sampler.SetProperty(FairMQBenchmarkSampler::Id, argv[i]); - ++i; - - int eventSize; - stringstream(argv[i]) >> eventSize; - sampler.SetProperty(FairMQBenchmarkSampler::EventSize, eventSize); - ++i; - - int eventRate; - stringstream(argv[i]) >> eventRate; - sampler.SetProperty(FairMQBenchmarkSampler::EventRate, eventRate); - ++i; - - int numIoThreads; - stringstream(argv[i]) >> numIoThreads; - sampler.SetProperty(FairMQBenchmarkSampler::NumIoThreads, numIoThreads); - ++i; + sampler.SetProperty(FairMQBenchmarkSampler::Id, options.id); + sampler.SetProperty(FairMQBenchmarkSampler::EventSize, options.eventSize); + sampler.SetProperty(FairMQBenchmarkSampler::EventRate, options.eventRate); + sampler.SetProperty(FairMQBenchmarkSampler::NumIoThreads, options.ioThreads); sampler.SetProperty(FairMQBenchmarkSampler::NumInputs, 0); sampler.SetProperty(FairMQBenchmarkSampler::NumOutputs, 1); sampler.ChangeState(FairMQBenchmarkSampler::INIT); - sampler.SetProperty(FairMQBenchmarkSampler::OutputSocketType, argv[i], 0); - ++i; - int outputSndBufSize; - stringstream(argv[i]) >> outputSndBufSize; - sampler.SetProperty(FairMQBenchmarkSampler::OutputSndBufSize, outputSndBufSize, 0); - ++i; - sampler.SetProperty(FairMQBenchmarkSampler::OutputMethod, argv[i], 0); - ++i; - sampler.SetProperty(FairMQBenchmarkSampler::OutputAddress, argv[i], 0); - ++i; + sampler.SetProperty(FairMQBenchmarkSampler::OutputSocketType, options.outputSocketType); + sampler.SetProperty(FairMQBenchmarkSampler::OutputSndBufSize, options.outputBufSize); + sampler.SetProperty(FairMQBenchmarkSampler::OutputMethod, options.outputMethod); + sampler.SetProperty(FairMQBenchmarkSampler::OutputAddress, options.outputAddress); sampler.ChangeState(FairMQBenchmarkSampler::SETOUTPUT); sampler.ChangeState(FairMQBenchmarkSampler::SETINPUT); diff --git a/fairmq/run/runBinSampler.cxx b/fairmq/run/runBinSampler.cxx index 38c51606..cfd704fc 100644 --- a/fairmq/run/runBinSampler.cxx +++ b/fairmq/run/runBinSampler.cxx @@ -15,6 +15,8 @@ #include #include +#include "boost/program_options.hpp" + #include "FairMQLogger.h" #include "FairMQBinSampler.h" @@ -24,10 +26,7 @@ #include "FairMQTransportFactoryZMQ.h" #endif -using std::cout; -using std::cin; -using std::endl; -using std::stringstream; +using namespace std; FairMQBinSampler sampler; @@ -52,18 +51,93 @@ static void s_catch_signals(void) sigaction(SIGTERM, &action, NULL); } +typedef struct DeviceOptions +{ + string id; + int eventSize; + int eventRate; + int ioThreads; + string outputSocketType; + int outputBufSize; + string outputMethod; + string outputAddress; +} DeviceOptions_t; + +inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) +{ + if (_options == NULL) + throw std::runtime_error("Internal error: options' container is empty."); + + namespace bpo = boost::program_options; + bpo::options_description desc("Options"); + desc.add_options() + ("id", bpo::value()->required(), "Device ID") + ("event-size", bpo::value()->default_value(1000), "Event size in bytes") + ("event-rate", bpo::value()->default_value(0), "Event rate limit in maximum number of events per second") + ("io-threads", bpo::value()->default_value(1), "Number of I/O threads") + ("output-socket-type", bpo::value()->required(), "Output socket type: pub/push") + ("output-buff-size", bpo::value()->required(), "Output buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") + ("output-method", bpo::value()->required(), "Output method: bind/connect") + ("output-address", bpo::value()->required(), "Output address, e.g.: \"tcp://*:5555\"") + ("help", "Print help messages"); + + bpo::variables_map vm; + bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm); + + if ( vm.count("help") ) + { + LOG(INFO) << "FairMQ Bin Sampler" << endl << desc; + return false; + } + + bpo::notify(vm); + + if ( vm.count("id") ) + _options->id = vm["id"].as(); + + if ( vm.count("event-size") ) + _options->eventSize = vm["event-size"].as(); + + if ( vm.count("event-rate") ) + _options->eventRate = vm["event-rate"].as(); + + if ( vm.count("io-threads") ) + _options->ioThreads = vm["io-threads"].as(); + + if ( vm.count("output-socket-type") ) + _options->outputSocketType = vm["output-socket-type"].as(); + + if ( vm.count("output-buff-size") ) + _options->outputBufSize = vm["output-buff-size"].as(); + + if ( vm.count("output-method") ) + _options->outputMethod = vm["output-method"].as(); + + if ( vm.count("output-address") ) + _options->outputAddress = vm["output-address"].as(); + + return true; +} + int main(int argc, char** argv) { - if (argc != 9) + s_catch_signals(); + + DeviceOptions_t options; + try { - cout << "Usage: bsampler ID eventSize eventRate numIoTreads\n" - << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << endl; + if (!parse_cmd_line(argc, argv, &options)) + return 0; + } + catch (exception& e) + { + LOG(ERROR) << e.what(); return 1; } - s_catch_signals(); - LOG(INFO) << "PID: " << getpid(); + LOG(INFO) << "CONFIG: " << "id: " << options.id << ", event size: " << options.eventSize << ", event rate: " << options.eventRate << ", I/O threads: " << options.ioThreads; + LOG(INFO) << "OUTPUT: " << options.outputSocketType << " " << options.outputBufSize << " " << options.outputMethod << " " << options.outputAddress; #ifdef NANOMSG FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); @@ -73,41 +147,20 @@ int main(int argc, char** argv) sampler.SetTransport(transportFactory); - int i = 1; - - sampler.SetProperty(FairMQBinSampler::Id, argv[i]); - ++i; - - int eventSize; - stringstream(argv[i]) >> eventSize; - sampler.SetProperty(FairMQBinSampler::EventSize, eventSize); - ++i; - - int eventRate; - stringstream(argv[i]) >> eventRate; - sampler.SetProperty(FairMQBinSampler::EventRate, eventRate); - ++i; - - int numIoThreads; - stringstream(argv[i]) >> numIoThreads; - sampler.SetProperty(FairMQBinSampler::NumIoThreads, numIoThreads); - ++i; + sampler.SetProperty(FairMQBinSampler::Id, options.id); + sampler.SetProperty(FairMQBinSampler::EventSize, options.eventSize); + sampler.SetProperty(FairMQBinSampler::EventRate, options.eventRate); + sampler.SetProperty(FairMQBinSampler::NumIoThreads, options.ioThreads); sampler.SetProperty(FairMQBinSampler::NumInputs, 0); sampler.SetProperty(FairMQBinSampler::NumOutputs, 1); sampler.ChangeState(FairMQBinSampler::INIT); - sampler.SetProperty(FairMQBinSampler::OutputSocketType, argv[i], 0); - ++i; - int outputSndBufSize; - stringstream(argv[i]) >> outputSndBufSize; - sampler.SetProperty(FairMQBinSampler::OutputSndBufSize, outputSndBufSize, 0); - ++i; - sampler.SetProperty(FairMQBinSampler::OutputMethod, argv[i], 0); - ++i; - sampler.SetProperty(FairMQBinSampler::OutputAddress, argv[i], 0); - ++i; + sampler.SetProperty(FairMQBinSampler::OutputSocketType, options.outputSocketType); + sampler.SetProperty(FairMQBinSampler::OutputSndBufSize, options.outputBufSize); + sampler.SetProperty(FairMQBinSampler::OutputMethod, options.outputMethod); + sampler.SetProperty(FairMQBinSampler::OutputAddress, options.outputAddress); sampler.ChangeState(FairMQBinSampler::SETOUTPUT); sampler.ChangeState(FairMQBinSampler::SETINPUT); diff --git a/fairmq/run/runBinSink.cxx b/fairmq/run/runBinSink.cxx index 7b0c0fcb..dcd7f79e 100644 --- a/fairmq/run/runBinSink.cxx +++ b/fairmq/run/runBinSink.cxx @@ -15,6 +15,8 @@ #include #include +#include "boost/program_options.hpp" + #include "FairMQLogger.h" #include "FairMQBinSink.h" @@ -24,10 +26,7 @@ #include "FairMQTransportFactoryZMQ.h" #endif -using std::cout; -using std::cin; -using std::endl; -using std::stringstream; +using namespace std; FairMQBinSink sink; @@ -52,17 +51,80 @@ static void s_catch_signals(void) sigaction(SIGTERM, &action, NULL); } -int main(int argc, char** argv) +typedef struct DeviceOptions { - if (argc != 7) + string id; + int ioThreads; + string inputSocketType; + int inputBufSize; + string inputMethod; + string inputAddress; +} DeviceOptions_t; + +inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) +{ + if (_options == NULL) + throw std::runtime_error("Internal error: options' container is empty."); + + namespace bpo = boost::program_options; + bpo::options_description desc("Options"); + desc.add_options() + ("id", bpo::value()->required(), "Device ID") + ("io-threads", bpo::value()->default_value(1), "Number of I/O threads") + ("input-socket-type", bpo::value()->required(), "Input socket type: sub/pull") + ("input-buff-size", bpo::value()->required(), "Input buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") + ("input-method", bpo::value()->required(), "Input method: bind/connect") + ("input-address", bpo::value()->required(), "Input address, e.g.: \"tcp://*:5555\"") + ("help", "Print help messages"); + + bpo::variables_map vm; + bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm); + + if ( vm.count("help") ) { - cout << "Usage: sink \tID numIoTreads\n" - << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" << endl; - return 1; + LOG(INFO) << "FairMQ Bin Sink" << endl << desc; + return false; } + bpo::notify(vm); + + if ( vm.count("id") ) + _options->id = vm["id"].as(); + + if ( vm.count("io-threads") ) + _options->ioThreads = vm["io-threads"].as(); + + if ( vm.count("input-socket-type") ) + _options->inputSocketType = vm["input-socket-type"].as(); + + if ( vm.count("input-buff-size") ) + _options->inputBufSize = vm["input-buff-size"].as(); + + if ( vm.count("input-method") ) + _options->inputMethod = vm["input-method"].as(); + + if ( vm.count("input-address") ) + _options->inputAddress = vm["input-address"].as(); + + return true; +} + +int main(int argc, char** argv) +{ s_catch_signals(); + DeviceOptions_t options; + try + { + if (!parse_cmd_line(argc, argv, &options)) + return 0; + } + catch (exception& e) + { + LOG(ERROR) << e.what(); + return 1; + } + LOG(INFO) << "PID: " << getpid(); #ifdef NANOMSG @@ -73,31 +135,18 @@ int main(int argc, char** argv) sink.SetTransport(transportFactory); - int i = 1; - - sink.SetProperty(FairMQBinSink::Id, argv[i]); - ++i; - - int numIoThreads; - stringstream(argv[i]) >> numIoThreads; - sink.SetProperty(FairMQBinSink::NumIoThreads, numIoThreads); - ++i; + sink.SetProperty(FairMQBinSink::Id, options.id); + sink.SetProperty(FairMQBinSink::NumIoThreads, options.ioThreads); sink.SetProperty(FairMQBinSink::NumInputs, 1); sink.SetProperty(FairMQBinSink::NumOutputs, 0); sink.ChangeState(FairMQBinSink::INIT); - sink.SetProperty(FairMQBinSink::InputSocketType, argv[i], 0); - ++i; - int inputRcvBufSize; - stringstream(argv[i]) >> inputRcvBufSize; - sink.SetProperty(FairMQBinSink::InputRcvBufSize, inputRcvBufSize, 0); - ++i; - sink.SetProperty(FairMQBinSink::InputMethod, argv[i], 0); - ++i; - sink.SetProperty(FairMQBinSink::InputAddress, argv[i], 0); - ++i; + sink.SetProperty(FairMQBinSink::InputSocketType, options.inputSocketType); + sink.SetProperty(FairMQBinSink::InputSndBufSize, options.inputBufSize); + sink.SetProperty(FairMQBinSink::InputMethod, options.inputMethod); + sink.SetProperty(FairMQBinSink::InputAddress, options.inputAddress); sink.ChangeState(FairMQBinSink::SETOUTPUT); sink.ChangeState(FairMQBinSink::SETINPUT); diff --git a/fairmq/run/runBuffer.cxx b/fairmq/run/runBuffer.cxx index 85eb17c0..928cab01 100644 --- a/fairmq/run/runBuffer.cxx +++ b/fairmq/run/runBuffer.cxx @@ -15,6 +15,8 @@ #include #include +#include "boost/program_options.hpp" + #include "FairMQLogger.h" #include "FairMQBuffer.h" @@ -24,10 +26,7 @@ #include "FairMQTransportFactoryZMQ.h" #endif -using std::cout; -using std::cin; -using std::endl; -using std::stringstream; +using namespace std; FairMQBuffer buffer; @@ -52,18 +51,100 @@ static void s_catch_signals(void) sigaction(SIGTERM, &action, NULL); } -int main(int argc, char** argv) +typedef struct DeviceOptions { - if (argc != 11) + string id; + int ioThreads; + string inputSocketType; + int inputBufSize; + string inputMethod; + string inputAddress; + string outputSocketType; + int outputBufSize; + string outputMethod; + string outputAddress; +} DeviceOptions_t; + +inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) +{ + if (_options == NULL) + throw std::runtime_error("Internal error: options' container is empty."); + + namespace bpo = boost::program_options; + bpo::options_description desc("Options"); + desc.add_options() + ("id", bpo::value(), "Device ID") + ("io-threads", bpo::value()->default_value(1), "Number of I/O threads") + ("input-socket-type", bpo::value()->required(), "Input socket type: sub/pull") + ("input-buff-size", bpo::value()->required(), "Input buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") + ("input-method", bpo::value()->required(), "Input method: bind/connect") + ("input-address", bpo::value()->required(), "Input address, e.g.: \"tcp://localhost:5555\"") + ("output-socket-type", bpo::value()->required(), "Output socket type: pub/push") + ("output-buff-size", bpo::value()->required(), "Output buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") + ("output-method", bpo::value()->required(), "Output method: bind/connect") + ("output-address", bpo::value()->required(), "Output address, e.g.: \"tcp://localhost:5555\"") + ("help", "Print help messages"); + + bpo::variables_map vm; + bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm); + + if ( vm.count("help") ) { - cout << "Usage: buffer \tID numIoTreads\n" - << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" - << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << endl; - return 1; + LOG(INFO) << "FairMQ Buffer" << endl << desc; + return false; } + bpo::notify(vm); + + if ( vm.count("id") ) + _options->id = vm["id"].as(); + + if ( vm.count("io-threads") ) + _options->ioThreads = vm["io-threads"].as(); + + if ( vm.count("input-socket-type") ) + _options->inputSocketType = vm["input-socket-type"].as(); + + if ( vm.count("input-buff-size") ) + _options->inputBufSize = vm["input-buff-size"].as(); + + if ( vm.count("input-method") ) + _options->inputMethod = vm["input-method"].as(); + + if ( vm.count("input-address") ) + _options->inputAddress = vm["input-address"].as(); + + if ( vm.count("output-socket-type") ) + _options->outputSocketType = vm["output-socket-type"].as(); + + if ( vm.count("output-buff-size") ) + _options->outputBufSize = vm["output-buff-size"].as(); + + if ( vm.count("output-method") ) + _options->outputMethod = vm["output-method"].as(); + + if ( vm.count("output-address") ) + _options->outputAddress = vm["output-address"].as(); + + return true; +} + +int main(int argc, char** argv) +{ s_catch_signals(); + DeviceOptions_t options; + try + { + if (!parse_cmd_line(argc, argv, &options)) + return 0; + } + catch (exception& e) + { + LOG(ERROR) << e.what(); + return 1; + } + LOG(INFO) << "PID: " << getpid(); #ifdef NANOMSG @@ -74,41 +155,23 @@ int main(int argc, char** argv) buffer.SetTransport(transportFactory); - int i = 1; + buffer.SetProperty(FairMQBuffer::Id, options.id); + buffer.SetProperty(FairMQBuffer::NumIoThreads, options.ioThreads); - buffer.SetProperty(FairMQBuffer::Id, argv[i]); - ++i; - - int numIoThreads; - stringstream(argv[i]) >> numIoThreads; - buffer.SetProperty(FairMQBuffer::NumIoThreads, numIoThreads); - ++i; buffer.SetProperty(FairMQBuffer::NumInputs, 1); buffer.SetProperty(FairMQBuffer::NumOutputs, 1); buffer.ChangeState(FairMQBuffer::INIT); - buffer.SetProperty(FairMQBuffer::InputSocketType, argv[i], 0); - ++i; - int inputRcvBufSize; - stringstream(argv[i]) >> inputRcvBufSize; - buffer.SetProperty(FairMQBuffer::InputRcvBufSize, inputRcvBufSize, 0); - ++i; - buffer.SetProperty(FairMQBuffer::InputMethod, argv[i], 0); - ++i; - buffer.SetProperty(FairMQBuffer::InputAddress, argv[i], 0); - ++i; + buffer.SetProperty(FairMQBuffer::InputSocketType, options.inputSocketType); + buffer.SetProperty(FairMQBuffer::InputSndBufSize, options.inputBufSize); + buffer.SetProperty(FairMQBuffer::InputMethod, options.inputMethod); + buffer.SetProperty(FairMQBuffer::InputAddress, options.inputAddress); - buffer.SetProperty(FairMQBuffer::OutputSocketType, argv[i], 0); - ++i; - int outputSndBufSize; - stringstream(argv[i]) >> outputSndBufSize; - buffer.SetProperty(FairMQBuffer::OutputSndBufSize, outputSndBufSize, 0); - ++i; - buffer.SetProperty(FairMQBuffer::OutputMethod, argv[i], 0); - ++i; - buffer.SetProperty(FairMQBuffer::OutputAddress, argv[i], 0); - ++i; + buffer.SetProperty(FairMQBuffer::OutputSocketType, options.outputSocketType); + buffer.SetProperty(FairMQBuffer::OutputSndBufSize, options.outputBufSize); + buffer.SetProperty(FairMQBuffer::OutputMethod, options.outputMethod); + buffer.SetProperty(FairMQBuffer::OutputAddress, options.outputAddress); buffer.ChangeState(FairMQBuffer::SETOUTPUT); buffer.ChangeState(FairMQBuffer::SETINPUT); diff --git a/fairmq/run/runMerger.cxx b/fairmq/run/runMerger.cxx index 3c04da92..a6bc874e 100644 --- a/fairmq/run/runMerger.cxx +++ b/fairmq/run/runMerger.cxx @@ -15,6 +15,8 @@ #include #include +#include "boost/program_options.hpp" + #include "FairMQLogger.h" #include "FairMQMerger.h" @@ -24,10 +26,7 @@ #include "FairMQTransportFactoryZMQ.h" #endif -using std::cout; -using std::cin; -using std::endl; -using std::stringstream; +using namespace std; FairMQMerger merger; @@ -52,20 +51,105 @@ static void s_catch_signals(void) sigaction(SIGTERM, &action, NULL); } -int main(int argc, char** argv) +typedef struct DeviceOptions { - if (argc < 16 || (argc - 8) % 4 != 0) + string id; + int ioThreads; + int numInputs; + vector inputSocketType; + vector inputBufSize; + vector inputMethod; + vector inputAddress; + string outputSocketType; + int outputBufSize; + string outputMethod; + string outputAddress; +} DeviceOptions_t; + +inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) +{ + if (_options == NULL) + throw std::runtime_error("Internal error: options' container is empty."); + + namespace bpo = boost::program_options; + bpo::options_description desc("Options"); + desc.add_options() + ("id", bpo::value()->required(), "Device ID") + ("io-threads", bpo::value()->default_value(1), "Number of I/O threads") + ("num-inputs", bpo::value()->required(), "Number of Merger input sockets") + ("input-socket-type", bpo::value< vector >()->required(), "Input socket type: sub/pull") + ("input-buff-size", bpo::value< vector >()->required(), "Input buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") + ("input-method", bpo::value< vector >()->required(), "Input method: bind/connect") + ("input-address", bpo::value< vector >()->required(), "Input address, e.g.: \"tcp://localhost:5555\"") + ("output-socket-type", bpo::value()->required(), "Output socket type: pub/push") + ("output-buff-size", bpo::value()->required(), "Output buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") + ("output-method", bpo::value()->required(), "Output method: bind/connect") + ("output-address", bpo::value()->required(), "Output address, e.g.: \"tcp://localhost:5555\"") + ("help", "Print help messages"); + + bpo::variables_map vm; + bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm); + + if ( vm.count("help") ) { - cout << "Usage: merger \tID numIoTreads numInputs\n" - << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" - << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" - << "\t\t...\n" - << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << argc << " arguments provided" << endl; - return 1; + LOG(INFO) << "FairMQ Merger" << endl << desc; + return false; } + bpo::notify(vm); + + if ( vm.count("id") ) + _options->id = vm["id"].as(); + + if ( vm.count("io-threads") ) + _options->ioThreads = vm["io-threads"].as(); + + if ( vm.count("num-inputs") ) + _options->numInputs = vm["num-inputs"].as(); + + if ( vm.count("input-socket-type") ) + _options->inputSocketType = vm["input-socket-type"].as< vector >(); + + if ( vm.count("input-buff-size") ) + _options->inputBufSize = vm["input-buff-size"].as< vector >(); + + if ( vm.count("input-method") ) + _options->inputMethod = vm["input-method"].as< vector >(); + + if ( vm.count("input-address") ) + _options->inputAddress = vm["input-address"].as< vector >(); + + if ( vm.count("output-socket-type") ) + _options->outputSocketType = vm["output-socket-type"].as(); + + if ( vm.count("output-buff-size") ) + _options->outputBufSize = vm["output-buff-size"].as(); + + if ( vm.count("output-method") ) + _options->outputMethod = vm["output-method"].as(); + + if ( vm.count("output-address") ) + _options->outputAddress = vm["output-address"].as(); + + return true; +} + +int main(int argc, char** argv) +{ s_catch_signals(); + DeviceOptions_t options; + try + { + if (!parse_cmd_line(argc, argv, &options)) + return 0; + } + catch (exception& e) + { + LOG(ERROR) << e.what(); + return 1; + } + LOG(INFO) << "PID: " << getpid(); #ifdef NANOMSG @@ -76,49 +160,26 @@ int main(int argc, char** argv) merger.SetTransport(transportFactory); - int i = 1; - - merger.SetProperty(FairMQMerger::Id, argv[i]); - ++i; - - int numIoThreads; - stringstream(argv[i]) >> numIoThreads; - merger.SetProperty(FairMQMerger::NumIoThreads, numIoThreads); - ++i; - - int numInputs; - stringstream(argv[i]) >> numInputs; - merger.SetProperty(FairMQMerger::NumInputs, numInputs); - ++i; + merger.SetProperty(FairMQMerger::Id, options.id); + merger.SetProperty(FairMQMerger::NumIoThreads, options.ioThreads); + merger.SetProperty(FairMQMerger::NumInputs, options.numInputs); merger.SetProperty(FairMQMerger::NumOutputs, 1); merger.ChangeState(FairMQMerger::INIT); - for (int iInput = 0; iInput < numInputs; iInput++) + for (int i = 0; i < options.numInputs; ++i) { - merger.SetProperty(FairMQMerger::InputSocketType, argv[i], iInput); - ++i; - int inputRcvBufSize; - stringstream(argv[i]) >> inputRcvBufSize; - merger.SetProperty(FairMQMerger::InputRcvBufSize, inputRcvBufSize, iInput); - ++i; - merger.SetProperty(FairMQMerger::InputMethod, argv[i], iInput); - ++i; - merger.SetProperty(FairMQMerger::InputAddress, argv[i], iInput); - ++i; + merger.SetProperty(FairMQMerger::InputSocketType, options.inputSocketType.at(i), i); + merger.SetProperty(FairMQMerger::InputRcvBufSize, options.inputBufSize.at(i), i); + merger.SetProperty(FairMQMerger::InputMethod, options.inputMethod.at(i), i); + merger.SetProperty(FairMQMerger::InputAddress, options.inputAddress.at(i), i); } - merger.SetProperty(FairMQMerger::OutputSocketType, argv[i], 0); - ++i; - int outputSndBufSize; - stringstream(argv[i]) >> outputSndBufSize; - merger.SetProperty(FairMQMerger::OutputSndBufSize, outputSndBufSize, 0); - ++i; - merger.SetProperty(FairMQMerger::OutputMethod, argv[i], 0); - ++i; - merger.SetProperty(FairMQMerger::OutputAddress, argv[i], 0); - ++i; + merger.SetProperty(FairMQMerger::OutputSocketType, options.outputSocketType); + merger.SetProperty(FairMQMerger::OutputSndBufSize, options.outputBufSize); + merger.SetProperty(FairMQMerger::OutputMethod, options.outputMethod); + merger.SetProperty(FairMQMerger::OutputAddress, options.outputAddress); merger.ChangeState(FairMQMerger::SETOUTPUT); merger.ChangeState(FairMQMerger::SETINPUT); diff --git a/fairmq/run/runProtoSampler.cxx b/fairmq/run/runProtoSampler.cxx index bbe5cb8d..eccd0777 100644 --- a/fairmq/run/runProtoSampler.cxx +++ b/fairmq/run/runProtoSampler.cxx @@ -15,6 +15,8 @@ #include #include +#include "boost/program_options.hpp" + #include "FairMQLogger.h" #include "FairMQProtoSampler.h" @@ -24,10 +26,7 @@ #include "FairMQTransportFactoryZMQ.h" #endif -using std::cout; -using std::cin; -using std::endl; -using std::stringstream; +using namespace std; FairMQProtoSampler sampler; @@ -52,18 +51,93 @@ static void s_catch_signals(void) sigaction(SIGTERM, &action, NULL); } +typedef struct DeviceOptions +{ + string id; + int eventSize; + int eventRate; + int ioThreads; + string outputSocketType; + int outputBufSize; + string outputMethod; + string outputAddress; +} DeviceOptions_t; + +inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) +{ + if (_options == NULL) + throw std::runtime_error("Internal error: options' container is empty."); + + namespace bpo = boost::program_options; + bpo::options_description desc("Options"); + desc.add_options() + ("id", bpo::value()->required(), "Device ID") + ("event-size", bpo::value()->default_value(1000), "Event size in bytes") + ("event-rate", bpo::value()->default_value(0), "Event rate limit in maximum number of events per second") + ("io-threads", bpo::value()->default_value(1), "Number of I/O threads") + ("output-socket-type", bpo::value()->required(), "Output socket type: pub/push") + ("output-buff-size", bpo::value()->required(), "Output buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") + ("output-method", bpo::value()->required(), "Output method: bind/connect") + ("output-address", bpo::value()->required(), "Output address, e.g.: \"tcp://*:5555\"") + ("help", "Print help messages"); + + bpo::variables_map vm; + bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm); + + if ( vm.count("help") ) + { + LOG(INFO) << "FairMQ Proto Sampler" << endl << desc; + return false; + } + + bpo::notify(vm); + + if ( vm.count("id") ) + _options->id = vm["id"].as(); + + if ( vm.count("event-size") ) + _options->eventSize = vm["event-size"].as(); + + if ( vm.count("event-rate") ) + _options->eventRate = vm["event-rate"].as(); + + if ( vm.count("io-threads") ) + _options->ioThreads = vm["io-threads"].as(); + + if ( vm.count("output-socket-type") ) + _options->outputSocketType = vm["output-socket-type"].as(); + + if ( vm.count("output-buff-size") ) + _options->outputBufSize = vm["output-buff-size"].as(); + + if ( vm.count("output-method") ) + _options->outputMethod = vm["output-method"].as(); + + if ( vm.count("output-address") ) + _options->outputAddress = vm["output-address"].as(); + + return true; +} + int main(int argc, char** argv) { - if (argc != 9) + s_catch_signals(); + + DeviceOptions_t options; + try { - cout << "Usage: bsampler ID eventSize eventRate numIoTreads\n" - << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << endl; + if (!parse_cmd_line(argc, argv, &options)) + return 0; + } + catch (exception& e) + { + LOG(ERROR) << e.what(); return 1; } - s_catch_signals(); - LOG(INFO) << "PID: " << getpid(); + LOG(INFO) << "CONFIG: " << "id: " << options.id << ", event size: " << options.eventSize << ", event rate: " << options.eventRate << ", I/O threads: " << options.ioThreads; + LOG(INFO) << "OUTPUT: " << options.outputSocketType << " " << options.outputBufSize << " " << options.outputMethod << " " << options.outputAddress; #ifdef NANOMSG FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); @@ -73,41 +147,20 @@ int main(int argc, char** argv) sampler.SetTransport(transportFactory); - int i = 1; - - sampler.SetProperty(FairMQProtoSampler::Id, argv[i]); - ++i; - - int eventSize; - stringstream(argv[i]) >> eventSize; - sampler.SetProperty(FairMQProtoSampler::EventSize, eventSize); - ++i; - - int eventRate; - stringstream(argv[i]) >> eventRate; - sampler.SetProperty(FairMQProtoSampler::EventRate, eventRate); - ++i; - - int numIoThreads; - stringstream(argv[i]) >> numIoThreads; - sampler.SetProperty(FairMQProtoSampler::NumIoThreads, numIoThreads); - ++i; + sampler.SetProperty(FairMQProtoSampler::Id, options.id); + sampler.SetProperty(FairMQProtoSampler::EventSize, options.eventSize); + sampler.SetProperty(FairMQProtoSampler::EventRate, options.eventRate); + sampler.SetProperty(FairMQProtoSampler::NumIoThreads, options.ioThreads); sampler.SetProperty(FairMQProtoSampler::NumInputs, 0); sampler.SetProperty(FairMQProtoSampler::NumOutputs, 1); sampler.ChangeState(FairMQProtoSampler::INIT); - sampler.SetProperty(FairMQProtoSampler::OutputSocketType, argv[i], 0); - ++i; - int outputSndBufSize; - stringstream(argv[i]) >> outputSndBufSize; - sampler.SetProperty(FairMQProtoSampler::OutputSndBufSize, outputSndBufSize, 0); - ++i; - sampler.SetProperty(FairMQProtoSampler::OutputMethod, argv[i], 0); - ++i; - sampler.SetProperty(FairMQProtoSampler::OutputAddress, argv[i], 0); - ++i; + sampler.SetProperty(FairMQProtoSampler::OutputSocketType, options.outputSocketType); + sampler.SetProperty(FairMQProtoSampler::OutputSndBufSize, options.outputBufSize); + sampler.SetProperty(FairMQProtoSampler::OutputMethod, options.outputMethod); + sampler.SetProperty(FairMQProtoSampler::OutputAddress, options.outputAddress); sampler.ChangeState(FairMQProtoSampler::SETOUTPUT); sampler.ChangeState(FairMQProtoSampler::SETINPUT); diff --git a/fairmq/run/runProtoSink.cxx b/fairmq/run/runProtoSink.cxx index dc694146..f40635ee 100644 --- a/fairmq/run/runProtoSink.cxx +++ b/fairmq/run/runProtoSink.cxx @@ -15,6 +15,8 @@ #include #include +#include "boost/program_options.hpp" + #include "FairMQLogger.h" #include "FairMQProtoSink.h" @@ -24,10 +26,7 @@ #include "FairMQTransportFactoryZMQ.h" #endif -using std::cout; -using std::cin; -using std::endl; -using std::stringstream; +using namespace std; FairMQProtoSink sink; @@ -52,17 +51,80 @@ static void s_catch_signals(void) sigaction(SIGTERM, &action, NULL); } -int main(int argc, char** argv) +typedef struct DeviceOptions { - if (argc != 7) + string id; + int ioThreads; + string inputSocketType; + int inputBufSize; + string inputMethod; + string inputAddress; +} DeviceOptions_t; + +inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) +{ + if (_options == NULL) + throw std::runtime_error("Internal error: options' container is empty."); + + namespace bpo = boost::program_options; + bpo::options_description desc("Options"); + desc.add_options() + ("id", bpo::value()->required(), "Device ID") + ("io-threads", bpo::value()->default_value(1), "Number of I/O threads") + ("input-socket-type", bpo::value()->required(), "Input socket type: sub/pull") + ("input-buff-size", bpo::value()->required(), "Input buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") + ("input-method", bpo::value()->required(), "Input method: bind/connect") + ("input-address", bpo::value()->required(), "Input address, e.g.: \"tcp://*:5555\"") + ("help", "Print help messages"); + + bpo::variables_map vm; + bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm); + + if ( vm.count("help") ) { - cout << "Usage: sink \tID numIoTreads\n" - << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" << endl; - return 1; + LOG(INFO) << "FairMQ Proto Sink" << endl << desc; + return false; } + bpo::notify(vm); + + if ( vm.count("id") ) + _options->id = vm["id"].as(); + + if ( vm.count("io-threads") ) + _options->ioThreads = vm["io-threads"].as(); + + if ( vm.count("input-socket-type") ) + _options->inputSocketType = vm["input-socket-type"].as(); + + if ( vm.count("input-buff-size") ) + _options->inputBufSize = vm["input-buff-size"].as(); + + if ( vm.count("input-method") ) + _options->inputMethod = vm["input-method"].as(); + + if ( vm.count("input-address") ) + _options->inputAddress = vm["input-address"].as(); + + return true; +} + +int main(int argc, char** argv) +{ s_catch_signals(); + DeviceOptions_t options; + try + { + if (!parse_cmd_line(argc, argv, &options)) + return 0; + } + catch (exception& e) + { + LOG(ERROR) << e.what(); + return 1; + } + LOG(INFO) << "PID: " << getpid(); #ifdef NANOMSG @@ -73,31 +135,18 @@ int main(int argc, char** argv) sink.SetTransport(transportFactory); - int i = 1; - - sink.SetProperty(FairMQProtoSink::Id, argv[i]); - ++i; - - int numIoThreads; - stringstream(argv[i]) >> numIoThreads; - sink.SetProperty(FairMQProtoSink::NumIoThreads, numIoThreads); - ++i; + sink.SetProperty(FairMQProtoSink::Id, options.id); + sink.SetProperty(FairMQProtoSink::NumIoThreads, options.ioThreads); sink.SetProperty(FairMQProtoSink::NumInputs, 1); sink.SetProperty(FairMQProtoSink::NumOutputs, 0); sink.ChangeState(FairMQProtoSink::INIT); - sink.SetProperty(FairMQProtoSink::InputSocketType, argv[i], 0); - ++i; - int inputRcvBufSize; - stringstream(argv[i]) >> inputRcvBufSize; - sink.SetProperty(FairMQProtoSink::InputRcvBufSize, inputRcvBufSize, 0); - ++i; - sink.SetProperty(FairMQProtoSink::InputMethod, argv[i], 0); - ++i; - sink.SetProperty(FairMQProtoSink::InputAddress, argv[i], 0); - ++i; + sink.SetProperty(FairMQProtoSink::InputSocketType, options.inputSocketType); + sink.SetProperty(FairMQProtoSink::InputSndBufSize, options.inputBufSize); + sink.SetProperty(FairMQProtoSink::InputMethod, options.inputMethod); + sink.SetProperty(FairMQProtoSink::InputAddress, options.inputAddress); sink.ChangeState(FairMQProtoSink::SETOUTPUT); sink.ChangeState(FairMQProtoSink::SETINPUT); diff --git a/fairmq/run/runProxy.cxx b/fairmq/run/runProxy.cxx index e3df0125..434274fa 100644 --- a/fairmq/run/runProxy.cxx +++ b/fairmq/run/runProxy.cxx @@ -15,6 +15,8 @@ #include #include +#include "boost/program_options.hpp" + #include "FairMQLogger.h" #include "FairMQProxy.h" @@ -24,10 +26,7 @@ #include "FairMQTransportFactoryZMQ.h" #endif -using std::cout; -using std::cin; -using std::endl; -using std::stringstream; +using namespace std; FairMQProxy proxy; @@ -52,18 +51,100 @@ static void s_catch_signals(void) sigaction(SIGTERM, &action, NULL); } -int main(int argc, char** argv) +typedef struct DeviceOptions { - if (argc != 11) + string id; + int ioThreads; + string inputSocketType; + int inputBufSize; + string inputMethod; + string inputAddress; + string outputSocketType; + int outputBufSize; + string outputMethod; + string outputAddress; +} DeviceOptions_t; + +inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) +{ + if (_options == NULL) + throw std::runtime_error("Internal error: options' container is empty."); + + namespace bpo = boost::program_options; + bpo::options_description desc("Options"); + desc.add_options() + ("id", bpo::value()->required(), "Device ID") + ("io-threads", bpo::value()->default_value(1), "Number of I/O threads") + ("input-socket-type", bpo::value()->required(), "Input socket type: sub/pull") + ("input-buff-size", bpo::value()->required(), "Input buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") + ("input-method", bpo::value()->required(), "Input method: bind/connect") + ("input-address", bpo::value()->required(), "Input address, e.g.: \"tcp://localhost:5555\"") + ("output-socket-type", bpo::value()->required(), "Output socket type: pub/push") + ("output-buff-size", bpo::value()->required(), "Output buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") + ("output-method", bpo::value()->required(), "Output method: bind/connect") + ("output-address", bpo::value()->required(), "Output address, e.g.: \"tcp://localhost:5555\"") + ("help", "Print help messages"); + + bpo::variables_map vm; + bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm); + + if ( vm.count("help") ) { - cout << "Usage: proxy \tID numIoTreads\n" - << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" - << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << endl; - return 1; + LOG(INFO) << "FairMQ Proxy" << endl << desc; + return false; } + bpo::notify(vm); + + if ( vm.count("id") ) + _options->id = vm["id"].as(); + + if ( vm.count("io-threads") ) + _options->ioThreads = vm["io-threads"].as(); + + if ( vm.count("input-socket-type") ) + _options->inputSocketType = vm["input-socket-type"].as(); + + if ( vm.count("input-buff-size") ) + _options->inputBufSize = vm["input-buff-size"].as(); + + if ( vm.count("input-method") ) + _options->inputMethod = vm["input-method"].as(); + + if ( vm.count("input-address") ) + _options->inputAddress = vm["input-address"].as(); + + if ( vm.count("output-socket-type") ) + _options->outputSocketType = vm["output-socket-type"].as(); + + if ( vm.count("output-buff-size") ) + _options->outputBufSize = vm["output-buff-size"].as(); + + if ( vm.count("output-method") ) + _options->outputMethod = vm["output-method"].as(); + + if ( vm.count("output-address") ) + _options->outputAddress = vm["output-address"].as(); + + return true; +} + +int main(int argc, char** argv) +{ s_catch_signals(); + DeviceOptions_t options; + try + { + if (!parse_cmd_line(argc, argv, &options)) + return 0; + } + catch (exception& e) + { + LOG(ERROR) << e.what(); + return 1; + } + LOG(INFO) << "PID: " << getpid(); #ifdef NANOMSG @@ -74,42 +155,23 @@ int main(int argc, char** argv) proxy.SetTransport(transportFactory); - int i = 1; - - proxy.SetProperty(FairMQProxy::Id, argv[i]); - ++i; - - int numIoThreads; - stringstream(argv[i]) >> numIoThreads; - proxy.SetProperty(FairMQProxy::NumIoThreads, numIoThreads); - ++i; + proxy.SetProperty(FairMQProxy::Id, options.id); + proxy.SetProperty(FairMQProxy::NumIoThreads, options.ioThreads); proxy.SetProperty(FairMQProxy::NumInputs, 1); proxy.SetProperty(FairMQProxy::NumOutputs, 1); proxy.ChangeState(FairMQProxy::INIT); - proxy.SetProperty(FairMQProxy::InputSocketType, argv[i], 0); - ++i; - int inputRcvBufSize; - stringstream(argv[i]) >> inputRcvBufSize; - proxy.SetProperty(FairMQProxy::InputRcvBufSize, inputRcvBufSize, 0); - ++i; - proxy.SetProperty(FairMQProxy::InputMethod, argv[i], 0); - ++i; - proxy.SetProperty(FairMQProxy::InputAddress, argv[i], 0); - ++i; + proxy.SetProperty(FairMQProxy::InputSocketType, options.inputSocketType); + proxy.SetProperty(FairMQProxy::InputSndBufSize, options.inputBufSize); + proxy.SetProperty(FairMQProxy::InputMethod, options.inputMethod); + proxy.SetProperty(FairMQProxy::InputAddress, options.inputAddress); - proxy.SetProperty(FairMQProxy::OutputSocketType, argv[i], 0); - ++i; - int outputSndBufSize; - stringstream(argv[i]) >> outputSndBufSize; - proxy.SetProperty(FairMQProxy::OutputSndBufSize, outputSndBufSize, 0); - ++i; - proxy.SetProperty(FairMQProxy::OutputMethod, argv[i], 0); - ++i; - proxy.SetProperty(FairMQProxy::OutputAddress, argv[i], 0); - ++i; + proxy.SetProperty(FairMQProxy::OutputSocketType, options.outputSocketType); + proxy.SetProperty(FairMQProxy::OutputSndBufSize, options.outputBufSize); + proxy.SetProperty(FairMQProxy::OutputMethod, options.outputMethod); + proxy.SetProperty(FairMQProxy::OutputAddress, options.outputAddress); proxy.ChangeState(FairMQProxy::SETOUTPUT); proxy.ChangeState(FairMQProxy::SETINPUT); diff --git a/fairmq/run/runSink.cxx b/fairmq/run/runSink.cxx index 02c35ce9..8e9de458 100644 --- a/fairmq/run/runSink.cxx +++ b/fairmq/run/runSink.cxx @@ -15,6 +15,8 @@ #include #include +#include "boost/program_options.hpp" + #include "FairMQLogger.h" #include "FairMQSink.h" @@ -24,10 +26,7 @@ #include "FairMQTransportFactoryZMQ.h" #endif -using std::cout; -using std::cin; -using std::endl; -using std::stringstream; +using namespace std; FairMQSink sink; @@ -52,17 +51,80 @@ static void s_catch_signals(void) sigaction(SIGTERM, &action, NULL); } -int main(int argc, char** argv) +typedef struct DeviceOptions { - if (argc != 7) + string id; + int ioThreads; + string inputSocketType; + int inputBufSize; + string inputMethod; + string inputAddress; +} DeviceOptions_t; + +inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) +{ + if (_options == NULL) + throw std::runtime_error("Internal error: options' container is empty."); + + namespace bpo = boost::program_options; + bpo::options_description desc("Options"); + desc.add_options() + ("id", bpo::value()->required(), "Device ID") + ("io-threads", bpo::value()->default_value(1), "Number of I/O threads") + ("input-socket-type", bpo::value()->required(), "Input socket type: sub/pull") + ("input-buff-size", bpo::value()->required(), "Input buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") + ("input-method", bpo::value()->required(), "Input method: bind/connect") + ("input-address", bpo::value()->required(), "Input address, e.g.: \"tcp://*:5555\"") + ("help", "Print help messages"); + + bpo::variables_map vm; + bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm); + + if ( vm.count("help") ) { - cout << "Usage: sink \tID numIoTreads\n" - << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" << endl; - return 1; + LOG(INFO) << "FairMQ Sink" << endl << desc; + return false; } + bpo::notify(vm); + + if ( vm.count("id") ) + _options->id = vm["id"].as(); + + if ( vm.count("io-threads") ) + _options->ioThreads = vm["io-threads"].as(); + + if ( vm.count("input-socket-type") ) + _options->inputSocketType = vm["input-socket-type"].as(); + + if ( vm.count("input-buff-size") ) + _options->inputBufSize = vm["input-buff-size"].as(); + + if ( vm.count("input-method") ) + _options->inputMethod = vm["input-method"].as(); + + if ( vm.count("input-address") ) + _options->inputAddress = vm["input-address"].as(); + + return true; +} + +int main(int argc, char** argv) +{ s_catch_signals(); + DeviceOptions_t options; + try + { + if (!parse_cmd_line(argc, argv, &options)) + return 0; + } + catch (exception& e) + { + LOG(ERROR) << e.what(); + return 1; + } + LOG(INFO) << "PID: " << getpid(); #ifdef NANOMSG @@ -73,31 +135,18 @@ int main(int argc, char** argv) sink.SetTransport(transportFactory); - int i = 1; - - sink.SetProperty(FairMQSink::Id, argv[i]); - ++i; - - int numIoThreads; - stringstream(argv[i]) >> numIoThreads; - sink.SetProperty(FairMQSink::NumIoThreads, numIoThreads); - ++i; + sink.SetProperty(FairMQSink::Id, options.id); + sink.SetProperty(FairMQSink::NumIoThreads, options.ioThreads); sink.SetProperty(FairMQSink::NumInputs, 1); sink.SetProperty(FairMQSink::NumOutputs, 0); sink.ChangeState(FairMQSink::INIT); - sink.SetProperty(FairMQSink::InputSocketType, argv[i], 0); - ++i; - int inputRcvBufSize; - stringstream(argv[i]) >> inputRcvBufSize; - sink.SetProperty(FairMQSink::InputRcvBufSize, inputRcvBufSize, 0); - ++i; - sink.SetProperty(FairMQSink::InputMethod, argv[i], 0); - ++i; - sink.SetProperty(FairMQSink::InputAddress, argv[i], 0); - ++i; + sink.SetProperty(FairMQSink::InputSocketType, options.inputSocketType); + sink.SetProperty(FairMQSink::InputRcvBufSize, options.inputBufSize); + sink.SetProperty(FairMQSink::InputMethod, options.inputMethod); + sink.SetProperty(FairMQSink::InputAddress, options.inputAddress); sink.ChangeState(FairMQSink::SETOUTPUT); sink.ChangeState(FairMQSink::SETINPUT); diff --git a/fairmq/run/runSplitter.cxx b/fairmq/run/runSplitter.cxx index 8babee06..8a28655b 100644 --- a/fairmq/run/runSplitter.cxx +++ b/fairmq/run/runSplitter.cxx @@ -15,6 +15,8 @@ #include #include +#include "boost/program_options.hpp" + #include "FairMQLogger.h" #include "FairMQSplitter.h" @@ -24,10 +26,7 @@ #include "FairMQTransportFactoryZMQ.h" #endif -using std::cout; -using std::cin; -using std::endl; -using std::stringstream; +using namespace std; FairMQSplitter splitter; @@ -52,20 +51,105 @@ static void s_catch_signals(void) sigaction(SIGTERM, &action, NULL); } -int main(int argc, char** argv) +typedef struct DeviceOptions { - if (argc < 16 || (argc - 8) % 4 != 0) + string id; + int ioThreads; + int numOutputs; + string inputSocketType; + int inputBufSize; + string inputMethod; + string inputAddress; + vector outputSocketType; + vector outputBufSize; + vector outputMethod; + vector outputAddress; +} DeviceOptions_t; + +inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) +{ + if (_options == NULL) + throw std::runtime_error("Internal error: options' container is empty."); + + namespace bpo = boost::program_options; + bpo::options_description desc("Options"); + desc.add_options() + ("id", bpo::value()->required(), "Device ID") + ("io-threads", bpo::value()->default_value(1), "Number of I/O threads") + ("num-outputs", bpo::value()->required(), "Number of Splitter output sockets") + ("input-socket-type", bpo::value()->required(), "Input socket type: sub/pull") + ("input-buff-size", bpo::value()->required(), "Input buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") + ("input-method", bpo::value()->required(), "Input method: bind/connect") + ("input-address", bpo::value()->required(), "Input address, e.g.: \"tcp://localhost:5555\"") + ("output-socket-type", bpo::value< vector >()->required(), "Output socket type: pub/push") + ("output-buff-size", bpo::value< vector >()->required(), "Output buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") + ("output-method", bpo::value< vector >()->required(), "Output method: bind/connect") + ("output-address", bpo::value< vector >()->required(), "Output address, e.g.: \"tcp://localhost:5555\"") + ("help", "Print help messages"); + + bpo::variables_map vm; + bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm); + + if ( vm.count("help") ) { - cout << "Usage: splitter \tID numIoTreads numOutputs\n" - << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" - << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" - << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" - << "\t\t..." << argc << " arguments provided" << endl; - return 1; + LOG(INFO) << "FairMQ Splitter" << endl << desc; + return false; } + bpo::notify(vm); + + if ( vm.count("id") ) + _options->id = vm["id"].as(); + + if ( vm.count("io-threads") ) + _options->ioThreads = vm["io-threads"].as(); + + if ( vm.count("num-outputs") ) + _options->numOutputs = vm["num-outputs"].as(); + + if ( vm.count("input-socket-type") ) + _options->inputSocketType = vm["input-socket-type"].as(); + + if ( vm.count("input-buff-size") ) + _options->inputBufSize = vm["input-buff-size"].as(); + + if ( vm.count("input-method") ) + _options->inputMethod = vm["input-method"].as(); + + if ( vm.count("input-address") ) + _options->inputAddress = vm["input-address"].as(); + + if ( vm.count("output-socket-type") ) + _options->outputSocketType = vm["output-socket-type"].as< vector >(); + + if ( vm.count("output-buff-size") ) + _options->outputBufSize = vm["output-buff-size"].as< vector >(); + + if ( vm.count("output-method") ) + _options->outputMethod = vm["output-method"].as< vector >(); + + if ( vm.count("output-address") ) + _options->outputAddress = vm["output-address"].as< vector >(); + + return true; +} + +int main(int argc, char** argv) +{ s_catch_signals(); + DeviceOptions_t options; + try + { + if (!parse_cmd_line(argc, argv, &options)) + return 0; + } + catch (exception& e) + { + LOG(ERROR) << e.what(); + return 1; + } + LOG(INFO) << "PID: " << getpid(); #ifdef NANOMSG @@ -76,48 +160,25 @@ int main(int argc, char** argv) splitter.SetTransport(transportFactory); - int i = 1; - - splitter.SetProperty(FairMQSplitter::Id, argv[i]); - ++i; - - int numIoThreads; - stringstream(argv[i]) >> numIoThreads; - splitter.SetProperty(FairMQSplitter::NumIoThreads, numIoThreads); - ++i; + splitter.SetProperty(FairMQSplitter::Id, options.id); + splitter.SetProperty(FairMQSplitter::NumIoThreads, options.ioThreads); splitter.SetProperty(FairMQSplitter::NumInputs, 1); - - int numOutputs; - stringstream(argv[i]) >> numOutputs; - splitter.SetProperty(FairMQSplitter::NumOutputs, numOutputs); - ++i; + splitter.SetProperty(FairMQSplitter::NumOutputs, options.numOutputs); splitter.ChangeState(FairMQSplitter::INIT); - splitter.SetProperty(FairMQSplitter::InputSocketType, argv[i], 0); - ++i; - int inputRcvBufSize; - stringstream(argv[i]) >> inputRcvBufSize; - splitter.SetProperty(FairMQSplitter::InputRcvBufSize, inputRcvBufSize, 0); - ++i; - splitter.SetProperty(FairMQSplitter::InputMethod, argv[i], 0); - ++i; - splitter.SetProperty(FairMQSplitter::InputAddress, argv[i], 0); - ++i; + splitter.SetProperty(FairMQSplitter::InputSocketType, options.inputSocketType); + splitter.SetProperty(FairMQSplitter::InputSndBufSize, options.inputBufSize); + splitter.SetProperty(FairMQSplitter::InputMethod, options.inputMethod); + splitter.SetProperty(FairMQSplitter::InputAddress, options.inputAddress); - int outputSndBufSize; - for (int iOutput = 0; iOutput < numOutputs; iOutput++) + for (int i = 0; i < options.numOutputs; ++i) { - splitter.SetProperty(FairMQSplitter::OutputSocketType, argv[i], iOutput); - ++i; - stringstream(argv[i]) >> outputSndBufSize; - splitter.SetProperty(FairMQSplitter::OutputSndBufSize, outputSndBufSize, iOutput); - ++i; - splitter.SetProperty(FairMQSplitter::OutputMethod, argv[i], iOutput); - ++i; - splitter.SetProperty(FairMQSplitter::OutputAddress, argv[i], iOutput); - ++i; + splitter.SetProperty(FairMQSplitter::OutputSocketType, options.outputSocketType.at(i), i); + splitter.SetProperty(FairMQSplitter::OutputRcvBufSize, options.outputBufSize.at(i), i); + splitter.SetProperty(FairMQSplitter::OutputMethod, options.outputMethod.at(i), i); + splitter.SetProperty(FairMQSplitter::OutputAddress, options.outputAddress.at(i), i); } splitter.ChangeState(FairMQSplitter::SETOUTPUT);