From 64b9e991c33d5d498509d7e65d4162ecc2d5eadf Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Fri, 24 Jan 2014 15:54:29 +0100 Subject: [PATCH] add nanomsg implementations + use factory for nanomsg + lots of small stuff --- fairmq/CMakeLists.txt | 97 +++++++---- fairmq/FairMQBenchmarkSampler.cxx | 6 +- fairmq/FairMQBenchmarkSampler.h | 4 +- fairmq/FairMQConfigurable.cxx | 8 +- fairmq/FairMQConfigurable.h | 11 +- ...FairMQContext.cxx => FairMQContextZMQ.cxx} | 19 ++- .../{FairMQContext.h => FairMQContextZMQ.h} | 12 +- fairmq/FairMQDevice.cxx | 119 +++++++------ fairmq/FairMQDevice.h | 41 ++--- fairmq/FairMQLogger.cxx | 13 +- fairmq/FairMQLogger.h | 10 +- fairmq/FairMQMerger.cxx | 16 +- fairmq/FairMQMessage.h | 1 + fairmq/FairMQMessageNN.cxx | 135 +++++++++++++++ fairmq/FairMQMessageNN.h | 43 +++++ fairmq/FairMQMessageZMQ.cxx | 57 +++++-- fairmq/FairMQMessageZMQ.h | 3 +- fairmq/FairMQPoller.cxx | 6 + fairmq/FairMQPoller.h | 21 +++ fairmq/FairMQPollerNN.cxx | 39 +++++ fairmq/FairMQPollerNN.h | 33 ++++ fairmq/FairMQPollerZMQ.cxx | 41 +++++ fairmq/FairMQPollerZMQ.h | 33 ++++ fairmq/FairMQProcessor.cxx | 2 +- fairmq/FairMQSampler.cxx | 16 +- fairmq/FairMQSampler.h | 11 +- fairmq/FairMQSocket.h | 14 +- fairmq/FairMQSocketNN.cxx | 161 ++++++++++++++++++ fairmq/FairMQSocketNN.h | 56 ++++++ fairmq/FairMQSocketZMQ.cxx | 92 +++++----- fairmq/FairMQSocketZMQ.h | 26 +-- fairmq/FairMQTransportFactory.h | 8 +- fairmq/FairMQTransportFactoryNN.cxx | 38 +++++ fairmq/FairMQTransportFactoryNN.h | 33 ++++ fairmq/FairMQTransportFactoryZMQ.cxx | 17 +- fairmq/FairMQTransportFactoryZMQ.h | 9 +- fairmq/runBenchmarkSampler.cxx | 33 ++-- fairmq/runBuffer.cxx | 38 ++--- fairmq/runMerger.cxx | 45 +++-- fairmq/runNToOneMerger.cxx | 40 +++-- fairmq/runOneToNSplitter.cxx | 40 +++-- fairmq/runProxy.cxx | 37 ++-- fairmq/runSink.cxx | 29 ++-- fairmq/runSplitter.cxx | 45 +++-- 44 files changed, 1138 insertions(+), 420 deletions(-) rename fairmq/{FairMQContext.cxx => FairMQContextZMQ.cxx} (74%) rename fairmq/{FairMQContext.h => FairMQContextZMQ.h} (51%) create mode 100644 fairmq/FairMQPoller.cxx create mode 100644 fairmq/FairMQPoller.h create mode 100644 fairmq/FairMQPollerNN.cxx create mode 100644 fairmq/FairMQPollerNN.h create mode 100644 fairmq/FairMQPollerZMQ.cxx create mode 100644 fairmq/FairMQPollerZMQ.h create mode 100644 fairmq/FairMQTransportFactoryNN.cxx create mode 100644 fairmq/FairMQTransportFactoryNN.h diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index 205406b1..00f52a9e 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -1,69 +1,98 @@ set(INCLUDE_DIRECTORIES ${BASE_INCLUDE_DIRECTORIES} ${CMAKE_SOURCE_DIR}/fairmq - ${ZMQ_INCLUDE_DIR} - ${NANOMSG_INCLUDE_DIR} ${Boost_INCLUDE_DIR} ${ROOT_INCLUDE_DIR} ) +if(NANOMSG_FOUND) + set(INCLUDE_DIRECTORIES + ${INCLUDE_DIRECTORIES} + ${NANOMSG_LIBRARY_SHARED} + ) +else(NANOMSG_FOUND) + set(INCLUDE_DIRECTORIES + ${INCLUDE_DIRECTORIES} + ${ZMQ_LIBRARY_SHARED} + ) +endif(NANOMSG_FOUND) + include_directories(${INCLUDE_DIRECTORIES}) -Set(LINK_DIRECTORIES +set(LINK_DIRECTORIES ${ROOT_LIBRARY_DIR} ${Boost_LIBRARY_DIRS} ) link_directories(${LINK_DIRECTORIES}) -Set(SRCS +set(SRCS + "FairMQLogger.cxx" + "FairMQConfigurable.cxx" + "FairMQStateMachine.cxx" + "FairMQTransportFactory.cxx" + "FairMQMessage.cxx" + "FairMQSocket.cxx" + "FairMQDevice.cxx" "FairMQSampler.cxx" "FairMQBenchmarkSampler.cxx" - "FairMQStateMachine.cxx" - "FairMQConfigurable.cxx" + "FairMQProcessor.cxx" + "FairMQSink.cxx" "FairMQBuffer.cxx" - "FairMQSamplerTask.cxx" - "FairMQLogger.cxx" - "FairMQContext.cxx" - "FairMQMessage.cxx" - "FairMQTransportFactory.cxx" - "FairMQTransportFactoryZMQ.cxx" - "FairMQMessageZMQ.cxx" - "FairMQMessageNN.cxx" - "FairMQSocket.cxx" - "FairMQSocketZMQ.cxx" - "FairMQSocketNN.cxx" + "FairMQProxy.cxx" "FairMQSplitter.cxx" "FairMQMerger.cxx" - "FairMQProcessor.cxx" + "FairMQPoller.cxx" + "FairMQSamplerTask.cxx" "FairMQProcessorTask.cxx" - "FairMQSink.cxx" - "FairMQDevice.cxx" - "FairMQProxy.cxx" ) -Set(DEPENDENCIES +if(NANOMSG_FOUND) + set(SRCS + ${SRCS} + "FairMQTransportFactoryNN.cxx" + "FairMQMessageNN.cxx" + "FairMQSocketNN.cxx" + "FairMQPollerNN.cxx" + ) + set(DEPENDENCIES + ${NANOMSG_LIBRARY_SHARED} + ) +else(NANOMSG_FOUND) + set(SRCS + ${SRCS} + "FairMQTransportFactoryZMQ.cxx" + "FairMQMessageZMQ.cxx" + "FairMQSocketZMQ.cxx" + "FairMQPollerZMQ.cxx" + "FairMQContextZMQ.cxx" + ) + set(DEPENDENCIES + ${ZMQ_LIBRARY_SHARED} + ) +endif(NANOMSG_FOUND) + +set(DEPENDENCIES + ${DEPENDENCIES} ${CMAKE_THREAD_LIBS_INIT} - ${ZMQ_LIBRARY_SHARED} - ${NANOMSG_LIBRARY_SHARED} Base ParBase FairTools GeoBase boost_thread boost_timer boost_system ) -Set(LIBRARY_NAME FairMQ) +set(LIBRARY_NAME FairMQ) GENERATE_LIBRARY() -Set(Exe_Names bsampler buffer splitter merger sink proxy n_one_merger one_n_splitter) -Set(Exe_Source runBenchmarkSampler.cxx runBuffer.cxx runSplitter.cxx runMerger.cxx runSink.cxx runProxy.cxx runNToOneMerger.cxx runOneToNSplitter.cxx) +set(Exe_Names bsampler buffer splitter merger sink proxy n_one_merger one_n_splitter) +set(Exe_Source runBenchmarkSampler.cxx runBuffer.cxx runSplitter.cxx runMerger.cxx runSink.cxx runProxy.cxx runNToOneMerger.cxx runOneToNSplitter.cxx) -List(LENGTH Exe_Names _length) -Math(EXPR _length ${_length}-1) +list(LENGTH Exe_Names _length) +math(EXPR _length ${_length}-1) ForEach(_file RANGE 0 ${_length}) - List(GET Exe_Names ${_file} _name) - List(GET Exe_Source ${_file} _src) - Set(EXE_NAME ${_name}) - Set(SRCS ${_src}) - Set(DEPENDENCIES FairMQ) + list(GET Exe_Names ${_file} _name) + list(GET Exe_Source ${_file} _src) + set(EXE_NAME ${_name}) + set(SRCS ${_src}) + set(DEPENDENCIES FairMQ) GENERATE_EXECUTABLE() EndForEach(_file RANGE 0 ${_length}) diff --git a/fairmq/FairMQBenchmarkSampler.cxx b/fairmq/FairMQBenchmarkSampler.cxx index fdbe7ba1..8dd9579c 100644 --- a/fairmq/FairMQBenchmarkSampler.cxx +++ b/fairmq/FairMQBenchmarkSampler.cxx @@ -103,7 +103,7 @@ void FairMQBenchmarkSampler::Log(int intervalInMs) megabytesPerSecond = ((double) (bytesNew - bytes) / (1024. * 1024.)) / (double) timeSinceLastLog_ms * 1000.; messagesPerSecond = (double) (messagesNew - messages) / (double) timeSinceLastLog_ms * 1000.; - std::stringstream logmsg; + stringstream logmsg; logmsg << "send " << messagesPerSecond << " msg/s, " << megabytesPerSecond << " MB/s"; FairMQLogger::GetInstance()->Log(FairMQLogger::DEBUG, logmsg.str()); @@ -113,7 +113,7 @@ void FairMQBenchmarkSampler::Log(int intervalInMs) } } -void FairMQBenchmarkSampler::SetProperty(const int& key, const std::string& value, const int& slot/*= 0*/) +void FairMQBenchmarkSampler::SetProperty(const int& key, const string& value, const int& slot/*= 0*/) { switch (key) { default: @@ -122,7 +122,7 @@ void FairMQBenchmarkSampler::SetProperty(const int& key, const std::string& valu } } -std::string FairMQBenchmarkSampler::GetProperty(const int& key, const std::string& default_/*= ""*/, const int& slot/*= 0*/) +string FairMQBenchmarkSampler::GetProperty(const int& key, const string& default_/*= ""*/, const int& slot/*= 0*/) { switch (key) { default: diff --git a/fairmq/FairMQBenchmarkSampler.h b/fairmq/FairMQBenchmarkSampler.h index aaa93a32..04313e98 100644 --- a/fairmq/FairMQBenchmarkSampler.h +++ b/fairmq/FairMQBenchmarkSampler.h @@ -29,8 +29,8 @@ class FairMQBenchmarkSampler: public FairMQDevice virtual ~FairMQBenchmarkSampler(); void Log(int intervalInMs); void ResetEventCounter(); - virtual void SetProperty(const int& key, const std::string& value, const int& slot = 0); - virtual std::string GetProperty(const int& key, const std::string& default_ = "", const int& slot = 0); + virtual void SetProperty(const int& key, const string& value, const int& slot = 0); + virtual string GetProperty(const int& key, const string& default_ = "", const int& slot = 0); virtual void SetProperty(const int& key, const int& value, const int& slot = 0); virtual int GetProperty(const int& key, const int& default_ = 0, const int& slot = 0); protected: diff --git a/fairmq/FairMQConfigurable.cxx b/fairmq/FairMQConfigurable.cxx index 71186517..1de383fd 100644 --- a/fairmq/FairMQConfigurable.cxx +++ b/fairmq/FairMQConfigurable.cxx @@ -12,20 +12,20 @@ FairMQConfigurable::FairMQConfigurable() { } -void FairMQConfigurable::SetProperty(const int& key, const std::string& value, const int& slot/*= 0*/) +void FairMQConfigurable::SetProperty(const int key, const string& value, const int slot/*= 0*/) { } -std::string FairMQConfigurable::GetProperty(const int& key, const std::string& default_/*= ""*/, const int& slot/*= 0*/) +string FairMQConfigurable::GetProperty(const int key, const string& default_/*= ""*/, const int slot/*= 0*/) { return default_; } -void FairMQConfigurable::SetProperty(const int& key, const int& value, const int& slot/*= 0*/) +void FairMQConfigurable::SetProperty(const int key, const int value, const int slot/*= 0*/) { } -int FairMQConfigurable::GetProperty(const int& key, const int& default_/*= 0*/, const int& slot/*= 0*/) +int FairMQConfigurable::GetProperty(const int key, const int default_/*= 0*/, const int slot/*= 0*/) { return default_; } diff --git a/fairmq/FairMQConfigurable.h b/fairmq/FairMQConfigurable.h index 924cf87f..3a5db663 100644 --- a/fairmq/FairMQConfigurable.h +++ b/fairmq/FairMQConfigurable.h @@ -10,6 +10,7 @@ #include +using std::string; class FairMQConfigurable { @@ -18,11 +19,13 @@ class FairMQConfigurable Last = 1 }; FairMQConfigurable(); - virtual void SetProperty(const int& key, const std::string& value, const int& slot = 0); - virtual std::string GetProperty(const int& key, const std::string& default_ = "", const int& slot = 0); - virtual void SetProperty(const int& key, const int& value, const int& slot = 0); - virtual int GetProperty(const int& key, const int& default_ = 0, const int& slot = 0); + virtual void SetProperty(const int key, const string& value, const int slot = 0); + virtual string GetProperty(const int key, const string& default_ = "", const int slot = 0); + virtual void SetProperty(const int key, const int value, const int slot = 0); + virtual int GetProperty(const int key, const int default_ = 0, const int slot = 0); virtual ~FairMQConfigurable(); + + // TODO: by value for integers }; #endif /* FAIRMQCONFIGURABLE_H_ */ diff --git a/fairmq/FairMQContext.cxx b/fairmq/FairMQContextZMQ.cxx similarity index 74% rename from fairmq/FairMQContext.cxx rename to fairmq/FairMQContextZMQ.cxx index 4097efda..bfd7ec86 100644 --- a/fairmq/FairMQContext.cxx +++ b/fairmq/FairMQContextZMQ.cxx @@ -1,41 +1,42 @@ /** - * FairMQContext.cxx + * FairMQContextZMQ.cxx * * @since 2012-12-05 * @author D. Klein, A. Rybalchenko */ #include "FairMQLogger.h" -#include "FairMQContext.h" +#include "FairMQContextZMQ.h" #include -FairMQContext::FairMQContext(int numIoThreads) +FairMQContextZMQ::FairMQContextZMQ(int numIoThreads) { fContext = zmq_ctx_new (); if (fContext == NULL){ - std::stringstream logmsg; + stringstream logmsg; logmsg << "failed creating context, reason: " << zmq_strerror(errno); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); } int rc = zmq_ctx_set (fContext, ZMQ_IO_THREADS, numIoThreads); if (rc != 0){ - std::stringstream logmsg; + stringstream logmsg; logmsg << "failed configuring context, reason: " << zmq_strerror(errno); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); } } -FairMQContext::~FairMQContext() +FairMQContextZMQ::~FairMQContextZMQ() { + Close(); } -void* FairMQContext::GetContext() +void* FairMQContextZMQ::GetContext() { return fContext; } -void FairMQContext::Close() +void FairMQContextZMQ::Close() { if (fContext == NULL){ return; @@ -43,7 +44,7 @@ void FairMQContext::Close() int rc = zmq_ctx_destroy (fContext); if (rc != 0) { - std::stringstream logmsg; + stringstream logmsg; logmsg << "failed closing context, reason: " << zmq_strerror(errno); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); } diff --git a/fairmq/FairMQContext.h b/fairmq/FairMQContextZMQ.h similarity index 51% rename from fairmq/FairMQContext.h rename to fairmq/FairMQContextZMQ.h index 0c8dc387..be860b7e 100644 --- a/fairmq/FairMQContext.h +++ b/fairmq/FairMQContextZMQ.h @@ -5,16 +5,16 @@ * @author D. Klein, A. Rybalchenko */ -#ifndef FAIRMQCONTEXT_H_ -#define FAIRMQCONTEXT_H_ +#ifndef FAIRMQCONTEXTZMQ_H_ +#define FAIRMQCONTEXTZMQ_H_ #include -class FairMQContext +class FairMQContextZMQ { public: - FairMQContext(int numIoThreads); - virtual ~FairMQContext(); + FairMQContextZMQ(int numIoThreads); + virtual ~FairMQContextZMQ(); void* GetContext(); void Close(); @@ -22,4 +22,4 @@ class FairMQContext void* fContext; }; -#endif /* FAIRMQCONTEXT_H_ */ +#endif /* FAIRMQCONTEXTZMQ_H_ */ diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index e2a7f93c..d1b05f91 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -5,20 +5,17 @@ * @author D. Klein, A. Rybalchenko */ -#include - #include -#include "FairMQSocketZMQ.h" +#include "FairMQSocket.h" #include "FairMQDevice.h" #include "FairMQLogger.h" FairMQDevice::FairMQDevice() : - fId(""), fNumIoThreads(1), - fPayloadContext(NULL), - fPayloadInputs(new std::vector()), - fPayloadOutputs(new std::vector()), + //fPayloadContext(NULL), + fPayloadInputs(new vector()), + fPayloadOutputs(new vector()), fLogIntervalInMs(1000) { } @@ -26,34 +23,35 @@ FairMQDevice::FairMQDevice() : void FairMQDevice::Init() { FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Init <<<<<<<"); - std::stringstream logmsg; + stringstream logmsg; logmsg << "numIoThreads: " << fNumIoThreads; FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); - fPayloadContext = new FairMQContext(fNumIoThreads); + // fPayloadContext = new FairMQContextZMQ(fNumIoThreads); - fInputAddress = new std::vector(fNumInputs); - fInputMethod = new std::vector(); - fInputSocketType = new std::vector(); - fInputSndBufSize = new std::vector(); - fInputRcvBufSize = new std::vector(); + // TODO: nafiga? + fInputAddress = new vector(fNumInputs); + fInputMethod = new vector(); + fInputSocketType = new vector(); + fInputSndBufSize = new vector(); + fInputRcvBufSize = new vector(); for (int i = 0; i < fNumInputs; ++i) { fInputMethod->push_back("connect"); // default value, can be overwritten in configuration - fInputSocketType->push_back(ZMQ_SUB); // default value, can be overwritten in configuration + fInputSocketType->push_back("sub"); // default value, can be overwritten in configuration fInputSndBufSize->push_back(10000); // default value, can be overwritten in configuration fInputRcvBufSize->push_back(10000); // default value, can be overwritten in configuration } - fOutputAddress = new std::vector(fNumOutputs); - fOutputMethod = new std::vector(); - fOutputSocketType = new std::vector(); - fOutputSndBufSize = new std::vector(); - fOutputRcvBufSize = new std::vector(); + fOutputAddress = new vector(fNumOutputs); + fOutputMethod = new vector(); + fOutputSocketType = new vector(); + fOutputSndBufSize = new vector(); + fOutputRcvBufSize = new vector(); for (int i = 0; i < fNumOutputs; ++i) { fOutputMethod->push_back("bind"); // default value, can be overwritten in configuration - fOutputSocketType->push_back(ZMQ_PUB); // default value, can be overwritten in configuration + fOutputSocketType->push_back("pub"); // default value, can be overwritten in configuration fOutputSndBufSize->push_back(10000); // default value, can be overwritten in configuration fOutputRcvBufSize->push_back(10000); // default value, can be overwritten in configuration } @@ -64,11 +62,10 @@ void FairMQDevice::InitInput() FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> InitInput <<<<<<<"); for (int i = 0; i < fNumInputs; ++i) { - //FairMQSocket* socket = new FairMQSocketZMQ(fPayloadContext, fInputSocketType->at(i), i); - FairMQSocket* socket = fTransportFactory->CreateSocket(fPayloadContext, fInputSocketType->at(i), i); + FairMQSocket* socket = fTransportFactory->CreateSocket(fInputSocketType->at(i), i); - socket->SetOption(ZMQ_SNDHWM, &fInputSndBufSize->at(i), sizeof(fInputSndBufSize->at(i))); - socket->SetOption(ZMQ_RCVHWM, &fInputRcvBufSize->at(i), sizeof(fInputRcvBufSize->at(i))); + socket->SetOption("snd-hwm", &fInputSndBufSize->at(i), sizeof(fInputSndBufSize->at(i))); + socket->SetOption("rcv-hwm", &fInputRcvBufSize->at(i), sizeof(fInputRcvBufSize->at(i))); fPayloadInputs->push_back(socket); @@ -88,10 +85,10 @@ void FairMQDevice::InitOutput() FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> InitOutput <<<<<<<"); for (int i = 0; i < fNumOutputs; ++i) { - FairMQSocket* socket = fTransportFactory->CreateSocket(fPayloadContext, fOutputSocketType->at(i), i); + FairMQSocket* socket = fTransportFactory->CreateSocket(fOutputSocketType->at(i), i); - socket->SetOption(ZMQ_SNDHWM, &fOutputSndBufSize->at(i), sizeof(fOutputSndBufSize->at(i))); - socket->SetOption(ZMQ_RCVHWM, &fOutputRcvBufSize->at(i), sizeof(fOutputRcvBufSize->at(i))); + socket->SetOption("snd-hwm", &fOutputSndBufSize->at(i), sizeof(fOutputSndBufSize->at(i))); + socket->SetOption("rcv-hwm", &fOutputRcvBufSize->at(i), sizeof(fOutputRcvBufSize->at(i))); fPayloadOutputs->push_back(socket); @@ -115,7 +112,7 @@ void FairMQDevice::Pause() } // Method for setting properties represented as a string. -void FairMQDevice::SetProperty(const int& key, const std::string& value, const int& slot/*= 0*/) +void FairMQDevice::SetProperty(const int key, const string& value, const int slot/*= 0*/) { switch (key) { case Id: @@ -137,6 +134,14 @@ void FairMQDevice::SetProperty(const int& key, const std::string& value, const i fOutputMethod->erase(fOutputMethod->begin() + slot); fOutputMethod->insert(fOutputMethod->begin() + slot, value); break; + case InputSocketType: + fInputSocketType->erase(fInputSocketType->begin() + slot); + fInputSocketType->insert(fInputSocketType->begin() + slot, value); + break; + case OutputSocketType: + fOutputSocketType->erase(fOutputSocketType->begin() + slot); + fOutputSocketType->insert(fOutputSocketType->begin() + slot, value); + break; default: FairMQConfigurable::SetProperty(key, value, slot); break; @@ -144,7 +149,7 @@ void FairMQDevice::SetProperty(const int& key, const std::string& value, const i } // Method for setting properties represented as an integer. -void FairMQDevice::SetProperty(const int& key, const int& value, const int& slot/*= 0*/) +void FairMQDevice::SetProperty(const int key, const int value, const int slot/*= 0*/) { switch (key) { case NumIoThreads: @@ -159,10 +164,6 @@ void FairMQDevice::SetProperty(const int& key, const int& value, const int& slot case LogIntervalInMs: fLogIntervalInMs = value; break; - case InputSocketType: - fInputSocketType->erase(fInputSocketType->begin() + slot); - fInputSocketType->insert(fInputSocketType->begin() + slot, value); - break; case InputSndBufSize: fInputSndBufSize->erase(fInputSndBufSize->begin() + slot); fInputSndBufSize->insert(fInputSndBufSize->begin() + slot, value); @@ -171,10 +172,6 @@ void FairMQDevice::SetProperty(const int& key, const int& value, const int& slot fInputRcvBufSize->erase(fInputRcvBufSize->begin() + slot); fInputRcvBufSize->insert(fInputRcvBufSize->begin() + slot, value); break; - case OutputSocketType: - fOutputSocketType->erase(fOutputSocketType->begin() + slot); - fOutputSocketType->insert(fOutputSocketType->begin() + slot, value); - break; case OutputSndBufSize: fOutputSndBufSize->erase(fOutputSndBufSize->begin() + slot); fOutputSndBufSize->insert(fOutputSndBufSize->begin() + slot, value); @@ -190,7 +187,7 @@ void FairMQDevice::SetProperty(const int& key, const int& value, const int& slot } // Method for getting properties represented as an string. -std::string FairMQDevice::GetProperty(const int& key, const std::string& default_/*= ""*/, const int& slot/*= 0*/) +string FairMQDevice::GetProperty(const int key, const string& default_/*= ""*/, const int slot/*= 0*/) { switch (key) { case Id: @@ -203,27 +200,27 @@ std::string FairMQDevice::GetProperty(const int& key, const std::string& default return fInputMethod->at(slot); case OutputMethod: return fOutputMethod->at(slot); + case InputSocketType: + return fInputSocketType->at(slot); + case OutputSocketType: + return fOutputSocketType->at(slot); default: return FairMQConfigurable::GetProperty(key, default_, slot); } } // Method for getting properties represented as an integer. -int FairMQDevice::GetProperty(const int& key, const int& default_/*= 0*/, const int& slot/*= 0*/) +int FairMQDevice::GetProperty(const int key, const int default_/*= 0*/, const int slot/*= 0*/) { switch (key) { case NumIoThreads: return fNumIoThreads; case LogIntervalInMs: return fLogIntervalInMs; - case InputSocketType: - return fInputSocketType->at(slot); case InputSndBufSize: return fInputSndBufSize->at(slot); case InputRcvBufSize: return fInputRcvBufSize->at(slot); - case OutputSocketType: - return fOutputSocketType->at(slot); case OutputSndBufSize: return fOutputSndBufSize->at(slot); case OutputRcvBufSize: @@ -268,14 +265,14 @@ void FairMQDevice::LogSocketRates() // End of temp stuff int i = 0; - for ( std::vector::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++ ) { + for ( vector::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++ ) { bytesInput[i] = (*itr)->GetBytesRx(); messagesInput[i] = (*itr)->GetMessagesRx(); ++i; } i = 0; - for ( std::vector::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++ ) { + for ( vector::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++ ) { bytesOutput[i] = (*itr)->GetBytesTx(); messagesOutput[i] = (*itr)->GetMessagesTx(); ++i; @@ -293,7 +290,7 @@ void FairMQDevice::LogSocketRates() i = 0; - for ( std::vector::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++ ) { + for ( vector::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++ ) { bytesInputNew[i] = (*itr)->GetBytesRx(); megabytesPerSecondInput[i] = ((double) (bytesInputNew[i] - bytesInput[i]) / (1024. * 1024.)) / (double) timeSinceLastLog_ms * 1000.; bytesInput[i] = bytesInputNew[i]; @@ -301,7 +298,7 @@ void FairMQDevice::LogSocketRates() messagesPerSecondInput[i] = (double) (messagesInputNew[i] - messagesInput[i]) / (double) timeSinceLastLog_ms * 1000.; messagesInput[i] = messagesInputNew[i]; - std::stringstream logmsg; + stringstream logmsg; logmsg << "#" << fId << "." << (*itr)->GetId() << ": " << messagesPerSecondInput[i] << " msg/s, " << megabytesPerSecondInput[i] << " MB/s"; FairMQLogger::GetInstance()->Log(FairMQLogger::DEBUG, logmsg.str()); @@ -310,7 +307,7 @@ void FairMQDevice::LogSocketRates() receivedSomething = true; } if ( receivedSomething && messagesPerSecondInput[i] == 0 ) { - std::cout << "Did not receive anything on socket " << i << " for " << didNotReceiveFor++ << " seconds." << std::endl; + cout << "Did not receive anything on socket " << i << " for " << didNotReceiveFor++ << " seconds." << endl; } else { didNotReceiveFor = 0; } @@ -321,7 +318,7 @@ void FairMQDevice::LogSocketRates() i = 0; - for ( std::vector::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++ ) { + for ( vector::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++ ) { bytesOutputNew[i] = (*itr)->GetBytesTx(); megabytesPerSecondOutput[i] = ((double) (bytesOutputNew[i] - bytesOutput[i]) / (1024. * 1024.)) / (double) timeSinceLastLog_ms * 1000.; bytesOutput[i] = bytesOutputNew[i]; @@ -329,7 +326,7 @@ void FairMQDevice::LogSocketRates() messagesPerSecondOutput[i] = (double) (messagesOutputNew[i] - messagesOutput[i]) / (double) timeSinceLastLog_ms * 1000.; messagesOutput[i] = messagesOutputNew[i]; - std::stringstream logmsg; + stringstream logmsg; logmsg << "#" << fId << "." << (*itr)->GetId() << ": " << messagesPerSecondOutput[i] << " msg/s, " << megabytesPerSecondOutput[i] << " MB/s"; FairMQLogger::GetInstance()->Log(FairMQLogger::DEBUG, logmsg.str()); @@ -338,7 +335,7 @@ void FairMQDevice::LogSocketRates() sentSomething = true; } if ( sentSomething && messagesPerSecondOutput[i] == 0 ) { - std::cout << "Did not send anything on socket " << i << " for " << didNotSendFor++ << " seconds." << std::endl; + cout << "Did not send anything on socket " << i << " for " << didNotSendFor++ << " seconds." << endl; } else { didNotSendFor = 0; } @@ -349,18 +346,18 @@ void FairMQDevice::LogSocketRates() // Temp stuff for process termination if (receivedSomething && didNotReceiveFor > 5) { - std::cout << "stopping because nothing was received for 5 seconds." << std::endl; + cout << "stopping because nothing was received for 5 seconds." << endl; ChangeState(STOP); } if (sentSomething && didNotSendFor > 5) { - std::cout << "stopping because nothing was sent for 5 seconds." << std::endl; + cout << "stopping because nothing was sent for 5 seconds." << endl; ChangeState(STOP); } // End of temp stuff t0 = t1; } catch (boost::thread_interrupted&) { - std::cout << "rateLogger interrupted" << std::endl; + cout << "rateLogger interrupted" << endl; break; } } @@ -390,26 +387,26 @@ void FairMQDevice::ListenToCommands() void FairMQDevice::Shutdown() { FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> closing inputs <<<<<<<"); - for( std::vector::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++ ) { + for( vector::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++ ) { (*itr)->Close(); } FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> closing outputs <<<<<<<"); - for( std::vector::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++ ) { + for( vector::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++ ) { (*itr)->Close(); } - FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> closing context <<<<<<<"); - fPayloadContext->Close(); + //FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> closing context <<<<<<<"); + //fPayloadContext->Close(); } FairMQDevice::~FairMQDevice() { - for( std::vector::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++ ) { + for( vector::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++ ) { delete (*itr); } - for( std::vector::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++ ) { + for( vector::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++ ) { delete (*itr); } diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index bcd6602e..b7f3ae8c 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -10,13 +10,17 @@ #include #include +#include #include "FairMQConfigurable.h" #include "FairMQStateMachine.h" #include "FairMQTransportFactory.h" -#include "FairMQContext.h" #include "FairMQSocket.h" +using std::vector; +using std::cin; +using std::cout; +using std::endl; class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable { @@ -45,38 +49,37 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable virtual void LogSocketRates(); virtual void ListenToCommands(); - virtual void SetProperty(const int& key, const std::string& value, const int& slot = 0); - virtual std::string GetProperty(const int& key, const std::string& default_ = "", const int& slot = 0); - virtual void SetProperty(const int& key, const int& value, const int& slot = 0); - virtual int GetProperty(const int& key, const int& default_ = 0, const int& slot = 0); + virtual void SetProperty(const int key, const string& value, const int slot = 0); + virtual string GetProperty(const int key, const string& default_ = "", const int slot = 0); + virtual void SetProperty(const int key, const int value, const int slot = 0); + virtual int GetProperty(const int key, const int default_ = 0, const int slot = 0); virtual void SetTransport(FairMQTransportFactory* factory); virtual ~FairMQDevice(); protected: - std::string fId; + string fId; int fNumIoThreads; - FairMQContext* fPayloadContext; FairMQTransportFactory* fTransportFactory; int fNumInputs; int fNumOutputs; - std::vector *fInputAddress; - std::vector *fInputMethod; - std::vector *fInputSocketType; - std::vector *fInputSndBufSize; - std::vector *fInputRcvBufSize; + vector *fInputAddress; + vector *fInputMethod; + vector *fInputSocketType; + vector *fInputSndBufSize; + vector *fInputRcvBufSize; - std::vector *fOutputAddress; - std::vector *fOutputMethod; - std::vector *fOutputSocketType; - std::vector *fOutputSndBufSize; - std::vector *fOutputRcvBufSize; + vector *fOutputAddress; + vector *fOutputMethod; + vector *fOutputSocketType; + vector *fOutputSndBufSize; + vector *fOutputRcvBufSize; - std::vector *fPayloadInputs; - std::vector *fPayloadOutputs; + vector *fPayloadInputs; + vector *fPayloadOutputs; int fLogIntervalInMs; diff --git a/fairmq/FairMQLogger.cxx b/fairmq/FairMQLogger.cxx index 28c6c606..e8c25f73 100644 --- a/fairmq/FairMQLogger.cxx +++ b/fairmq/FairMQLogger.cxx @@ -11,6 +11,9 @@ #include "FairMQLogger.h" +using std::cin; +using std::cout; +using std::endl; FairMQLogger* FairMQLogger::instance = NULL; @@ -22,7 +25,7 @@ FairMQLogger* FairMQLogger::GetInstance() return instance; } -FairMQLogger* FairMQLogger::InitInstance(std::string bindAddress) +FairMQLogger* FairMQLogger::InitInstance(const string& bindAddress) { instance = new FairMQLogger(bindAddress); return instance; @@ -33,7 +36,7 @@ FairMQLogger::FairMQLogger() : { } -FairMQLogger::FairMQLogger(std::string bindAddress) : +FairMQLogger::FairMQLogger(const string& bindAddress) : fBindAddress(bindAddress) { } @@ -42,7 +45,7 @@ FairMQLogger::~FairMQLogger() { } -void FairMQLogger::Log(int type, std::string logmsg) +void FairMQLogger::Log(int type, const string& logmsg) { timestamp_t tm = get_timestamp(); timestamp_t ms = tm / 1000.0L; @@ -52,7 +55,7 @@ void FairMQLogger::Log(int type, std::string logmsg) char mbstr[100]; std::strftime(mbstr, 100, "%H:%M:%S:", std::localtime(&t)); - std::string type_str; + string type_str; switch (type) { case DEBUG: type_str = "\033[01;34mDEBUG\033[0m"; @@ -69,7 +72,7 @@ void FairMQLogger::Log(int type, std::string logmsg) break; } - std::cout << "[\033[01;36m" << mbstr << fractional_seconds << "\033[0m]" << "[" << type_str << "]" << " " << logmsg << std::endl; + cout << "[\033[01;36m" << mbstr << fractional_seconds << "\033[0m]" << "[" << type_str << "]" << " " << logmsg << endl; } timestamp_t get_timestamp () diff --git a/fairmq/FairMQLogger.h b/fairmq/FairMQLogger.h index 1dc44b24..9725c70f 100644 --- a/fairmq/FairMQLogger.h +++ b/fairmq/FairMQLogger.h @@ -12,22 +12,24 @@ #include #include +using std::string; +using std::stringstream; class FairMQLogger { private: static FairMQLogger* instance; - std::string fBindAddress; + string fBindAddress; public: enum { DEBUG, INFO, ERROR, STATE }; FairMQLogger(); - FairMQLogger(std::string bindAdress); + FairMQLogger(const string& bindAdress); // TODO: check this for const ref virtual ~FairMQLogger(); - void Log(int type, std::string logmsg); + void Log(int type, const string& logmsg); static FairMQLogger* GetInstance(); - static FairMQLogger* InitInstance(std::string bindAddress); + static FairMQLogger* InitInstance(const string& bindAddress); // TODO: check this for const ref }; typedef unsigned long long timestamp_t; diff --git a/fairmq/FairMQMerger.cxx b/fairmq/FairMQMerger.cxx index e09dc882..6a9d7590 100644 --- a/fairmq/FairMQMerger.cxx +++ b/fairmq/FairMQMerger.cxx @@ -10,6 +10,7 @@ #include "FairMQLogger.h" #include "FairMQMerger.h" +#include "FairMQPoller.h" FairMQMerger::FairMQMerger() @@ -26,24 +27,17 @@ void FairMQMerger::Run() boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); - zmq_pollitem_t items[fNumInputs]; - - for (int i = 0; i < fNumInputs; i++) { - items[i].socket = fPayloadInputs->at(i)->GetSocket(); - items[i].fd = 0; - items[i].events = ZMQ_POLLIN; - items[i].revents = 0; - } + FairMQPoller* poller = fTransportFactory->CreatePoller(*fPayloadInputs); bool received = false; while ( fState == RUNNING ) { FairMQMessage* msg = fTransportFactory->CreateMessage(); - zmq_poll(items, fNumInputs, 100); + poller->Poll(100); for(int i = 0; i < fNumInputs; i++) { - if (items[i].revents & ZMQ_POLLIN) { + if (poller->CheckInput(i)){ received = fPayloadInputs->at(i)->Receive(msg); } if (received) { @@ -55,6 +49,8 @@ void FairMQMerger::Run() delete msg; } + delete poller; + rateLogger.interrupt(); rateLogger.join(); } diff --git a/fairmq/FairMQMessage.h b/fairmq/FairMQMessage.h index 0ce27b4f..247e8670 100644 --- a/fairmq/FairMQMessage.h +++ b/fairmq/FairMQMessage.h @@ -21,6 +21,7 @@ class FairMQMessage virtual void* GetMessage() = 0; virtual void* GetData() = 0; virtual size_t GetSize() = 0; + virtual void SetMessage(void* data, size_t size) = 0; virtual void Copy(FairMQMessage* msg) = 0; diff --git a/fairmq/FairMQMessageNN.cxx b/fairmq/FairMQMessageNN.cxx index e69de29b..3a8cee80 100644 --- a/fairmq/FairMQMessageNN.cxx +++ b/fairmq/FairMQMessageNN.cxx @@ -0,0 +1,135 @@ +/** + * FairMQMessageNN.cxx + * + * @since 2013-12-05 + * @author: A. Rybalchenko + */ + +#include + +#include + +#include "FairMQMessageNN.h" +#include "FairMQLogger.h" + + +FairMQMessageNN::FairMQMessageNN() : + fSize(0), + fMessage(NULL) +{ +} + +FairMQMessageNN::FairMQMessageNN(size_t size) +{ + fMessage = nn_allocmsg(size, 0); + if(!fMessage){ + stringstream logmsg; + logmsg << "failed allocating message, reason: " << nn_strerror(errno); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); + } + fSize = size; +} + +FairMQMessageNN::FairMQMessageNN(void* data, size_t size) +{ + fMessage = nn_allocmsg(size, 0); + if(!fMessage){ + stringstream logmsg; + logmsg << "failed allocating message, reason: " << nn_strerror(errno); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); + } + memcpy (fMessage, data, size); + fSize = size; +} + +void FairMQMessageNN::Rebuild() +{ + Clear(); + fSize = 0; + fMessage = NULL; +} + +void FairMQMessageNN::Rebuild(size_t size) +{ + Clear(); + fMessage = nn_allocmsg(size, 0); + if(!fMessage){ + stringstream logmsg; + logmsg << "failed allocating message, reason: " << nn_strerror(errno); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); + } + fSize = size; +} + +void FairMQMessageNN::Rebuild(void* data, size_t size) +{ + Clear(); + fMessage = nn_allocmsg(size, 0); + if(!fMessage){ + stringstream logmsg; + logmsg << "failed allocating message, reason: " << nn_strerror(errno); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); + } + fSize = size; +} + +void* FairMQMessageNN::GetMessage() +{ + return fMessage; +} + +void* FairMQMessageNN::GetData() +{ + return fMessage; +} + +size_t FairMQMessageNN::GetSize() +{ + return fSize; +} + +void FairMQMessageNN::SetMessage(void* data, size_t size) +{ + fMessage = data; + fSize = size; +} + +void FairMQMessageNN::Copy(FairMQMessage* msg) +{ + if(fMessage){ + int rc = nn_freemsg(fMessage); + if( rc < 0 ){ + stringstream logmsg; + logmsg << "failed freeing message, reason: " << nn_strerror(errno); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); + } + } + + size_t size = msg->GetSize(); + + fMessage = nn_allocmsg(size, 0); + if(!fMessage){ + stringstream logmsg; + logmsg << "failed allocating message, reason: " << nn_strerror(errno); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); + } + std::memcpy (fMessage, msg->GetMessage(), size); + fSize = size; +} + +void FairMQMessageNN::Clear() +{ + int rc = nn_freemsg(fMessage); + if (rc < 0) { + stringstream logmsg; + logmsg << "failed freeing message, reason: " << nn_strerror(errno); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); + } else { + fMessage = NULL; + fSize = 0; + } +} + +FairMQMessageNN::~FairMQMessageNN() +{ +} diff --git a/fairmq/FairMQMessageNN.h b/fairmq/FairMQMessageNN.h index e69de29b..0a354ef3 100644 --- a/fairmq/FairMQMessageNN.h +++ b/fairmq/FairMQMessageNN.h @@ -0,0 +1,43 @@ +/** + * FairMQMessageNN.h + * + * @since 2013-12-05 + * @author: A. Rybalchenko + */ + +#ifndef FAIRMQMESSAGENN_H_ +#define FAIRMQMESSAGENN_H_ + +#include + +#include "FairMQMessage.h" + + +class FairMQMessageNN : public FairMQMessage +{ + public: + FairMQMessageNN(); + FairMQMessageNN(size_t size); + FairMQMessageNN(void* data, size_t size); + + virtual void Rebuild(); + virtual void Rebuild(size_t size); + virtual void Rebuild(void* data, size_t site); + + virtual void* GetMessage(); + virtual void* GetData(); + virtual size_t GetSize(); + + virtual void Copy(FairMQMessage* msg); + + void SetMessage(void* data, size_t size); + void Clear(); + + virtual ~FairMQMessageNN(); + + private: + void* fMessage; + size_t fSize; +}; + +#endif /* FAIRMQMESSAGENN_H_ */ diff --git a/fairmq/FairMQMessageZMQ.cxx b/fairmq/FairMQMessageZMQ.cxx index a6fdcdf6..39cdb38d 100644 --- a/fairmq/FairMQMessageZMQ.cxx +++ b/fairmq/FairMQMessageZMQ.cxx @@ -14,8 +14,8 @@ FairMQMessageZMQ::FairMQMessageZMQ() { int rc = zmq_msg_init (&fMessage); - if (rc != 0){ - std::stringstream logmsg; + if (rc != 0) { + stringstream logmsg; logmsg << "failed initializing message, reason: " << zmq_strerror(errno); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); } @@ -24,8 +24,8 @@ FairMQMessageZMQ::FairMQMessageZMQ() FairMQMessageZMQ::FairMQMessageZMQ(size_t size) { int rc = zmq_msg_init_size (&fMessage, size); - if (rc != 0){ - std::stringstream logmsg; + if (rc != 0) { + stringstream logmsg; logmsg << "failed initializing message with size, reason: " << zmq_strerror(errno); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); } @@ -34,8 +34,8 @@ FairMQMessageZMQ::FairMQMessageZMQ(size_t size) FairMQMessageZMQ::FairMQMessageZMQ(void* data, size_t size) { int rc = zmq_msg_init_data (&fMessage, data, size, &CleanUp, NULL); - if (rc != 0){ - std::stringstream logmsg; + if (rc != 0) { + stringstream logmsg; logmsg << "failed initializing message with data, reason: " << zmq_strerror(errno); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); } @@ -43,28 +43,50 @@ FairMQMessageZMQ::FairMQMessageZMQ(void* data, size_t size) void FairMQMessageZMQ::Rebuild() { - //TODO + int rc = zmq_msg_close (&fMessage); + if (rc != 0) { + stringstream logmsg; + logmsg << "failed closing message, reason: " << zmq_strerror(errno); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); + } + + rc = zmq_msg_init (&fMessage); + if (rc != 0) { + stringstream logmsg; + logmsg << "failed initializing message, reason: " << zmq_strerror(errno); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); + } } void FairMQMessageZMQ::Rebuild(size_t size) { - //TODO + int rc = zmq_msg_close (&fMessage); + if (rc != 0) { + stringstream logmsg; + logmsg << "failed closing message, reason: " << zmq_strerror(errno); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); + } + + rc = zmq_msg_init_size (&fMessage, size); + if (rc != 0) { + stringstream logmsg; + logmsg << "failed initializing message with size, reason: " << zmq_strerror(errno); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); + } } - - void FairMQMessageZMQ::Rebuild(void* data, size_t size) { int rc = zmq_msg_close (&fMessage); if (rc != 0) { - std::stringstream logmsg; + stringstream logmsg; logmsg << "failed closing message, reason: " << zmq_strerror(errno); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); } rc = zmq_msg_init_data (&fMessage, data, size, &CleanUp, NULL); if (rc != 0) { - std::stringstream logmsg2; + stringstream logmsg2; logmsg2 << "failed initializing message with data, reason: " << zmq_strerror(errno); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str()); } @@ -85,11 +107,16 @@ size_t FairMQMessageZMQ::GetSize() return zmq_msg_size (&fMessage); } +void FairMQMessageZMQ::SetMessage(void* data, size_t size) +{ + // dummy method to comply with the interface. functionality not allowed in zeromq. +} + void FairMQMessageZMQ::Copy(FairMQMessage* msg) { int rc = zmq_msg_copy (&fMessage, &(static_cast(msg)->fMessage)); if (rc != 0) { - std::stringstream logmsg; + stringstream logmsg; logmsg << "failed copying message, reason: " << zmq_strerror(errno); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); } @@ -103,8 +130,8 @@ void FairMQMessageZMQ::CleanUp(void* data, void* hint) FairMQMessageZMQ::~FairMQMessageZMQ() { int rc = zmq_msg_close (&fMessage); - if (rc != 0){ - std::stringstream logmsg; + if (rc != 0) { + stringstream logmsg; logmsg << "failed closing message with data, reason: " << zmq_strerror(errno); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); } diff --git a/fairmq/FairMQMessageZMQ.h b/fairmq/FairMQMessageZMQ.h index 84fec86d..792bab9e 100644 --- a/fairmq/FairMQMessageZMQ.h +++ b/fairmq/FairMQMessageZMQ.h @@ -15,7 +15,7 @@ #include "FairMQMessage.h" -class FairMQMessageZMQ: public FairMQMessage +class FairMQMessageZMQ : public FairMQMessage { public: FairMQMessageZMQ(); @@ -29,6 +29,7 @@ class FairMQMessageZMQ: public FairMQMessage virtual void* GetMessage(); virtual void* GetData(); virtual size_t GetSize(); + virtual void SetMessage(void* data, size_t size); virtual void Copy(FairMQMessage* msg); diff --git a/fairmq/FairMQPoller.cxx b/fairmq/FairMQPoller.cxx new file mode 100644 index 00000000..d46b9ea6 --- /dev/null +++ b/fairmq/FairMQPoller.cxx @@ -0,0 +1,6 @@ +/** + * FairMQPoller.cxx + * + * @since 2014-01-23 + * @author A. Rybalchenko + */ diff --git a/fairmq/FairMQPoller.h b/fairmq/FairMQPoller.h new file mode 100644 index 00000000..d4781304 --- /dev/null +++ b/fairmq/FairMQPoller.h @@ -0,0 +1,21 @@ +/** + * FairMQPoller.h + * + * @since 2014-01-23 + * @author A. Rybalchenko + */ + +#ifndef FAIRMQPOLLER_H_ +#define FAIRMQPOLLER_H_ + +class FairMQPoller +{ + public: + virtual void Poll(int timeout) = 0; + virtual bool CheckInput(int index) = 0; + + virtual ~FairMQPoller() {}; + +}; + +#endif /* FAIRMQPOLLER_H_ */ \ No newline at end of file diff --git a/fairmq/FairMQPollerNN.cxx b/fairmq/FairMQPollerNN.cxx new file mode 100644 index 00000000..6918ac45 --- /dev/null +++ b/fairmq/FairMQPollerNN.cxx @@ -0,0 +1,39 @@ +/** + * FairMQPollerNN.cxx + * + * @since 2014-01-23 + * @author A. Rybalchenko + */ + +#include + +#include "FairMQPollerNN.h" + +FairMQPollerNN::FairMQPollerNN(const vector& inputs) +{ + fNumItems = inputs.size(); + items = new nn_pollfd[fNumItems]; + + for (int i = 0; i < fNumItems; i++) { + items[i].fd = inputs.at(i)->GetSocket(1); + items[i].events = NN_POLLIN; + } +} + +void FairMQPollerNN::Poll(int timeout) +{ + nn_poll(items, fNumItems, timeout); +} + +bool FairMQPollerNN::CheckInput(int index) +{ + if (items[index].revents & NN_POLLIN) + return true; + + return false; +} + +FairMQPollerNN::~FairMQPollerNN() +{ + if (items != NULL) delete [] items; +} diff --git a/fairmq/FairMQPollerNN.h b/fairmq/FairMQPollerNN.h new file mode 100644 index 00000000..c3401c45 --- /dev/null +++ b/fairmq/FairMQPollerNN.h @@ -0,0 +1,33 @@ +/** + * FairMQPollerNN.h + * + * @since 2014-01-23 + * @author A. Rybalchenko + */ + +#ifndef FAIRMQPOLLERNN_H_ +#define FAIRMQPOLLERNN_H_ + +#include + +#include "FairMQPoller.h" +#include "FairMQSocket.h" + +using std::vector; + +class FairMQPollerNN : public FairMQPoller +{ + public: + FairMQPollerNN(const vector& inputs); + + virtual void Poll(int timeout); + virtual bool CheckInput(int index); + + virtual ~FairMQPollerNN(); + + private: + nn_pollfd* items; + int fNumItems; +}; + +#endif /* FAIRMQPOLLERNN_H_ */ \ No newline at end of file diff --git a/fairmq/FairMQPollerZMQ.cxx b/fairmq/FairMQPollerZMQ.cxx new file mode 100644 index 00000000..c9b7577e --- /dev/null +++ b/fairmq/FairMQPollerZMQ.cxx @@ -0,0 +1,41 @@ +/** + * FairMQPollerZMQ.cxx + * + * @since 2014-01-23 + * @author A. Rybalchenko + */ + +#include + +#include "FairMQPollerZMQ.h" + +FairMQPollerZMQ::FairMQPollerZMQ(const vector& inputs) +{ + fNumItems = inputs.size(); + items = new zmq_pollitem_t[fNumItems]; + + for (int i = 0; i < fNumItems; i++) { + items[i].socket = inputs.at(i)->GetSocket(); + items[i].fd = 0; + items[i].events = ZMQ_POLLIN; + items[i].revents = 0; + } +} + +void FairMQPollerZMQ::Poll(int timeout) +{ + zmq_poll(items, fNumItems, timeout); +} + +bool FairMQPollerZMQ::CheckInput(int index) +{ + if (items[index].revents & ZMQ_POLLIN) + return true; + + return false; +} + +FairMQPollerZMQ::~FairMQPollerZMQ() +{ + if (items != NULL) delete [] items; +} diff --git a/fairmq/FairMQPollerZMQ.h b/fairmq/FairMQPollerZMQ.h new file mode 100644 index 00000000..8d0e1a5e --- /dev/null +++ b/fairmq/FairMQPollerZMQ.h @@ -0,0 +1,33 @@ +/** + * FairMQPollerZMQ.h + * + * @since 2014-01-23 + * @author A. Rybalchenko + */ + +#ifndef FAIRMQPOLLERZMQ_H_ +#define FAIRMQPOLLERZMQ_H_ + +#include + +#include "FairMQPoller.h" +#include "FairMQSocket.h" + +using std::vector; + +class FairMQPollerZMQ : public FairMQPoller +{ + public: + FairMQPollerZMQ(const vector& inputs); + + virtual void Poll(int timeout); + virtual bool CheckInput(int index); + + virtual ~FairMQPollerZMQ(); + + private: + zmq_pollitem_t* items; + int fNumItems; +}; + +#endif /* FAIRMQPOLLERZMQ_H_ */ \ No newline at end of file diff --git a/fairmq/FairMQProcessor.cxx b/fairmq/FairMQProcessor.cxx index 1374f416..2aa614e1 100644 --- a/fairmq/FairMQProcessor.cxx +++ b/fairmq/FairMQProcessor.cxx @@ -61,7 +61,7 @@ void FairMQProcessor::Run() delete msg; } - std::cout << "I've received " << receivedMsgs << " and sent " << sentMsgs << " messages!" << std::endl; + cout << "I've received " << receivedMsgs << " and sent " << sentMsgs << " messages!" << endl; boost::this_thread::sleep(boost::posix_time::milliseconds(5000)); diff --git a/fairmq/FairMQSampler.cxx b/fairmq/FairMQSampler.cxx index 275ac57f..a85b6a82 100644 --- a/fairmq/FairMQSampler.cxx +++ b/fairmq/FairMQSampler.cxx @@ -45,7 +45,7 @@ void FairMQSampler::Init() FairMQDevice::Init(); fSamplerTask->SetBranch(fBranch); - fSamplerTask->SetTransport(fTransportFactory); // TODO: simplify message creation for sampler task? + fSamplerTask->SetTransport(fTransportFactory); fFairRunAna->SetInputFile(TString(fInputFile)); TString output = fInputFile; @@ -79,7 +79,7 @@ void FairMQSampler::Run() boost::timer::auto_cpu_timer timer; - std::cout << "Number of events to process: " << fNumEvents << std::endl; + cout << "Number of events to process: " << fNumEvents << endl; Long64_t eventNr = 0; @@ -105,8 +105,8 @@ void FairMQSampler::Run() boost::timer::cpu_times const elapsed_time(timer.elapsed()); - std::cout << "Sent everything in:\n" << boost::timer::format(elapsed_time, 2) << std::endl; - std::cout << "Sent " << sentMsgs << " messages!" << std::endl; + cout << "Sent everything in:\n" << boost::timer::format(elapsed_time, 2) << endl; + cout << "Sent " << sentMsgs << " messages!" << endl; //boost::this_thread::sleep(boost::posix_time::milliseconds(5000)); @@ -125,7 +125,7 @@ void FairMQSampler::ResetEventCounter() fEventCounter = fEventRate / 100; boost::this_thread::sleep(boost::posix_time::milliseconds(10)); } catch (boost::thread_interrupted&) { - std::cout << "resetEventCounter interrupted" << std::endl; + cout << "resetEventCounter interrupted" << endl; break; } } @@ -154,14 +154,14 @@ void FairMQSampler::ListenToCommands() boost::this_thread::interruption_point(); } catch (boost::thread_interrupted&) { - std::cout << "commandListener interrupted" << std::endl; + cout << "commandListener interrupted" << endl; break; } } FairMQLogger::GetInstance()->Log(FairMQLogger::DEBUG, ">>>>>>> stopping commandListener <<<<<<<"); } -void FairMQSampler::SetProperty(const int& key, const std::string& value, const int& slot/*= 0*/) +void FairMQSampler::SetProperty(const int& key, const string& value, const int& slot/*= 0*/) { switch (key) { case InputFile: @@ -179,7 +179,7 @@ void FairMQSampler::SetProperty(const int& key, const std::string& value, const } } -std::string FairMQSampler::GetProperty(const int& key, const std::string& default_/*= ""*/, const int& slot/*= 0*/) +string FairMQSampler::GetProperty(const int& key, const string& default_/*= ""*/, const int& slot/*= 0*/) { switch (key) { case InputFile: diff --git a/fairmq/FairMQSampler.h b/fairmq/FairMQSampler.h index 695bb362..8c6b8f16 100644 --- a/fairmq/FairMQSampler.h +++ b/fairmq/FairMQSampler.h @@ -8,7 +8,6 @@ #ifndef FAIRMQSAMPLER_H_ #define FAIRMQSAMPLER_H_ -#include #include "FairRunAna.h" #include "FairTask.h" #include "FairMQDevice.h" @@ -40,17 +39,17 @@ class FairMQSampler: public FairMQDevice void ResetEventCounter(); virtual void ListenToCommands(); - virtual void SetProperty(const int& key, const std::string& value, const int& slot = 0); - virtual std::string GetProperty(const int& key, const std::string& default_ = "", const int& slot = 0); + virtual void SetProperty(const int& key, const string& value, const int& slot = 0); + virtual string GetProperty(const int& key, const string& default_ = "", const int& slot = 0); virtual void SetProperty(const int& key, const int& value, const int& slot = 0); virtual int GetProperty(const int& key, const int& default_ = 0, const int& slot = 0); protected: FairRunAna* fFairRunAna; int fNumEvents; FairMQSamplerTask* fSamplerTask; - std::string fInputFile; // Filename of a root file containing the simulated digis. - std::string fParFile; - std::string fBranch; // The name of the sub-detector branch to stream the digis from. + string fInputFile; // Filename of a root file containing the simulated digis. + string fParFile; + string fBranch; // The name of the sub-detector branch to stream the digis from. int fEventRate; int fEventCounter; virtual void Init(); diff --git a/fairmq/FairMQSocket.h b/fairmq/FairMQSocket.h index 73b3f2bd..5eae0d45 100644 --- a/fairmq/FairMQSocket.h +++ b/fairmq/FairMQSocket.h @@ -9,25 +9,27 @@ #define FAIRMQSOCKET_H_ #include -#include "FairMQContext.h" #include "FairMQMessage.h" +using std::string; +using std::stringstream; class FairMQSocket { public: - virtual std::string GetId() = 0; + virtual string GetId() = 0; - virtual void Bind(std::string address) = 0; - virtual void Connect(std::string address) = 0; + virtual void Bind(const string& address) = 0; + virtual void Connect(const string& address) = 0; virtual size_t Send(FairMQMessage* msg) = 0; virtual size_t Receive(FairMQMessage* msg) = 0; - virtual void Close() = 0; virtual void* GetSocket() = 0; + virtual int GetSocket(int nothing) = 0; + virtual void Close() = 0; - virtual void SetOption(int option, const void* value, size_t valueSize) = 0; + virtual void SetOption(const string& option, const void* value, size_t valueSize) = 0; virtual unsigned long GetBytesTx() = 0; virtual unsigned long GetBytesRx() = 0; diff --git a/fairmq/FairMQSocketNN.cxx b/fairmq/FairMQSocketNN.cxx index e69de29b..2d0a43f6 100644 --- a/fairmq/FairMQSocketNN.cxx +++ b/fairmq/FairMQSocketNN.cxx @@ -0,0 +1,161 @@ +/** + * FairMQSocketNN.cxx + * + * @since 2012-12-05 + * @author A. Rybalchenko + */ + +#include + +#include "FairMQSocketNN.h" +#include "FairMQLogger.h" + +FairMQSocketNN::FairMQSocketNN(const string& type, int num) : + fBytesTx(0), + fBytesRx(0), + fMessagesTx(0), + fMessagesRx(0) +{ + stringstream id; + id << type << "." << num; + fId = id.str(); + + fSocket = nn_socket (AF_SP, GetConstant(type)); + if (type == "sub") { + nn_setsockopt(fSocket, NN_SUB, NN_SUB_SUBSCRIBE, NULL, 0); + } + + stringstream logmsg; + logmsg << "created socket #" << fId; + FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); +} + +string FairMQSocketNN::GetId() +{ + return fId; +} + +void FairMQSocketNN::Bind(const string& address) +{ + stringstream logmsg; + logmsg << "bind socket #" << fId << " on " << address; + FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); + + int eid = nn_bind(fSocket, address.c_str()); + if (eid < 0) { + stringstream logmsg2; + logmsg2 << "failed binding socket #" << fId << ", reason: " << nn_strerror(errno); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str()); + } +} + +void FairMQSocketNN::Connect(const string& address) +{ + stringstream logmsg; + logmsg << "connect socket #" << fId << " to " << address; + FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); + + int eid = nn_connect(fSocket, address.c_str()); + if (eid < 0) { + stringstream logmsg2; + logmsg2 << "failed connecting socket #" << fId << ", reason: " << nn_strerror(errno); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str()); + } +} + +size_t FairMQSocketNN::Send(FairMQMessage* msg) +{ + void* ptr = msg->GetMessage(); + int rc = nn_send(fSocket, &ptr, NN_MSG, 0); + if (rc < 0) { + stringstream logmsg; + logmsg << "failed sending on socket #" << fId << ", reason: " << nn_strerror(errno); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); + } else { + fBytesTx += rc; + ++fMessagesTx; + } + + return rc; +} + +size_t FairMQSocketNN::Receive(FairMQMessage* msg) +{ + void* ptr = msg->GetMessage(); + int rc = nn_recv(fSocket, &ptr, NN_MSG, 0); + if (rc < 0) { + stringstream logmsg; + logmsg << "failed receiving on socket #" << fId << ", reason: " << nn_strerror(errno); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); + } else { + fBytesRx += rc; + ++fMessagesRx; + msg->SetMessage(ptr, rc); + } + + return rc; +} + +void* FairMQSocketNN::GetSocket() +{ + return NULL;// dummy method to compy with the interface. functionality not possible in zeromq. +} + +int FairMQSocketNN::GetSocket(int nothing) +{ + return fSocket; +} + +void FairMQSocketNN::Close() +{ + nn_close(fSocket); +} + +void FairMQSocketNN::SetOption(const string& option, const void* value, size_t valueSize) +{ + int rc = nn_setsockopt(fSocket, NN_SOL_SOCKET, GetConstant(option), value, valueSize); + if (rc < 0) { + stringstream logmsg; + logmsg << "failed setting socket option, reason: " << nn_strerror(errno); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); + } +} + +unsigned long FairMQSocketNN::GetBytesTx() +{ + return fBytesTx; +} + +unsigned long FairMQSocketNN::GetBytesRx() +{ + return fBytesRx; +} + +unsigned long FairMQSocketNN::GetMessagesTx() +{ + return fMessagesTx; +} + +unsigned long FairMQSocketNN::GetMessagesRx() +{ + return fMessagesRx; +} + +int FairMQSocketNN::GetConstant(const string& constant) +{ + if (constant == "sub") return NN_SUB; + if (constant == "pub") return NN_PUB; + if (constant == "xsub") return NN_SUB; // TODO: is there XPUB, XSUB for nanomsg? + if (constant == "xpub") return NN_PUB; + if (constant == "push") return NN_PUSH; + if (constant == "pull") return NN_PULL; + if (constant == "snd-hwm") return NN_SNDBUF; + if (constant == "rcv-hwm") return NN_RCVBUF; + + return -1; +} + +FairMQSocketNN::~FairMQSocketNN() +{ + Close(); +} diff --git a/fairmq/FairMQSocketNN.h b/fairmq/FairMQSocketNN.h index e69de29b..052d5535 100644 --- a/fairmq/FairMQSocketNN.h +++ b/fairmq/FairMQSocketNN.h @@ -0,0 +1,56 @@ +/** + * FairMQSocketNN.h + * + * @since 2013-12-05 + * @author A. Rybalchenko + */ + +#ifndef FAIRMQSOCKETNN_H_ +#define FAIRMQSOCKETNN_H_ + +#include +#include +#include + +#include "FairMQSocket.h" + + +class FairMQSocketNN : public FairMQSocket +{ + public: + FairMQSocketNN(const string& type, int num); + + virtual string GetId(); + + virtual void Bind(const string& address); + virtual void Connect(const string& address); + + virtual size_t Send(FairMQMessage* msg); + virtual size_t Receive(FairMQMessage* msg); + + virtual void* GetSocket(); + virtual int GetSocket(int nothing); + virtual void Close(); + + virtual void SetOption(const string& option, const void* value, size_t valueSize); + + unsigned long GetBytesTx(); + unsigned long GetBytesRx(); + unsigned long GetMessagesTx(); + unsigned long GetMessagesRx(); + + static int GetConstant(const string& constant); + + + virtual ~FairMQSocketNN(); + + private: + int fSocket; + string fId; + unsigned long fBytesTx; + unsigned long fBytesRx; + unsigned long fMessagesTx; + unsigned long fMessagesRx; +}; + +#endif /* FAIRMQSOCKETNN_H_ */ diff --git a/fairmq/FairMQSocketZMQ.cxx b/fairmq/FairMQSocketZMQ.cxx index afbea8c9..eec6cea2 100644 --- a/fairmq/FairMQSocketZMQ.cxx +++ b/fairmq/FairMQSocketZMQ.cxx @@ -10,83 +10,68 @@ #include "FairMQSocketZMQ.h" #include "FairMQLogger.h" +boost::shared_ptr FairMQSocketZMQ::fContext = boost::shared_ptr(new FairMQContextZMQ(1)); // TODO: numIoThreads! -FairMQSocketZMQ::FairMQSocketZMQ(FairMQContext* context, int type, int num) : +FairMQSocketZMQ::FairMQSocketZMQ(const string& type, int num) : fBytesTx(0), fBytesRx(0), fMessagesTx(0), fMessagesRx(0) { - std::stringstream id; - id << GetTypeString(type) << "." << num; + stringstream id; // TODO + id << type << "." << num; fId = id.str(); - fSocket = zmq_socket(context->GetContext(), type); + fSocket = zmq_socket(fContext->GetContext(), GetConstant(type)); int rc = zmq_setsockopt(fSocket, ZMQ_IDENTITY, &fId, fId.length()); if (rc != 0) { - std::stringstream logmsg; + stringstream logmsg; logmsg << "failed setting socket option, reason: " << zmq_strerror(errno); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); } - if (type == ZMQ_SUB) { + if (type == "sub") { rc = zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, NULL, 0); if (rc != 0) { - std::stringstream logmsg2; + stringstream logmsg2; logmsg2 << "failed setting socket option, reason: " << zmq_strerror(errno); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str()); } } - std::stringstream logmsg3; + stringstream logmsg3; logmsg3 << "created socket #" << fId; FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg3.str()); } -std::string FairMQSocketZMQ::GetId() +string FairMQSocketZMQ::GetId() { return fId; } -std::string FairMQSocketZMQ::GetTypeString(int type) +void FairMQSocketZMQ::Bind(const string& address) { - switch (type) { - case ZMQ_SUB: - return "sub"; - case ZMQ_PUB: - return "pub"; - case ZMQ_PUSH: - return "push"; - case ZMQ_PULL: - return "pull"; - default: - return ""; - } -} - -void FairMQSocketZMQ::Bind(std::string address) -{ - std::stringstream logmsg; + stringstream logmsg; logmsg << "bind socket #" << fId << " on " << address; FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); int rc = zmq_bind (fSocket, address.c_str()); if (rc != 0) { - std::stringstream logmsg2; + stringstream logmsg2; logmsg2 << "failed binding socket #" << fId << ", reason: " << zmq_strerror(errno); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str()); } } -void FairMQSocketZMQ::Connect(std::string address) +void FairMQSocketZMQ::Connect(const string& address) { - std::stringstream logmsg; + stringstream logmsg; logmsg << "connect socket #" << fId << " on " << address; FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); int rc = zmq_connect (fSocket, address.c_str()); if (rc != 0) { - std::stringstream logmsg2; + stringstream logmsg2; logmsg2 << "failed connecting socket #" << fId << ", reason: " << zmq_strerror(errno); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str()); } @@ -103,7 +88,7 @@ size_t FairMQSocketZMQ::Send(FairMQMessage* msg) if (zmq_errno() == EAGAIN){ return false; } - std::stringstream logmsg; + stringstream logmsg; logmsg << "failed sending on socket #" << fId << ", reason: " << zmq_strerror(errno); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); return nbytes; @@ -120,22 +105,12 @@ size_t FairMQSocketZMQ::Receive(FairMQMessage* msg) if (zmq_errno() == EAGAIN){ return false; } - std::stringstream logmsg; + stringstream logmsg; logmsg << "failed receiving on socket #" << fId << ", reason: " << zmq_strerror(errno); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); return nbytes; } -void FairMQSocketZMQ::SetOption(int option, const void* value, size_t valueSize) -{ - int rc = zmq_setsockopt(fSocket, option, value, valueSize); - if (rc < 0) { - std::stringstream logmsg; - logmsg << "failed setting socket option, reason: " << zmq_strerror(errno); - FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); - } -} - void FairMQSocketZMQ::Close() { if (fSocket == NULL){ @@ -144,7 +119,7 @@ void FairMQSocketZMQ::Close() int rc = zmq_close (fSocket); if (rc != 0) { - std::stringstream logmsg; + stringstream logmsg; logmsg << "failed closing socket, reason: " << zmq_strerror(errno); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); } @@ -157,6 +132,21 @@ void* FairMQSocketZMQ::GetSocket() return fSocket; } +int FairMQSocketZMQ::GetSocket(int nothing) +{ + // dummy method to compy with the interface. functionality not possible in zeromq. +} + +void FairMQSocketZMQ::SetOption(const string& option, const void* value, size_t valueSize) +{ + int rc = zmq_setsockopt(fSocket, GetConstant(option), value, valueSize); + if (rc < 0) { + stringstream logmsg; + logmsg << "failed setting socket option, reason: " << zmq_strerror(errno); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); + } +} + unsigned long FairMQSocketZMQ::GetBytesTx() { return fBytesTx; @@ -177,6 +167,20 @@ unsigned long FairMQSocketZMQ::GetMessagesRx() return fMessagesRx; } +int FairMQSocketZMQ::GetConstant(const string& constant) +{ + if (constant == "sub") return ZMQ_SUB; + if (constant == "pub") return ZMQ_PUB; + if (constant == "xsub") return ZMQ_XSUB; + if (constant == "xpub") return ZMQ_XPUB; + if (constant == "push") return ZMQ_PUSH; + if (constant == "pull") return ZMQ_PULL; + if (constant == "snd-hwm") return ZMQ_SNDHWM; + if (constant == "rcv-hwm") return ZMQ_RCVHWM; + + return -1; +} + FairMQSocketZMQ::~FairMQSocketZMQ() { } \ No newline at end of file diff --git a/fairmq/FairMQSocketZMQ.h b/fairmq/FairMQSocketZMQ.h index 389e264a..56cdd043 100644 --- a/fairmq/FairMQSocketZMQ.h +++ b/fairmq/FairMQSocketZMQ.h @@ -8,45 +8,51 @@ #ifndef FAIRMQSOCKETZMQ_H_ #define FAIRMQSOCKETZMQ_H_ +#include + #include #include "FairMQSocket.h" -#include "FairMQContext.h" +#include "FairMQContextZMQ.h" class FairMQSocketZMQ : public FairMQSocket { public: - FairMQSocketZMQ(FairMQContext* context, int type, int num); + FairMQSocketZMQ(const string& type, int num); - virtual std::string GetId(); + virtual string GetId(); - virtual void Bind(std::string address); - virtual void Connect(std::string address); + virtual void Bind(const string& address); + virtual void Connect(const string& address); virtual size_t Send(FairMQMessage* msg); virtual size_t Receive(FairMQMessage* msg); - virtual void Close(); - virtual void* GetSocket(); - virtual void SetOption(int option, const void* value, size_t valueSize); + virtual void* GetSocket(); + virtual int GetSocket(int nothing); + virtual void Close(); + + virtual void SetOption(const string& option, const void* value, size_t valueSize); virtual unsigned long GetBytesTx(); virtual unsigned long GetBytesRx(); virtual unsigned long GetMessagesTx(); virtual unsigned long GetMessagesRx(); - static std::string GetTypeString(int type); + static int GetConstant(const string& constant); virtual ~FairMQSocketZMQ(); private: void* fSocket; - std::string fId; + string fId; unsigned long fBytesTx; unsigned long fBytesRx; unsigned long fMessagesTx; unsigned long fMessagesRx; + + static boost::shared_ptr fContext; }; #endif /* FAIRMQSOCKETZMQ_H_ */ diff --git a/fairmq/FairMQTransportFactory.h b/fairmq/FairMQTransportFactory.h index dbfe8045..ac30b737 100644 --- a/fairmq/FairMQTransportFactory.h +++ b/fairmq/FairMQTransportFactory.h @@ -8,8 +8,13 @@ #ifndef FAIRMQTRANSPORTFACTORY_H_ #define FAIRMQTRANSPORTFACTORY_H_ +#include + #include "FairMQMessage.h" #include "FairMQSocket.h" +#include "FairMQPoller.h" + +using std::vector; class FairMQTransportFactory { @@ -17,7 +22,8 @@ class FairMQTransportFactory virtual FairMQMessage* CreateMessage() = 0; virtual FairMQMessage* CreateMessage(size_t size) = 0; virtual FairMQMessage* CreateMessage(void* data, size_t size) = 0; - virtual FairMQSocket* CreateSocket(FairMQContext* context, int type, int num) = 0; + virtual FairMQSocket* CreateSocket(string type, int num) = 0; + virtual FairMQPoller* CreatePoller(const vector& inputs) = 0; virtual ~FairMQTransportFactory() {}; }; diff --git a/fairmq/FairMQTransportFactoryNN.cxx b/fairmq/FairMQTransportFactoryNN.cxx new file mode 100644 index 00000000..039be997 --- /dev/null +++ b/fairmq/FairMQTransportFactoryNN.cxx @@ -0,0 +1,38 @@ +/** + * FairMQTransportFactoryNN.cxx + * + * @since 2014-01-20 + * @author: A. Rybalchenko + */ + +#include "FairMQTransportFactoryNN.h" + +FairMQTransportFactoryNN::FairMQTransportFactoryNN() +{ + +} + +FairMQMessage* FairMQTransportFactoryNN::CreateMessage() +{ + return new FairMQMessageNN(); +} + +FairMQMessage* FairMQTransportFactoryNN::CreateMessage(size_t size) +{ + return new FairMQMessageNN(size); +} + +FairMQMessage* FairMQTransportFactoryNN::CreateMessage(void* data, size_t size) +{ + return new FairMQMessageNN(data, size); +} + +FairMQSocket* FairMQTransportFactoryNN::CreateSocket(string type, int num) +{ + return new FairMQSocketNN(type, num); +} + +FairMQPoller* FairMQTransportFactoryNN::CreatePoller(const vector& inputs) +{ + return new FairMQPollerNN(inputs); +} diff --git a/fairmq/FairMQTransportFactoryNN.h b/fairmq/FairMQTransportFactoryNN.h new file mode 100644 index 00000000..21456fe4 --- /dev/null +++ b/fairmq/FairMQTransportFactoryNN.h @@ -0,0 +1,33 @@ +/** + * FairMQTransportFactoryNN.h + * + * @since 2014-01-20 + * @author: A. Rybalchenko + */ + +#ifndef FAIRMQTRANSPORTFACTORYNN_H_ +#define FAIRMQTRANSPORTFACTORYNN_H_ + +#include + +#include "FairMQTransportFactory.h" +#include "FairMQMessageNN.h" +#include "FairMQSocketNN.h" +#include "FairMQPollerNN.h" + +class FairMQTransportFactoryNN : public FairMQTransportFactory +{ + public: + FairMQTransportFactoryNN(); + + virtual FairMQMessage* CreateMessage(); + virtual FairMQMessage* CreateMessage(size_t size); + virtual FairMQMessage* CreateMessage(void* data, size_t size); + virtual FairMQSocket* CreateSocket(string type, int num); + virtual FairMQPoller* CreatePoller(const vector& inputs); + + + virtual ~FairMQTransportFactoryNN() {}; +}; + +#endif /* FAIRMQTRANSPORTFACTORYNN_H_ */ diff --git a/fairmq/FairMQTransportFactoryZMQ.cxx b/fairmq/FairMQTransportFactoryZMQ.cxx index 8fbf2a84..3661115d 100644 --- a/fairmq/FairMQTransportFactoryZMQ.cxx +++ b/fairmq/FairMQTransportFactoryZMQ.cxx @@ -9,25 +9,30 @@ FairMQTransportFactoryZMQ::FairMQTransportFactoryZMQ() { - + } FairMQMessage* FairMQTransportFactoryZMQ::CreateMessage() { - return new FairMQMessageZMQ(); + return new FairMQMessageZMQ(); } FairMQMessage* FairMQTransportFactoryZMQ::CreateMessage(size_t size) { - return new FairMQMessageZMQ(size); + return new FairMQMessageZMQ(size); } FairMQMessage* FairMQTransportFactoryZMQ::CreateMessage(void* data, size_t size) { - return new FairMQMessageZMQ(data, size); + return new FairMQMessageZMQ(data, size); } -FairMQSocket* FairMQTransportFactoryZMQ::CreateSocket(FairMQContext* context, int type, int num) +FairMQSocket* FairMQTransportFactoryZMQ::CreateSocket(string type, int num) { - return new FairMQSocketZMQ(context, type, num); + return new FairMQSocketZMQ(type, num); +} + +FairMQPoller* FairMQTransportFactoryZMQ::CreatePoller(const vector& inputs) +{ + return new FairMQPollerZMQ(inputs); } diff --git a/fairmq/FairMQTransportFactoryZMQ.h b/fairmq/FairMQTransportFactoryZMQ.h index f41c9f86..92b94848 100644 --- a/fairmq/FairMQTransportFactoryZMQ.h +++ b/fairmq/FairMQTransportFactoryZMQ.h @@ -8,10 +8,13 @@ #ifndef FAIRMQTRANSPORTFACTORYZMQ_H_ #define FAIRMQTRANSPORTFACTORYZMQ_H_ +#include + #include "FairMQTransportFactory.h" -#include "FairMQContext.h" +#include "FairMQContextZMQ.h" #include "FairMQMessageZMQ.h" #include "FairMQSocketZMQ.h" +#include "FairMQPollerZMQ.h" class FairMQTransportFactoryZMQ : public FairMQTransportFactory { @@ -21,7 +24,9 @@ class FairMQTransportFactoryZMQ : public FairMQTransportFactory virtual FairMQMessage* CreateMessage(); virtual FairMQMessage* CreateMessage(size_t size); virtual FairMQMessage* CreateMessage(void* data, size_t size); - virtual FairMQSocket* CreateSocket(FairMQContext* context, int type, int num); + virtual FairMQSocket* CreateSocket(string type, int num); + virtual FairMQPoller* CreatePoller(const vector& inputs); + virtual ~FairMQTransportFactoryZMQ() {}; }; diff --git a/fairmq/runBenchmarkSampler.cxx b/fairmq/runBenchmarkSampler.cxx index 3c860424..a5ce4988 100644 --- a/fairmq/runBenchmarkSampler.cxx +++ b/fairmq/runBenchmarkSampler.cxx @@ -11,18 +11,24 @@ #include "FairMQLogger.h" #include "FairMQBenchmarkSampler.h" #include "FairMQTransportFactoryZMQ.h" +// #include "FairMQTransportFactoryNN.h" + +using std::cout; +using std::cin; +using std::endl; +using std::stringstream; FairMQBenchmarkSampler sampler; static void s_signal_handler (int signal) { - std::cout << std::endl << "Caught signal " << signal << std::endl; + cout << endl << "Caught signal " << signal << endl; sampler.ChangeState(FairMQBenchmarkSampler::STOP); sampler.ChangeState(FairMQBenchmarkSampler::END); - std::cout << "Shutdown complete. Bye!" << std::endl; + cout << "Shutdown complete. Bye!" << endl; exit(1); } @@ -39,19 +45,20 @@ static void s_catch_signals (void) int main(int argc, char** argv) { if ( argc != 9 ) { - std::cout << "Usage: bsampler ID eventSize eventRate numIoTreads\n" + cout << "Usage: bsampler ID eventSize eventRate numIoTreads\n" << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" - << std::endl; + << endl; return 1; } s_catch_signals(); - std::stringstream logmsg; + stringstream logmsg; logmsg << "PID: " << getpid(); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); + // FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); sampler.SetTransport(transportFactory); int i = 1; @@ -60,17 +67,17 @@ int main(int argc, char** argv) ++i; int eventSize; - std::stringstream(argv[i]) >> eventSize; + stringstream(argv[i]) >> eventSize; sampler.SetProperty(FairMQBenchmarkSampler::EventSize, eventSize); ++i; int eventRate; - std::stringstream(argv[i]) >> eventRate; + stringstream(argv[i]) >> eventRate; sampler.SetProperty(FairMQBenchmarkSampler::EventRate, eventRate); ++i; int numIoThreads; - std::stringstream(argv[i]) >> numIoThreads; + stringstream(argv[i]) >> numIoThreads; sampler.SetProperty(FairMQBenchmarkSampler::NumIoThreads, numIoThreads); ++i; @@ -81,14 +88,10 @@ int main(int argc, char** argv) sampler.ChangeState(FairMQBenchmarkSampler::INIT); - int outputSocketType = ZMQ_PUB; - if (strcmp(argv[i], "push") == 0) { - outputSocketType = ZMQ_PUSH; - } - sampler.SetProperty(FairMQBenchmarkSampler::OutputSocketType, outputSocketType, 0); + sampler.SetProperty(FairMQBenchmarkSampler::OutputSocketType, argv[i], 0); ++i; int outputSndBufSize; - std::stringstream(argv[i]) >> outputSndBufSize; + stringstream(argv[i]) >> outputSndBufSize; sampler.SetProperty(FairMQBenchmarkSampler::OutputSndBufSize, outputSndBufSize, 0); ++i; sampler.SetProperty(FairMQBenchmarkSampler::OutputMethod, argv[i], 0); @@ -104,7 +107,7 @@ int main(int argc, char** argv) char ch; - std::cin.get(ch); + cin.get(ch); sampler.ChangeState(FairMQBenchmarkSampler::STOP); sampler.ChangeState(FairMQBenchmarkSampler::END); diff --git a/fairmq/runBuffer.cxx b/fairmq/runBuffer.cxx index 8f57afdb..e13bdcb6 100644 --- a/fairmq/runBuffer.cxx +++ b/fairmq/runBuffer.cxx @@ -11,18 +11,24 @@ #include "FairMQLogger.h" #include "FairMQBuffer.h" #include "FairMQTransportFactoryZMQ.h" +// #include "FairMQTransportFactoryNN.h" + +using std::cout; +using std::cin; +using std::endl; +using std::stringstream; FairMQBuffer buffer; static void s_signal_handler (int signal) { - std::cout << std::endl << "Caught signal " << signal << std::endl; + cout << endl << "Caught signal " << signal << endl; buffer.ChangeState(FairMQBuffer::STOP); buffer.ChangeState(FairMQBuffer::END); - std::cout << "Shutdown complete. Bye!" << std::endl; + cout << "Shutdown complete. Bye!" << endl; exit(1); } @@ -39,19 +45,20 @@ static void s_catch_signals (void) int main(int argc, char** argv) { if ( argc != 11 ) { - std::cout << "Usage: buffer \tID numIoTreads\n" + cout << "Usage: buffer \tID numIoTreads\n" << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" - << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << std::endl; + << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << endl; return 1; } s_catch_signals(); - std::stringstream logmsg; + stringstream logmsg; logmsg << "PID: " << getpid(); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); + // FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); buffer.SetTransport(transportFactory); int i = 1; @@ -60,7 +67,7 @@ int main(int argc, char** argv) ++i; int numIoThreads; - std::stringstream(argv[i]) >> numIoThreads; + stringstream(argv[i]) >> numIoThreads; buffer.SetProperty(FairMQBuffer::NumIoThreads, numIoThreads); ++i; buffer.SetProperty(FairMQBuffer::NumInputs, 1); @@ -70,14 +77,10 @@ int main(int argc, char** argv) buffer.ChangeState(FairMQBuffer::INIT); - int inputSocketType = ZMQ_SUB; - if (strcmp(argv[i], "pull") == 0) { - inputSocketType = ZMQ_PULL; - } - buffer.SetProperty(FairMQBuffer::InputSocketType, inputSocketType, 0); + buffer.SetProperty(FairMQBuffer::InputSocketType, argv[i], 0); ++i; int inputRcvBufSize; - std::stringstream(argv[i]) >> inputRcvBufSize; + stringstream(argv[i]) >> inputRcvBufSize; buffer.SetProperty(FairMQBuffer::InputRcvBufSize, inputRcvBufSize, 0); ++i; buffer.SetProperty(FairMQBuffer::InputMethod, argv[i], 0); @@ -85,15 +88,10 @@ int main(int argc, char** argv) buffer.SetProperty(FairMQBuffer::InputAddress, argv[i], 0); ++i; - - int outputSocketType = ZMQ_PUB; - if (strcmp(argv[i], "push") == 0) { - outputSocketType = ZMQ_PUSH; - } - buffer.SetProperty(FairMQBuffer::OutputSocketType, outputSocketType, 0); + buffer.SetProperty(FairMQBuffer::OutputSocketType, argv[i], 0); ++i; int outputSndBufSize; - std::stringstream(argv[i]) >> outputSndBufSize; + stringstream(argv[i]) >> outputSndBufSize; buffer.SetProperty(FairMQBuffer::OutputSndBufSize, outputSndBufSize, 0); ++i; buffer.SetProperty(FairMQBuffer::OutputMethod, argv[i], 0); @@ -109,7 +107,7 @@ int main(int argc, char** argv) char ch; - std::cin.get(ch); + cin.get(ch); buffer.ChangeState(FairMQBuffer::STOP); buffer.ChangeState(FairMQBuffer::END); diff --git a/fairmq/runMerger.cxx b/fairmq/runMerger.cxx index a7b1f98b..43e6b246 100644 --- a/fairmq/runMerger.cxx +++ b/fairmq/runMerger.cxx @@ -11,18 +11,24 @@ #include "FairMQLogger.h" #include "FairMQMerger.h" #include "FairMQTransportFactoryZMQ.h" +// #include "FairMQTransportFactoryNN.h" + +using std::cout; +using std::cin; +using std::endl; +using std::stringstream; FairMQMerger merger; static void s_signal_handler (int signal) { - std::cout << std::endl << "Caught signal " << signal << std::endl; + cout << endl << "Caught signal " << signal << endl; merger.ChangeState(FairMQMerger::STOP); merger.ChangeState(FairMQMerger::END); - std::cout << "Shutdown complete. Bye!" << std::endl; + cout << "Shutdown complete. Bye!" << endl; exit(1); } @@ -39,20 +45,21 @@ static void s_catch_signals (void) int main(int argc, char** argv) { if ( argc != 15 ) { - std::cout << "Usage: merger \tID numIoTreads\n" + cout << "Usage: merger \tID numIoTreads\n" << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" - << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << std::endl; + << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << endl; return 1; } s_catch_signals(); - std::stringstream logmsg; + stringstream logmsg; logmsg << "PID: " << getpid(); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); + // FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); merger.SetTransport(transportFactory); int i = 1; @@ -61,7 +68,7 @@ int main(int argc, char** argv) ++i; int numIoThreads; - std::stringstream(argv[i]) >> numIoThreads; + stringstream(argv[i]) >> numIoThreads; merger.SetProperty(FairMQMerger::NumIoThreads, numIoThreads); ++i; @@ -72,14 +79,10 @@ int main(int argc, char** argv) merger.ChangeState(FairMQMerger::INIT); - int inputSocketType = ZMQ_SUB; - if (strcmp(argv[i], "pull") == 0) { - inputSocketType = ZMQ_PULL; - } - merger.SetProperty(FairMQMerger::InputSocketType, inputSocketType, 0); + merger.SetProperty(FairMQMerger::InputSocketType, argv[i], 0); ++i; int inputRcvBufSize; - std::stringstream(argv[i]) >> inputRcvBufSize; + stringstream(argv[i]) >> inputRcvBufSize; merger.SetProperty(FairMQMerger::InputRcvBufSize, inputRcvBufSize, 0); ++i; merger.SetProperty(FairMQMerger::InputMethod, argv[i], 0); @@ -87,13 +90,9 @@ int main(int argc, char** argv) merger.SetProperty(FairMQMerger::InputAddress, argv[i], 0); ++i; - inputSocketType = ZMQ_SUB; - if (strcmp(argv[i], "pull") == 0) { - inputSocketType = ZMQ_PULL; - } - merger.SetProperty(FairMQMerger::InputSocketType, inputSocketType, 1); + merger.SetProperty(FairMQMerger::InputSocketType, argv[i], 1); ++i; - std::stringstream(argv[i]) >> inputRcvBufSize; + stringstream(argv[i]) >> inputRcvBufSize; merger.SetProperty(FairMQMerger::InputRcvBufSize, inputRcvBufSize, 1); ++i; merger.SetProperty(FairMQMerger::InputMethod, argv[i], 1); @@ -101,14 +100,10 @@ int main(int argc, char** argv) merger.SetProperty(FairMQMerger::InputAddress, argv[i], 1); ++i; - int outputSocketType = ZMQ_PUB; - if (strcmp(argv[i], "push") == 0) { - outputSocketType = ZMQ_PUSH; - } - merger.SetProperty(FairMQMerger::OutputSocketType, outputSocketType, 0); + merger.SetProperty(FairMQMerger::OutputSocketType, argv[i], 0); ++i; int outputSndBufSize; - std::stringstream(argv[i]) >> outputSndBufSize; + stringstream(argv[i]) >> outputSndBufSize; merger.SetProperty(FairMQMerger::OutputSndBufSize, outputSndBufSize, 0); ++i; merger.SetProperty(FairMQMerger::OutputMethod, argv[i], 0); @@ -123,7 +118,7 @@ int main(int argc, char** argv) char ch; - std::cin.get(ch); + cin.get(ch); merger.ChangeState(FairMQMerger::STOP); merger.ChangeState(FairMQMerger::END); diff --git a/fairmq/runNToOneMerger.cxx b/fairmq/runNToOneMerger.cxx index cd7c38f6..abc84158 100644 --- a/fairmq/runNToOneMerger.cxx +++ b/fairmq/runNToOneMerger.cxx @@ -11,18 +11,24 @@ #include "FairMQLogger.h" #include "FairMQMerger.h" #include "FairMQTransportFactoryZMQ.h" +// #include "FairMQTransportFactoryNN.h" + +using std::cout; +using std::cin; +using std::endl; +using std::stringstream; FairMQMerger merger; static void s_signal_handler (int signal) { - std::cout << std::endl << "Caught signal " << signal << std::endl; + cout << endl << "Caught signal " << signal << endl; merger.ChangeState(FairMQMerger::STOP); merger.ChangeState(FairMQMerger::END); - std::cout << "Shutdown complete. Bye!" << std::endl; + cout << "Shutdown complete. Bye!" << endl; exit(1); } @@ -39,22 +45,23 @@ static void s_catch_signals (void) int main(int argc, char** argv) { if ( argc < 16 || (argc-8)%4!=0 ) { - std::cout << "Usage: merger \tID numIoTreads numInputs\n" + cout << "Usage: merger \tID numIoTreads numInputs\n" << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" << "\t\t...\n" << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" - << argc << std::endl; + << argc << endl; return 1; } s_catch_signals(); - std::stringstream logmsg; + stringstream logmsg; logmsg << "PID: " << getpid(); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); + // FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); merger.SetTransport(transportFactory); int i = 1; @@ -63,12 +70,12 @@ int main(int argc, char** argv) ++i; int numIoThreads; - std::stringstream(argv[i]) >> numIoThreads; + stringstream(argv[i]) >> numIoThreads; merger.SetProperty(FairMQMerger::NumIoThreads, numIoThreads); ++i; int numInputs; - std::stringstream(argv[i]) >> numInputs; + stringstream(argv[i]) >> numInputs; merger.SetProperty(FairMQMerger::NumInputs, numInputs); ++i; @@ -78,16 +85,11 @@ int main(int argc, char** argv) merger.ChangeState(FairMQMerger::INIT); - int inputSocketType; for (int iInput = 0; iInput < numInputs; iInput++ ) { - inputSocketType = ZMQ_SUB; - if (strcmp(argv[i], "pull") == 0) { - inputSocketType = ZMQ_PULL; - } - merger.SetProperty(FairMQMerger::InputSocketType, inputSocketType, iInput); + merger.SetProperty(FairMQMerger::InputSocketType, argv[i], iInput); ++i; int inputRcvBufSize; - std::stringstream(argv[i]) >> inputRcvBufSize; + stringstream(argv[i]) >> inputRcvBufSize; merger.SetProperty(FairMQMerger::InputRcvBufSize, inputRcvBufSize, iInput); ++i; merger.SetProperty(FairMQMerger::InputMethod, argv[i], iInput); @@ -96,14 +98,10 @@ int main(int argc, char** argv) ++i; } - int outputSocketType = ZMQ_PUB; - if (strcmp(argv[i], "push") == 0) { - outputSocketType = ZMQ_PUSH; - } - merger.SetProperty(FairMQMerger::OutputSocketType, outputSocketType, 0); + merger.SetProperty(FairMQMerger::OutputSocketType, argv[i], 0); ++i; int outputSndBufSize; - std::stringstream(argv[i]) >> outputSndBufSize; + stringstream(argv[i]) >> outputSndBufSize; merger.SetProperty(FairMQMerger::OutputSndBufSize, outputSndBufSize, 0); ++i; merger.SetProperty(FairMQMerger::OutputMethod, argv[i], 0); @@ -118,7 +116,7 @@ int main(int argc, char** argv) char ch; - std::cin.get(ch); + cin.get(ch); merger.ChangeState(FairMQMerger::STOP); merger.ChangeState(FairMQMerger::END); diff --git a/fairmq/runOneToNSplitter.cxx b/fairmq/runOneToNSplitter.cxx index cba07895..a2694fe2 100644 --- a/fairmq/runOneToNSplitter.cxx +++ b/fairmq/runOneToNSplitter.cxx @@ -11,18 +11,24 @@ #include "FairMQLogger.h" #include "FairMQSplitter.h" #include "FairMQTransportFactoryZMQ.h" +// #include "FairMQTransportFactoryNN.h" + +using std::cout; +using std::cin; +using std::endl; +using std::stringstream; FairMQSplitter splitter; static void s_signal_handler (int signal) { - std::cout << std::endl << "Caught signal " << signal << std::endl; + cout << endl << "Caught signal " << signal << endl; splitter.ChangeState(FairMQSplitter::STOP); splitter.ChangeState(FairMQSplitter::END); - std::cout << "Shutdown complete. Bye!" << std::endl; + cout << "Shutdown complete. Bye!" << endl; exit(1); } @@ -39,21 +45,22 @@ static void s_catch_signals (void) int main(int argc, char** argv) { if ( argc < 16 || (argc - 8) % 4 != 0 ) { // argc{ name, id, threads, nout, insock, inbuff, inmet, inadd, ... out} - std::cout << "Usage: splitter \tID numIoTreads numOutputs\n" + cout << "Usage: splitter \tID numIoTreads numOutputs\n" << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" - << "\t\t..." << argc << " arguments provided" << std::endl; + << "\t\t..." << argc << " arguments provided" << endl; return 1; } s_catch_signals(); - std::stringstream logmsg; + stringstream logmsg; logmsg << "PID: " << getpid(); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); + // FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); splitter.SetTransport(transportFactory); int i = 1; @@ -62,14 +69,14 @@ int main(int argc, char** argv) ++i; int numIoThreads; - std::stringstream(argv[i]) >> numIoThreads; + stringstream(argv[i]) >> numIoThreads; splitter.SetProperty(FairMQSplitter::NumIoThreads, numIoThreads); ++i; splitter.SetProperty(FairMQSplitter::NumInputs, 1); int numOutputs; - std::stringstream(argv[i]) >> numOutputs; + stringstream(argv[i]) >> numOutputs; splitter.SetProperty(FairMQSplitter::NumOutputs, numOutputs); ++i; @@ -77,14 +84,10 @@ int main(int argc, char** argv) splitter.ChangeState(FairMQSplitter::INIT); - int inputSocketType = ZMQ_SUB; - if (strcmp(argv[i], "pull") == 0) { - inputSocketType = ZMQ_PULL; - } - splitter.SetProperty(FairMQSplitter::InputSocketType, inputSocketType, 0); + splitter.SetProperty(FairMQSplitter::InputSocketType, argv[i], 0); ++i; int inputRcvBufSize; - std::stringstream(argv[i]) >> inputRcvBufSize; + stringstream(argv[i]) >> inputRcvBufSize; splitter.SetProperty(FairMQSplitter::InputRcvBufSize, inputRcvBufSize, 0); ++i; splitter.SetProperty(FairMQSplitter::InputMethod, argv[i], 0); @@ -92,16 +95,11 @@ int main(int argc, char** argv) splitter.SetProperty(FairMQSplitter::InputAddress, argv[i], 0); ++i; - int outputSocketType; int outputSndBufSize; for (int iOutput = 0; iOutput < numOutputs; iOutput++) { - outputSocketType = ZMQ_PUB; - if (strcmp(argv[i], "push") == 0) { - outputSocketType = ZMQ_PUSH; - } - splitter.SetProperty(FairMQSplitter::OutputSocketType, outputSocketType, iOutput); + splitter.SetProperty(FairMQSplitter::OutputSocketType, argv[i], iOutput); ++i; - std::stringstream(argv[i]) >> outputSndBufSize; + stringstream(argv[i]) >> outputSndBufSize; splitter.SetProperty(FairMQSplitter::OutputSndBufSize, outputSndBufSize, iOutput); ++i; splitter.SetProperty(FairMQSplitter::OutputMethod, argv[i], iOutput); @@ -116,7 +114,7 @@ int main(int argc, char** argv) char ch; - std::cin.get(ch); + cin.get(ch); splitter.ChangeState(FairMQSplitter::STOP); splitter.ChangeState(FairMQSplitter::END); diff --git a/fairmq/runProxy.cxx b/fairmq/runProxy.cxx index e66918da..06984c93 100644 --- a/fairmq/runProxy.cxx +++ b/fairmq/runProxy.cxx @@ -11,18 +11,24 @@ #include "FairMQLogger.h" #include "FairMQProxy.h" #include "FairMQTransportFactoryZMQ.h" +// #include "FairMQTransportFactoryNN.h" + +using std::cout; +using std::cin; +using std::endl; +using std::stringstream; FairMQProxy proxy; static void s_signal_handler (int signal) { - std::cout << std::endl << "Caught signal " << signal << std::endl; + cout << endl << "Caught signal " << signal << endl; proxy.ChangeState(FairMQProxy::STOP); proxy.ChangeState(FairMQProxy::END); - std::cout << "Shutdown complete. Bye!" << std::endl; + cout << "Shutdown complete. Bye!" << endl; exit(1); } @@ -39,19 +45,20 @@ static void s_catch_signals (void) int main(int argc, char** argv) { if ( argc != 11 ) { - std::cout << "Usage: proxy \tID numIoTreads\n" + cout << "Usage: proxy \tID numIoTreads\n" << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" - << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << std::endl; + << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << endl; return 1; } s_catch_signals(); - std::stringstream logmsg; + stringstream logmsg; logmsg << "PID: " << getpid(); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); + // FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); proxy.SetTransport(transportFactory); int i = 1; @@ -60,7 +67,7 @@ int main(int argc, char** argv) ++i; int numIoThreads; - std::stringstream(argv[i]) >> numIoThreads; + stringstream(argv[i]) >> numIoThreads; proxy.SetProperty(FairMQProxy::NumIoThreads, numIoThreads); ++i; @@ -71,14 +78,10 @@ int main(int argc, char** argv) proxy.ChangeState(FairMQProxy::INIT); - int inputSocketType = ZMQ_XSUB; - if (strcmp(argv[i], "pull") == 0) { - inputSocketType = ZMQ_PULL; - } - proxy.SetProperty(FairMQProxy::InputSocketType, inputSocketType, 0); + proxy.SetProperty(FairMQProxy::InputSocketType, argv[i], 0); ++i; int inputRcvBufSize; - std::stringstream(argv[i]) >> inputRcvBufSize; + stringstream(argv[i]) >> inputRcvBufSize; proxy.SetProperty(FairMQProxy::InputRcvBufSize, inputRcvBufSize, 0); ++i; proxy.SetProperty(FairMQProxy::InputMethod, argv[i], 0); @@ -86,14 +89,10 @@ int main(int argc, char** argv) proxy.SetProperty(FairMQProxy::InputAddress, argv[i], 0); ++i; - int outputSocketType = ZMQ_XPUB; - if (strcmp(argv[i], "push") == 0) { - outputSocketType = ZMQ_PUSH; - } - proxy.SetProperty(FairMQProxy::OutputSocketType, outputSocketType, 0); + proxy.SetProperty(FairMQProxy::OutputSocketType, argv[i], 0); ++i; int outputSndBufSize; - std::stringstream(argv[i]) >> outputSndBufSize; + stringstream(argv[i]) >> outputSndBufSize; proxy.SetProperty(FairMQProxy::OutputSndBufSize, outputSndBufSize, 0); ++i; proxy.SetProperty(FairMQProxy::OutputMethod, argv[i], 0); @@ -108,7 +107,7 @@ int main(int argc, char** argv) char ch; - std::cin.get(ch); + cin.get(ch); proxy.ChangeState(FairMQProxy::STOP); proxy.ChangeState(FairMQProxy::END); diff --git a/fairmq/runSink.cxx b/fairmq/runSink.cxx index c055eb34..664cf1c3 100644 --- a/fairmq/runSink.cxx +++ b/fairmq/runSink.cxx @@ -11,18 +11,24 @@ #include "FairMQLogger.h" #include "FairMQSink.h" #include "FairMQTransportFactoryZMQ.h" +// #include "FairMQTransportFactoryNN.h" + +using std::cout; +using std::cin; +using std::endl; +using std::stringstream; FairMQSink sink; static void s_signal_handler (int signal) { - std::cout << std::endl << "Caught signal " << signal << std::endl; + cout << endl << "Caught signal " << signal << endl; sink.ChangeState(FairMQSink::STOP); sink.ChangeState(FairMQSink::END); - std::cout << "Shutdown complete. Bye!" << std::endl; + cout << "Shutdown complete. Bye!" << endl; exit(1); } @@ -39,19 +45,20 @@ static void s_catch_signals (void) int main(int argc, char** argv) { if ( argc != 7 ) { - std::cout << "Usage: sink \tID numIoTreads\n" + cout << "Usage: sink \tID numIoTreads\n" << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" - << std::endl; + << endl; return 1; } s_catch_signals(); - std::stringstream logmsg; + stringstream logmsg; logmsg << "PID: " << getpid(); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); + // FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); sink.SetTransport(transportFactory); int i = 1; @@ -60,7 +67,7 @@ int main(int argc, char** argv) ++i; int numIoThreads; - std::stringstream(argv[i]) >> numIoThreads; + stringstream(argv[i]) >> numIoThreads; sink.SetProperty(FairMQSink::NumIoThreads, numIoThreads); ++i; @@ -71,14 +78,10 @@ int main(int argc, char** argv) sink.ChangeState(FairMQSink::INIT); - int inputSocketType = ZMQ_SUB; - if (strcmp(argv[i], "pull") == 0) { - inputSocketType = ZMQ_PULL; - } - sink.SetProperty(FairMQSink::InputSocketType, inputSocketType, 0); + sink.SetProperty(FairMQSink::InputSocketType, argv[i], 0); ++i; int inputRcvBufSize; - std::stringstream(argv[i]) >> inputRcvBufSize; + stringstream(argv[i]) >> inputRcvBufSize; sink.SetProperty(FairMQSink::InputRcvBufSize, inputRcvBufSize, 0); ++i; sink.SetProperty(FairMQSink::InputMethod, argv[i], 0); @@ -93,7 +96,7 @@ int main(int argc, char** argv) char ch; - std::cin.get(ch); + cin.get(ch); sink.ChangeState(FairMQSink::STOP); sink.ChangeState(FairMQSink::END); diff --git a/fairmq/runSplitter.cxx b/fairmq/runSplitter.cxx index da33cafb..f1080242 100644 --- a/fairmq/runSplitter.cxx +++ b/fairmq/runSplitter.cxx @@ -11,18 +11,24 @@ #include "FairMQLogger.h" #include "FairMQSplitter.h" #include "FairMQTransportFactoryZMQ.h" +// #include "FairMQTransportFactoryNN.h" + +using std::cout; +using std::cin; +using std::endl; +using std::stringstream; FairMQSplitter splitter; static void s_signal_handler (int signal) { - std::cout << std::endl << "Caught signal " << signal << std::endl; + cout << endl << "Caught signal " << signal << endl; splitter.ChangeState(FairMQSplitter::STOP); splitter.ChangeState(FairMQSplitter::END); - std::cout << "Shutdown complete. Bye!" << std::endl; + cout << "Shutdown complete. Bye!" << endl; exit(1); } @@ -39,20 +45,21 @@ static void s_catch_signals (void) int main(int argc, char** argv) { if ( argc != 15 ) { - std::cout << "Usage: splitter \tID numIoTreads\n" + cout << "Usage: splitter \tID numIoTreads\n" << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" - << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << std::endl; + << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << endl; return 1; } s_catch_signals(); - std::stringstream logmsg; + stringstream logmsg; logmsg << "PID: " << getpid(); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); + // FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); splitter.SetTransport(transportFactory); int i = 1; @@ -61,7 +68,7 @@ int main(int argc, char** argv) ++i; int numIoThreads; - std::stringstream(argv[i]) >> numIoThreads; + stringstream(argv[i]) >> numIoThreads; splitter.SetProperty(FairMQSplitter::NumIoThreads, numIoThreads); ++i; @@ -72,14 +79,10 @@ int main(int argc, char** argv) splitter.ChangeState(FairMQSplitter::INIT); - int inputSocketType = ZMQ_SUB; - if (strcmp(argv[i], "pull") == 0) { - inputSocketType = ZMQ_PULL; - } - splitter.SetProperty(FairMQSplitter::InputSocketType, inputSocketType, 0); + splitter.SetProperty(FairMQSplitter::InputSocketType, argv[i], 0); ++i; int inputRcvBufSize; - std::stringstream(argv[i]) >> inputRcvBufSize; + stringstream(argv[i]) >> inputRcvBufSize; splitter.SetProperty(FairMQSplitter::InputRcvBufSize, inputRcvBufSize, 0); ++i; splitter.SetProperty(FairMQSplitter::InputMethod, argv[i], 0); @@ -87,14 +90,10 @@ int main(int argc, char** argv) splitter.SetProperty(FairMQSplitter::InputAddress, argv[i], 0); ++i; - int outputSocketType = ZMQ_PUB; - if (strcmp(argv[i], "push") == 0) { - outputSocketType = ZMQ_PUSH; - } - splitter.SetProperty(FairMQSplitter::OutputSocketType, outputSocketType, 0); + splitter.SetProperty(FairMQSplitter::OutputSocketType, argv[i], 0); ++i; int outputSndBufSize; - std::stringstream(argv[i]) >> outputSndBufSize; + stringstream(argv[i]) >> outputSndBufSize; splitter.SetProperty(FairMQSplitter::OutputSndBufSize, outputSndBufSize, 0); ++i; splitter.SetProperty(FairMQSplitter::OutputMethod, argv[i], 0); @@ -102,13 +101,9 @@ int main(int argc, char** argv) splitter.SetProperty(FairMQSplitter::OutputAddress, argv[i], 0); ++i; - outputSocketType = ZMQ_PUB; - if (strcmp(argv[i], "push") == 0) { - outputSocketType = ZMQ_PUSH; - } - splitter.SetProperty(FairMQSplitter::OutputSocketType, outputSocketType, 1); + splitter.SetProperty(FairMQSplitter::OutputSocketType, argv[i], 1); ++i; - std::stringstream(argv[i]) >> outputSndBufSize; + stringstream(argv[i]) >> outputSndBufSize; splitter.SetProperty(FairMQSplitter::OutputSndBufSize, outputSndBufSize, 1); ++i; splitter.SetProperty(FairMQSplitter::OutputMethod, argv[i], 1); @@ -123,7 +118,7 @@ int main(int argc, char** argv) char ch; - std::cin.get(ch); + cin.get(ch); splitter.ChangeState(FairMQSplitter::STOP); splitter.ChangeState(FairMQSplitter::END);