From 9c4d64f3b178381937ffa80094055f7b146a4cc0 Mon Sep 17 00:00:00 2001 From: Mohammad Al-Turany Date: Fri, 1 Nov 2013 11:24:47 +0000 Subject: [PATCH] New files from Ralf Kliemt git-svn-id: https://subversion.gsi.de/fairroot/fairbase/trunk@22606 0381ead4-6506-0410-b988-94b70fbc4730 --- fairmq/CMakeLists.txt | 2 +- fairmq/FairMQStandaloneMerger.cxx | 35 ++++----- fairmq/runNToOneMerger.cxx | 124 ++++++++++++++++++++++++++++++ fairmq/runOneToNSplitter.cxx | 122 +++++++++++++++++++++++++++++ 4 files changed, 260 insertions(+), 23 deletions(-) create mode 100644 fairmq/runNToOneMerger.cxx create mode 100644 fairmq/runOneToNSplitter.cxx diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index ba3dd4ae..013fcf02 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -43,7 +43,7 @@ Set(DEPENDENCIES GENERATE_LIBRARY() Set(Exe_Names bsampler buffer splitter merger sink proxy) -Set(Exe_Source runBenchmarkSampler.cxx runBuffer.cxx runSplitter.cxx runMerger.cxx runSink.cxx runProxy.cxx) +Set(Exe_Source runBenchmarkSampler.cxx runBuffer.cxx runSplitter.cxx runMerger.cxx runSink.cxx runProxy.cxx runNToOneMerger.cxx runOneToNSplitter.cxx) List(LENGTH Exe_Names _length) Math(EXPR _length ${_length}-1) diff --git a/fairmq/FairMQStandaloneMerger.cxx b/fairmq/FairMQStandaloneMerger.cxx index 82036864..d9bcdbe2 100644 --- a/fairmq/FairMQStandaloneMerger.cxx +++ b/fairmq/FairMQStandaloneMerger.cxx @@ -25,11 +25,11 @@ void FairMQStandaloneMerger::Run() boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); - // Initialize poll set - zmq_pollitem_t items[] = { - { *(fPayloadInputs->at(0)->GetSocket()), 0, ZMQ_POLLIN, 0 }, - { *(fPayloadInputs->at(1)->GetSocket()), 0, ZMQ_POLLIN, 0 } - }; + zmq_pollitem_t items[fNumInputs]; + for (Int_t iInput = 0; iInput < fNumInputs; iInput++) { + zmq_pollitem_t tempitem( {*(fPayloadInputs->at(iInput)->GetSocket()), 0, ZMQ_POLLIN, 0}); + items[iInput] = tempitem; + } Bool_t received = false; @@ -38,24 +38,15 @@ void FairMQStandaloneMerger::Run() zmq_poll(items, fNumInputs, 100); - if (items[0].revents & ZMQ_POLLIN) { - received = fPayloadInputs->at(0)->Receive(&msg); + for(Int_t iItem = 0; iItem < fNumInputs; iItem++) { + if (items[iItem].revents & ZMQ_POLLIN) { + received = fPayloadInputs->at(iItem)->Receive(&msg); + } + if (received) { + fPayloadOutputs->at(0)->Send(&msg); + received = false; + } } - - if (received) { - fPayloadOutputs->at(0)->Send(&msg); - received = false; - } - - if (items[1].revents & ZMQ_POLLIN) { - received = fPayloadInputs->at(1)->Receive(&msg); - } - - if (received) { - fPayloadOutputs->at(0)->Send(&msg); - received = false; - } - } rateLogger.interrupt(); diff --git a/fairmq/runNToOneMerger.cxx b/fairmq/runNToOneMerger.cxx new file mode 100644 index 00000000..96a7dcaf --- /dev/null +++ b/fairmq/runNToOneMerger.cxx @@ -0,0 +1,124 @@ +/* + * runMerger.cxx + * + * Created on: Dec 6, 2012 + * Author: dklein + */ + +#include +#include + +#include "FairMQLogger.h" +#include "FairMQStandaloneMerger.h" + + +FairMQStandaloneMerger merger; + +static void s_signal_handler (int signal) +{ + std::cout << std::endl << "Caught signal " << signal << std::endl; + + merger.ChangeState(FairMQStandaloneMerger::STOP); + merger.ChangeState(FairMQStandaloneMerger::END); + + std::cout << "Shutdown complete. Bye!" << std::endl; + exit(1); +} + +static void s_catch_signals (void) +{ + struct sigaction action; + action.sa_handler = s_signal_handler; + action.sa_flags = 0; + sigemptyset(&action.sa_mask); + sigaction(SIGINT, &action, NULL); + sigaction(SIGTERM, &action, NULL); +} + +int main(int argc, char** argv) +{ + if ( argc < 16 || (argc-8)%4!=0 ) { + std::cout << "Usage: merger \tID numIoTreads numInputs\n" + << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" + << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" + << "\t\t...\n" + << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" + << argc << std::endl; + return 1; + } + + s_catch_signals(); + + std::stringstream logmsg; + logmsg << "PID: " << getpid(); + FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); + + int i = 1; + + merger.SetProperty(FairMQStandaloneMerger::Id, argv[i]); + ++i; + + int numIoThreads; + std::stringstream(argv[i]) >> numIoThreads; + merger.SetProperty(FairMQStandaloneMerger::NumIoThreads, numIoThreads); + ++i; + + int numInputs; + std::stringstream(argv[i]) >> numInputs; + merger.SetProperty(FairMQStandaloneMerger::NumInputs, numInputs); + ++i; + + merger.SetProperty(FairMQStandaloneMerger::NumOutputs, 1); + + + merger.ChangeState(FairMQStandaloneMerger::INIT); + + + int inputSocketType; + for (int iInput = 0; iInput < numInputs; iInput++ ) { + inputSocketType = ZMQ_SUB; + if (strcmp(argv[i], "pull") == 0) { + inputSocketType = ZMQ_PULL; + } + merger.SetProperty(FairMQStandaloneMerger::InputSocketType, inputSocketType, iInput); + ++i; + int inputRcvBufSize; + std::stringstream(argv[i]) >> inputRcvBufSize; + merger.SetProperty(FairMQStandaloneMerger::InputRcvBufSize, inputRcvBufSize, iInput); + ++i; + merger.SetProperty(FairMQStandaloneMerger::InputMethod, argv[i], iInput); + ++i; + merger.SetProperty(FairMQStandaloneMerger::InputAddress, argv[i], iInput); + ++i; + } + + int outputSocketType = ZMQ_PUB; + if (strcmp(argv[i], "push") == 0) { + outputSocketType = ZMQ_PUSH; + } + merger.SetProperty(FairMQStandaloneMerger::OutputSocketType, outputSocketType, 0); + ++i; + int outputSndBufSize; + std::stringstream(argv[i]) >> outputSndBufSize; + merger.SetProperty(FairMQStandaloneMerger::OutputSndBufSize, outputSndBufSize, 0); + ++i; + merger.SetProperty(FairMQStandaloneMerger::OutputMethod, argv[i], 0); + ++i; + merger.SetProperty(FairMQStandaloneMerger::OutputAddress, argv[i], 0); + ++i; + + + merger.ChangeState(FairMQStandaloneMerger::SETOUTPUT); + merger.ChangeState(FairMQStandaloneMerger::SETINPUT); + merger.ChangeState(FairMQStandaloneMerger::RUN); + + + char ch; + std::cin.get(ch); + + merger.ChangeState(FairMQStandaloneMerger::STOP); + merger.ChangeState(FairMQStandaloneMerger::END); + + return 0; +} + diff --git a/fairmq/runOneToNSplitter.cxx b/fairmq/runOneToNSplitter.cxx new file mode 100644 index 00000000..0b39d825 --- /dev/null +++ b/fairmq/runOneToNSplitter.cxx @@ -0,0 +1,122 @@ +/* + * runSplitter.cxx + * + * Created on: Dec 6, 2012 + * Author: dklein + */ + +#include +#include + +#include "FairMQLogger.h" +#include "FairMQBalancedStandaloneSplitter.h" + + +FairMQBalancedStandaloneSplitter splitter; + +static void s_signal_handler (int signal) +{ + std::cout << std::endl << "Caught signal " << signal << std::endl; + + splitter.ChangeState(FairMQBalancedStandaloneSplitter::STOP); + splitter.ChangeState(FairMQBalancedStandaloneSplitter::END); + + std::cout << "Shutdown complete. Bye!" << std::endl; + exit(1); +} + +static void s_catch_signals (void) +{ + struct sigaction action; + action.sa_handler = s_signal_handler; + action.sa_flags = 0; + sigemptyset(&action.sa_mask); + sigaction(SIGINT, &action, NULL); + sigaction(SIGTERM, &action, NULL); +} + +int main(int argc, char** argv) +{ + if ( argc < 16 || (argc-8)%4!=0 ) { //argc{name,id,threads,nout,insock,inbuff,inmet,inadd, ... out} + std::cout << "Usage: splitter \tID numIoTreads numOutputs\n" + << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" + << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" + << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" + << "\t\t..." << argc << " arguments provided" << std::endl; + return 1; + } + + s_catch_signals(); + + std::stringstream logmsg; + logmsg << "PID: " << getpid(); + FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); + + int i = 1; + + splitter.SetProperty(FairMQBalancedStandaloneSplitter::Id, argv[i]); + ++i; + + int numIoThreads; + std::stringstream(argv[i]) >> numIoThreads; + splitter.SetProperty(FairMQBalancedStandaloneSplitter::NumIoThreads, numIoThreads); + ++i; + + splitter.SetProperty(FairMQBalancedStandaloneSplitter::NumInputs, 1); + + int numOutputs; + std::stringstream(argv[i]) >> numOutputs; + splitter.SetProperty(FairMQBalancedStandaloneSplitter::NumOutputs, numOutputs); + ++i; + + + splitter.ChangeState(FairMQBalancedStandaloneSplitter::INIT); + + + int inputSocketType = ZMQ_SUB; + if (strcmp(argv[i], "pull") == 0) { + inputSocketType = ZMQ_PULL; + } + splitter.SetProperty(FairMQBalancedStandaloneSplitter::InputSocketType, inputSocketType, 0); + ++i; + int inputRcvBufSize; + std::stringstream(argv[i]) >> inputRcvBufSize; + splitter.SetProperty(FairMQBalancedStandaloneSplitter::InputRcvBufSize, inputRcvBufSize, 0); + ++i; + splitter.SetProperty(FairMQBalancedStandaloneSplitter::InputMethod, argv[i], 0); + ++i; + splitter.SetProperty(FairMQBalancedStandaloneSplitter::InputAddress, argv[i], 0); + ++i; + + int outputSocketType; + int outputSndBufSize; + for (int iOutput = 0; iOutput < numOutputs; iOutput++) { + outputSocketType = ZMQ_PUB; + if (strcmp(argv[i], "push") == 0) { + outputSocketType = ZMQ_PUSH; + } + splitter.SetProperty(FairMQBalancedStandaloneSplitter::OutputSocketType, outputSocketType, iOutput); + ++i; + std::stringstream(argv[i]) >> outputSndBufSize; + splitter.SetProperty(FairMQBalancedStandaloneSplitter::OutputSndBufSize, outputSndBufSize, iOutput); + ++i; + splitter.SetProperty(FairMQBalancedStandaloneSplitter::OutputMethod, argv[i], iOutput); + ++i; + splitter.SetProperty(FairMQBalancedStandaloneSplitter::OutputAddress, argv[i], iOutput); + ++i; + } + + splitter.ChangeState(FairMQBalancedStandaloneSplitter::SETOUTPUT); + splitter.ChangeState(FairMQBalancedStandaloneSplitter::SETINPUT); + splitter.ChangeState(FairMQBalancedStandaloneSplitter::RUN); + + + char ch; + std::cin.get(ch); + + splitter.ChangeState(FairMQBalancedStandaloneSplitter::STOP); + splitter.ChangeState(FairMQBalancedStandaloneSplitter::END); + + return 0; +} +