diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index 4ce8b438..b41fdb86 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -3,15 +3,25 @@ set(INCLUDE_DIRECTORIES ${Boost_INCLUDE_DIR} ) +if(PROTOBUF_FOUND) + set(INCLUDE_DIRECTORIES + ${INCLUDE_DIRECTORIES} + ${PROTOBUF_INCLUDE_DIR} + ${CMAKE_SOURCE_DIR}/fairmq/prototest + ) +endif(PROTOBUF_FOUND) + if(NANOMSG_FOUND) set(INCLUDE_DIRECTORIES ${INCLUDE_DIRECTORIES} ${NANOMSG_LIBRARY_SHARED} + ${CMAKE_SOURCE_DIR}/fairmq/nanomsg ) else(NANOMSG_FOUND) set(INCLUDE_DIRECTORIES ${INCLUDE_DIRECTORIES} ${ZMQ_INCLUDE_DIR} + ${CMAKE_SOURCE_DIR}/fairmq/zeromq ) endif(NANOMSG_FOUND) @@ -40,6 +50,21 @@ set(SRCS "FairMQPoller.cxx" ) +if(PROTOBUF_FOUND) + set(SRCS + ${SRCS} + "prototest/payload.pb.cc" + "prototest/FairMQProtoSampler.cxx" + "prototest/FairMQBinSampler.cxx" + "prototest/FairMQBinSink.cxx" + "prototest/FairMQProtoSink.cxx" + ) + set(DEPENDENCIES + ${DEPENDENCIES} + ${PROTOBUF_LIBRARY} + ) +endif(PROTOBUF_FOUND) + if(NANOMSG_FOUND) set(SRCS ${SRCS} @@ -49,6 +74,7 @@ if(NANOMSG_FOUND) "nanomsg/FairMQPollerNN.cxx" ) set(DEPENDENCIES + ${DEPENDENCIES} ${NANOMSG_LIBRARY_SHARED} ) else(NANOMSG_FOUND) @@ -61,6 +87,7 @@ else(NANOMSG_FOUND) "zeromq/FairMQContextZMQ.cxx" ) set(DEPENDENCIES + ${DEPENDENCIES} ${ZMQ_LIBRARY_SHARED} ) endif(NANOMSG_FOUND) @@ -75,31 +102,36 @@ set(LIBRARY_NAME FairMQ) GENERATE_LIBRARY() -set(Exe_Names bsampler buffer splitter merger sink proxy n_one_merger one_n_splitter) +set(Exe_Names bsampler buffer splitter merger sink proxy) -if(NANOMSG_FOUND) - set(Exe_Source - nanomsg/runBenchmarkSampler.cxx - nanomsg/runBuffer.cxx - nanomsg/runSplitter.cxx - nanomsg/runMerger.cxx - nanomsg/runSink.cxx - nanomsg/runProxy.cxx - nanomsg/runNToOneMerger.cxx - nanomsg/runOneToNSplitter.cxx - ) -else(NANOMSG_FOUND) - set(Exe_Source - zeromq/runBenchmarkSampler.cxx - zeromq/runBuffer.cxx - zeromq/runSplitter.cxx - zeromq/runMerger.cxx - zeromq/runSink.cxx - zeromq/runProxy.cxx - zeromq/runNToOneMerger.cxx - zeromq/runOneToNSplitter.cxx - ) -endif(NANOMSG_FOUND) +if(PROTOBUF_FOUND) + set(Exe_Names + ${Exe_Names} + binsampler + protosampler + binsink + protosink + ) +endif(PROTOBUF_FOUND) + +set(Exe_Source + run/runBenchmarkSampler.cxx + run/runBuffer.cxx + run/runSplitter.cxx + run/runMerger.cxx + run/runSink.cxx + run/runProxy.cxx +) + +if(PROTOBUF_FOUND) + set(Exe_Source + ${Exe_Source} + run/runBinSampler.cxx + run/runProtoSampler.cxx + run/runBinSink.cxx + run/runProtoSink.cxx + ) +endif(PROTOBUF_FOUND) list(LENGTH Exe_Names _length) math(EXPR _length ${_length}-1) diff --git a/fairmq/FairMQBenchmarkSampler.cxx b/fairmq/FairMQBenchmarkSampler.cxx index fe4428f9..ec4ac995 100644 --- a/fairmq/FairMQBenchmarkSampler.cxx +++ b/fairmq/FairMQBenchmarkSampler.cxx @@ -39,13 +39,13 @@ void FairMQBenchmarkSampler::Run() boost::thread resetEventCounter(boost::bind(&FairMQBenchmarkSampler::ResetEventCounter, this)); void* buffer = operator new[](fEventSize); - FairMQMessage* base_event = fTransportFactory->CreateMessage(buffer, fEventSize); + FairMQMessage* base_msg = fTransportFactory->CreateMessage(buffer, fEventSize); while ( fState == RUNNING ) { - FairMQMessage* event = fTransportFactory->CreateMessage(); - event->Copy(base_event); + FairMQMessage* msg = fTransportFactory->CreateMessage(); + msg->Copy(base_msg); - fPayloadOutputs->at(0)->Send(event); + fPayloadOutputs->at(0)->Send(msg); --fEventCounter; @@ -53,10 +53,10 @@ void FairMQBenchmarkSampler::Run() boost::this_thread::sleep(boost::posix_time::milliseconds(1)); } - delete event; + delete msg; } - delete base_event; + delete base_msg; rateLogger.interrupt(); resetEventCounter.interrupt(); diff --git a/fairmq/FairMQBenchmarkSampler.h b/fairmq/FairMQBenchmarkSampler.h index 1d293c08..3274db0d 100644 --- a/fairmq/FairMQBenchmarkSampler.h +++ b/fairmq/FairMQBenchmarkSampler.h @@ -33,6 +33,7 @@ class FairMQBenchmarkSampler: public FairMQDevice virtual string GetProperty(const int key, const string& default_ = "", const int slot = 0); virtual void SetProperty(const int key, const int value, const int slot = 0); virtual int GetProperty(const int key, const int default_ = 0, const int slot = 0); + protected: int fEventSize; int fEventRate; diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index 612d9ba9..0a5b0c73 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -315,6 +315,11 @@ void FairMQDevice::LogSocketRates() i = 0; for ( vector::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++ ) { + + // #ifdef NANOMSG + // LOG(ERROR) << "OK THEN"; + // #endif + bytesOutputNew[i] = (*itr)->GetBytesTx(); megabytesPerSecondOutput[i] = ((double) (bytesOutputNew[i] - bytesOutput[i]) / (1024. * 1024.)) / (double) timeSinceLastLog_ms * 1000.; bytesOutput[i] = bytesOutputNew[i]; diff --git a/fairmq/FairMQLogger.cxx b/fairmq/FairMQLogger.cxx index a1b94b4a..1fd53c6b 100644 --- a/fairmq/FairMQLogger.cxx +++ b/fairmq/FairMQLogger.cxx @@ -31,7 +31,7 @@ std::ostringstream& FairMQLogger::Log(int type) std::time_t t = s; std::size_t fractional_seconds = ms % 1000; char mbstr[100]; - std::strftime(mbstr, 100, "%H:%M:%S:", std::localtime(&t)); + std::strftime(mbstr, 100, "%H:%M:%S", std::localtime(&t)); string type_str; switch (type) { @@ -50,7 +50,7 @@ std::ostringstream& FairMQLogger::Log(int type) break; } - os << "[\033[01;36m" << mbstr << fractional_seconds << "\033[0m]" << "[" << type_str << "]" << " "; + os << "[\033[01;36m" << mbstr << "\033[0m]" << "[" << type_str << "]" << " "; return os; } diff --git a/fairmq/nanomsg/FairMQMessageNN.cxx b/fairmq/nanomsg/FairMQMessageNN.cxx index bb6320a8..313efdc9 100644 --- a/fairmq/nanomsg/FairMQMessageNN.cxx +++ b/fairmq/nanomsg/FairMQMessageNN.cxx @@ -14,7 +14,8 @@ FairMQMessageNN::FairMQMessageNN() : fSize(0), - fMessage(NULL) + fMessage(NULL), + fReceiving(false) { } @@ -25,6 +26,7 @@ FairMQMessageNN::FairMQMessageNN(size_t size) LOG(ERROR) << "failed allocating message, reason: " << nn_strerror(errno); } fSize = size; + fReceiving = false; } FairMQMessageNN::FairMQMessageNN(void* data, size_t size) @@ -35,6 +37,7 @@ FairMQMessageNN::FairMQMessageNN(void* data, size_t size) } memcpy (fMessage, data, size); fSize = size; + fReceiving = false; } void FairMQMessageNN::Rebuild() @@ -42,6 +45,7 @@ void FairMQMessageNN::Rebuild() Clear(); fSize = 0; fMessage = NULL; + fReceiving = false; } void FairMQMessageNN::Rebuild(size_t size) @@ -52,6 +56,7 @@ void FairMQMessageNN::Rebuild(size_t size) LOG(ERROR) << "failed allocating message, reason: " << nn_strerror(errno); } fSize = size; + fReceiving = false; } void FairMQMessageNN::Rebuild(void* data, size_t size) @@ -63,6 +68,7 @@ void FairMQMessageNN::Rebuild(void* data, size_t size) } memcpy (fMessage, data, size); fSize = size; + fReceiving = false; } void* FairMQMessageNN::GetMessage() @@ -118,4 +124,13 @@ inline void FairMQMessageNN::Clear() FairMQMessageNN::~FairMQMessageNN() { + if(fReceiving){ + int rc = nn_freemsg(fMessage); + if (rc < 0) { + LOG(ERROR) << "failed freeing message, reason: " << nn_strerror(errno); + } else { + fMessage = NULL; + fSize = 0; + } + } } diff --git a/fairmq/nanomsg/FairMQMessageNN.h b/fairmq/nanomsg/FairMQMessageNN.h index 4c91e1b2..09ac14b1 100644 --- a/fairmq/nanomsg/FairMQMessageNN.h +++ b/fairmq/nanomsg/FairMQMessageNN.h @@ -35,9 +35,12 @@ class FairMQMessageNN : public FairMQMessage virtual ~FairMQMessageNN(); + friend class FairMQSocketNN; + private: void* fMessage; size_t fSize; + bool fReceiving; void Clear(); }; diff --git a/fairmq/nanomsg/FairMQSocketNN.cxx b/fairmq/nanomsg/FairMQSocketNN.cxx index 2d20c978..038742db 100644 --- a/fairmq/nanomsg/FairMQSocketNN.cxx +++ b/fairmq/nanomsg/FairMQSocketNN.cxx @@ -8,6 +8,7 @@ #include #include "FairMQSocketNN.h" +#include "FairMQMessageNN.h" #include "FairMQLogger.h" FairMQSocketNN::FairMQSocketNN(const string& type, int num) : @@ -62,6 +63,7 @@ size_t FairMQSocketNN::Send(FairMQMessage* msg) } else { fBytesTx += rc; ++fMessagesTx; + static_cast(msg)->fReceiving = false; } return rc; @@ -77,6 +79,7 @@ size_t FairMQSocketNN::Receive(FairMQMessage* msg) fBytesRx += rc; ++fMessagesRx; msg->SetMessage(ptr, rc); + static_cast(msg)->fReceiving = true; } return rc; diff --git a/fairmq/nanomsg/FairMQSocketNN.h b/fairmq/nanomsg/FairMQSocketNN.h index 052d5535..e094a6b4 100644 --- a/fairmq/nanomsg/FairMQSocketNN.h +++ b/fairmq/nanomsg/FairMQSocketNN.h @@ -41,7 +41,6 @@ class FairMQSocketNN : public FairMQSocket static int GetConstant(const string& constant); - virtual ~FairMQSocketNN(); private: diff --git a/fairmq/nanomsg/runBuffer.cxx b/fairmq/nanomsg/runBuffer.cxx deleted file mode 100644 index 265186db..00000000 --- a/fairmq/nanomsg/runBuffer.cxx +++ /dev/null @@ -1,113 +0,0 @@ -/** - * runBuffer.cxx - * - * @since 2012-10-26 - * @author: D. Klein, A. Rybalchenko - */ - -#include -#include - -#include "FairMQLogger.h" -#include "FairMQBuffer.h" -#include "FairMQTransportFactoryNN.h" - -using std::cout; -using std::cin; -using std::endl; -using std::stringstream; - - -FairMQBuffer buffer; - -static void s_signal_handler (int signal) -{ - cout << endl << "Caught signal " << signal << endl; - - buffer.ChangeState(FairMQBuffer::STOP); - buffer.ChangeState(FairMQBuffer::END); - - cout << "Shutdown complete. Bye!" << endl; - exit(1); -} - -static void s_catch_signals (void) -{ - struct sigaction action; - action.sa_handler = s_signal_handler; - action.sa_flags = 0; - sigemptyset(&action.sa_mask); - sigaction(SIGINT, &action, NULL); - sigaction(SIGTERM, &action, NULL); -} - -int main(int argc, char** argv) -{ - if ( argc != 11 ) { - cout << "Usage: buffer \tID numIoTreads\n" - << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" - << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << endl; - return 1; - } - - s_catch_signals(); - - LOG(INFO) << "PID: " << getpid(); - - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); - buffer.SetTransport(transportFactory); - - int i = 1; - - buffer.SetProperty(FairMQBuffer::Id, argv[i]); - ++i; - - int numIoThreads; - stringstream(argv[i]) >> numIoThreads; - buffer.SetProperty(FairMQBuffer::NumIoThreads, numIoThreads); - ++i; - buffer.SetProperty(FairMQBuffer::NumInputs, 1); - buffer.SetProperty(FairMQBuffer::NumOutputs, 1); - - - buffer.ChangeState(FairMQBuffer::INIT); - - - buffer.SetProperty(FairMQBuffer::InputSocketType, argv[i], 0); - ++i; - int inputRcvBufSize; - stringstream(argv[i]) >> inputRcvBufSize; - buffer.SetProperty(FairMQBuffer::InputRcvBufSize, inputRcvBufSize, 0); - ++i; - buffer.SetProperty(FairMQBuffer::InputMethod, argv[i], 0); - ++i; - buffer.SetProperty(FairMQBuffer::InputAddress, argv[i], 0); - ++i; - - buffer.SetProperty(FairMQBuffer::OutputSocketType, argv[i], 0); - ++i; - int outputSndBufSize; - stringstream(argv[i]) >> outputSndBufSize; - buffer.SetProperty(FairMQBuffer::OutputSndBufSize, outputSndBufSize, 0); - ++i; - buffer.SetProperty(FairMQBuffer::OutputMethod, argv[i], 0); - ++i; - buffer.SetProperty(FairMQBuffer::OutputAddress, argv[i], 0); - ++i; - - - buffer.ChangeState(FairMQBuffer::SETOUTPUT); - buffer.ChangeState(FairMQBuffer::SETINPUT); - buffer.ChangeState(FairMQBuffer::RUN); - - - - char ch; - cin.get(ch); - - buffer.ChangeState(FairMQBuffer::STOP); - buffer.ChangeState(FairMQBuffer::END); - - return 0; -} - diff --git a/fairmq/nanomsg/runMerger.cxx b/fairmq/nanomsg/runMerger.cxx deleted file mode 100644 index a8c56e39..00000000 --- a/fairmq/nanomsg/runMerger.cxx +++ /dev/null @@ -1,124 +0,0 @@ -/** - * runMerger.cxx - * - * @since 2012-12-06 - * @author D. Klein, A. Rybalchenko - */ - -#include -#include - -#include "FairMQLogger.h" -#include "FairMQMerger.h" -#include "FairMQTransportFactoryNN.h" - -using std::cout; -using std::cin; -using std::endl; -using std::stringstream; - - -FairMQMerger merger; - -static void s_signal_handler (int signal) -{ - cout << endl << "Caught signal " << signal << endl; - - merger.ChangeState(FairMQMerger::STOP); - merger.ChangeState(FairMQMerger::END); - - cout << "Shutdown complete. Bye!" << endl; - exit(1); -} - -static void s_catch_signals (void) -{ - struct sigaction action; - action.sa_handler = s_signal_handler; - action.sa_flags = 0; - sigemptyset(&action.sa_mask); - sigaction(SIGINT, &action, NULL); - sigaction(SIGTERM, &action, NULL); -} - -int main(int argc, char** argv) -{ - if ( argc != 15 ) { - cout << "Usage: merger \tID numIoTreads\n" - << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" - << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" - << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << endl; - return 1; - } - - s_catch_signals(); - - LOG(INFO) << "PID: " << getpid(); - - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); - merger.SetTransport(transportFactory); - - int i = 1; - - merger.SetProperty(FairMQMerger::Id, argv[i]); - ++i; - - int numIoThreads; - stringstream(argv[i]) >> numIoThreads; - merger.SetProperty(FairMQMerger::NumIoThreads, numIoThreads); - ++i; - - merger.SetProperty(FairMQMerger::NumInputs, 2); - merger.SetProperty(FairMQMerger::NumOutputs, 1); - - - merger.ChangeState(FairMQMerger::INIT); - - - merger.SetProperty(FairMQMerger::InputSocketType, argv[i], 0); - ++i; - int inputRcvBufSize; - stringstream(argv[i]) >> inputRcvBufSize; - merger.SetProperty(FairMQMerger::InputRcvBufSize, inputRcvBufSize, 0); - ++i; - merger.SetProperty(FairMQMerger::InputMethod, argv[i], 0); - ++i; - merger.SetProperty(FairMQMerger::InputAddress, argv[i], 0); - ++i; - - merger.SetProperty(FairMQMerger::InputSocketType, argv[i], 1); - ++i; - stringstream(argv[i]) >> inputRcvBufSize; - merger.SetProperty(FairMQMerger::InputRcvBufSize, inputRcvBufSize, 1); - ++i; - merger.SetProperty(FairMQMerger::InputMethod, argv[i], 1); - ++i; - merger.SetProperty(FairMQMerger::InputAddress, argv[i], 1); - ++i; - - merger.SetProperty(FairMQMerger::OutputSocketType, argv[i], 0); - ++i; - int outputSndBufSize; - stringstream(argv[i]) >> outputSndBufSize; - merger.SetProperty(FairMQMerger::OutputSndBufSize, outputSndBufSize, 0); - ++i; - merger.SetProperty(FairMQMerger::OutputMethod, argv[i], 0); - ++i; - merger.SetProperty(FairMQMerger::OutputAddress, argv[i], 0); - ++i; - - - merger.ChangeState(FairMQMerger::SETOUTPUT); - merger.ChangeState(FairMQMerger::SETINPUT); - merger.ChangeState(FairMQMerger::RUN); - - - char ch; - cin.get(ch); - - merger.ChangeState(FairMQMerger::STOP); - merger.ChangeState(FairMQMerger::END); - - return 0; -} - diff --git a/fairmq/nanomsg/runNToOneMerger.cxx b/fairmq/nanomsg/runNToOneMerger.cxx deleted file mode 100644 index baa312d5..00000000 --- a/fairmq/nanomsg/runNToOneMerger.cxx +++ /dev/null @@ -1,122 +0,0 @@ -/** - * runNToOneMerger.cxx - * - * @since 2012-12-06 - * @author D. Klein, A. Rybalchenko - */ - -#include -#include - -#include "FairMQLogger.h" -#include "FairMQMerger.h" -#include "FairMQTransportFactoryNN.h" - -using std::cout; -using std::cin; -using std::endl; -using std::stringstream; - - -FairMQMerger merger; - -static void s_signal_handler (int signal) -{ - cout << endl << "Caught signal " << signal << endl; - - merger.ChangeState(FairMQMerger::STOP); - merger.ChangeState(FairMQMerger::END); - - cout << "Shutdown complete. Bye!" << endl; - exit(1); -} - -static void s_catch_signals (void) -{ - struct sigaction action; - action.sa_handler = s_signal_handler; - action.sa_flags = 0; - sigemptyset(&action.sa_mask); - sigaction(SIGINT, &action, NULL); - sigaction(SIGTERM, &action, NULL); -} - -int main(int argc, char** argv) -{ - if ( argc < 16 || (argc-8)%4!=0 ) { - cout << "Usage: merger \tID numIoTreads numInputs\n" - << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" - << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" - << "\t\t...\n" - << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" - << argc << endl; - return 1; - } - - s_catch_signals(); - - LOG(INFO) << "PID: " << getpid(); - - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); - merger.SetTransport(transportFactory); - - int i = 1; - - merger.SetProperty(FairMQMerger::Id, argv[i]); - ++i; - - int numIoThreads; - stringstream(argv[i]) >> numIoThreads; - merger.SetProperty(FairMQMerger::NumIoThreads, numIoThreads); - ++i; - - int numInputs; - stringstream(argv[i]) >> numInputs; - merger.SetProperty(FairMQMerger::NumInputs, numInputs); - ++i; - - merger.SetProperty(FairMQMerger::NumOutputs, 1); - - - merger.ChangeState(FairMQMerger::INIT); - - - for (int iInput = 0; iInput < numInputs; iInput++ ) { - merger.SetProperty(FairMQMerger::InputSocketType, argv[i], iInput); - ++i; - int inputRcvBufSize; - stringstream(argv[i]) >> inputRcvBufSize; - merger.SetProperty(FairMQMerger::InputRcvBufSize, inputRcvBufSize, iInput); - ++i; - merger.SetProperty(FairMQMerger::InputMethod, argv[i], iInput); - ++i; - merger.SetProperty(FairMQMerger::InputAddress, argv[i], iInput); - ++i; - } - - merger.SetProperty(FairMQMerger::OutputSocketType, argv[i], 0); - ++i; - int outputSndBufSize; - stringstream(argv[i]) >> outputSndBufSize; - merger.SetProperty(FairMQMerger::OutputSndBufSize, outputSndBufSize, 0); - ++i; - merger.SetProperty(FairMQMerger::OutputMethod, argv[i], 0); - ++i; - merger.SetProperty(FairMQMerger::OutputAddress, argv[i], 0); - ++i; - - - merger.ChangeState(FairMQMerger::SETOUTPUT); - merger.ChangeState(FairMQMerger::SETINPUT); - merger.ChangeState(FairMQMerger::RUN); - - - char ch; - cin.get(ch); - - merger.ChangeState(FairMQMerger::STOP); - merger.ChangeState(FairMQMerger::END); - - return 0; -} - diff --git a/fairmq/nanomsg/runOneToNSplitter.cxx b/fairmq/nanomsg/runOneToNSplitter.cxx deleted file mode 100644 index c2731e0d..00000000 --- a/fairmq/nanomsg/runOneToNSplitter.cxx +++ /dev/null @@ -1,120 +0,0 @@ -/** - * runOneToNSplitter.cxx - * - * @since 2012-12-06 - * @author D. Klein, A. Rybalchenko - */ - -#include -#include - -#include "FairMQLogger.h" -#include "FairMQSplitter.h" -#include "FairMQTransportFactoryNN.h" - -using std::cout; -using std::cin; -using std::endl; -using std::stringstream; - - -FairMQSplitter splitter; - -static void s_signal_handler (int signal) -{ - cout << endl << "Caught signal " << signal << endl; - - splitter.ChangeState(FairMQSplitter::STOP); - splitter.ChangeState(FairMQSplitter::END); - - cout << "Shutdown complete. Bye!" << endl; - exit(1); -} - -static void s_catch_signals (void) -{ - struct sigaction action; - action.sa_handler = s_signal_handler; - action.sa_flags = 0; - sigemptyset(&action.sa_mask); - sigaction(SIGINT, &action, NULL); - sigaction(SIGTERM, &action, NULL); -} - -int main(int argc, char** argv) -{ - if ( argc < 16 || (argc - 8) % 4 != 0 ) { // argc{ name, id, threads, nout, insock, inbuff, inmet, inadd, ... out} - cout << "Usage: splitter \tID numIoTreads numOutputs\n" - << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" - << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" - << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" - << "\t\t..." << argc << " arguments provided" << endl; - return 1; - } - - s_catch_signals(); - - LOG(INFO) << "PID: " << getpid(); - - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); - splitter.SetTransport(transportFactory); - - int i = 1; - - splitter.SetProperty(FairMQSplitter::Id, argv[i]); - ++i; - - int numIoThreads; - stringstream(argv[i]) >> numIoThreads; - splitter.SetProperty(FairMQSplitter::NumIoThreads, numIoThreads); - ++i; - - splitter.SetProperty(FairMQSplitter::NumInputs, 1); - - int numOutputs; - stringstream(argv[i]) >> numOutputs; - splitter.SetProperty(FairMQSplitter::NumOutputs, numOutputs); - ++i; - - - splitter.ChangeState(FairMQSplitter::INIT); - - - splitter.SetProperty(FairMQSplitter::InputSocketType, argv[i], 0); - ++i; - int inputRcvBufSize; - stringstream(argv[i]) >> inputRcvBufSize; - splitter.SetProperty(FairMQSplitter::InputRcvBufSize, inputRcvBufSize, 0); - ++i; - splitter.SetProperty(FairMQSplitter::InputMethod, argv[i], 0); - ++i; - splitter.SetProperty(FairMQSplitter::InputAddress, argv[i], 0); - ++i; - - int outputSndBufSize; - for (int iOutput = 0; iOutput < numOutputs; iOutput++) { - splitter.SetProperty(FairMQSplitter::OutputSocketType, argv[i], iOutput); - ++i; - stringstream(argv[i]) >> outputSndBufSize; - splitter.SetProperty(FairMQSplitter::OutputSndBufSize, outputSndBufSize, iOutput); - ++i; - splitter.SetProperty(FairMQSplitter::OutputMethod, argv[i], iOutput); - ++i; - splitter.SetProperty(FairMQSplitter::OutputAddress, argv[i], iOutput); - ++i; - } - - splitter.ChangeState(FairMQSplitter::SETOUTPUT); - splitter.ChangeState(FairMQSplitter::SETINPUT); - splitter.ChangeState(FairMQSplitter::RUN); - - - char ch; - cin.get(ch); - - splitter.ChangeState(FairMQSplitter::STOP); - splitter.ChangeState(FairMQSplitter::END); - - return 0; -} - diff --git a/fairmq/nanomsg/runProxy.cxx b/fairmq/nanomsg/runProxy.cxx deleted file mode 100644 index 39cd0fe8..00000000 --- a/fairmq/nanomsg/runProxy.cxx +++ /dev/null @@ -1,113 +0,0 @@ -/** - * runProxy.cxx - * - * @since 2013-10-07 - * @author A. Rybalchenko - */ - -#include -#include - -#include "FairMQLogger.h" -#include "FairMQProxy.h" -#include "FairMQTransportFactoryNN.h" - -using std::cout; -using std::cin; -using std::endl; -using std::stringstream; - - -FairMQProxy proxy; - -static void s_signal_handler (int signal) -{ - cout << endl << "Caught signal " << signal << endl; - - proxy.ChangeState(FairMQProxy::STOP); - proxy.ChangeState(FairMQProxy::END); - - cout << "Shutdown complete. Bye!" << endl; - exit(1); -} - -static void s_catch_signals (void) -{ - struct sigaction action; - action.sa_handler = s_signal_handler; - action.sa_flags = 0; - sigemptyset(&action.sa_mask); - sigaction(SIGINT, &action, NULL); - sigaction(SIGTERM, &action, NULL); -} - -int main(int argc, char** argv) -{ - if ( argc != 11 ) { - cout << "Usage: proxy \tID numIoTreads\n" - << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" - << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << endl; - return 1; - } - - s_catch_signals(); - - LOG(INFO) << "PID: " << getpid(); - - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); - proxy.SetTransport(transportFactory); - - int i = 1; - - proxy.SetProperty(FairMQProxy::Id, argv[i]); - ++i; - - int numIoThreads; - stringstream(argv[i]) >> numIoThreads; - proxy.SetProperty(FairMQProxy::NumIoThreads, numIoThreads); - ++i; - - proxy.SetProperty(FairMQProxy::NumInputs, 1); - proxy.SetProperty(FairMQProxy::NumOutputs, 1); - - - proxy.ChangeState(FairMQProxy::INIT); - - - proxy.SetProperty(FairMQProxy::InputSocketType, argv[i], 0); - ++i; - int inputRcvBufSize; - stringstream(argv[i]) >> inputRcvBufSize; - proxy.SetProperty(FairMQProxy::InputRcvBufSize, inputRcvBufSize, 0); - ++i; - proxy.SetProperty(FairMQProxy::InputMethod, argv[i], 0); - ++i; - proxy.SetProperty(FairMQProxy::InputAddress, argv[i], 0); - ++i; - - proxy.SetProperty(FairMQProxy::OutputSocketType, argv[i], 0); - ++i; - int outputSndBufSize; - stringstream(argv[i]) >> outputSndBufSize; - proxy.SetProperty(FairMQProxy::OutputSndBufSize, outputSndBufSize, 0); - ++i; - proxy.SetProperty(FairMQProxy::OutputMethod, argv[i], 0); - ++i; - proxy.SetProperty(FairMQProxy::OutputAddress, argv[i], 0); - ++i; - - - proxy.ChangeState(FairMQProxy::SETOUTPUT); - proxy.ChangeState(FairMQProxy::SETINPUT); - proxy.ChangeState(FairMQProxy::RUN); - - - char ch; - cin.get(ch); - - proxy.ChangeState(FairMQProxy::STOP); - proxy.ChangeState(FairMQProxy::END); - - return 0; -} - diff --git a/fairmq/nanomsg/runSplitter.cxx b/fairmq/nanomsg/runSplitter.cxx deleted file mode 100644 index 66930036..00000000 --- a/fairmq/nanomsg/runSplitter.cxx +++ /dev/null @@ -1,124 +0,0 @@ -/** - * runSplitter.cxx - * - * @since 2012-12-06 - * @author D. Klein, A. Rybalchenko - */ - -#include -#include - -#include "FairMQLogger.h" -#include "FairMQSplitter.h" -#include "FairMQTransportFactoryNN.h" - -using std::cout; -using std::cin; -using std::endl; -using std::stringstream; - - -FairMQSplitter splitter; - -static void s_signal_handler (int signal) -{ - cout << endl << "Caught signal " << signal << endl; - - splitter.ChangeState(FairMQSplitter::STOP); - splitter.ChangeState(FairMQSplitter::END); - - cout << "Shutdown complete. Bye!" << endl; - exit(1); -} - -static void s_catch_signals (void) -{ - struct sigaction action; - action.sa_handler = s_signal_handler; - action.sa_flags = 0; - sigemptyset(&action.sa_mask); - sigaction(SIGINT, &action, NULL); - sigaction(SIGTERM, &action, NULL); -} - -int main(int argc, char** argv) -{ - if ( argc != 15 ) { - cout << "Usage: splitter \tID numIoTreads\n" - << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" - << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" - << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << endl; - return 1; - } - - s_catch_signals(); - - LOG(INFO) << "PID: " << getpid(); - - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); - splitter.SetTransport(transportFactory); - - int i = 1; - - splitter.SetProperty(FairMQSplitter::Id, argv[i]); - ++i; - - int numIoThreads; - stringstream(argv[i]) >> numIoThreads; - splitter.SetProperty(FairMQSplitter::NumIoThreads, numIoThreads); - ++i; - - splitter.SetProperty(FairMQSplitter::NumInputs, 1); - splitter.SetProperty(FairMQSplitter::NumOutputs, 2); - - - splitter.ChangeState(FairMQSplitter::INIT); - - - splitter.SetProperty(FairMQSplitter::InputSocketType, argv[i], 0); - ++i; - int inputRcvBufSize; - stringstream(argv[i]) >> inputRcvBufSize; - splitter.SetProperty(FairMQSplitter::InputRcvBufSize, inputRcvBufSize, 0); - ++i; - splitter.SetProperty(FairMQSplitter::InputMethod, argv[i], 0); - ++i; - splitter.SetProperty(FairMQSplitter::InputAddress, argv[i], 0); - ++i; - - splitter.SetProperty(FairMQSplitter::OutputSocketType, argv[i], 0); - ++i; - int outputSndBufSize; - stringstream(argv[i]) >> outputSndBufSize; - splitter.SetProperty(FairMQSplitter::OutputSndBufSize, outputSndBufSize, 0); - ++i; - splitter.SetProperty(FairMQSplitter::OutputMethod, argv[i], 0); - ++i; - splitter.SetProperty(FairMQSplitter::OutputAddress, argv[i], 0); - ++i; - - splitter.SetProperty(FairMQSplitter::OutputSocketType, argv[i], 1); - ++i; - stringstream(argv[i]) >> outputSndBufSize; - splitter.SetProperty(FairMQSplitter::OutputSndBufSize, outputSndBufSize, 1); - ++i; - splitter.SetProperty(FairMQSplitter::OutputMethod, argv[i], 1); - ++i; - splitter.SetProperty(FairMQSplitter::OutputAddress, argv[i], 1); - ++i; - - - splitter.ChangeState(FairMQSplitter::SETOUTPUT); - splitter.ChangeState(FairMQSplitter::SETINPUT); - splitter.ChangeState(FairMQSplitter::RUN); - - - char ch; - cin.get(ch); - - splitter.ChangeState(FairMQSplitter::STOP); - splitter.ChangeState(FairMQSplitter::END); - - return 0; -} - diff --git a/fairmq/prototest/FairMQBinSampler.cxx b/fairmq/prototest/FairMQBinSampler.cxx new file mode 100644 index 00000000..c618a98c --- /dev/null +++ b/fairmq/prototest/FairMQBinSampler.cxx @@ -0,0 +1,170 @@ +/** + * FairMQBinSampler.cpp + * + * @since 2013-04-23 + * @author D. Klein, A. Rybalchenko + */ + +#include +#include /* srand, rand */ +#include /* time */ + +#include +#include + +#include "FairMQBinSampler.h" +#include "FairMQLogger.h" + + +FairMQBinSampler::FairMQBinSampler() : + fEventSize(10000), + fEventRate(1), + fEventCounter(0) +{ +} + +FairMQBinSampler::~FairMQBinSampler() +{ +} + +void FairMQBinSampler::Init() +{ + FairMQDevice::Init(); +} + +void FairMQBinSampler::Run() +{ + LOG(INFO) << ">>>>>>> Run <<<<<<<"; + //boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); + + boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); + boost::thread resetEventCounter(boost::bind(&FairMQBinSampler::ResetEventCounter, this)); + + srand(time(NULL)); + + LOG(DEBUG) << "Message size: " << fEventSize * sizeof(Content) << " bytes."; + + while ( fState == RUNNING ) { + + Content* payload = new Content[fEventSize]; + + for (int i = 0; i < fEventSize; ++i) { + (&payload[i])->x = rand() % 100 + 1; + (&payload[i])->y = rand() % 100 + 1; + (&payload[i])->z = rand() % 100 + 1; + (&payload[i])->a = (rand() % 100 + 1) / (rand() % 100 + 1); + (&payload[i])->b = (rand() % 100 + 1) / (rand() % 100 + 1); + // LOG(INFO) << (&payload[i])->x << " " << (&payload[i])->y << " " << (&payload[i])->z << " " << (&payload[i])->a << " " << (&payload[i])->b; + } + + FairMQMessage* msg = fTransportFactory->CreateMessage(fEventSize * sizeof(Content)); + memcpy(msg->GetData(), payload, fEventSize * sizeof(Content)); + + fPayloadOutputs->at(0)->Send(msg); + + --fEventCounter; + + while (fEventCounter == 0) { + boost::this_thread::sleep(boost::posix_time::milliseconds(1)); + } + + delete[] payload; + delete msg; + } + + rateLogger.interrupt(); + resetEventCounter.interrupt(); + + rateLogger.join(); + resetEventCounter.join(); +} + +void FairMQBinSampler::ResetEventCounter() +{ + while ( true ) { + try { + fEventCounter = fEventRate / 100; + boost::this_thread::sleep(boost::posix_time::milliseconds(10)); + } catch (boost::thread_interrupted&) { + break; + } + } +} + +void FairMQBinSampler::Log(int intervalInMs) +{ + timestamp_t t0; + timestamp_t t1; + unsigned long bytes = fPayloadOutputs->at(0)->GetBytesTx(); + unsigned long messages = fPayloadOutputs->at(0)->GetMessagesTx(); + unsigned long bytesNew = 0; + unsigned long messagesNew = 0; + double megabytesPerSecond = 0; + double messagesPerSecond = 0; + + t0 = get_timestamp(); + + while (true) { + boost::this_thread::sleep(boost::posix_time::milliseconds(intervalInMs)); + + t1 = get_timestamp(); + + bytesNew = fPayloadOutputs->at(0)->GetBytesTx(); + messagesNew = fPayloadOutputs->at(0)->GetMessagesTx(); + + timestamp_t timeSinceLastLog_ms = (t1 - t0) / 1000.0L; + + megabytesPerSecond = ((double) (bytesNew - bytes) / (1024. * 1024.)) / (double) timeSinceLastLog_ms * 1000.; + messagesPerSecond = (double) (messagesNew - messages) / (double) timeSinceLastLog_ms * 1000.; + + LOG(DEBUG) << "send " << messagesPerSecond << " msg/s, " << megabytesPerSecond << " MB/s"; + + bytes = bytesNew; + messages = messagesNew; + t0 = t1; + } +} + +void FairMQBinSampler::SetProperty(const int key, const string& value, const int slot/*= 0*/) +{ + switch (key) { + default: + FairMQDevice::SetProperty(key, value, slot); + break; + } +} + +string FairMQBinSampler::GetProperty(const int key, const string& default_/*= ""*/, const int slot/*= 0*/) +{ + switch (key) { + default: + return FairMQDevice::GetProperty(key, default_, slot); + } +} + +void FairMQBinSampler::SetProperty(const int key, const int value, const int slot/*= 0*/) +{ + switch (key) { + case EventSize: + fEventSize = value; + break; + case EventRate: + fEventRate = value; + break; + default: + FairMQDevice::SetProperty(key, value, slot); + break; + } +} + +int FairMQBinSampler::GetProperty(const int key, const int default_/*= 0*/, const int slot/*= 0*/) +{ + switch (key) { + case EventSize: + return fEventSize; + case EventRate: + return fEventRate; + default: + return FairMQDevice::GetProperty(key, default_, slot); + } +} diff --git a/fairmq/prototest/FairMQBinSampler.h b/fairmq/prototest/FairMQBinSampler.h new file mode 100644 index 00000000..66f9e95e --- /dev/null +++ b/fairmq/prototest/FairMQBinSampler.h @@ -0,0 +1,49 @@ +/** + * FairMQBinSampler.h + * + * @since 2014-02-24 + * @author A. Rybalchenko + */ + +#ifndef FAIRMQBINSAMPLER_H_ +#define FAIRMQBINSAMPLER_H_ + +#include + +#include "FairMQDevice.h" + +struct Content { + double a; + double b; + int x; + int y; + int z; +}; + +class FairMQBinSampler: public FairMQDevice +{ + public: + enum { + InputFile = FairMQDevice::Last, + EventRate, + EventSize, + Last + }; + FairMQBinSampler(); + virtual ~FairMQBinSampler(); + void Log(int intervalInMs); + void ResetEventCounter(); + virtual void SetProperty(const int key, const string& value, const int slot = 0); + virtual string GetProperty(const int key, const string& default_ = "", const int slot = 0); + virtual void SetProperty(const int key, const int value, const int slot = 0); + virtual int GetProperty(const int key, const int default_ = 0, const int slot = 0); + + protected: + int fEventSize; + int fEventRate; + int fEventCounter; + virtual void Init(); + virtual void Run(); +}; + +#endif /* FAIRMQBINSAMPLER_H_ */ diff --git a/fairmq/prototest/FairMQBinSink.cxx b/fairmq/prototest/FairMQBinSink.cxx new file mode 100644 index 00000000..a54fbb55 --- /dev/null +++ b/fairmq/prototest/FairMQBinSink.cxx @@ -0,0 +1,46 @@ +/** + * FairMQBinSink.cxx + * + * @since 2013-01-09 + * @author D. Klein, A. Rybalchenko + */ + +#include +#include + +#include "FairMQBinSink.h" +#include "FairMQLogger.h" + +FairMQBinSink::FairMQBinSink() +{ +} + +void FairMQBinSink::Run() +{ + LOG(INFO) << ">>>>>>> Run <<<<<<<"; + + boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); + + while ( fState == RUNNING ) { + FairMQMessage* msg = fTransportFactory->CreateMessage(); + + fPayloadInputs->at(0)->Receive(msg); + + int inputSize = msg->GetSize(); + int numInput = inputSize / sizeof(Content); + Content* input = reinterpret_cast(msg->GetData()); + + // for (int i = 0; i < numInput; ++i) { + // LOG(INFO) << (&input[i])->x << " " << (&input[i])->y << " " << (&input[i])->z << " " << (&input[i])->a << " " << (&input[i])->b; + // } + + delete msg; + } + + rateLogger.interrupt(); + rateLogger.join(); +} + +FairMQBinSink::~FairMQBinSink() +{ +} diff --git a/fairmq/prototest/FairMQBinSink.h b/fairmq/prototest/FairMQBinSink.h new file mode 100644 index 00000000..64b76438 --- /dev/null +++ b/fairmq/prototest/FairMQBinSink.h @@ -0,0 +1,30 @@ +/** + * FairMQBinSink.h + * + * @since 2013-01-09 + * @author D. Klein, A. Rybalchenko + */ + +#ifndef FAIRMQPROTOSINK_H_ +#define FAIRMQPROTOSINK_H_ + +#include "FairMQDevice.h" + +struct Content { + double a; + double b; + int x; + int y; + int z; +}; + +class FairMQBinSink: public FairMQDevice +{ + public: + FairMQBinSink(); + virtual ~FairMQBinSink(); + protected: + virtual void Run(); +}; + +#endif /* FAIRMQPROTOSINK_H_ */ diff --git a/fairmq/prototest/FairMQProtoSampler.cxx b/fairmq/prototest/FairMQProtoSampler.cxx new file mode 100644 index 00000000..34460cf8 --- /dev/null +++ b/fairmq/prototest/FairMQProtoSampler.cxx @@ -0,0 +1,173 @@ +/** + * FairMQProtoSampler.cpp + * + * @since 2013-04-23 + * @author D. Klein, A. Rybalchenko + */ + +#include + +#include +#include + +#include "FairMQProtoSampler.h" +#include "FairMQLogger.h" + +#include "payload.pb.h" + + +FairMQProtoSampler::FairMQProtoSampler() : + fEventSize(10000), + fEventRate(1), + fEventCounter(0) +{ +} + +FairMQProtoSampler::~FairMQProtoSampler() +{ +} + +void FairMQProtoSampler::Init() +{ + FairMQDevice::Init(); +} + +void FairMQProtoSampler::Run() +{ + LOG(INFO) << ">>>>>>> Run <<<<<<<"; + //boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); + + boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); + boost::thread resetEventCounter(boost::bind(&FairMQProtoSampler::ResetEventCounter, this)); + + srand(time(NULL)); + + while ( fState == RUNNING ) { + + sampler::Payload p; + + for (int i = 0; i < fEventSize; ++i) { + sampler::Content* content = p.add_data(); + + content->set_x(rand() % 100 + 1); + content->set_y(rand() % 100 + 1); + content->set_z(rand() % 100 + 1); + content->set_a((rand() % 100 + 1) / (rand() % 100 + 1)); + content->set_b((rand() % 100 + 1) / (rand() % 100 + 1)); + // LOG(INFO) << content->x() << " " << content->y() << " " << content->z() << " " << content->a() << " " << content->b(); + } + + std::string str; + p.SerializeToString(&str); + size_t size = str.length(); + + FairMQMessage* msg = fTransportFactory->CreateMessage(size); + memcpy(msg->GetData(), str.c_str(), size); + + fPayloadOutputs->at(0)->Send(msg); + + --fEventCounter; + + while (fEventCounter == 0) { + boost::this_thread::sleep(boost::posix_time::milliseconds(1)); + } + + delete msg; + } + + rateLogger.interrupt(); + resetEventCounter.interrupt(); + + rateLogger.join(); + resetEventCounter.join(); +} + +void FairMQProtoSampler::ResetEventCounter() +{ + while ( true ) { + try { + fEventCounter = fEventRate / 100; + boost::this_thread::sleep(boost::posix_time::milliseconds(10)); + } catch (boost::thread_interrupted&) { + break; + } + } +} + +void FairMQProtoSampler::Log(int intervalInMs) +{ + timestamp_t t0; + timestamp_t t1; + unsigned long bytes = fPayloadOutputs->at(0)->GetBytesTx(); + unsigned long messages = fPayloadOutputs->at(0)->GetMessagesTx(); + unsigned long bytesNew = 0; + unsigned long messagesNew = 0; + double megabytesPerSecond = 0; + double messagesPerSecond = 0; + + t0 = get_timestamp(); + + while (true) { + boost::this_thread::sleep(boost::posix_time::milliseconds(intervalInMs)); + + t1 = get_timestamp(); + + bytesNew = fPayloadOutputs->at(0)->GetBytesTx(); + messagesNew = fPayloadOutputs->at(0)->GetMessagesTx(); + + timestamp_t timeSinceLastLog_ms = (t1 - t0) / 1000.0L; + + megabytesPerSecond = ((double) (bytesNew - bytes) / (1024. * 1024.)) / (double) timeSinceLastLog_ms * 1000.; + messagesPerSecond = (double) (messagesNew - messages) / (double) timeSinceLastLog_ms * 1000.; + + LOG(DEBUG) << "send " << messagesPerSecond << " msg/s, " << megabytesPerSecond << " MB/s"; + + bytes = bytesNew; + messages = messagesNew; + t0 = t1; + } +} + +void FairMQProtoSampler::SetProperty(const int key, const string& value, const int slot/*= 0*/) +{ + switch (key) { + default: + FairMQDevice::SetProperty(key, value, slot); + break; + } +} + +string FairMQProtoSampler::GetProperty(const int key, const string& default_/*= ""*/, const int slot/*= 0*/) +{ + switch (key) { + default: + return FairMQDevice::GetProperty(key, default_, slot); + } +} + +void FairMQProtoSampler::SetProperty(const int key, const int value, const int slot/*= 0*/) +{ + switch (key) { + case EventSize: + fEventSize = value; + break; + case EventRate: + fEventRate = value; + break; + default: + FairMQDevice::SetProperty(key, value, slot); + break; + } +} + +int FairMQProtoSampler::GetProperty(const int key, const int default_/*= 0*/, const int slot/*= 0*/) +{ + switch (key) { + case EventSize: + return fEventSize; + case EventRate: + return fEventRate; + default: + return FairMQDevice::GetProperty(key, default_, slot); + } +} diff --git a/fairmq/prototest/FairMQProtoSampler.h b/fairmq/prototest/FairMQProtoSampler.h new file mode 100644 index 00000000..cb4a4677 --- /dev/null +++ b/fairmq/prototest/FairMQProtoSampler.h @@ -0,0 +1,41 @@ +/** + * FairMQProtoSampler.h + * + * @since 2013-04-23 + * @author D. Klein, A. Rybalchenko + */ + +#ifndef FAIRMQPROTOSAMPLER_H_ +#define FAIRMQPROTOSAMPLER_H_ + +#include + +#include "FairMQDevice.h" + +class FairMQProtoSampler: public FairMQDevice +{ + public: + enum { + InputFile = FairMQDevice::Last, + EventRate, + EventSize, + Last + }; + FairMQProtoSampler(); + virtual ~FairMQProtoSampler(); + void Log(int intervalInMs); + void ResetEventCounter(); + virtual void SetProperty(const int key, const string& value, const int slot = 0); + virtual string GetProperty(const int key, const string& default_ = "", const int slot = 0); + virtual void SetProperty(const int key, const int value, const int slot = 0); + virtual int GetProperty(const int key, const int default_ = 0, const int slot = 0); + + protected: + int fEventSize; + int fEventRate; + int fEventCounter; + virtual void Init(); + virtual void Run(); +}; + +#endif /* FAIRMQPROTOSAMPLER_H_ */ diff --git a/fairmq/prototest/FairMQProtoSink.cxx b/fairmq/prototest/FairMQProtoSink.cxx new file mode 100644 index 00000000..4480979e --- /dev/null +++ b/fairmq/prototest/FairMQProtoSink.cxx @@ -0,0 +1,49 @@ +/** + * FairMQProtoSink.cxx + * + * @since 2013-01-09 + * @author D. Klein, A. Rybalchenko + */ + +#include +#include + +#include "FairMQProtoSink.h" +#include "FairMQLogger.h" + +#include "payload.pb.h" + +FairMQProtoSink::FairMQProtoSink() +{ +} + +void FairMQProtoSink::Run() +{ + LOG(INFO) << ">>>>>>> Run <<<<<<<"; + + boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); + + while ( fState == RUNNING ) { + FairMQMessage* msg = fTransportFactory->CreateMessage(); + + fPayloadInputs->at(0)->Receive(msg); + + sampler::Payload p; + + p.ParseFromArray(msg->GetData(), msg->GetSize()); + + // for (int i = 0; i < p.data_size(); ++i) { + // const sampler::Payload::Content& content = p.data(i); + // LOG(INFO) << content.x() << " " << content.y() << " " << content.z() << " " << content.a() << " " << content.b(); + // } + + delete msg; + } + + rateLogger.interrupt(); + rateLogger.join(); +} + +FairMQProtoSink::~FairMQProtoSink() +{ +} diff --git a/fairmq/prototest/FairMQProtoSink.h b/fairmq/prototest/FairMQProtoSink.h new file mode 100644 index 00000000..64f8fba2 --- /dev/null +++ b/fairmq/prototest/FairMQProtoSink.h @@ -0,0 +1,30 @@ +/** + * FairMQProtoSink.h + * + * @since 2013-01-09 + * @author D. Klein, A. Rybalchenko + */ + +#ifndef FAIRMQPROTOSINK_H_ +#define FAIRMQPROTOSINK_H_ + +#include "FairMQDevice.h" + +struct Content { + double a; + double b; + int x; + int y; + int z; +}; + +class FairMQProtoSink: public FairMQDevice +{ + public: + FairMQProtoSink(); + virtual ~FairMQProtoSink(); + protected: + virtual void Run(); +}; + +#endif /* FAIRMQPROTOSINK_H_ */ diff --git a/fairmq/prototest/payload.pb.cc b/fairmq/prototest/payload.pb.cc new file mode 100644 index 00000000..3e6f004d --- /dev/null +++ b/fairmq/prototest/payload.pb.cc @@ -0,0 +1,698 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: payload.proto + +#define INTERNAL_SUPPRESS_PROTOBUF_FIELD_DEPRECATION +#include "payload.pb.h" + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +// @@protoc_insertion_point(includes) + +namespace sampler { + +namespace { + +const ::google::protobuf::Descriptor* Content_descriptor_ = NULL; +const ::google::protobuf::internal::GeneratedMessageReflection* + Content_reflection_ = NULL; +const ::google::protobuf::Descriptor* Payload_descriptor_ = NULL; +const ::google::protobuf::internal::GeneratedMessageReflection* + Payload_reflection_ = NULL; + +} // namespace + + +void protobuf_AssignDesc_payload_2eproto() { + protobuf_AddDesc_payload_2eproto(); + const ::google::protobuf::FileDescriptor* file = + ::google::protobuf::DescriptorPool::generated_pool()->FindFileByName( + "payload.proto"); + GOOGLE_CHECK(file != NULL); + Content_descriptor_ = file->message_type(0); + static const int Content_offsets_[5] = { + GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(Content, a_), + GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(Content, b_), + GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(Content, x_), + GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(Content, y_), + GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(Content, z_), + }; + Content_reflection_ = + new ::google::protobuf::internal::GeneratedMessageReflection( + Content_descriptor_, + Content::default_instance_, + Content_offsets_, + GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(Content, _has_bits_[0]), + GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(Content, _unknown_fields_), + -1, + ::google::protobuf::DescriptorPool::generated_pool(), + ::google::protobuf::MessageFactory::generated_factory(), + sizeof(Content)); + Payload_descriptor_ = file->message_type(1); + static const int Payload_offsets_[1] = { + GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(Payload, data_), + }; + Payload_reflection_ = + new ::google::protobuf::internal::GeneratedMessageReflection( + Payload_descriptor_, + Payload::default_instance_, + Payload_offsets_, + GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(Payload, _has_bits_[0]), + GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(Payload, _unknown_fields_), + -1, + ::google::protobuf::DescriptorPool::generated_pool(), + ::google::protobuf::MessageFactory::generated_factory(), + sizeof(Payload)); +} + +namespace { + +GOOGLE_PROTOBUF_DECLARE_ONCE(protobuf_AssignDescriptors_once_); +inline void protobuf_AssignDescriptorsOnce() { + ::google::protobuf::GoogleOnceInit(&protobuf_AssignDescriptors_once_, + &protobuf_AssignDesc_payload_2eproto); +} + +void protobuf_RegisterTypes(const ::std::string&) { + protobuf_AssignDescriptorsOnce(); + ::google::protobuf::MessageFactory::InternalRegisterGeneratedMessage( + Content_descriptor_, &Content::default_instance()); + ::google::protobuf::MessageFactory::InternalRegisterGeneratedMessage( + Payload_descriptor_, &Payload::default_instance()); +} + +} // namespace + +void protobuf_ShutdownFile_payload_2eproto() { + delete Content::default_instance_; + delete Content_reflection_; + delete Payload::default_instance_; + delete Payload_reflection_; +} + +void protobuf_AddDesc_payload_2eproto() { + static bool already_here = false; + if (already_here) return; + already_here = true; + GOOGLE_PROTOBUF_VERIFY_VERSION; + + ::google::protobuf::DescriptorPool::InternalAddGeneratedFile( + "\n\rpayload.proto\022\007sampler\"@\n\007Content\022\t\n\001a" + "\030\001 \001(\001\022\t\n\001b\030\002 \001(\001\022\t\n\001x\030\003 \001(\005\022\t\n\001y\030\004 \001(\005\022" + "\t\n\001z\030\005 \001(\005\")\n\007Payload\022\036\n\004data\030\001 \003(\0132\020.sa" + "mpler.Content", 133); + ::google::protobuf::MessageFactory::InternalRegisterGeneratedFile( + "payload.proto", &protobuf_RegisterTypes); + Content::default_instance_ = new Content(); + Payload::default_instance_ = new Payload(); + Content::default_instance_->InitAsDefaultInstance(); + Payload::default_instance_->InitAsDefaultInstance(); + ::google::protobuf::internal::OnShutdown(&protobuf_ShutdownFile_payload_2eproto); +} + +// Force AddDescriptors() to be called at static initialization time. +struct StaticDescriptorInitializer_payload_2eproto { + StaticDescriptorInitializer_payload_2eproto() { + protobuf_AddDesc_payload_2eproto(); + } +} static_descriptor_initializer_payload_2eproto_; + +// =================================================================== + +#ifndef _MSC_VER +const int Content::kAFieldNumber; +const int Content::kBFieldNumber; +const int Content::kXFieldNumber; +const int Content::kYFieldNumber; +const int Content::kZFieldNumber; +#endif // !_MSC_VER + +Content::Content() + : ::google::protobuf::Message() { + SharedCtor(); +} + +void Content::InitAsDefaultInstance() { +} + +Content::Content(const Content& from) + : ::google::protobuf::Message() { + SharedCtor(); + MergeFrom(from); +} + +void Content::SharedCtor() { + _cached_size_ = 0; + a_ = 0; + b_ = 0; + x_ = 0; + y_ = 0; + z_ = 0; + ::memset(_has_bits_, 0, sizeof(_has_bits_)); +} + +Content::~Content() { + SharedDtor(); +} + +void Content::SharedDtor() { + if (this != default_instance_) { + } +} + +void Content::SetCachedSize(int size) const { + GOOGLE_SAFE_CONCURRENT_WRITES_BEGIN(); + _cached_size_ = size; + GOOGLE_SAFE_CONCURRENT_WRITES_END(); +} +const ::google::protobuf::Descriptor* Content::descriptor() { + protobuf_AssignDescriptorsOnce(); + return Content_descriptor_; +} + +const Content& Content::default_instance() { + if (default_instance_ == NULL) protobuf_AddDesc_payload_2eproto(); + return *default_instance_; +} + +Content* Content::default_instance_ = NULL; + +Content* Content::New() const { + return new Content; +} + +void Content::Clear() { + if (_has_bits_[0 / 32] & (0xffu << (0 % 32))) { + a_ = 0; + b_ = 0; + x_ = 0; + y_ = 0; + z_ = 0; + } + ::memset(_has_bits_, 0, sizeof(_has_bits_)); + mutable_unknown_fields()->Clear(); +} + +bool Content::MergePartialFromCodedStream( + ::google::protobuf::io::CodedInputStream* input) { +#define DO_(EXPRESSION) if (!(EXPRESSION)) return false + ::google::protobuf::uint32 tag; + while ((tag = input->ReadTag()) != 0) { + switch (::google::protobuf::internal::WireFormatLite::GetTagFieldNumber(tag)) { + // optional double a = 1; + case 1: { + if (::google::protobuf::internal::WireFormatLite::GetTagWireType(tag) == + ::google::protobuf::internal::WireFormatLite::WIRETYPE_FIXED64) { + DO_((::google::protobuf::internal::WireFormatLite::ReadPrimitive< + double, ::google::protobuf::internal::WireFormatLite::TYPE_DOUBLE>( + input, &a_))); + set_has_a(); + } else { + goto handle_uninterpreted; + } + if (input->ExpectTag(17)) goto parse_b; + break; + } + + // optional double b = 2; + case 2: { + if (::google::protobuf::internal::WireFormatLite::GetTagWireType(tag) == + ::google::protobuf::internal::WireFormatLite::WIRETYPE_FIXED64) { + parse_b: + DO_((::google::protobuf::internal::WireFormatLite::ReadPrimitive< + double, ::google::protobuf::internal::WireFormatLite::TYPE_DOUBLE>( + input, &b_))); + set_has_b(); + } else { + goto handle_uninterpreted; + } + if (input->ExpectTag(24)) goto parse_x; + break; + } + + // optional int32 x = 3; + case 3: { + if (::google::protobuf::internal::WireFormatLite::GetTagWireType(tag) == + ::google::protobuf::internal::WireFormatLite::WIRETYPE_VARINT) { + parse_x: + DO_((::google::protobuf::internal::WireFormatLite::ReadPrimitive< + ::google::protobuf::int32, ::google::protobuf::internal::WireFormatLite::TYPE_INT32>( + input, &x_))); + set_has_x(); + } else { + goto handle_uninterpreted; + } + if (input->ExpectTag(32)) goto parse_y; + break; + } + + // optional int32 y = 4; + case 4: { + if (::google::protobuf::internal::WireFormatLite::GetTagWireType(tag) == + ::google::protobuf::internal::WireFormatLite::WIRETYPE_VARINT) { + parse_y: + DO_((::google::protobuf::internal::WireFormatLite::ReadPrimitive< + ::google::protobuf::int32, ::google::protobuf::internal::WireFormatLite::TYPE_INT32>( + input, &y_))); + set_has_y(); + } else { + goto handle_uninterpreted; + } + if (input->ExpectTag(40)) goto parse_z; + break; + } + + // optional int32 z = 5; + case 5: { + if (::google::protobuf::internal::WireFormatLite::GetTagWireType(tag) == + ::google::protobuf::internal::WireFormatLite::WIRETYPE_VARINT) { + parse_z: + DO_((::google::protobuf::internal::WireFormatLite::ReadPrimitive< + ::google::protobuf::int32, ::google::protobuf::internal::WireFormatLite::TYPE_INT32>( + input, &z_))); + set_has_z(); + } else { + goto handle_uninterpreted; + } + if (input->ExpectAtEnd()) return true; + break; + } + + default: { + handle_uninterpreted: + if (::google::protobuf::internal::WireFormatLite::GetTagWireType(tag) == + ::google::protobuf::internal::WireFormatLite::WIRETYPE_END_GROUP) { + return true; + } + DO_(::google::protobuf::internal::WireFormat::SkipField( + input, tag, mutable_unknown_fields())); + break; + } + } + } + return true; +#undef DO_ +} + +void Content::SerializeWithCachedSizes( + ::google::protobuf::io::CodedOutputStream* output) const { + // optional double a = 1; + if (has_a()) { + ::google::protobuf::internal::WireFormatLite::WriteDouble(1, this->a(), output); + } + + // optional double b = 2; + if (has_b()) { + ::google::protobuf::internal::WireFormatLite::WriteDouble(2, this->b(), output); + } + + // optional int32 x = 3; + if (has_x()) { + ::google::protobuf::internal::WireFormatLite::WriteInt32(3, this->x(), output); + } + + // optional int32 y = 4; + if (has_y()) { + ::google::protobuf::internal::WireFormatLite::WriteInt32(4, this->y(), output); + } + + // optional int32 z = 5; + if (has_z()) { + ::google::protobuf::internal::WireFormatLite::WriteInt32(5, this->z(), output); + } + + if (!unknown_fields().empty()) { + ::google::protobuf::internal::WireFormat::SerializeUnknownFields( + unknown_fields(), output); + } +} + +::google::protobuf::uint8* Content::SerializeWithCachedSizesToArray( + ::google::protobuf::uint8* target) const { + // optional double a = 1; + if (has_a()) { + target = ::google::protobuf::internal::WireFormatLite::WriteDoubleToArray(1, this->a(), target); + } + + // optional double b = 2; + if (has_b()) { + target = ::google::protobuf::internal::WireFormatLite::WriteDoubleToArray(2, this->b(), target); + } + + // optional int32 x = 3; + if (has_x()) { + target = ::google::protobuf::internal::WireFormatLite::WriteInt32ToArray(3, this->x(), target); + } + + // optional int32 y = 4; + if (has_y()) { + target = ::google::protobuf::internal::WireFormatLite::WriteInt32ToArray(4, this->y(), target); + } + + // optional int32 z = 5; + if (has_z()) { + target = ::google::protobuf::internal::WireFormatLite::WriteInt32ToArray(5, this->z(), target); + } + + if (!unknown_fields().empty()) { + target = ::google::protobuf::internal::WireFormat::SerializeUnknownFieldsToArray( + unknown_fields(), target); + } + return target; +} + +int Content::ByteSize() const { + int total_size = 0; + + if (_has_bits_[0 / 32] & (0xffu << (0 % 32))) { + // optional double a = 1; + if (has_a()) { + total_size += 1 + 8; + } + + // optional double b = 2; + if (has_b()) { + total_size += 1 + 8; + } + + // optional int32 x = 3; + if (has_x()) { + total_size += 1 + + ::google::protobuf::internal::WireFormatLite::Int32Size( + this->x()); + } + + // optional int32 y = 4; + if (has_y()) { + total_size += 1 + + ::google::protobuf::internal::WireFormatLite::Int32Size( + this->y()); + } + + // optional int32 z = 5; + if (has_z()) { + total_size += 1 + + ::google::protobuf::internal::WireFormatLite::Int32Size( + this->z()); + } + + } + if (!unknown_fields().empty()) { + total_size += + ::google::protobuf::internal::WireFormat::ComputeUnknownFieldsSize( + unknown_fields()); + } + GOOGLE_SAFE_CONCURRENT_WRITES_BEGIN(); + _cached_size_ = total_size; + GOOGLE_SAFE_CONCURRENT_WRITES_END(); + return total_size; +} + +void Content::MergeFrom(const ::google::protobuf::Message& from) { + GOOGLE_CHECK_NE(&from, this); + const Content* source = + ::google::protobuf::internal::dynamic_cast_if_available( + &from); + if (source == NULL) { + ::google::protobuf::internal::ReflectionOps::Merge(from, this); + } else { + MergeFrom(*source); + } +} + +void Content::MergeFrom(const Content& from) { + GOOGLE_CHECK_NE(&from, this); + if (from._has_bits_[0 / 32] & (0xffu << (0 % 32))) { + if (from.has_a()) { + set_a(from.a()); + } + if (from.has_b()) { + set_b(from.b()); + } + if (from.has_x()) { + set_x(from.x()); + } + if (from.has_y()) { + set_y(from.y()); + } + if (from.has_z()) { + set_z(from.z()); + } + } + mutable_unknown_fields()->MergeFrom(from.unknown_fields()); +} + +void Content::CopyFrom(const ::google::protobuf::Message& from) { + if (&from == this) return; + Clear(); + MergeFrom(from); +} + +void Content::CopyFrom(const Content& from) { + if (&from == this) return; + Clear(); + MergeFrom(from); +} + +bool Content::IsInitialized() const { + + return true; +} + +void Content::Swap(Content* other) { + if (other != this) { + std::swap(a_, other->a_); + std::swap(b_, other->b_); + std::swap(x_, other->x_); + std::swap(y_, other->y_); + std::swap(z_, other->z_); + std::swap(_has_bits_[0], other->_has_bits_[0]); + _unknown_fields_.Swap(&other->_unknown_fields_); + std::swap(_cached_size_, other->_cached_size_); + } +} + +::google::protobuf::Metadata Content::GetMetadata() const { + protobuf_AssignDescriptorsOnce(); + ::google::protobuf::Metadata metadata; + metadata.descriptor = Content_descriptor_; + metadata.reflection = Content_reflection_; + return metadata; +} + + +// =================================================================== + +#ifndef _MSC_VER +const int Payload::kDataFieldNumber; +#endif // !_MSC_VER + +Payload::Payload() + : ::google::protobuf::Message() { + SharedCtor(); +} + +void Payload::InitAsDefaultInstance() { +} + +Payload::Payload(const Payload& from) + : ::google::protobuf::Message() { + SharedCtor(); + MergeFrom(from); +} + +void Payload::SharedCtor() { + _cached_size_ = 0; + ::memset(_has_bits_, 0, sizeof(_has_bits_)); +} + +Payload::~Payload() { + SharedDtor(); +} + +void Payload::SharedDtor() { + if (this != default_instance_) { + } +} + +void Payload::SetCachedSize(int size) const { + GOOGLE_SAFE_CONCURRENT_WRITES_BEGIN(); + _cached_size_ = size; + GOOGLE_SAFE_CONCURRENT_WRITES_END(); +} +const ::google::protobuf::Descriptor* Payload::descriptor() { + protobuf_AssignDescriptorsOnce(); + return Payload_descriptor_; +} + +const Payload& Payload::default_instance() { + if (default_instance_ == NULL) protobuf_AddDesc_payload_2eproto(); + return *default_instance_; +} + +Payload* Payload::default_instance_ = NULL; + +Payload* Payload::New() const { + return new Payload; +} + +void Payload::Clear() { + data_.Clear(); + ::memset(_has_bits_, 0, sizeof(_has_bits_)); + mutable_unknown_fields()->Clear(); +} + +bool Payload::MergePartialFromCodedStream( + ::google::protobuf::io::CodedInputStream* input) { +#define DO_(EXPRESSION) if (!(EXPRESSION)) return false + ::google::protobuf::uint32 tag; + while ((tag = input->ReadTag()) != 0) { + switch (::google::protobuf::internal::WireFormatLite::GetTagFieldNumber(tag)) { + // repeated .sampler.Content data = 1; + case 1: { + if (::google::protobuf::internal::WireFormatLite::GetTagWireType(tag) == + ::google::protobuf::internal::WireFormatLite::WIRETYPE_LENGTH_DELIMITED) { + parse_data: + DO_(::google::protobuf::internal::WireFormatLite::ReadMessageNoVirtual( + input, add_data())); + } else { + goto handle_uninterpreted; + } + if (input->ExpectTag(10)) goto parse_data; + if (input->ExpectAtEnd()) return true; + break; + } + + default: { + handle_uninterpreted: + if (::google::protobuf::internal::WireFormatLite::GetTagWireType(tag) == + ::google::protobuf::internal::WireFormatLite::WIRETYPE_END_GROUP) { + return true; + } + DO_(::google::protobuf::internal::WireFormat::SkipField( + input, tag, mutable_unknown_fields())); + break; + } + } + } + return true; +#undef DO_ +} + +void Payload::SerializeWithCachedSizes( + ::google::protobuf::io::CodedOutputStream* output) const { + // repeated .sampler.Content data = 1; + for (int i = 0; i < this->data_size(); i++) { + ::google::protobuf::internal::WireFormatLite::WriteMessageMaybeToArray( + 1, this->data(i), output); + } + + if (!unknown_fields().empty()) { + ::google::protobuf::internal::WireFormat::SerializeUnknownFields( + unknown_fields(), output); + } +} + +::google::protobuf::uint8* Payload::SerializeWithCachedSizesToArray( + ::google::protobuf::uint8* target) const { + // repeated .sampler.Content data = 1; + for (int i = 0; i < this->data_size(); i++) { + target = ::google::protobuf::internal::WireFormatLite:: + WriteMessageNoVirtualToArray( + 1, this->data(i), target); + } + + if (!unknown_fields().empty()) { + target = ::google::protobuf::internal::WireFormat::SerializeUnknownFieldsToArray( + unknown_fields(), target); + } + return target; +} + +int Payload::ByteSize() const { + int total_size = 0; + + // repeated .sampler.Content data = 1; + total_size += 1 * this->data_size(); + for (int i = 0; i < this->data_size(); i++) { + total_size += + ::google::protobuf::internal::WireFormatLite::MessageSizeNoVirtual( + this->data(i)); + } + + if (!unknown_fields().empty()) { + total_size += + ::google::protobuf::internal::WireFormat::ComputeUnknownFieldsSize( + unknown_fields()); + } + GOOGLE_SAFE_CONCURRENT_WRITES_BEGIN(); + _cached_size_ = total_size; + GOOGLE_SAFE_CONCURRENT_WRITES_END(); + return total_size; +} + +void Payload::MergeFrom(const ::google::protobuf::Message& from) { + GOOGLE_CHECK_NE(&from, this); + const Payload* source = + ::google::protobuf::internal::dynamic_cast_if_available( + &from); + if (source == NULL) { + ::google::protobuf::internal::ReflectionOps::Merge(from, this); + } else { + MergeFrom(*source); + } +} + +void Payload::MergeFrom(const Payload& from) { + GOOGLE_CHECK_NE(&from, this); + data_.MergeFrom(from.data_); + mutable_unknown_fields()->MergeFrom(from.unknown_fields()); +} + +void Payload::CopyFrom(const ::google::protobuf::Message& from) { + if (&from == this) return; + Clear(); + MergeFrom(from); +} + +void Payload::CopyFrom(const Payload& from) { + if (&from == this) return; + Clear(); + MergeFrom(from); +} + +bool Payload::IsInitialized() const { + + return true; +} + +void Payload::Swap(Payload* other) { + if (other != this) { + data_.Swap(&other->data_); + std::swap(_has_bits_[0], other->_has_bits_[0]); + _unknown_fields_.Swap(&other->_unknown_fields_); + std::swap(_cached_size_, other->_cached_size_); + } +} + +::google::protobuf::Metadata Payload::GetMetadata() const { + protobuf_AssignDescriptorsOnce(); + ::google::protobuf::Metadata metadata; + metadata.descriptor = Payload_descriptor_; + metadata.reflection = Payload_reflection_; + return metadata; +} + + +// @@protoc_insertion_point(namespace_scope) + +} // namespace sampler + +// @@protoc_insertion_point(global_scope) diff --git a/fairmq/prototest/payload.pb.h b/fairmq/prototest/payload.pb.h new file mode 100644 index 00000000..76d96e05 --- /dev/null +++ b/fairmq/prototest/payload.pb.h @@ -0,0 +1,408 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: payload.proto + +#ifndef PROTOBUF_payload_2eproto__INCLUDED +#define PROTOBUF_payload_2eproto__INCLUDED + +#include + +#include + +#if GOOGLE_PROTOBUF_VERSION < 2005000 +#error This file was generated by a newer version of protoc which is +#error incompatible with your Protocol Buffer headers. Please update +#error your headers. +#endif +#if 2005000 < GOOGLE_PROTOBUF_MIN_PROTOC_VERSION +#error This file was generated by an older version of protoc which is +#error incompatible with your Protocol Buffer headers. Please +#error regenerate this file with a newer version of protoc. +#endif + +#include +#include +#include +#include +#include +// @@protoc_insertion_point(includes) + +namespace sampler { + +// Internal implementation detail -- do not call these. +void protobuf_AddDesc_payload_2eproto(); +void protobuf_AssignDesc_payload_2eproto(); +void protobuf_ShutdownFile_payload_2eproto(); + +class Content; +class Payload; + +// =================================================================== + +class Content : public ::google::protobuf::Message { + public: + Content(); + virtual ~Content(); + + Content(const Content& from); + + inline Content& operator=(const Content& from) { + CopyFrom(from); + return *this; + } + + inline const ::google::protobuf::UnknownFieldSet& unknown_fields() const { + return _unknown_fields_; + } + + inline ::google::protobuf::UnknownFieldSet* mutable_unknown_fields() { + return &_unknown_fields_; + } + + static const ::google::protobuf::Descriptor* descriptor(); + static const Content& default_instance(); + + void Swap(Content* other); + + // implements Message ---------------------------------------------- + + Content* New() const; + void CopyFrom(const ::google::protobuf::Message& from); + void MergeFrom(const ::google::protobuf::Message& from); + void CopyFrom(const Content& from); + void MergeFrom(const Content& from); + void Clear(); + bool IsInitialized() const; + + int ByteSize() const; + bool MergePartialFromCodedStream( + ::google::protobuf::io::CodedInputStream* input); + void SerializeWithCachedSizes( + ::google::protobuf::io::CodedOutputStream* output) const; + ::google::protobuf::uint8* SerializeWithCachedSizesToArray(::google::protobuf::uint8* output) const; + int GetCachedSize() const { return _cached_size_; } + private: + void SharedCtor(); + void SharedDtor(); + void SetCachedSize(int size) const; + public: + + ::google::protobuf::Metadata GetMetadata() const; + + // nested types ---------------------------------------------------- + + // accessors ------------------------------------------------------- + + // optional double a = 1; + inline bool has_a() const; + inline void clear_a(); + static const int kAFieldNumber = 1; + inline double a() const; + inline void set_a(double value); + + // optional double b = 2; + inline bool has_b() const; + inline void clear_b(); + static const int kBFieldNumber = 2; + inline double b() const; + inline void set_b(double value); + + // optional int32 x = 3; + inline bool has_x() const; + inline void clear_x(); + static const int kXFieldNumber = 3; + inline ::google::protobuf::int32 x() const; + inline void set_x(::google::protobuf::int32 value); + + // optional int32 y = 4; + inline bool has_y() const; + inline void clear_y(); + static const int kYFieldNumber = 4; + inline ::google::protobuf::int32 y() const; + inline void set_y(::google::protobuf::int32 value); + + // optional int32 z = 5; + inline bool has_z() const; + inline void clear_z(); + static const int kZFieldNumber = 5; + inline ::google::protobuf::int32 z() const; + inline void set_z(::google::protobuf::int32 value); + + // @@protoc_insertion_point(class_scope:sampler.Content) + private: + inline void set_has_a(); + inline void clear_has_a(); + inline void set_has_b(); + inline void clear_has_b(); + inline void set_has_x(); + inline void clear_has_x(); + inline void set_has_y(); + inline void clear_has_y(); + inline void set_has_z(); + inline void clear_has_z(); + + ::google::protobuf::UnknownFieldSet _unknown_fields_; + + double a_; + double b_; + ::google::protobuf::int32 x_; + ::google::protobuf::int32 y_; + ::google::protobuf::int32 z_; + + mutable int _cached_size_; + ::google::protobuf::uint32 _has_bits_[(5 + 31) / 32]; + + friend void protobuf_AddDesc_payload_2eproto(); + friend void protobuf_AssignDesc_payload_2eproto(); + friend void protobuf_ShutdownFile_payload_2eproto(); + + void InitAsDefaultInstance(); + static Content* default_instance_; +}; +// ------------------------------------------------------------------- + +class Payload : public ::google::protobuf::Message { + public: + Payload(); + virtual ~Payload(); + + Payload(const Payload& from); + + inline Payload& operator=(const Payload& from) { + CopyFrom(from); + return *this; + } + + inline const ::google::protobuf::UnknownFieldSet& unknown_fields() const { + return _unknown_fields_; + } + + inline ::google::protobuf::UnknownFieldSet* mutable_unknown_fields() { + return &_unknown_fields_; + } + + static const ::google::protobuf::Descriptor* descriptor(); + static const Payload& default_instance(); + + void Swap(Payload* other); + + // implements Message ---------------------------------------------- + + Payload* New() const; + void CopyFrom(const ::google::protobuf::Message& from); + void MergeFrom(const ::google::protobuf::Message& from); + void CopyFrom(const Payload& from); + void MergeFrom(const Payload& from); + void Clear(); + bool IsInitialized() const; + + int ByteSize() const; + bool MergePartialFromCodedStream( + ::google::protobuf::io::CodedInputStream* input); + void SerializeWithCachedSizes( + ::google::protobuf::io::CodedOutputStream* output) const; + ::google::protobuf::uint8* SerializeWithCachedSizesToArray(::google::protobuf::uint8* output) const; + int GetCachedSize() const { return _cached_size_; } + private: + void SharedCtor(); + void SharedDtor(); + void SetCachedSize(int size) const; + public: + + ::google::protobuf::Metadata GetMetadata() const; + + // nested types ---------------------------------------------------- + + // accessors ------------------------------------------------------- + + // repeated .sampler.Content data = 1; + inline int data_size() const; + inline void clear_data(); + static const int kDataFieldNumber = 1; + inline const ::sampler::Content& data(int index) const; + inline ::sampler::Content* mutable_data(int index); + inline ::sampler::Content* add_data(); + inline const ::google::protobuf::RepeatedPtrField< ::sampler::Content >& + data() const; + inline ::google::protobuf::RepeatedPtrField< ::sampler::Content >* + mutable_data(); + + // @@protoc_insertion_point(class_scope:sampler.Payload) + private: + + ::google::protobuf::UnknownFieldSet _unknown_fields_; + + ::google::protobuf::RepeatedPtrField< ::sampler::Content > data_; + + mutable int _cached_size_; + ::google::protobuf::uint32 _has_bits_[(1 + 31) / 32]; + + friend void protobuf_AddDesc_payload_2eproto(); + friend void protobuf_AssignDesc_payload_2eproto(); + friend void protobuf_ShutdownFile_payload_2eproto(); + + void InitAsDefaultInstance(); + static Payload* default_instance_; +}; +// =================================================================== + + +// =================================================================== + +// Content + +// optional double a = 1; +inline bool Content::has_a() const { + return (_has_bits_[0] & 0x00000001u) != 0; +} +inline void Content::set_has_a() { + _has_bits_[0] |= 0x00000001u; +} +inline void Content::clear_has_a() { + _has_bits_[0] &= ~0x00000001u; +} +inline void Content::clear_a() { + a_ = 0; + clear_has_a(); +} +inline double Content::a() const { + return a_; +} +inline void Content::set_a(double value) { + set_has_a(); + a_ = value; +} + +// optional double b = 2; +inline bool Content::has_b() const { + return (_has_bits_[0] & 0x00000002u) != 0; +} +inline void Content::set_has_b() { + _has_bits_[0] |= 0x00000002u; +} +inline void Content::clear_has_b() { + _has_bits_[0] &= ~0x00000002u; +} +inline void Content::clear_b() { + b_ = 0; + clear_has_b(); +} +inline double Content::b() const { + return b_; +} +inline void Content::set_b(double value) { + set_has_b(); + b_ = value; +} + +// optional int32 x = 3; +inline bool Content::has_x() const { + return (_has_bits_[0] & 0x00000004u) != 0; +} +inline void Content::set_has_x() { + _has_bits_[0] |= 0x00000004u; +} +inline void Content::clear_has_x() { + _has_bits_[0] &= ~0x00000004u; +} +inline void Content::clear_x() { + x_ = 0; + clear_has_x(); +} +inline ::google::protobuf::int32 Content::x() const { + return x_; +} +inline void Content::set_x(::google::protobuf::int32 value) { + set_has_x(); + x_ = value; +} + +// optional int32 y = 4; +inline bool Content::has_y() const { + return (_has_bits_[0] & 0x00000008u) != 0; +} +inline void Content::set_has_y() { + _has_bits_[0] |= 0x00000008u; +} +inline void Content::clear_has_y() { + _has_bits_[0] &= ~0x00000008u; +} +inline void Content::clear_y() { + y_ = 0; + clear_has_y(); +} +inline ::google::protobuf::int32 Content::y() const { + return y_; +} +inline void Content::set_y(::google::protobuf::int32 value) { + set_has_y(); + y_ = value; +} + +// optional int32 z = 5; +inline bool Content::has_z() const { + return (_has_bits_[0] & 0x00000010u) != 0; +} +inline void Content::set_has_z() { + _has_bits_[0] |= 0x00000010u; +} +inline void Content::clear_has_z() { + _has_bits_[0] &= ~0x00000010u; +} +inline void Content::clear_z() { + z_ = 0; + clear_has_z(); +} +inline ::google::protobuf::int32 Content::z() const { + return z_; +} +inline void Content::set_z(::google::protobuf::int32 value) { + set_has_z(); + z_ = value; +} + +// ------------------------------------------------------------------- + +// Payload + +// repeated .sampler.Content data = 1; +inline int Payload::data_size() const { + return data_.size(); +} +inline void Payload::clear_data() { + data_.Clear(); +} +inline const ::sampler::Content& Payload::data(int index) const { + return data_.Get(index); +} +inline ::sampler::Content* Payload::mutable_data(int index) { + return data_.Mutable(index); +} +inline ::sampler::Content* Payload::add_data() { + return data_.Add(); +} +inline const ::google::protobuf::RepeatedPtrField< ::sampler::Content >& +Payload::data() const { + return data_; +} +inline ::google::protobuf::RepeatedPtrField< ::sampler::Content >* +Payload::mutable_data() { + return &data_; +} + + +// @@protoc_insertion_point(namespace_scope) + +} // namespace sampler + +#ifndef SWIG +namespace google { +namespace protobuf { + + +} // namespace google +} // namespace protobuf +#endif // SWIG + +// @@protoc_insertion_point(global_scope) + +#endif // PROTOBUF_payload_2eproto__INCLUDED diff --git a/fairmq/prototest/payload.proto b/fairmq/prototest/payload.proto new file mode 100644 index 00000000..1ecfbfdd --- /dev/null +++ b/fairmq/prototest/payload.proto @@ -0,0 +1,13 @@ +package sampler; + +message Content { + optional double a = 1; + optional double b = 2; + optional int32 x = 3; + optional int32 y = 4; + optional int32 z = 5; +} + +message Payload { + repeated Content data = 1; +} diff --git a/fairmq/zeromq/runBenchmarkSampler.cxx b/fairmq/run/runBenchmarkSampler.cxx similarity index 92% rename from fairmq/zeromq/runBenchmarkSampler.cxx rename to fairmq/run/runBenchmarkSampler.cxx index 0ab12f90..e8c50398 100644 --- a/fairmq/zeromq/runBenchmarkSampler.cxx +++ b/fairmq/run/runBenchmarkSampler.cxx @@ -10,7 +10,12 @@ #include "FairMQLogger.h" #include "FairMQBenchmarkSampler.h" -#include "FairMQTransportFactoryZMQ.h" + +#ifdef NANOMSG + #include "FairMQTransportFactoryNN.h" +#else + #include "FairMQTransportFactoryZMQ.h" +#endif using std::cout; using std::cin; @@ -54,7 +59,12 @@ int main(int argc, char** argv) LOG(INFO) << "PID: " << getpid(); +#ifdef NANOMSG + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); +#else FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); +#endif + sampler.SetTransport(transportFactory); int i = 1; diff --git a/fairmq/nanomsg/runBenchmarkSampler.cxx b/fairmq/run/runBinSampler.cxx similarity index 52% rename from fairmq/nanomsg/runBenchmarkSampler.cxx rename to fairmq/run/runBinSampler.cxx index 44ec3d51..0ccf5f97 100644 --- a/fairmq/nanomsg/runBenchmarkSampler.cxx +++ b/fairmq/run/runBinSampler.cxx @@ -9,8 +9,13 @@ #include #include "FairMQLogger.h" -#include "FairMQBenchmarkSampler.h" -#include "FairMQTransportFactoryNN.h" +#include "FairMQBinSampler.h" + +#ifdef NANOMSG + #include "FairMQTransportFactoryNN.h" +#else + #include "FairMQTransportFactoryZMQ.h" +#endif using std::cout; using std::cin; @@ -18,14 +23,14 @@ using std::endl; using std::stringstream; -FairMQBenchmarkSampler sampler; +FairMQBinSampler sampler; static void s_signal_handler (int signal) { cout << endl << "Caught signal " << signal << endl; - sampler.ChangeState(FairMQBenchmarkSampler::STOP); - sampler.ChangeState(FairMQBenchmarkSampler::END); + sampler.ChangeState(FairMQBinSampler::STOP); + sampler.ChangeState(FairMQBinSampler::END); cout << "Shutdown complete. Bye!" << endl; exit(1); @@ -54,59 +59,64 @@ int main(int argc, char** argv) LOG(INFO) << "PID: " << getpid(); +#ifdef NANOMSG FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); +#else + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); +#endif + sampler.SetTransport(transportFactory); int i = 1; - sampler.SetProperty(FairMQBenchmarkSampler::Id, argv[i]); + sampler.SetProperty(FairMQBinSampler::Id, argv[i]); ++i; int eventSize; stringstream(argv[i]) >> eventSize; - sampler.SetProperty(FairMQBenchmarkSampler::EventSize, eventSize); + sampler.SetProperty(FairMQBinSampler::EventSize, eventSize); ++i; int eventRate; stringstream(argv[i]) >> eventRate; - sampler.SetProperty(FairMQBenchmarkSampler::EventRate, eventRate); + sampler.SetProperty(FairMQBinSampler::EventRate, eventRate); ++i; int numIoThreads; stringstream(argv[i]) >> numIoThreads; - sampler.SetProperty(FairMQBenchmarkSampler::NumIoThreads, numIoThreads); + sampler.SetProperty(FairMQBinSampler::NumIoThreads, numIoThreads); ++i; - sampler.SetProperty(FairMQBenchmarkSampler::NumInputs, 0); - sampler.SetProperty(FairMQBenchmarkSampler::NumOutputs, 1); + sampler.SetProperty(FairMQBinSampler::NumInputs, 0); + sampler.SetProperty(FairMQBinSampler::NumOutputs, 1); - sampler.ChangeState(FairMQBenchmarkSampler::INIT); + sampler.ChangeState(FairMQBinSampler::INIT); - sampler.SetProperty(FairMQBenchmarkSampler::OutputSocketType, argv[i], 0); + sampler.SetProperty(FairMQBinSampler::OutputSocketType, argv[i], 0); ++i; int outputSndBufSize; stringstream(argv[i]) >> outputSndBufSize; - sampler.SetProperty(FairMQBenchmarkSampler::OutputSndBufSize, outputSndBufSize, 0); + sampler.SetProperty(FairMQBinSampler::OutputSndBufSize, outputSndBufSize, 0); ++i; - sampler.SetProperty(FairMQBenchmarkSampler::OutputMethod, argv[i], 0); + sampler.SetProperty(FairMQBinSampler::OutputMethod, argv[i], 0); ++i; - sampler.SetProperty(FairMQBenchmarkSampler::OutputAddress, argv[i], 0); + sampler.SetProperty(FairMQBinSampler::OutputAddress, argv[i], 0); ++i; - sampler.ChangeState(FairMQBenchmarkSampler::SETOUTPUT); - sampler.ChangeState(FairMQBenchmarkSampler::SETINPUT); - sampler.ChangeState(FairMQBenchmarkSampler::RUN); + sampler.ChangeState(FairMQBinSampler::SETOUTPUT); + sampler.ChangeState(FairMQBinSampler::SETINPUT); + sampler.ChangeState(FairMQBinSampler::RUN); char ch; cin.get(ch); - sampler.ChangeState(FairMQBenchmarkSampler::STOP); - sampler.ChangeState(FairMQBenchmarkSampler::END); + sampler.ChangeState(FairMQBinSampler::STOP); + sampler.ChangeState(FairMQBinSampler::END); return 0; } diff --git a/fairmq/nanomsg/runSink.cxx b/fairmq/run/runBinSink.cxx similarity index 54% rename from fairmq/nanomsg/runSink.cxx rename to fairmq/run/runBinSink.cxx index cccb1928..85fc3b42 100644 --- a/fairmq/nanomsg/runSink.cxx +++ b/fairmq/run/runBinSink.cxx @@ -9,8 +9,13 @@ #include #include "FairMQLogger.h" -#include "FairMQSink.h" -#include "FairMQTransportFactoryNN.h" +#include "FairMQBinSink.h" + +#ifdef NANOMSG + #include "FairMQTransportFactoryNN.h" +#else + #include "FairMQTransportFactoryZMQ.h" +#endif using std::cout; using std::cin; @@ -18,14 +23,14 @@ using std::endl; using std::stringstream; -FairMQSink sink; +FairMQBinSink sink; static void s_signal_handler (int signal) { cout << endl << "Caught signal " << signal << endl; - sink.ChangeState(FairMQSink::STOP); - sink.ChangeState(FairMQSink::END); + sink.ChangeState(FairMQBinSink::STOP); + sink.ChangeState(FairMQBinSink::END); cout << "Shutdown complete. Bye!" << endl; exit(1); @@ -54,48 +59,53 @@ int main(int argc, char** argv) LOG(INFO) << "PID: " << getpid(); +#ifdef NANOMSG FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); +#else + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); +#endif + sink.SetTransport(transportFactory); int i = 1; - sink.SetProperty(FairMQSink::Id, argv[i]); + sink.SetProperty(FairMQBinSink::Id, argv[i]); ++i; int numIoThreads; stringstream(argv[i]) >> numIoThreads; - sink.SetProperty(FairMQSink::NumIoThreads, numIoThreads); + sink.SetProperty(FairMQBinSink::NumIoThreads, numIoThreads); ++i; - sink.SetProperty(FairMQSink::NumInputs, 1); - sink.SetProperty(FairMQSink::NumOutputs, 0); + sink.SetProperty(FairMQBinSink::NumInputs, 1); + sink.SetProperty(FairMQBinSink::NumOutputs, 0); - sink.ChangeState(FairMQSink::INIT); + sink.ChangeState(FairMQBinSink::INIT); - sink.SetProperty(FairMQSink::InputSocketType, argv[i], 0); + sink.SetProperty(FairMQBinSink::InputSocketType, argv[i], 0); ++i; int inputRcvBufSize; stringstream(argv[i]) >> inputRcvBufSize; - sink.SetProperty(FairMQSink::InputRcvBufSize, inputRcvBufSize, 0); + sink.SetProperty(FairMQBinSink::InputRcvBufSize, inputRcvBufSize, 0); ++i; - sink.SetProperty(FairMQSink::InputMethod, argv[i], 0); + sink.SetProperty(FairMQBinSink::InputMethod, argv[i], 0); ++i; - sink.SetProperty(FairMQSink::InputAddress, argv[i], 0); + sink.SetProperty(FairMQBinSink::InputAddress, argv[i], 0); ++i; - sink.ChangeState(FairMQSink::SETOUTPUT); - sink.ChangeState(FairMQSink::SETINPUT); - sink.ChangeState(FairMQSink::RUN); + sink.ChangeState(FairMQBinSink::SETOUTPUT); + sink.ChangeState(FairMQBinSink::SETINPUT); + sink.ChangeState(FairMQBinSink::RUN); char ch; cin.get(ch); - sink.ChangeState(FairMQSink::STOP); - sink.ChangeState(FairMQSink::END); + sink.ChangeState(FairMQBinSink::STOP); + sink.ChangeState(FairMQBinSink::END); return 0; } diff --git a/fairmq/zeromq/runBuffer.cxx b/fairmq/run/runBuffer.cxx similarity index 92% rename from fairmq/zeromq/runBuffer.cxx rename to fairmq/run/runBuffer.cxx index a1f3c868..fee9d850 100644 --- a/fairmq/zeromq/runBuffer.cxx +++ b/fairmq/run/runBuffer.cxx @@ -10,7 +10,12 @@ #include "FairMQLogger.h" #include "FairMQBuffer.h" -#include "FairMQTransportFactoryZMQ.h" + +#ifdef NANOMSG + #include "FairMQTransportFactoryNN.h" +#else + #include "FairMQTransportFactoryZMQ.h" +#endif using std::cout; using std::cin; @@ -54,7 +59,12 @@ int main(int argc, char** argv) LOG(INFO) << "PID: " << getpid(); +#ifdef NANOMSG + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); +#else FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); +#endif + buffer.SetTransport(transportFactory); int i = 1; diff --git a/fairmq/zeromq/runNToOneMerger.cxx b/fairmq/run/runMerger.cxx similarity index 89% rename from fairmq/zeromq/runNToOneMerger.cxx rename to fairmq/run/runMerger.cxx index 5d4249c2..58e33fd7 100644 --- a/fairmq/zeromq/runNToOneMerger.cxx +++ b/fairmq/run/runMerger.cxx @@ -1,5 +1,5 @@ /** - * runNToOneMerger.cxx + * runMerger.cxx * * @since 2012-12-06 * @author D. Klein, A. Rybalchenko @@ -10,7 +10,12 @@ #include "FairMQLogger.h" #include "FairMQMerger.h" -#include "FairMQTransportFactoryZMQ.h" + +#ifdef NANOMSG + #include "FairMQTransportFactoryNN.h" +#else + #include "FairMQTransportFactoryZMQ.h" +#endif using std::cout; using std::cin; @@ -43,13 +48,13 @@ static void s_catch_signals (void) int main(int argc, char** argv) { - if ( argc < 16 || (argc-8)%4!=0 ) { + if ( argc < 16 || (argc - 8) % 4 != 0 ) { cout << "Usage: merger \tID numIoTreads numInputs\n" << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" << "\t\t...\n" << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" - << argc << endl; + << argc << " arguments provided" << endl; return 1; } @@ -57,7 +62,12 @@ int main(int argc, char** argv) LOG(INFO) << "PID: " << getpid(); +#ifdef NANOMSG + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); +#else FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); +#endif + merger.SetTransport(transportFactory); int i = 1; @@ -77,10 +87,8 @@ int main(int argc, char** argv) merger.SetProperty(FairMQMerger::NumOutputs, 1); - merger.ChangeState(FairMQMerger::INIT); - for (int iInput = 0; iInput < numInputs; iInput++ ) { merger.SetProperty(FairMQMerger::InputSocketType, argv[i], iInput); ++i; @@ -105,7 +113,6 @@ int main(int argc, char** argv) merger.SetProperty(FairMQMerger::OutputAddress, argv[i], 0); ++i; - merger.ChangeState(FairMQMerger::SETOUTPUT); merger.ChangeState(FairMQMerger::SETINPUT); merger.ChangeState(FairMQMerger::RUN); diff --git a/fairmq/run/runProtoSampler.cxx b/fairmq/run/runProtoSampler.cxx new file mode 100644 index 00000000..5aa5e332 --- /dev/null +++ b/fairmq/run/runProtoSampler.cxx @@ -0,0 +1,123 @@ +/** + * runBenchmarkSampler.cxx + * + * @since 2013-04-23 + * @author D. Klein, A. Rybalchenko + */ + +#include +#include + +#include "FairMQLogger.h" +#include "FairMQProtoSampler.h" + +#ifdef NANOMSG + #include "FairMQTransportFactoryNN.h" +#else + #include "FairMQTransportFactoryZMQ.h" +#endif + +using std::cout; +using std::cin; +using std::endl; +using std::stringstream; + + +FairMQProtoSampler sampler; + +static void s_signal_handler (int signal) +{ + cout << endl << "Caught signal " << signal << endl; + + sampler.ChangeState(FairMQProtoSampler::STOP); + sampler.ChangeState(FairMQProtoSampler::END); + + cout << "Shutdown complete. Bye!" << endl; + exit(1); +} + +static void s_catch_signals (void) +{ + struct sigaction action; + action.sa_handler = s_signal_handler; + action.sa_flags = 0; + sigemptyset(&action.sa_mask); + sigaction(SIGINT, &action, NULL); + sigaction(SIGTERM, &action, NULL); +} + +int main(int argc, char** argv) +{ + if ( argc != 9 ) { + cout << "Usage: bsampler ID eventSize eventRate numIoTreads\n" + << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" + << endl; + return 1; + } + + s_catch_signals(); + + LOG(INFO) << "PID: " << getpid(); + +#ifdef NANOMSG + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); +#else + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); +#endif + + sampler.SetTransport(transportFactory); + + int i = 1; + + sampler.SetProperty(FairMQProtoSampler::Id, argv[i]); + ++i; + + int eventSize; + stringstream(argv[i]) >> eventSize; + sampler.SetProperty(FairMQProtoSampler::EventSize, eventSize); + ++i; + + int eventRate; + stringstream(argv[i]) >> eventRate; + sampler.SetProperty(FairMQProtoSampler::EventRate, eventRate); + ++i; + + int numIoThreads; + stringstream(argv[i]) >> numIoThreads; + sampler.SetProperty(FairMQProtoSampler::NumIoThreads, numIoThreads); + ++i; + + sampler.SetProperty(FairMQProtoSampler::NumInputs, 0); + sampler.SetProperty(FairMQProtoSampler::NumOutputs, 1); + + + sampler.ChangeState(FairMQProtoSampler::INIT); + + + sampler.SetProperty(FairMQProtoSampler::OutputSocketType, argv[i], 0); + ++i; + int outputSndBufSize; + stringstream(argv[i]) >> outputSndBufSize; + sampler.SetProperty(FairMQProtoSampler::OutputSndBufSize, outputSndBufSize, 0); + ++i; + sampler.SetProperty(FairMQProtoSampler::OutputMethod, argv[i], 0); + ++i; + sampler.SetProperty(FairMQProtoSampler::OutputAddress, argv[i], 0); + ++i; + + + sampler.ChangeState(FairMQProtoSampler::SETOUTPUT); + sampler.ChangeState(FairMQProtoSampler::SETINPUT); + sampler.ChangeState(FairMQProtoSampler::RUN); + + + + char ch; + cin.get(ch); + + sampler.ChangeState(FairMQProtoSampler::STOP); + sampler.ChangeState(FairMQProtoSampler::END); + + return 0; +} + diff --git a/fairmq/run/runProtoSink.cxx b/fairmq/run/runProtoSink.cxx new file mode 100644 index 00000000..92b92f77 --- /dev/null +++ b/fairmq/run/runProtoSink.cxx @@ -0,0 +1,112 @@ +/** + * runSink.cxx + * + * @since 2013-01-21 + * @author D. Klein, A. Rybalchenko + */ + +#include +#include + +#include "FairMQLogger.h" +#include "FairMQProtoSink.h" + +#ifdef NANOMSG + #include "FairMQTransportFactoryNN.h" +#else + #include "FairMQTransportFactoryZMQ.h" +#endif + +using std::cout; +using std::cin; +using std::endl; +using std::stringstream; + + +FairMQProtoSink sink; + +static void s_signal_handler (int signal) +{ + cout << endl << "Caught signal " << signal << endl; + + sink.ChangeState(FairMQProtoSink::STOP); + sink.ChangeState(FairMQProtoSink::END); + + cout << "Shutdown complete. Bye!" << endl; + exit(1); +} + +static void s_catch_signals (void) +{ + struct sigaction action; + action.sa_handler = s_signal_handler; + action.sa_flags = 0; + sigemptyset(&action.sa_mask); + sigaction(SIGINT, &action, NULL); + sigaction(SIGTERM, &action, NULL); +} + +int main(int argc, char** argv) +{ + if ( argc != 7 ) { + cout << "Usage: sink \tID numIoTreads\n" + << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" + << endl; + return 1; + } + + s_catch_signals(); + + LOG(INFO) << "PID: " << getpid(); + +#ifdef NANOMSG + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); +#else + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); +#endif + + sink.SetTransport(transportFactory); + + int i = 1; + + sink.SetProperty(FairMQProtoSink::Id, argv[i]); + ++i; + + int numIoThreads; + stringstream(argv[i]) >> numIoThreads; + sink.SetProperty(FairMQProtoSink::NumIoThreads, numIoThreads); + ++i; + + sink.SetProperty(FairMQProtoSink::NumInputs, 1); + sink.SetProperty(FairMQProtoSink::NumOutputs, 0); + + + sink.ChangeState(FairMQProtoSink::INIT); + + + sink.SetProperty(FairMQProtoSink::InputSocketType, argv[i], 0); + ++i; + int inputRcvBufSize; + stringstream(argv[i]) >> inputRcvBufSize; + sink.SetProperty(FairMQProtoSink::InputRcvBufSize, inputRcvBufSize, 0); + ++i; + sink.SetProperty(FairMQProtoSink::InputMethod, argv[i], 0); + ++i; + sink.SetProperty(FairMQProtoSink::InputAddress, argv[i], 0); + ++i; + + + sink.ChangeState(FairMQProtoSink::SETOUTPUT); + sink.ChangeState(FairMQProtoSink::SETINPUT); + sink.ChangeState(FairMQProtoSink::RUN); + + + char ch; + cin.get(ch); + + sink.ChangeState(FairMQProtoSink::STOP); + sink.ChangeState(FairMQProtoSink::END); + + return 0; +} + diff --git a/fairmq/zeromq/runProxy.cxx b/fairmq/run/runProxy.cxx similarity index 92% rename from fairmq/zeromq/runProxy.cxx rename to fairmq/run/runProxy.cxx index 993b755c..0465c0c8 100644 --- a/fairmq/zeromq/runProxy.cxx +++ b/fairmq/run/runProxy.cxx @@ -10,7 +10,12 @@ #include "FairMQLogger.h" #include "FairMQProxy.h" -#include "FairMQTransportFactoryZMQ.h" + +#ifdef NANOMSG + #include "FairMQTransportFactoryNN.h" +#else + #include "FairMQTransportFactoryZMQ.h" +#endif using std::cout; using std::cin; @@ -54,7 +59,12 @@ int main(int argc, char** argv) LOG(INFO) << "PID: " << getpid(); +#ifdef NANOMSG + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); +#else FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); +#endif + proxy.SetTransport(transportFactory); int i = 1; diff --git a/fairmq/zeromq/runSink.cxx b/fairmq/run/runSink.cxx similarity index 90% rename from fairmq/zeromq/runSink.cxx rename to fairmq/run/runSink.cxx index 1285af83..a61af6c9 100644 --- a/fairmq/zeromq/runSink.cxx +++ b/fairmq/run/runSink.cxx @@ -10,7 +10,12 @@ #include "FairMQLogger.h" #include "FairMQSink.h" -#include "FairMQTransportFactoryZMQ.h" + +#ifdef NANOMSG + #include "FairMQTransportFactoryNN.h" +#else + #include "FairMQTransportFactoryZMQ.h" +#endif using std::cout; using std::cin; @@ -54,7 +59,12 @@ int main(int argc, char** argv) LOG(INFO) << "PID: " << getpid(); +#ifdef NANOMSG + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); +#else FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); +#endif + sink.SetTransport(transportFactory); int i = 1; diff --git a/fairmq/zeromq/runOneToNSplitter.cxx b/fairmq/run/runSplitter.cxx similarity index 88% rename from fairmq/zeromq/runOneToNSplitter.cxx rename to fairmq/run/runSplitter.cxx index 78a6af6c..593f2444 100644 --- a/fairmq/zeromq/runOneToNSplitter.cxx +++ b/fairmq/run/runSplitter.cxx @@ -1,5 +1,5 @@ /** - * runOneToNSplitter.cxx + * runSplitter.cxx * * @since 2012-12-06 * @author D. Klein, A. Rybalchenko @@ -10,7 +10,12 @@ #include "FairMQLogger.h" #include "FairMQSplitter.h" -#include "FairMQTransportFactoryZMQ.h" + +#ifdef NANOMSG + #include "FairMQTransportFactoryNN.h" +#else + #include "FairMQTransportFactoryZMQ.h" +#endif using std::cout; using std::cin; @@ -43,12 +48,13 @@ static void s_catch_signals (void) int main(int argc, char** argv) { - if ( argc < 16 || (argc - 8) % 4 != 0 ) { // argc{ name, id, threads, nout, insock, inbuff, inmet, inadd, ... out} + if ( argc < 16 || (argc - 8) % 4 != 0 ) { cout << "Usage: splitter \tID numIoTreads numOutputs\n" << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" - << "\t\t..." << argc << " arguments provided" << endl; + << "\t\t..." + << argc << " arguments provided" << endl; return 1; } @@ -56,7 +62,12 @@ int main(int argc, char** argv) LOG(INFO) << "PID: " << getpid(); +#ifdef NANOMSG + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); +#else FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); +#endif + splitter.SetTransport(transportFactory); int i = 1; @@ -76,10 +87,8 @@ int main(int argc, char** argv) splitter.SetProperty(FairMQSplitter::NumOutputs, numOutputs); ++i; - splitter.ChangeState(FairMQSplitter::INIT); - splitter.SetProperty(FairMQSplitter::InputSocketType, argv[i], 0); ++i; int inputRcvBufSize; diff --git a/fairmq/zeromq/FairMQMessageZMQ.cxx b/fairmq/zeromq/FairMQMessageZMQ.cxx index b27e0b39..ee29fa88 100644 --- a/fairmq/zeromq/FairMQMessageZMQ.cxx +++ b/fairmq/zeromq/FairMQMessageZMQ.cxx @@ -5,34 +5,32 @@ * @author D. Klein, A. Rybalchenko, N. Winckler */ +#include #include #include "FairMQMessageZMQ.h" #include "FairMQLogger.h" -FairMQMessageZMQ::FairMQMessageZMQ() : - fMessage(new zmq_msg_t()) +FairMQMessageZMQ::FairMQMessageZMQ() { - int rc = zmq_msg_init (fMessage); + int rc = zmq_msg_init (&fMessage); if (rc != 0) { LOG(ERROR) << "failed initializing message, reason: " << zmq_strerror(errno); } } -FairMQMessageZMQ::FairMQMessageZMQ(size_t size) : - fMessage(new zmq_msg_t()) +FairMQMessageZMQ::FairMQMessageZMQ(size_t size) { - int rc = zmq_msg_init_size (fMessage, size); + int rc = zmq_msg_init_size (&fMessage, size); if (rc != 0) { LOG(ERROR) << "failed initializing message with size, reason: " << zmq_strerror(errno); } } -FairMQMessageZMQ::FairMQMessageZMQ(void* data, size_t size) : - fMessage(new zmq_msg_t()) +FairMQMessageZMQ::FairMQMessageZMQ(void* data, size_t size) { - int rc = zmq_msg_init_data (fMessage, data, size, &CleanUp, NULL); + int rc = zmq_msg_init_data (&fMessage, data, size, &CleanUp, NULL); if (rc != 0) { LOG(ERROR) << "failed initializing message with data, reason: " << zmq_strerror(errno); } @@ -41,7 +39,7 @@ FairMQMessageZMQ::FairMQMessageZMQ(void* data, size_t size) : void FairMQMessageZMQ::Rebuild() { CloseMessage(); - int rc = zmq_msg_init (fMessage); + int rc = zmq_msg_init (&fMessage); if (rc != 0) { LOG(ERROR) << "failed initializing message, reason: " << zmq_strerror(errno); } @@ -50,7 +48,7 @@ void FairMQMessageZMQ::Rebuild() void FairMQMessageZMQ::Rebuild(size_t size) { CloseMessage(); - int rc = zmq_msg_init_size (fMessage, size); + int rc = zmq_msg_init_size (&fMessage, size); if (rc != 0) { LOG(ERROR) << "failed initializing message with size, reason: " << zmq_strerror(errno); } @@ -59,7 +57,7 @@ void FairMQMessageZMQ::Rebuild(size_t size) void FairMQMessageZMQ::Rebuild(void* data, size_t size) { CloseMessage(); - int rc = zmq_msg_init_data (fMessage, data, size, &CleanUp, NULL); + int rc = zmq_msg_init_data (&fMessage, data, size, &CleanUp, NULL); if (rc != 0) { LOG(ERROR) << "failed initializing message with data, reason: " << zmq_strerror(errno); } @@ -67,17 +65,17 @@ void FairMQMessageZMQ::Rebuild(void* data, size_t size) void* FairMQMessageZMQ::GetMessage() { - return fMessage; + return &fMessage; } void* FairMQMessageZMQ::GetData() { - return zmq_msg_data (fMessage); + return zmq_msg_data (&fMessage); } size_t FairMQMessageZMQ::GetSize() { - return zmq_msg_size (fMessage); + return zmq_msg_size (&fMessage); } void FairMQMessageZMQ::SetMessage(void* data, size_t size) @@ -85,21 +83,22 @@ void FairMQMessageZMQ::SetMessage(void* data, size_t size) // dummy method to comply with the interface. functionality not allowed in zeromq. } +void FairMQMessageZMQ::Copy(FairMQMessage* msg) +{ + CloseMessage(); + size_t size = msg->GetSize(); + zmq_msg_init_size(&fMessage, size); + std::memcpy(zmq_msg_data(&fMessage), msg->GetData(), size); +} + inline void FairMQMessageZMQ::CloseMessage() { - int rc = zmq_msg_close (fMessage); + int rc = zmq_msg_close (&fMessage); if (rc != 0) { LOG(ERROR) << "failed closing message, reason: " << zmq_strerror(errno); } } -void FairMQMessageZMQ::Copy(FairMQMessage* msg) -{ - int rc = zmq_msg_copy (fMessage, (static_cast(msg)->fMessage)); - if (rc != 0) { - LOG(ERROR) << "failed copying message, reason: " << zmq_strerror(errno); - } -} void FairMQMessageZMQ::CleanUp(void* data, void* hint) { @@ -108,7 +107,7 @@ void FairMQMessageZMQ::CleanUp(void* data, void* hint) FairMQMessageZMQ::~FairMQMessageZMQ() { - int rc = zmq_msg_close (fMessage); + int rc = zmq_msg_close (&fMessage); if (rc != 0) { LOG(ERROR) << "failed closing message with data, reason: " << zmq_strerror(errno); } diff --git a/fairmq/zeromq/FairMQMessageZMQ.h b/fairmq/zeromq/FairMQMessageZMQ.h index f21af547..e0c68522 100644 --- a/fairmq/zeromq/FairMQMessageZMQ.h +++ b/fairmq/zeromq/FairMQMessageZMQ.h @@ -40,7 +40,7 @@ class FairMQMessageZMQ : public FairMQMessage virtual ~FairMQMessageZMQ(); private: - zmq_msg_t* fMessage; + zmq_msg_t fMessage; }; #endif /* FAIRMQMESSAGEZMQ_H_ */ diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx b/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx index 8db16590..c108197e 100644 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx +++ b/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx @@ -5,11 +5,15 @@ * @author: A. Rybalchenko */ +#include "zmq.h" + #include "FairMQTransportFactoryZMQ.h" FairMQTransportFactoryZMQ::FairMQTransportFactoryZMQ() { - LOG(INFO) << "Using ZeroMQ library"; + int major, minor, patch; + zmq_version (&major, &minor, &patch); + LOG(INFO) << "Using ZeroMQ library, version: " << major << "." << minor << "." << patch; } FairMQMessage* FairMQTransportFactoryZMQ::CreateMessage() diff --git a/fairmq/zeromq/runMerger.cxx b/fairmq/zeromq/runMerger.cxx deleted file mode 100644 index 8716b3da..00000000 --- a/fairmq/zeromq/runMerger.cxx +++ /dev/null @@ -1,124 +0,0 @@ -/** - * runMerger.cxx - * - * @since 2012-12-06 - * @author D. Klein, A. Rybalchenko - */ - -#include -#include - -#include "FairMQLogger.h" -#include "FairMQMerger.h" -#include "FairMQTransportFactoryZMQ.h" - -using std::cout; -using std::cin; -using std::endl; -using std::stringstream; - - -FairMQMerger merger; - -static void s_signal_handler (int signal) -{ - cout << endl << "Caught signal " << signal << endl; - - merger.ChangeState(FairMQMerger::STOP); - merger.ChangeState(FairMQMerger::END); - - cout << "Shutdown complete. Bye!" << endl; - exit(1); -} - -static void s_catch_signals (void) -{ - struct sigaction action; - action.sa_handler = s_signal_handler; - action.sa_flags = 0; - sigemptyset(&action.sa_mask); - sigaction(SIGINT, &action, NULL); - sigaction(SIGTERM, &action, NULL); -} - -int main(int argc, char** argv) -{ - if ( argc != 15 ) { - cout << "Usage: merger \tID numIoTreads\n" - << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" - << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" - << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << endl; - return 1; - } - - s_catch_signals(); - - LOG(INFO) << "PID: " << getpid(); - - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); - merger.SetTransport(transportFactory); - - int i = 1; - - merger.SetProperty(FairMQMerger::Id, argv[i]); - ++i; - - int numIoThreads; - stringstream(argv[i]) >> numIoThreads; - merger.SetProperty(FairMQMerger::NumIoThreads, numIoThreads); - ++i; - - merger.SetProperty(FairMQMerger::NumInputs, 2); - merger.SetProperty(FairMQMerger::NumOutputs, 1); - - - merger.ChangeState(FairMQMerger::INIT); - - - merger.SetProperty(FairMQMerger::InputSocketType, argv[i], 0); - ++i; - int inputRcvBufSize; - stringstream(argv[i]) >> inputRcvBufSize; - merger.SetProperty(FairMQMerger::InputRcvBufSize, inputRcvBufSize, 0); - ++i; - merger.SetProperty(FairMQMerger::InputMethod, argv[i], 0); - ++i; - merger.SetProperty(FairMQMerger::InputAddress, argv[i], 0); - ++i; - - merger.SetProperty(FairMQMerger::InputSocketType, argv[i], 1); - ++i; - stringstream(argv[i]) >> inputRcvBufSize; - merger.SetProperty(FairMQMerger::InputRcvBufSize, inputRcvBufSize, 1); - ++i; - merger.SetProperty(FairMQMerger::InputMethod, argv[i], 1); - ++i; - merger.SetProperty(FairMQMerger::InputAddress, argv[i], 1); - ++i; - - merger.SetProperty(FairMQMerger::OutputSocketType, argv[i], 0); - ++i; - int outputSndBufSize; - stringstream(argv[i]) >> outputSndBufSize; - merger.SetProperty(FairMQMerger::OutputSndBufSize, outputSndBufSize, 0); - ++i; - merger.SetProperty(FairMQMerger::OutputMethod, argv[i], 0); - ++i; - merger.SetProperty(FairMQMerger::OutputAddress, argv[i], 0); - ++i; - - - merger.ChangeState(FairMQMerger::SETOUTPUT); - merger.ChangeState(FairMQMerger::SETINPUT); - merger.ChangeState(FairMQMerger::RUN); - - - char ch; - cin.get(ch); - - merger.ChangeState(FairMQMerger::STOP); - merger.ChangeState(FairMQMerger::END); - - return 0; -} - diff --git a/fairmq/zeromq/runSplitter.cxx b/fairmq/zeromq/runSplitter.cxx deleted file mode 100644 index 42c77ddd..00000000 --- a/fairmq/zeromq/runSplitter.cxx +++ /dev/null @@ -1,124 +0,0 @@ -/** - * runSplitter.cxx - * - * @since 2012-12-06 - * @author D. Klein, A. Rybalchenko - */ - -#include -#include - -#include "FairMQLogger.h" -#include "FairMQSplitter.h" -#include "FairMQTransportFactoryZMQ.h" - -using std::cout; -using std::cin; -using std::endl; -using std::stringstream; - - -FairMQSplitter splitter; - -static void s_signal_handler (int signal) -{ - cout << endl << "Caught signal " << signal << endl; - - splitter.ChangeState(FairMQSplitter::STOP); - splitter.ChangeState(FairMQSplitter::END); - - cout << "Shutdown complete. Bye!" << endl; - exit(1); -} - -static void s_catch_signals (void) -{ - struct sigaction action; - action.sa_handler = s_signal_handler; - action.sa_flags = 0; - sigemptyset(&action.sa_mask); - sigaction(SIGINT, &action, NULL); - sigaction(SIGTERM, &action, NULL); -} - -int main(int argc, char** argv) -{ - if ( argc != 15 ) { - cout << "Usage: splitter \tID numIoTreads\n" - << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" - << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" - << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << endl; - return 1; - } - - s_catch_signals(); - - LOG(INFO) << "PID: " << getpid(); - - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); - splitter.SetTransport(transportFactory); - - int i = 1; - - splitter.SetProperty(FairMQSplitter::Id, argv[i]); - ++i; - - int numIoThreads; - stringstream(argv[i]) >> numIoThreads; - splitter.SetProperty(FairMQSplitter::NumIoThreads, numIoThreads); - ++i; - - splitter.SetProperty(FairMQSplitter::NumInputs, 1); - splitter.SetProperty(FairMQSplitter::NumOutputs, 2); - - - splitter.ChangeState(FairMQSplitter::INIT); - - - splitter.SetProperty(FairMQSplitter::InputSocketType, argv[i], 0); - ++i; - int inputRcvBufSize; - stringstream(argv[i]) >> inputRcvBufSize; - splitter.SetProperty(FairMQSplitter::InputRcvBufSize, inputRcvBufSize, 0); - ++i; - splitter.SetProperty(FairMQSplitter::InputMethod, argv[i], 0); - ++i; - splitter.SetProperty(FairMQSplitter::InputAddress, argv[i], 0); - ++i; - - splitter.SetProperty(FairMQSplitter::OutputSocketType, argv[i], 0); - ++i; - int outputSndBufSize; - stringstream(argv[i]) >> outputSndBufSize; - splitter.SetProperty(FairMQSplitter::OutputSndBufSize, outputSndBufSize, 0); - ++i; - splitter.SetProperty(FairMQSplitter::OutputMethod, argv[i], 0); - ++i; - splitter.SetProperty(FairMQSplitter::OutputAddress, argv[i], 0); - ++i; - - splitter.SetProperty(FairMQSplitter::OutputSocketType, argv[i], 1); - ++i; - stringstream(argv[i]) >> outputSndBufSize; - splitter.SetProperty(FairMQSplitter::OutputSndBufSize, outputSndBufSize, 1); - ++i; - splitter.SetProperty(FairMQSplitter::OutputMethod, argv[i], 1); - ++i; - splitter.SetProperty(FairMQSplitter::OutputAddress, argv[i], 1); - ++i; - - - splitter.ChangeState(FairMQSplitter::SETOUTPUT); - splitter.ChangeState(FairMQSplitter::SETINPUT); - splitter.ChangeState(FairMQSplitter::RUN); - - - char ch; - cin.get(ch); - - splitter.ChangeState(FairMQSplitter::STOP); - splitter.ChangeState(FairMQSplitter::END); - - return 0; -} -