From 3803a3d1554b0dd70d64a62467938329fc55162e Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Fri, 17 Jan 2014 12:34:57 +0100 Subject: [PATCH] a little clean up --- fairmq/CMakeLists.txt | 11 +- fairmq/FairMQBenchmarkSampler.cxx | 10 +- fairmq/FairMQBenchmarkSampler.h | 7 +- fairmq/FairMQBuffer.cxx | 20 ++-- fairmq/FairMQBuffer.h | 7 +- fairmq/FairMQConfigurable.cxx | 6 +- fairmq/FairMQConfigurable.h | 6 +- fairmq/FairMQContext.cxx | 52 +++++---- fairmq/FairMQContext.h | 24 ++--- fairmq/FairMQDevice.cxx | 21 ++-- fairmq/FairMQDevice.h | 4 +- fairmq/FairMQLogger.cxx | 6 +- fairmq/FairMQLogger.h | 6 +- fairmq/FairMQMerger.cxx | 32 +++--- fairmq/FairMQMerger.h | 22 ++-- fairmq/FairMQMessage.cxx | 104 ++++++++++++------ fairmq/FairMQMessage.h | 35 ++++-- fairmq/FairMQProcessor.cxx | 19 +--- fairmq/FairMQProcessor.h | 6 +- fairmq/FairMQProcessorTask.cxx | 11 +- fairmq/FairMQProcessorTask.h | 7 +- fairmq/FairMQProxy.cxx | 20 ++-- fairmq/FairMQProxy.h | 6 +- fairmq/FairMQSampler.cxx | 18 +--- fairmq/FairMQSampler.h | 6 +- fairmq/FairMQSamplerTask.cxx | 10 +- fairmq/FairMQSamplerTask.h | 7 +- fairmq/FairMQSink.cxx | 19 +--- fairmq/FairMQSink.h | 10 +- fairmq/FairMQSocket.cxx | 174 ++++++++++++++++-------------- fairmq/FairMQSocket.h | 40 +++---- fairmq/FairMQSplitter.cxx | 34 +++--- fairmq/FairMQSplitter.h | 21 ++-- fairmq/FairMQStateMachine.cxx | 7 +- fairmq/FairMQStateMachine.h | 8 +- fairmq/runBenchmarkSampler.cxx | 6 +- fairmq/runBuffer.cxx | 4 +- fairmq/runMerger.cxx | 58 +++++----- fairmq/runNToOneMerger.cxx | 52 ++++----- fairmq/runOneToNSplitter.cxx | 54 +++++----- fairmq/runProxy.cxx | 4 +- fairmq/runSink.cxx | 6 +- fairmq/runSplitter.cxx | 58 +++++----- 43 files changed, 525 insertions(+), 513 deletions(-) diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index 88cf1962..9a096806 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -2,6 +2,8 @@ include_directories( ${BASE_INCLUDE_DIRECTORIES} ${CMAKE_SOURCE_DIR}/fairmq ${ZMQ_INCLUDE_DIR} + ${NANOMSG_INCLUDE_DIR} + ${Boost_INCLUDE_DIR} ${ROOT_INCLUDE_DIR} ) @@ -15,9 +17,13 @@ Set(SRCS "FairMQLogger.cxx" "FairMQContext.cxx" "FairMQMessage.cxx" + "FairMQMessageZMQ.cxx" + "FairMQMessageNN.cxx" "FairMQSocket.cxx" - "FairMQBalancedStandaloneSplitter.cxx" - "FairMQStandaloneMerger.cxx" + "FairMQSocketZMQ.cxx" + "FairMQSocketNN.cxx" + "FairMQSplitter.cxx" + "FairMQMerger.cxx" "FairMQProcessor.cxx" "FairMQProcessorTask.cxx" "FairMQSink.cxx" @@ -37,6 +43,7 @@ Set(LINKDEF) Set(DEPENDENCIES ${CMAKE_THREAD_LIBS_INIT} ${ZMQ_LIBRARY_SHARED} + ${NANOMSG_LIBRARY_SHARED} Base ParBase FairTools GeoBase boost_thread boost_timer boost_system ) diff --git a/fairmq/FairMQBenchmarkSampler.cxx b/fairmq/FairMQBenchmarkSampler.cxx index 97dca4df..2fc7064d 100644 --- a/fairmq/FairMQBenchmarkSampler.cxx +++ b/fairmq/FairMQBenchmarkSampler.cxx @@ -1,9 +1,10 @@ -/* +/** * FairMQBenchmarkSampler.cpp * - * Created on: Apr 23, 2013 - * Author: dklein + * @since 2013-04-23 + * @author D. Klein, A. Rybalchenko */ + #include #include @@ -12,6 +13,7 @@ #include "FairMQBenchmarkSampler.h" #include "FairMQLogger.h" + FairMQBenchmarkSampler::FairMQBenchmarkSampler() : fEventSize(10000), fEventRate(1), @@ -37,7 +39,7 @@ void FairMQBenchmarkSampler::Run() boost::thread resetEventCounter(boost::bind(&FairMQBenchmarkSampler::ResetEventCounter, this)); void* buffer = operator new[](fEventSize); - FairMQMessage* base_event = new FairMQMessage(buffer, fEventSize, NULL); + FairMQMessage* base_event = new FairMQMessage(buffer, fEventSize); while ( fState == RUNNING ) { FairMQMessage event; diff --git a/fairmq/FairMQBenchmarkSampler.h b/fairmq/FairMQBenchmarkSampler.h index 14a67e68..f1d6fef0 100644 --- a/fairmq/FairMQBenchmarkSampler.h +++ b/fairmq/FairMQBenchmarkSampler.h @@ -1,14 +1,15 @@ -/* +/** * FairMQBenchmarkSampler.h * - * Created on: Apr 23, 2013 - * Author: dklein + * @since 2013-04-23 + * @author D. Klein, A. Rybalchenko */ #ifndef FAIRMQBENCHMARKSAMPLER_H_ #define FAIRMQBENCHMARKSAMPLER_H_ #include + #include "FairMQDevice.h" #include "TString.h" diff --git a/fairmq/FairMQBuffer.cxx b/fairmq/FairMQBuffer.cxx index 7b4f9a23..42cdaf04 100644 --- a/fairmq/FairMQBuffer.cxx +++ b/fairmq/FairMQBuffer.cxx @@ -1,8 +1,8 @@ -/* +/** * FairMQBuffer.cxx * - * Created on: Oct 25, 2012 - * Author: dklein + * @since 2012-10-25 + * @author D. Klein, A. Rybalchenko */ #include @@ -13,6 +13,7 @@ #include "FairMQBuffer.h" #include "FairMQLogger.h" + FairMQBuffer::FairMQBuffer() { } @@ -23,20 +24,11 @@ void FairMQBuffer::Run() boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); - // Initialize poll set - zmq_pollitem_t items[] = { - { *(fPayloadInputs->at(0)->GetSocket()), 0, ZMQ_POLLIN, 0 } - }; - - Bool_t received = false; + bool received = false; while ( fState == RUNNING ) { FairMQMessage msg; - zmq_poll(items, 1, 100); - - if (items[0].revents & ZMQ_POLLIN) { - received = fPayloadInputs->at(0)->Receive(&msg); - } + received = fPayloadInputs->at(0)->Receive(&msg); if (received) { fPayloadOutputs->at(0)->Send(&msg); diff --git a/fairmq/FairMQBuffer.h b/fairmq/FairMQBuffer.h index 76e37978..5f64062f 100644 --- a/fairmq/FairMQBuffer.h +++ b/fairmq/FairMQBuffer.h @@ -1,15 +1,14 @@ -/* +/** * FairMQBuffer.h * - * Created on: Oct 25, 2012 - * Author: dklein + * @since 2012-10-25 + * @author D. Klein, A. Rybalchenko */ #ifndef FAIRMQBUFFER_H_ #define FAIRMQBUFFER_H_ #include "FairMQDevice.h" -#include "Rtypes.h" class FairMQBuffer: public FairMQDevice diff --git a/fairmq/FairMQConfigurable.cxx b/fairmq/FairMQConfigurable.cxx index 1bc0114c..dd589eff 100644 --- a/fairmq/FairMQConfigurable.cxx +++ b/fairmq/FairMQConfigurable.cxx @@ -1,8 +1,8 @@ -/* +/** * FairMQConfigurable.cxx * - * Created on: Oct 25, 2012 - * Author: dklein + * @since 2012-10-25 + * @author D. Klein, A. Rybalchenko */ #include "FairMQConfigurable.h" diff --git a/fairmq/FairMQConfigurable.h b/fairmq/FairMQConfigurable.h index 1854f404..641086f9 100644 --- a/fairmq/FairMQConfigurable.h +++ b/fairmq/FairMQConfigurable.h @@ -1,8 +1,8 @@ -/* +/** * FairMQConfigurable.h * - * Created on: Oct 25, 2012 - * Author: dklein + * @since 2012-10-25 + * @author D. Klein, A. Rybalchenko */ #ifndef FAIRMQCONFIGURABLE_H_ diff --git a/fairmq/FairMQContext.cxx b/fairmq/FairMQContext.cxx index 95e85390..4097efda 100644 --- a/fairmq/FairMQContext.cxx +++ b/fairmq/FairMQContext.cxx @@ -1,44 +1,52 @@ -/* +/** * FairMQContext.cxx * - * Created on: Dec 5, 2012 - * Author: dklein + * @since 2012-12-05 + * @author D. Klein, A. Rybalchenko */ +#include "FairMQLogger.h" #include "FairMQContext.h" #include - -const TString FairMQContext::PAYLOAD = "payload"; -const TString FairMQContext::LOG = "log"; -const TString FairMQContext::CONFIG = "config"; -const TString FairMQContext::CONTROL = "control"; - -FairMQContext::FairMQContext(TString deviceId, TString contextId, Int_t numIoThreads) +FairMQContext::FairMQContext(int numIoThreads) { - std::stringstream id; - id << deviceId << "." << contextId; - fId = id.str(); + fContext = zmq_ctx_new (); + if (fContext == NULL){ + std::stringstream logmsg; + logmsg << "failed creating context, reason: " << zmq_strerror(errno); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); + } - fContext = new zmq::context_t(numIoThreads); + int rc = zmq_ctx_set (fContext, ZMQ_IO_THREADS, numIoThreads); + if (rc != 0){ + std::stringstream logmsg; + logmsg << "failed configuring context, reason: " << zmq_strerror(errno); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); + } } FairMQContext::~FairMQContext() { } -TString FairMQContext::GetId() -{ - return fId; -} - -zmq::context_t* FairMQContext::GetContext() +void* FairMQContext::GetContext() { return fContext; } void FairMQContext::Close() { - fContext->close(); -} + if (fContext == NULL){ + return; + } + int rc = zmq_ctx_destroy (fContext); + if (rc != 0) { + std::stringstream logmsg; + logmsg << "failed closing context, reason: " << zmq_strerror(errno); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); + } + + fContext = NULL; +} diff --git a/fairmq/FairMQContext.h b/fairmq/FairMQContext.h index 046ece47..0c8dc387 100644 --- a/fairmq/FairMQContext.h +++ b/fairmq/FairMQContext.h @@ -1,31 +1,25 @@ -/* +/** * FairMQContext.h * - * Created on: Dec 5, 2012 - * Author: dklein + * @since 2012-12-05 + * @author D. Klein, A. Rybalchenko */ #ifndef FAIRMQCONTEXT_H_ #define FAIRMQCONTEXT_H_ -#include -#include -#include "Rtypes.h" -#include "TString.h" - +#include class FairMQContext { - private: - TString fId; - zmq::context_t* fContext; public: - const static TString PAYLOAD, LOG, CONFIG, CONTROL; - FairMQContext(TString deviceId, TString contextId, Int_t numIoThreads); + FairMQContext(int numIoThreads); virtual ~FairMQContext(); - TString GetId(); - zmq::context_t* GetContext(); + void* GetContext(); void Close(); + + private: + void* fContext; }; #endif /* FAIRMQCONTEXT_H_ */ diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index 5ca70e1e..7a9d5719 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -1,8 +1,8 @@ /** * FairMQDevice.cxx * - * @since Oct 25, 2012 - * @authors: D. Klein, A. Rybalchenko + * @since 2012-10-25 + * @author D. Klein, A. Rybalchenko */ #include @@ -30,7 +30,7 @@ void FairMQDevice::Init() logmsg << "numIoThreads: " << fNumIoThreads; FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); - fPayloadContext = new FairMQContext(fId, FairMQContext::PAYLOAD, fNumIoThreads); + fPayloadContext = new FairMQContext(fNumIoThreads); fInputAddress = new std::vector(fNumInputs); fInputMethod = new std::vector(); @@ -65,9 +65,12 @@ void FairMQDevice::InitInput() for (Int_t i = 0; i < fNumInputs; ++i) { FairMQSocket* socket = new FairMQSocket(fPayloadContext, fInputSocketType->at(i), i); - socket->GetSocket()->setsockopt(ZMQ_SNDHWM, &fInputSndBufSize->at(i), sizeof(fInputSndBufSize->at(i))); - socket->GetSocket()->setsockopt(ZMQ_RCVHWM, &fInputRcvBufSize->at(i), sizeof(fInputRcvBufSize->at(i))); + + socket->SetOption(ZMQ_SNDHWM, &fInputSndBufSize->at(i), sizeof(fInputSndBufSize->at(i))); + socket->SetOption(ZMQ_RCVHWM, &fInputRcvBufSize->at(i), sizeof(fInputRcvBufSize->at(i))); + fPayloadInputs->push_back(socket); + try { if (fInputMethod->at(i) == "bind") { fPayloadInputs->at(i)->Bind(fInputAddress->at(i)); @@ -85,8 +88,8 @@ void FairMQDevice::InitOutput() for (Int_t i = 0; i < fNumOutputs; ++i) { FairMQSocket* socket = new FairMQSocket(fPayloadContext, fOutputSocketType->at(i), i); - socket->GetSocket()->setsockopt(ZMQ_SNDHWM, &fOutputSndBufSize->at(i), sizeof(fOutputSndBufSize->at(i))); - socket->GetSocket()->setsockopt(ZMQ_RCVHWM, &fOutputRcvBufSize->at(i), sizeof(fOutputRcvBufSize->at(i))); + socket->SetOption(ZMQ_SNDHWM, &fOutputSndBufSize->at(i), sizeof(fOutputSndBufSize->at(i))); + socket->SetOption(ZMQ_RCVHWM, &fOutputRcvBufSize->at(i), sizeof(fOutputRcvBufSize->at(i))); fPayloadOutputs->push_back(socket); try { if (fOutputMethod->at(i) == "bind") { @@ -290,7 +293,7 @@ void FairMQDevice::LogSocketRates() messagesInput[i] = messagesInputNew[i]; std::stringstream logmsg; - logmsg << "#" << (*itr)->GetId() << ": " << messagesPerSecondInput[i] << " msg/s, " << megabytesPerSecondInput[i] << " MB/s"; + logmsg << "#" << fId << "." << (*itr)->GetId() << ": " << messagesPerSecondInput[i] << " msg/s, " << megabytesPerSecondInput[i] << " MB/s"; FairMQLogger::GetInstance()->Log(FairMQLogger::DEBUG, logmsg.str()); // Temp stuff for process termination @@ -318,7 +321,7 @@ void FairMQDevice::LogSocketRates() messagesOutput[i] = messagesOutputNew[i]; std::stringstream logmsg; - logmsg << "#" << (*itr)->GetId() << ": " << messagesPerSecondOutput[i] << " msg/s, " << megabytesPerSecondOutput[i] << " MB/s"; + logmsg << "#" << fId << "." << (*itr)->GetId() << ": " << messagesPerSecondOutput[i] << " msg/s, " << megabytesPerSecondOutput[i] << " MB/s"; FairMQLogger::GetInstance()->Log(FairMQLogger::DEBUG, logmsg.str()); // Temp stuff for process termination diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index 6c5c8f86..a38c651c 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -1,8 +1,8 @@ /** * FairMQDevice.h * - * @since Oct 25, 2012 - * @authors: D. Klein, A. Rybalchenko + * @since 2012-10-25 + * @author D. Klein, A. Rybalchenko */ #ifndef FAIRMQDEVICE_H_ diff --git a/fairmq/FairMQLogger.cxx b/fairmq/FairMQLogger.cxx index 027a1e25..f08f62db 100644 --- a/fairmq/FairMQLogger.cxx +++ b/fairmq/FairMQLogger.cxx @@ -1,8 +1,8 @@ -/* +/** * FairMQLogger.cxx * - * Created on: Dec 4, 2012 - * Author: dklein + * @since 2012-12-04 + * @author D. Klein, A. Rybalchenko */ #include "FairMQLogger.h" diff --git a/fairmq/FairMQLogger.h b/fairmq/FairMQLogger.h index e3848054..8adbbcaa 100644 --- a/fairmq/FairMQLogger.h +++ b/fairmq/FairMQLogger.h @@ -1,8 +1,8 @@ -/* +/** * FairMQLogger.h * - * Created on: Dec 4, 2012 - * Author: dklein + * @since 2012-12-04 + * @author D. Klein, A. Rybalchenko */ #ifndef FAIRMQLOGGER_H_ diff --git a/fairmq/FairMQMerger.cxx b/fairmq/FairMQMerger.cxx index b0f445b8..acc1c7b3 100644 --- a/fairmq/FairMQMerger.cxx +++ b/fairmq/FairMQMerger.cxx @@ -1,34 +1,38 @@ -/* - * FairMQStandaloneMerger.cxx +/** + * FairMQMerger.cxx * - * Created on: Dec 6, 2012 - * Author: dklein + * @since 2012-12-06 + * @author D. Klein, A. Rybalchenko */ #include #include #include "FairMQLogger.h" -#include "FairMQStandaloneMerger.h" +#include "FairMQMerger.h" -FairMQStandaloneMerger::FairMQStandaloneMerger() + +FairMQMerger::FairMQMerger() { } -FairMQStandaloneMerger::~FairMQStandaloneMerger() +FairMQMerger::~FairMQMerger() { } -void FairMQStandaloneMerger::Run() +void FairMQMerger::Run() { FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Run <<<<<<<"); boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); zmq_pollitem_t items[fNumInputs]; - for (Int_t iInput = 0; iInput < fNumInputs; iInput++) { - zmq_pollitem_t tempitem= {*(fPayloadInputs->at(iInput)->GetSocket()), 0, ZMQ_POLLIN, 0}; - items[iInput] = tempitem; + + for (int i = 0; i < fNumInputs; i++) { + items[i].socket = fPayloadInputs->at(i)->GetSocket(); + items[i].fd = 0; + items[i].events = ZMQ_POLLIN; + items[i].revents = 0; } Bool_t received = false; @@ -38,9 +42,9 @@ void FairMQStandaloneMerger::Run() zmq_poll(items, fNumInputs, 100); - for(Int_t iItem = 0; iItem < fNumInputs; iItem++) { - if (items[iItem].revents & ZMQ_POLLIN) { - received = fPayloadInputs->at(iItem)->Receive(&msg); + for(int i = 0; i < fNumInputs; i++) { + if (items[i].revents & ZMQ_POLLIN) { + received = fPayloadInputs->at(i)->Receive(&msg); } if (received) { fPayloadOutputs->at(0)->Send(&msg); diff --git a/fairmq/FairMQMerger.h b/fairmq/FairMQMerger.h index c9e855a1..9e258e9c 100644 --- a/fairmq/FairMQMerger.h +++ b/fairmq/FairMQMerger.h @@ -1,25 +1,23 @@ -/* - * FairMQStandaloneMerger.h +/** + * FairMQMerger.h * - * Created on: Dec 6, 2012 - * Author: dklein + * @since 2012-12-06 + * @author D. Klein, A. Rybalchenko */ -#ifndef FAIRMQSTANDALONEMERGER_H_ -#define FAIRMQSTANDALONEMERGER_H_ +#ifndef FAIRMQMERGER_H_ +#define FAIRMQMERGER_H_ #include "FairMQDevice.h" -#include "Rtypes.h" -#include "TString.h" -class FairMQStandaloneMerger: public FairMQDevice +class FairMQMerger: public FairMQDevice { public: - FairMQStandaloneMerger(); - virtual ~FairMQStandaloneMerger(); + FairMQMerger(); + virtual ~FairMQMerger(); protected: virtual void Run(); }; -#endif /* FAIRMQSTANDALONEMERGER_H_ */ +#endif /* FAIRMQMERGER_H_ */ diff --git a/fairmq/FairMQMessage.cxx b/fairmq/FairMQMessage.cxx index 57c6e2ee..5e76063f 100644 --- a/fairmq/FairMQMessage.cxx +++ b/fairmq/FairMQMessage.cxx @@ -1,63 +1,99 @@ -/* +/** * FairMQMessage.cxx * - * Created on: Dec 5, 2012 - * Author: dklein + * @since 2012-12-05 + * @author: D. Klein, A. Rybalchenko */ +#include + #include "FairMQMessage.h" #include "FairMQLogger.h" FairMQMessage::FairMQMessage() { - try { - fMessage = new zmq::message_t(); - } catch (zmq::error_t& e) { + int rc = zmq_msg_init (&fMessage); + if (rc != 0){ std::stringstream logmsg; - logmsg << "failed allocating new message, reason: " << e.what(); + logmsg << "failed initializing message, reason: " << zmq_strerror(errno); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); } } -FairMQMessage::FairMQMessage(void* data_, size_t size_, zmq::free_fn* ffn_, void* hint_/*= NULL*/) +FairMQMessage::FairMQMessage(size_t size) { - try { - fMessage = new zmq::message_t(data_, size_, ffn_, hint_); - } catch (zmq::error_t& e) { + int rc = zmq_msg_init_size (&fMessage, size); + if (rc != 0){ std::stringstream logmsg; - logmsg << "failed allocating new message, reason: " << e.what(); + logmsg << "failed initializing message with size, reason: " << zmq_strerror(errno); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); + } +} + +FairMQMessage::FairMQMessage(void* data, size_t size) +{ + int rc = zmq_msg_init_data (&fMessage, data, size, &CleanUp, NULL); + if (rc != 0){ + std::stringstream logmsg; + logmsg << "failed initializing message with data, reason: " << zmq_strerror(errno); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); } } FairMQMessage::~FairMQMessage() { - delete fMessage; -} - -zmq::message_t* FairMQMessage::GetMessage() -{ - return fMessage; -} - -Int_t FairMQMessage::Size() -{ - return fMessage->size(); -} - -Bool_t FairMQMessage::Copy(FairMQMessage* msg) -{ - Bool_t result = false; - - try { - fMessage->copy(msg->GetMessage()); - } catch (zmq::error_t& e) { + int rc = zmq_msg_close (&fMessage); + if (rc != 0){ std::stringstream logmsg; - logmsg << "failed copying message, reason: " << e.what(); + logmsg << "failed closing message with data, reason: " << zmq_strerror(errno); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); + } +} + +void FairMQMessage::Rebuild(void* data, size_t size) +{ + int rc = zmq_msg_close (&fMessage); + if (rc != 0) { + std::stringstream logmsg; + logmsg << "failed closing message, reason: " << zmq_strerror(errno); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); } - return result; + rc = zmq_msg_init_data (&fMessage, data, size, &CleanUp, NULL); + if (rc != 0) { + std::stringstream logmsg2; + logmsg2 << "failed initializing message with data, reason: " << zmq_strerror(errno); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str()); + } } +zmq_msg_t* FairMQMessage::GetMessage() +{ + return &fMessage; +} + +void* FairMQMessage::GetData() +{ + return zmq_msg_data (&fMessage); +} + +size_t FairMQMessage::GetSize() +{ + return zmq_msg_size (&fMessage); +} + +void FairMQMessage::Copy(FairMQMessage* msg) +{ + int rc = zmq_msg_copy (&fMessage, &(msg->fMessage)); + if (rc != 0) { + std::stringstream logmsg; + logmsg << "failed copying message, reason: " << zmq_strerror(errno); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); + } +} + +void FairMQMessage::CleanUp(void* data, void* hint) +{ + free (data); +} diff --git a/fairmq/FairMQMessage.h b/fairmq/FairMQMessage.h index 7db45665..a071a86d 100644 --- a/fairmq/FairMQMessage.h +++ b/fairmq/FairMQMessage.h @@ -1,28 +1,41 @@ -/* +/** * FairMQMessage.h * - * Created on: Dec 5, 2012 - * Author: dklein + * @since 2012-12-05 + * @author: D. Klein, A. Rybalchenko */ #ifndef FAIRMQMESSAGE_H_ #define FAIRMQMESSAGE_H_ -#include -#include "Rtypes.h" +#include + +#include class FairMQMessage { - private: - zmq::message_t* fMessage; public: FairMQMessage(); - FairMQMessage(void* data_, size_t size_, zmq::free_fn* ffn_, void* hint_ = NULL); + FairMQMessage(size_t size); + FairMQMessage(void* data, size_t size); + + void Rebuild(); + void Rebuild(size_t size); + void Rebuild(void* data, size_t site); + + zmq_msg_t* GetMessage(); + void* GetData(); + size_t GetSize(); + + void Copy(FairMQMessage* msg); + + static void CleanUp(void* data, void* hint); + virtual ~FairMQMessage(); - zmq::message_t* GetMessage(); - Int_t Size(); - Bool_t Copy(FairMQMessage* msg); + + private: + zmq_msg_t fMessage; }; #endif /* FAIRMQMESSAGE_H_ */ diff --git a/fairmq/FairMQProcessor.cxx b/fairmq/FairMQProcessor.cxx index d623253a..a783ad5a 100644 --- a/fairmq/FairMQProcessor.cxx +++ b/fairmq/FairMQProcessor.cxx @@ -1,8 +1,8 @@ -/* +/** * FairMQProcessor.cxx * - * Created on: Dec 6, 2012 - * Author: dklein + * @since 2012-12-06 + * @author D. Klein, A. Rybalchenko */ #include @@ -39,11 +39,6 @@ void FairMQProcessor::Run() boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); - // Initialize poll set - zmq_pollitem_t items[] = { - { *(fPayloadInputs->at(0)->GetSocket()), 0, ZMQ_POLLIN, 0 } - }; - int receivedMsgs = 0; int sentMsgs = 0; @@ -52,12 +47,8 @@ void FairMQProcessor::Run() while ( fState == RUNNING ) { FairMQMessage msg; - zmq_poll(items, 1, 100); - - if (items[0].revents & ZMQ_POLLIN) { - received = fPayloadInputs->at(0)->Receive(&msg); - receivedMsgs++; - } + received = fPayloadInputs->at(0)->Receive(&msg); + receivedMsgs++; if (received) { fTask->Exec(&msg, NULL); diff --git a/fairmq/FairMQProcessor.h b/fairmq/FairMQProcessor.h index 7a839c32..ae793449 100644 --- a/fairmq/FairMQProcessor.h +++ b/fairmq/FairMQProcessor.h @@ -1,8 +1,8 @@ -/* +/** * FairMQProcessor.h * - * Created on: Dec 6, 2012 - * Author: dklein + * @since 2012-12-06 + * @author D. Klein, A. Rybalchenko */ #ifndef FAIRMQPROCESSOR_H_ diff --git a/fairmq/FairMQProcessorTask.cxx b/fairmq/FairMQProcessorTask.cxx index 19462a84..ca0f6e06 100644 --- a/fairmq/FairMQProcessorTask.cxx +++ b/fairmq/FairMQProcessorTask.cxx @@ -1,8 +1,8 @@ -/* +/** * FairMQProcessorTask.cxx * - * Created on: Dec 6, 2012 - * Author: dklein + * @since Dec 6, 2012-12-06 + * @author: D. Klein, A. Rybalchenko */ #include "FairMQProcessorTask.h" @@ -15,8 +15,3 @@ FairMQProcessorTask::FairMQProcessorTask() FairMQProcessorTask::~FairMQProcessorTask() { } - -void FairMQProcessorTask::ClearOutput(void* data, void* hint) -{ - free (data); -} \ No newline at end of file diff --git a/fairmq/FairMQProcessorTask.h b/fairmq/FairMQProcessorTask.h index 0fe1bbdc..97151f75 100644 --- a/fairmq/FairMQProcessorTask.h +++ b/fairmq/FairMQProcessorTask.h @@ -1,8 +1,8 @@ -/* +/** * FairMQProcessorTask.h * - * Created on: Dec 6, 2012 - * Author: dklein + * @since Dec 6, 2012-12-06 + * @author: D. Klein, A. Rybalchenko */ #ifndef FAIRMQPROCESSORTASK_H_ @@ -19,7 +19,6 @@ class FairMQProcessorTask : public FairTask FairMQProcessorTask(); virtual ~FairMQProcessorTask(); virtual void Exec(FairMQMessage* msg, Option_t* opt) = 0; - static void ClearOutput(void* data, void* hint); }; #endif /* FAIRMQPROCESSORTASK_H_ */ diff --git a/fairmq/FairMQProxy.cxx b/fairmq/FairMQProxy.cxx index 4f381580..a3e930d1 100644 --- a/fairmq/FairMQProxy.cxx +++ b/fairmq/FairMQProxy.cxx @@ -1,12 +1,10 @@ -/* +/** * FairMQProxy.cxx * - * Created on: Oct 2, 2013 - * Author: A. Rybalchenko + * @since 2013-10-02 + * @author A. Rybalchenko */ -#include - #include #include @@ -27,15 +25,13 @@ void FairMQProxy::Run() boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); - //TODO: check rateLogger output - int rc = zmq_proxy(*(fPayloadInputs->at(0)->GetSocket()), *(fPayloadOutputs->at(0)->GetSocket()), NULL); - if (rc == -1) { - std::cout << "Error: proxy failed: " << strerror(errno) << std::endl; - } + FairMQMessage msg; - //TODO: make proxy bind on both ends. + while ( fState == RUNNING ) { + fPayloadInputs->at(0)->Receive(&msg); + fPayloadOutputs->at(0)->Send(&msg); + } rateLogger.interrupt(); rateLogger.join(); } - diff --git a/fairmq/FairMQProxy.h b/fairmq/FairMQProxy.h index 854237b1..6f684992 100644 --- a/fairmq/FairMQProxy.h +++ b/fairmq/FairMQProxy.h @@ -1,8 +1,8 @@ -/* +/** * FairMQProxy.h * - * Created on: Oct 2, 2013 - * Author: A. Rybalchenko + * @since 2013-10-02 + * @author A. Rybalchenko */ #ifndef FAIRMQPROXY_H_ diff --git a/fairmq/FairMQSampler.cxx b/fairmq/FairMQSampler.cxx index ab395c17..9b1fd1ee 100644 --- a/fairmq/FairMQSampler.cxx +++ b/fairmq/FairMQSampler.cxx @@ -1,8 +1,8 @@ -/* +/** * FairMQSampler.cpp * - * Created on: Sep 27, 2012 - * Author: A. Rybalchenko, D. Klein + * @since 2012-09-27 + * @author D. Klein, A. Rybalchenko */ #include @@ -135,21 +135,13 @@ void FairMQSampler::ListenToCommands() { FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> ListenToCommands <<<<<<<"); - // Initialize poll set - zmq_pollitem_t items[] = { - { *(fPayloadInputs->at(0)->GetSocket()), 0, ZMQ_POLLIN, 0 } - }; + bool received = false; - Bool_t received = false; while ( true ) { try { FairMQMessage msg; - zmq_poll(items, 1, 100); - - if (items[0].revents & ZMQ_POLLIN) { - received = fPayloadInputs->at(0)->Receive(&msg); - } + received = fPayloadInputs->at(0)->Receive(&msg); if (received) { //command handling goes here. diff --git a/fairmq/FairMQSampler.h b/fairmq/FairMQSampler.h index 18149a4a..53b5084e 100644 --- a/fairmq/FairMQSampler.h +++ b/fairmq/FairMQSampler.h @@ -1,8 +1,8 @@ -/* +/** * FairMQSampler.h * - * Created on: Sep 27, 2012 - * Author: dklein + * @since 2012-09-27 + * @author D. Klein, A. Rybalchenko */ #ifndef FAIRMQSAMPLER_H_ diff --git a/fairmq/FairMQSamplerTask.cxx b/fairmq/FairMQSamplerTask.cxx index b52c4e05..a38ecbaa 100644 --- a/fairmq/FairMQSamplerTask.cxx +++ b/fairmq/FairMQSamplerTask.cxx @@ -1,8 +1,8 @@ -/* +/** * FairMQSamplerTask.cxx * - * Created on: Nov 22, 2012 - * Author: dklein + * @since 2012-11-22 + * @author D. Klein, A. Rybalchenko */ #include "FairMQSamplerTask.h" @@ -48,8 +48,4 @@ FairMQMessage* FairMQSamplerTask::GetOutput() return fOutput; } -void FairMQSamplerTask::ClearOutput(void* data, void* hint) -{ - free (data); -} diff --git a/fairmq/FairMQSamplerTask.h b/fairmq/FairMQSamplerTask.h index f3d347f0..286c9ef7 100644 --- a/fairmq/FairMQSamplerTask.h +++ b/fairmq/FairMQSamplerTask.h @@ -1,8 +1,8 @@ -/* +/** * FairMQSamplerTask.h * - * Created on: Nov 22, 2012 - * Author: dklein + * @since 2012-11-22 + * @author D. Klein, A. Rybalchenko */ #ifndef FAIRMQSAMPLERTASK_H_ @@ -26,7 +26,6 @@ class FairMQSamplerTask: public FairTask virtual void Exec(Option_t* opt) = 0; void SetBranch(TString branch); FairMQMessage* GetOutput(); - static void ClearOutput(void* data, void* hint); protected: TClonesArray* fInput; TString fBranch; diff --git a/fairmq/FairMQSink.cxx b/fairmq/FairMQSink.cxx index 2598a254..3b09b743 100644 --- a/fairmq/FairMQSink.cxx +++ b/fairmq/FairMQSink.cxx @@ -1,12 +1,10 @@ -/* +/** * FairMQSink.cxx * - * Created on: Jan 9, 2013 - * Author: dklein + * @since 2013-01-09 + * @author D. Klein, A. Rybalchenko */ -#include - #include #include @@ -24,19 +22,10 @@ void FairMQSink::Run() boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); - // Initialize poll set - zmq_pollitem_t items[] = { - { *(fPayloadInputs->at(0)->GetSocket()), 0, ZMQ_POLLIN, 0 } - }; - while ( fState == RUNNING ) { FairMQMessage msg; - zmq_poll(items, 1, 100); - - if (items[0].revents & ZMQ_POLLIN) { - fPayloadInputs->at(0)->Receive(&msg); - } + fPayloadInputs->at(0)->Receive(&msg); } rateLogger.interrupt(); diff --git a/fairmq/FairMQSink.h b/fairmq/FairMQSink.h index 73e576fd..21906299 100644 --- a/fairmq/FairMQSink.h +++ b/fairmq/FairMQSink.h @@ -1,20 +1,16 @@ -/* +/** * FairMQSink.h * - * Created on: Jan 9, 2013 - * Author: dklein + * @since 2013-01-09 + * @author D. Klein, A. Rybalchenko */ #ifndef FAIRMQSINK_H_ #define FAIRMQSINK_H_ -#include "Rtypes.h" -#include - #include "FairMQDevice.h" - class FairMQSink: public FairMQDevice { public: diff --git a/fairmq/FairMQSocket.cxx b/fairmq/FairMQSocket.cxx index 402c5737..f15e5bc6 100644 --- a/fairmq/FairMQSocket.cxx +++ b/fairmq/FairMQSocket.cxx @@ -1,8 +1,8 @@ -/* +/** * FairMQSocket.cxx * - * Created on: Dec 5, 2012 - * Author: dklein + * @since 2012-12-05 + * @author D. Klein, A. Rybalchenko */ #include "FairMQSocket.h" @@ -10,10 +10,6 @@ #include "FairMQLogger.h" -const TString FairMQSocket::TCP = "tcp://"; -const TString FairMQSocket::IPC = "ipc://"; -const TString FairMQSocket::INPROC = "inproc://"; - FairMQSocket::FairMQSocket(FairMQContext* context, int type, int num) : fBytesTx(0), fBytesRx(0), @@ -21,18 +17,29 @@ FairMQSocket::FairMQSocket(FairMQContext* context, int type, int num) : fMessagesRx(0) { std::stringstream id; - id << context->GetId() << "." << GetTypeString(type) << "." << num; + id << GetTypeString(type) << "." << num; fId = id.str(); - fSocket = new zmq::socket_t(*context->GetContext(), type); - fSocket->setsockopt(ZMQ_IDENTITY, &fId, fId.Length()); - if (type == ZMQ_SUB) { - fSocket->setsockopt(ZMQ_SUBSCRIBE, NULL, 0); + fSocket = zmq_socket(context->GetContext(), type); + int rc = zmq_setsockopt(fSocket, ZMQ_IDENTITY, &fId, fId.Length()); + if (rc != 0) { + std::stringstream logmsg; + logmsg << "failed setting socket option, reason: " << zmq_strerror(errno); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); } - std::stringstream logmsg; - logmsg << "created socket #" << fId; - FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); + if (type == ZMQ_SUB) { + rc = zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, NULL, 0); + if (rc != 0) { + std::stringstream logmsg2; + logmsg2 << "failed setting socket option, reason: " << zmq_strerror(errno); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str()); + } + } + + std::stringstream logmsg3; + logmsg3 << "created socket #" << fId; + FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg3.str()); } FairMQSocket::~FairMQSocket() @@ -44,106 +51,111 @@ TString FairMQSocket::GetId() return fId; } -TString FairMQSocket::GetTypeString(Int_t type) +TString FairMQSocket::GetTypeString(int type) { switch (type) { - case ZMQ_SUB: - return "sub"; - case ZMQ_PUB: - return "pub"; - case ZMQ_PUSH: - return "push"; - case ZMQ_PULL: - return "pull"; - default: - return ""; + case ZMQ_SUB: + return "sub"; + case ZMQ_PUB: + return "pub"; + case ZMQ_PUSH: + return "push"; + case ZMQ_PULL: + return "pull"; + default: + return ""; } } -Bool_t FairMQSocket::Bind(TString address) +void FairMQSocket::Bind(TString address) { - Bool_t result = true; + std::stringstream logmsg; + logmsg << "bind socket #" << fId << " on " << address; + FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); - try { - if ( address.Length() > 0 /*!address.empty()*/) { - std::stringstream logmsg; - logmsg << "bind socket #" << fId << " on " << address; - FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); - fSocket->bind(address.Data()); - } - } catch (zmq::error_t& e) { + int rc = zmq_bind (fSocket, address); + if (rc != 0) { std::stringstream logmsg2; - logmsg2 << "failed binding socket #" << fId << ", reason: " << e.what(); + logmsg2 << "failed binding socket #" << fId << ", reason: " << zmq_strerror(errno); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str()); - result = false; } - - return result; } -Bool_t FairMQSocket::Connect(TString address) +void FairMQSocket::Connect(TString address) { - Bool_t result = true; + std::stringstream logmsg; + logmsg << "connect socket #" << fId << " on " << address; + FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); - try { - if ( address.Length() > 0 /*!address.empty()*/) { - std::stringstream logmsg; - logmsg << "connect socket #" << fId << " to " << address; - FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); - fSocket->connect(address.Data()); - } - } catch (zmq::error_t& e) { + int rc = zmq_connect (fSocket, address); + if (rc != 0) { std::stringstream logmsg2; - logmsg2 << "failed connecting socket #" << fId << ", reason: " << e.what(); + logmsg2 << "failed connecting socket #" << fId << ", reason: " << zmq_strerror(errno); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str()); - result = false; } - - return result; } -Bool_t FairMQSocket::Send(FairMQMessage* msg) +size_t FairMQSocket::Send(FairMQMessage* msg) { - Bool_t result = false; - - try { - fBytesTx += msg->Size(); + int nbytes = zmq_msg_send (msg->GetMessage(), fSocket, 0); + if (nbytes >= 0){ + fBytesTx += nbytes; ++fMessagesTx; - result = fSocket->send(*msg->GetMessage()); // use send(*msg->GetMessage(), ZMQ_DONTWAIT) for non-blocking call - } catch (zmq::error_t& e) { - std::stringstream logmsg; - logmsg << "failed sending on socket #" << fId << ", reason: " << e.what(); - FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); - result = false; + return nbytes; } - - return result; + if (zmq_errno() == EAGAIN){ + return false; + } + std::stringstream logmsg; + logmsg << "failed sending on socket #" << fId << ", reason: " << zmq_strerror(errno); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); + return nbytes; } -Bool_t FairMQSocket::Receive(FairMQMessage* msg) +size_t FairMQSocket::Receive(FairMQMessage* msg) { - Bool_t result = false; - - try { - result = fSocket->recv(msg->GetMessage()); - fBytesRx += msg->Size(); + int nbytes = zmq_msg_recv (msg->GetMessage(), fSocket, 0); + if (nbytes >= 0){ + fBytesRx += nbytes; ++fMessagesRx; - } catch (zmq::error_t& e) { - std::stringstream logmsg; - logmsg << "failed receiving on socket #" << fId << ", reason: " << e.what(); - FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); - result = false; + return nbytes; } + if (zmq_errno() == EAGAIN){ + return false; + } + std::stringstream logmsg; + logmsg << "failed receiving on socket #" << fId << ", reason: " << zmq_strerror(errno); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); + return nbytes; +} - return result; +void FairMQSocket::SetOption(int option, const void* value, size_t valueSize) +{ + int rc = zmq_setsockopt(fSocket, option, value, valueSize); + if (rc < 0) { + std::stringstream logmsg; + logmsg << "failed setting socket option, reason: " << zmq_strerror(errno); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); + } } void FairMQSocket::Close() { - fSocket->close(); + if (fSocket == NULL){ + return; + } + + int rc = zmq_close (fSocket); + if (rc != 0) { + std::stringstream logmsg; + logmsg << "failed closing socket, reason: " << zmq_strerror(errno); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); + } + + fSocket = NULL; } -zmq::socket_t* FairMQSocket::GetSocket() +void* FairMQSocket::GetSocket() { return fSocket; } diff --git a/fairmq/FairMQSocket.h b/fairmq/FairMQSocket.h index a5875b73..71d8da38 100644 --- a/fairmq/FairMQSocket.h +++ b/fairmq/FairMQSocket.h @@ -1,14 +1,14 @@ -/* +/** * FairMQSocket.h * - * Created on: Dec 5, 2012 - * Author: dklein + * @since 2012-12-05 + * @author D. Klein, A. Rybalchenko */ #ifndef FAIRMQSOCKET_H_ #define FAIRMQSOCKET_H_ -#include +#include #include #include "FairMQContext.h" #include "FairMQMessage.h" @@ -18,30 +18,32 @@ class FairMQSocket { - private: - zmq::socket_t* fSocket; - TString fId; - ULong_t fBytesTx; - ULong_t fBytesRx; - ULong_t fMessagesTx; - ULong_t fMessagesRx; public: - const static TString TCP, IPC, INPROC; - FairMQSocket(FairMQContext* context, Int_t type, Int_t num); + FairMQSocket(FairMQContext* context, int type, int num); virtual ~FairMQSocket(); TString GetId(); - static TString GetTypeString(Int_t type); - Bool_t Send(FairMQMessage* msg); - Bool_t Receive(FairMQMessage* msg); + static TString GetTypeString(int type); + size_t Send(FairMQMessage* msg); + size_t Receive(FairMQMessage* msg); void Close(); - Bool_t Bind(TString address); - Bool_t Connect(TString address); - zmq::socket_t* GetSocket(); + void Bind(TString address); + void Connect(TString address); + void* GetSocket(); + + void SetOption(int option, const void* value, size_t valueSize); + ULong_t GetBytesTx(); ULong_t GetBytesRx(); ULong_t GetMessagesTx(); ULong_t GetMessagesRx(); + private: + void* fSocket; + TString fId; + ULong_t fBytesTx; + ULong_t fBytesRx; + ULong_t fMessagesTx; + ULong_t fMessagesRx; }; #endif /* FAIRMQSOCKET_H_ */ diff --git a/fairmq/FairMQSplitter.cxx b/fairmq/FairMQSplitter.cxx index 667e69cf..e6cd32fb 100644 --- a/fairmq/FairMQSplitter.cxx +++ b/fairmq/FairMQSplitter.cxx @@ -1,46 +1,38 @@ -/* - * FairMQBalancedStandaloneSplitter.cxx +/** + * FairMQSplitter.cxx * - * Created on: Dec 6, 2012 - * Author: dklein + * @since 2012-12-06 + * @author D. Klein, A. Rybalchenko */ #include #include #include "FairMQLogger.h" -#include "FairMQBalancedStandaloneSplitter.h" +#include "FairMQSplitter.h" -FairMQBalancedStandaloneSplitter::FairMQBalancedStandaloneSplitter() + +FairMQSplitter::FairMQSplitter() { } -FairMQBalancedStandaloneSplitter::~FairMQBalancedStandaloneSplitter() +FairMQSplitter::~FairMQSplitter() { } -void FairMQBalancedStandaloneSplitter::Run() +void FairMQSplitter::Run() { FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Run <<<<<<<"); boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); - // Initialize poll set - zmq_pollitem_t items[] = { - { *(fPayloadInputs->at(0)->GetSocket()), 0, ZMQ_POLLIN, 0 } - }; - - Bool_t received = false; - Int_t direction = 0; + bool received = false; + int direction = 0; while ( fState == RUNNING ) { FairMQMessage msg; - zmq_poll(items, 1, 100); - - if (items[0].revents & ZMQ_POLLIN) { - received = fPayloadInputs->at(0)->Receive(&msg); - } + received = fPayloadInputs->at(0)->Receive(&msg); if (received) { fPayloadOutputs->at(direction)->Send(&msg); @@ -49,7 +41,7 @@ void FairMQBalancedStandaloneSplitter::Run() direction = 0; } received = false; - }//if received + } } rateLogger.interrupt(); diff --git a/fairmq/FairMQSplitter.h b/fairmq/FairMQSplitter.h index 7ec35530..dce5da34 100644 --- a/fairmq/FairMQSplitter.h +++ b/fairmq/FairMQSplitter.h @@ -1,24 +1,23 @@ -/* - * FairMQBalancedStandaloneSplitter.h +/** + * FairMQSplitter.h * - * Created on: Dec 6, 2012 - * Author: dklein + * @since 2012-12-06 + * @author D. Klein, A. Rybalchenko */ -#ifndef FAIRMQBALANCEDSTANDALONESPLITTER_H_ -#define FAIRMQBALANCEDSTANDALONESPLITTER_H_ +#ifndef FAIRMQSPLITTER_H_ +#define FAIRMQSPLITTER_H_ #include "FairMQDevice.h" -#include "Rtypes.h" -class FairMQBalancedStandaloneSplitter: public FairMQDevice +class FairMQSplitter: public FairMQDevice { public: - FairMQBalancedStandaloneSplitter(); - virtual ~FairMQBalancedStandaloneSplitter(); + FairMQSplitter(); + virtual ~FairMQSplitter(); protected: virtual void Run(); }; -#endif /* FAIRMQBALANCEDSTANDALONESPLITTER_H_ */ +#endif /* FAIRMQSPLITTER_H_ */ diff --git a/fairmq/FairMQStateMachine.cxx b/fairmq/FairMQStateMachine.cxx index d4d59c2c..2c8f4460 100644 --- a/fairmq/FairMQStateMachine.cxx +++ b/fairmq/FairMQStateMachine.cxx @@ -1,11 +1,10 @@ -/* +/** * FairMQStateMachine.cxx * - * Created on: Oct 25, 2012 - * Author: dklein + * @since 2012-10-25 + * @author D. Klein, A. Rybalchenko */ -#include #include #include diff --git a/fairmq/FairMQStateMachine.h b/fairmq/FairMQStateMachine.h index a19cbc04..2cd015be 100644 --- a/fairmq/FairMQStateMachine.h +++ b/fairmq/FairMQStateMachine.h @@ -1,12 +1,10 @@ -/* +/** * FairMQStateMachine.h * - * Created on: Oct 25, 2012 - * Author: dklein + * @since 2012-10-25 + * @author D. Klein, A. Rybalchenko */ - - #ifndef FAIRMQSTATEMACHINE_H_ #define FAIRMQSTATEMACHINE_H_ diff --git a/fairmq/runBenchmarkSampler.cxx b/fairmq/runBenchmarkSampler.cxx index 80418b6c..bbcce1a1 100644 --- a/fairmq/runBenchmarkSampler.cxx +++ b/fairmq/runBenchmarkSampler.cxx @@ -1,8 +1,8 @@ -/* +/** * runBenchmarkSampler.cxx * - * Created on: Apr 23, 2013 - * Author: dklein + * @since Apr 23, 2013-04-23 + * @author D. Klein, A. Rybalchenko */ #include diff --git a/fairmq/runBuffer.cxx b/fairmq/runBuffer.cxx index 5fa8b6dc..e75a0b08 100644 --- a/fairmq/runBuffer.cxx +++ b/fairmq/runBuffer.cxx @@ -1,8 +1,8 @@ /** * runBuffer.cxx * - * @since Oct 26, 2012 - * @authors: D. Klein, A. Rybalchenko + * @since 2012-10-26 + * @author: D. Klein, A. Rybalchenko */ #include diff --git a/fairmq/runMerger.cxx b/fairmq/runMerger.cxx index 1e8af158..9f44ae6a 100644 --- a/fairmq/runMerger.cxx +++ b/fairmq/runMerger.cxx @@ -1,25 +1,25 @@ -/* +/** * runMerger.cxx * - * Created on: Dec 6, 2012 - * Author: dklein + * @since 2012-12-06 + * @author D. Klein, A. Rybalchenko */ #include #include #include "FairMQLogger.h" -#include "FairMQStandaloneMerger.h" +#include "FairMQMerger.h" -FairMQStandaloneMerger merger; +FairMQMerger merger; static void s_signal_handler (int signal) { std::cout << std::endl << "Caught signal " << signal << std::endl; - merger.ChangeState(FairMQStandaloneMerger::STOP); - merger.ChangeState(FairMQStandaloneMerger::END); + merger.ChangeState(FairMQMerger::STOP); + merger.ChangeState(FairMQMerger::END); std::cout << "Shutdown complete. Bye!" << std::endl; exit(1); @@ -53,76 +53,76 @@ int main(int argc, char** argv) int i = 1; - merger.SetProperty(FairMQStandaloneMerger::Id, argv[i]); + merger.SetProperty(FairMQMerger::Id, argv[i]); ++i; int numIoThreads; std::stringstream(argv[i]) >> numIoThreads; - merger.SetProperty(FairMQStandaloneMerger::NumIoThreads, numIoThreads); + merger.SetProperty(FairMQMerger::NumIoThreads, numIoThreads); ++i; - merger.SetProperty(FairMQStandaloneMerger::NumInputs, 2); - merger.SetProperty(FairMQStandaloneMerger::NumOutputs, 1); + merger.SetProperty(FairMQMerger::NumInputs, 2); + merger.SetProperty(FairMQMerger::NumOutputs, 1); - merger.ChangeState(FairMQStandaloneMerger::INIT); + merger.ChangeState(FairMQMerger::INIT); int inputSocketType = ZMQ_SUB; if (strcmp(argv[i], "pull") == 0) { inputSocketType = ZMQ_PULL; } - merger.SetProperty(FairMQStandaloneMerger::InputSocketType, inputSocketType, 0); + merger.SetProperty(FairMQMerger::InputSocketType, inputSocketType, 0); ++i; int inputRcvBufSize; std::stringstream(argv[i]) >> inputRcvBufSize; - merger.SetProperty(FairMQStandaloneMerger::InputRcvBufSize, inputRcvBufSize, 0); + merger.SetProperty(FairMQMerger::InputRcvBufSize, inputRcvBufSize, 0); ++i; - merger.SetProperty(FairMQStandaloneMerger::InputMethod, argv[i], 0); + merger.SetProperty(FairMQMerger::InputMethod, argv[i], 0); ++i; - merger.SetProperty(FairMQStandaloneMerger::InputAddress, argv[i], 0); + merger.SetProperty(FairMQMerger::InputAddress, argv[i], 0); ++i; inputSocketType = ZMQ_SUB; if (strcmp(argv[i], "pull") == 0) { inputSocketType = ZMQ_PULL; } - merger.SetProperty(FairMQStandaloneMerger::InputSocketType, inputSocketType, 1); + merger.SetProperty(FairMQMerger::InputSocketType, inputSocketType, 1); ++i; std::stringstream(argv[i]) >> inputRcvBufSize; - merger.SetProperty(FairMQStandaloneMerger::InputRcvBufSize, inputRcvBufSize, 1); + merger.SetProperty(FairMQMerger::InputRcvBufSize, inputRcvBufSize, 1); ++i; - merger.SetProperty(FairMQStandaloneMerger::InputMethod, argv[i], 1); + merger.SetProperty(FairMQMerger::InputMethod, argv[i], 1); ++i; - merger.SetProperty(FairMQStandaloneMerger::InputAddress, argv[i], 1); + merger.SetProperty(FairMQMerger::InputAddress, argv[i], 1); ++i; int outputSocketType = ZMQ_PUB; if (strcmp(argv[i], "push") == 0) { outputSocketType = ZMQ_PUSH; } - merger.SetProperty(FairMQStandaloneMerger::OutputSocketType, outputSocketType, 0); + merger.SetProperty(FairMQMerger::OutputSocketType, outputSocketType, 0); ++i; int outputSndBufSize; std::stringstream(argv[i]) >> outputSndBufSize; - merger.SetProperty(FairMQStandaloneMerger::OutputSndBufSize, outputSndBufSize, 0); + merger.SetProperty(FairMQMerger::OutputSndBufSize, outputSndBufSize, 0); ++i; - merger.SetProperty(FairMQStandaloneMerger::OutputMethod, argv[i], 0); + merger.SetProperty(FairMQMerger::OutputMethod, argv[i], 0); ++i; - merger.SetProperty(FairMQStandaloneMerger::OutputAddress, argv[i], 0); + merger.SetProperty(FairMQMerger::OutputAddress, argv[i], 0); ++i; - merger.ChangeState(FairMQStandaloneMerger::SETOUTPUT); - merger.ChangeState(FairMQStandaloneMerger::SETINPUT); - merger.ChangeState(FairMQStandaloneMerger::RUN); + merger.ChangeState(FairMQMerger::SETOUTPUT); + merger.ChangeState(FairMQMerger::SETINPUT); + merger.ChangeState(FairMQMerger::RUN); char ch; std::cin.get(ch); - merger.ChangeState(FairMQStandaloneMerger::STOP); - merger.ChangeState(FairMQStandaloneMerger::END); + merger.ChangeState(FairMQMerger::STOP); + merger.ChangeState(FairMQMerger::END); return 0; } diff --git a/fairmq/runNToOneMerger.cxx b/fairmq/runNToOneMerger.cxx index 96a7dcaf..6e89b39d 100644 --- a/fairmq/runNToOneMerger.cxx +++ b/fairmq/runNToOneMerger.cxx @@ -1,25 +1,25 @@ -/* - * runMerger.cxx +/** + * runNToOneMerger.cxx * - * Created on: Dec 6, 2012 - * Author: dklein + * @since 2012-12-06 + * @author D. Klein, A. Rybalchenko */ #include #include #include "FairMQLogger.h" -#include "FairMQStandaloneMerger.h" +#include "FairMQMerger.h" -FairMQStandaloneMerger merger; +FairMQMerger merger; static void s_signal_handler (int signal) { std::cout << std::endl << "Caught signal " << signal << std::endl; - merger.ChangeState(FairMQStandaloneMerger::STOP); - merger.ChangeState(FairMQStandaloneMerger::END); + merger.ChangeState(FairMQMerger::STOP); + merger.ChangeState(FairMQMerger::END); std::cout << "Shutdown complete. Bye!" << std::endl; exit(1); @@ -55,23 +55,23 @@ int main(int argc, char** argv) int i = 1; - merger.SetProperty(FairMQStandaloneMerger::Id, argv[i]); + merger.SetProperty(FairMQMerger::Id, argv[i]); ++i; int numIoThreads; std::stringstream(argv[i]) >> numIoThreads; - merger.SetProperty(FairMQStandaloneMerger::NumIoThreads, numIoThreads); + merger.SetProperty(FairMQMerger::NumIoThreads, numIoThreads); ++i; int numInputs; std::stringstream(argv[i]) >> numInputs; - merger.SetProperty(FairMQStandaloneMerger::NumInputs, numInputs); + merger.SetProperty(FairMQMerger::NumInputs, numInputs); ++i; - merger.SetProperty(FairMQStandaloneMerger::NumOutputs, 1); + merger.SetProperty(FairMQMerger::NumOutputs, 1); - merger.ChangeState(FairMQStandaloneMerger::INIT); + merger.ChangeState(FairMQMerger::INIT); int inputSocketType; @@ -80,15 +80,15 @@ int main(int argc, char** argv) if (strcmp(argv[i], "pull") == 0) { inputSocketType = ZMQ_PULL; } - merger.SetProperty(FairMQStandaloneMerger::InputSocketType, inputSocketType, iInput); + merger.SetProperty(FairMQMerger::InputSocketType, inputSocketType, iInput); ++i; int inputRcvBufSize; std::stringstream(argv[i]) >> inputRcvBufSize; - merger.SetProperty(FairMQStandaloneMerger::InputRcvBufSize, inputRcvBufSize, iInput); + merger.SetProperty(FairMQMerger::InputRcvBufSize, inputRcvBufSize, iInput); ++i; - merger.SetProperty(FairMQStandaloneMerger::InputMethod, argv[i], iInput); + merger.SetProperty(FairMQMerger::InputMethod, argv[i], iInput); ++i; - merger.SetProperty(FairMQStandaloneMerger::InputAddress, argv[i], iInput); + merger.SetProperty(FairMQMerger::InputAddress, argv[i], iInput); ++i; } @@ -96,28 +96,28 @@ int main(int argc, char** argv) if (strcmp(argv[i], "push") == 0) { outputSocketType = ZMQ_PUSH; } - merger.SetProperty(FairMQStandaloneMerger::OutputSocketType, outputSocketType, 0); + merger.SetProperty(FairMQMerger::OutputSocketType, outputSocketType, 0); ++i; int outputSndBufSize; std::stringstream(argv[i]) >> outputSndBufSize; - merger.SetProperty(FairMQStandaloneMerger::OutputSndBufSize, outputSndBufSize, 0); + merger.SetProperty(FairMQMerger::OutputSndBufSize, outputSndBufSize, 0); ++i; - merger.SetProperty(FairMQStandaloneMerger::OutputMethod, argv[i], 0); + merger.SetProperty(FairMQMerger::OutputMethod, argv[i], 0); ++i; - merger.SetProperty(FairMQStandaloneMerger::OutputAddress, argv[i], 0); + merger.SetProperty(FairMQMerger::OutputAddress, argv[i], 0); ++i; - merger.ChangeState(FairMQStandaloneMerger::SETOUTPUT); - merger.ChangeState(FairMQStandaloneMerger::SETINPUT); - merger.ChangeState(FairMQStandaloneMerger::RUN); + merger.ChangeState(FairMQMerger::SETOUTPUT); + merger.ChangeState(FairMQMerger::SETINPUT); + merger.ChangeState(FairMQMerger::RUN); char ch; std::cin.get(ch); - merger.ChangeState(FairMQStandaloneMerger::STOP); - merger.ChangeState(FairMQStandaloneMerger::END); + merger.ChangeState(FairMQMerger::STOP); + merger.ChangeState(FairMQMerger::END); return 0; } diff --git a/fairmq/runOneToNSplitter.cxx b/fairmq/runOneToNSplitter.cxx index 0b39d825..ba54f533 100644 --- a/fairmq/runOneToNSplitter.cxx +++ b/fairmq/runOneToNSplitter.cxx @@ -1,25 +1,25 @@ -/* - * runSplitter.cxx +/** + * runOneToNSplitter.cxx * - * Created on: Dec 6, 2012 - * Author: dklein + * @since 2012-12-06 + * @author D. Klein, A. Rybalchenko */ #include #include #include "FairMQLogger.h" -#include "FairMQBalancedStandaloneSplitter.h" +#include "FairMQSplitter.h" -FairMQBalancedStandaloneSplitter splitter; +FairMQSplitter splitter; static void s_signal_handler (int signal) { std::cout << std::endl << "Caught signal " << signal << std::endl; - splitter.ChangeState(FairMQBalancedStandaloneSplitter::STOP); - splitter.ChangeState(FairMQBalancedStandaloneSplitter::END); + splitter.ChangeState(FairMQSplitter::STOP); + splitter.ChangeState(FairMQSplitter::END); std::cout << "Shutdown complete. Bye!" << std::endl; exit(1); @@ -37,7 +37,7 @@ static void s_catch_signals (void) int main(int argc, char** argv) { - if ( argc < 16 || (argc-8)%4!=0 ) { //argc{name,id,threads,nout,insock,inbuff,inmet,inadd, ... out} + if ( argc < 16 || (argc - 8) % 4 != 0 ) { // argc{ name, id, threads, nout, insock, inbuff, inmet, inadd, ... out} std::cout << "Usage: splitter \tID numIoTreads numOutputs\n" << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" @@ -54,38 +54,38 @@ int main(int argc, char** argv) int i = 1; - splitter.SetProperty(FairMQBalancedStandaloneSplitter::Id, argv[i]); + splitter.SetProperty(FairMQSplitter::Id, argv[i]); ++i; int numIoThreads; std::stringstream(argv[i]) >> numIoThreads; - splitter.SetProperty(FairMQBalancedStandaloneSplitter::NumIoThreads, numIoThreads); + splitter.SetProperty(FairMQSplitter::NumIoThreads, numIoThreads); ++i; - splitter.SetProperty(FairMQBalancedStandaloneSplitter::NumInputs, 1); + splitter.SetProperty(FairMQSplitter::NumInputs, 1); int numOutputs; std::stringstream(argv[i]) >> numOutputs; - splitter.SetProperty(FairMQBalancedStandaloneSplitter::NumOutputs, numOutputs); + splitter.SetProperty(FairMQSplitter::NumOutputs, numOutputs); ++i; - splitter.ChangeState(FairMQBalancedStandaloneSplitter::INIT); + splitter.ChangeState(FairMQSplitter::INIT); int inputSocketType = ZMQ_SUB; if (strcmp(argv[i], "pull") == 0) { inputSocketType = ZMQ_PULL; } - splitter.SetProperty(FairMQBalancedStandaloneSplitter::InputSocketType, inputSocketType, 0); + splitter.SetProperty(FairMQSplitter::InputSocketType, inputSocketType, 0); ++i; int inputRcvBufSize; std::stringstream(argv[i]) >> inputRcvBufSize; - splitter.SetProperty(FairMQBalancedStandaloneSplitter::InputRcvBufSize, inputRcvBufSize, 0); + splitter.SetProperty(FairMQSplitter::InputRcvBufSize, inputRcvBufSize, 0); ++i; - splitter.SetProperty(FairMQBalancedStandaloneSplitter::InputMethod, argv[i], 0); + splitter.SetProperty(FairMQSplitter::InputMethod, argv[i], 0); ++i; - splitter.SetProperty(FairMQBalancedStandaloneSplitter::InputAddress, argv[i], 0); + splitter.SetProperty(FairMQSplitter::InputAddress, argv[i], 0); ++i; int outputSocketType; @@ -95,27 +95,27 @@ int main(int argc, char** argv) if (strcmp(argv[i], "push") == 0) { outputSocketType = ZMQ_PUSH; } - splitter.SetProperty(FairMQBalancedStandaloneSplitter::OutputSocketType, outputSocketType, iOutput); + splitter.SetProperty(FairMQSplitter::OutputSocketType, outputSocketType, iOutput); ++i; std::stringstream(argv[i]) >> outputSndBufSize; - splitter.SetProperty(FairMQBalancedStandaloneSplitter::OutputSndBufSize, outputSndBufSize, iOutput); + splitter.SetProperty(FairMQSplitter::OutputSndBufSize, outputSndBufSize, iOutput); ++i; - splitter.SetProperty(FairMQBalancedStandaloneSplitter::OutputMethod, argv[i], iOutput); + splitter.SetProperty(FairMQSplitter::OutputMethod, argv[i], iOutput); ++i; - splitter.SetProperty(FairMQBalancedStandaloneSplitter::OutputAddress, argv[i], iOutput); + splitter.SetProperty(FairMQSplitter::OutputAddress, argv[i], iOutput); ++i; } - splitter.ChangeState(FairMQBalancedStandaloneSplitter::SETOUTPUT); - splitter.ChangeState(FairMQBalancedStandaloneSplitter::SETINPUT); - splitter.ChangeState(FairMQBalancedStandaloneSplitter::RUN); + splitter.ChangeState(FairMQSplitter::SETOUTPUT); + splitter.ChangeState(FairMQSplitter::SETINPUT); + splitter.ChangeState(FairMQSplitter::RUN); char ch; std::cin.get(ch); - splitter.ChangeState(FairMQBalancedStandaloneSplitter::STOP); - splitter.ChangeState(FairMQBalancedStandaloneSplitter::END); + splitter.ChangeState(FairMQSplitter::STOP); + splitter.ChangeState(FairMQSplitter::END); return 0; } diff --git a/fairmq/runProxy.cxx b/fairmq/runProxy.cxx index 2032bb5b..ef814c66 100644 --- a/fairmq/runProxy.cxx +++ b/fairmq/runProxy.cxx @@ -1,8 +1,8 @@ /** * runProxy.cxx * - * @since: Oct 07, 2013 - * @authors: A. Rybalchenko + * @since 2013-10-07 + * @author A. Rybalchenko */ #include diff --git a/fairmq/runSink.cxx b/fairmq/runSink.cxx index f6434a85..55d2b444 100644 --- a/fairmq/runSink.cxx +++ b/fairmq/runSink.cxx @@ -1,8 +1,8 @@ -/* +/** * runSink.cxx * - * @since: Jan 21, 2013 - * @author: dklein + * @since 2013-01-21 + * @author D. Klein, A. Rybalchenko */ #include diff --git a/fairmq/runSplitter.cxx b/fairmq/runSplitter.cxx index eceb40df..858c553c 100644 --- a/fairmq/runSplitter.cxx +++ b/fairmq/runSplitter.cxx @@ -1,25 +1,25 @@ -/* +/** * runSplitter.cxx * - * Created on: Dec 6, 2012 - * Author: dklein + * @since 2012-12-06 + * @author D. Klein, A. Rybalchenko */ #include #include #include "FairMQLogger.h" -#include "FairMQBalancedStandaloneSplitter.h" +#include "FairMQSplitter.h" -FairMQBalancedStandaloneSplitter splitter; +FairMQSplitter splitter; static void s_signal_handler (int signal) { std::cout << std::endl << "Caught signal " << signal << std::endl; - splitter.ChangeState(FairMQBalancedStandaloneSplitter::STOP); - splitter.ChangeState(FairMQBalancedStandaloneSplitter::END); + splitter.ChangeState(FairMQSplitter::STOP); + splitter.ChangeState(FairMQSplitter::END); std::cout << "Shutdown complete. Bye!" << std::endl; exit(1); @@ -53,76 +53,76 @@ int main(int argc, char** argv) int i = 1; - splitter.SetProperty(FairMQBalancedStandaloneSplitter::Id, argv[i]); + splitter.SetProperty(FairMQSplitter::Id, argv[i]); ++i; int numIoThreads; std::stringstream(argv[i]) >> numIoThreads; - splitter.SetProperty(FairMQBalancedStandaloneSplitter::NumIoThreads, numIoThreads); + splitter.SetProperty(FairMQSplitter::NumIoThreads, numIoThreads); ++i; - splitter.SetProperty(FairMQBalancedStandaloneSplitter::NumInputs, 1); - splitter.SetProperty(FairMQBalancedStandaloneSplitter::NumOutputs, 2); + splitter.SetProperty(FairMQSplitter::NumInputs, 1); + splitter.SetProperty(FairMQSplitter::NumOutputs, 2); - splitter.ChangeState(FairMQBalancedStandaloneSplitter::INIT); + splitter.ChangeState(FairMQSplitter::INIT); int inputSocketType = ZMQ_SUB; if (strcmp(argv[i], "pull") == 0) { inputSocketType = ZMQ_PULL; } - splitter.SetProperty(FairMQBalancedStandaloneSplitter::InputSocketType, inputSocketType, 0); + splitter.SetProperty(FairMQSplitter::InputSocketType, inputSocketType, 0); ++i; int inputRcvBufSize; std::stringstream(argv[i]) >> inputRcvBufSize; - splitter.SetProperty(FairMQBalancedStandaloneSplitter::InputRcvBufSize, inputRcvBufSize, 0); + splitter.SetProperty(FairMQSplitter::InputRcvBufSize, inputRcvBufSize, 0); ++i; - splitter.SetProperty(FairMQBalancedStandaloneSplitter::InputMethod, argv[i], 0); + splitter.SetProperty(FairMQSplitter::InputMethod, argv[i], 0); ++i; - splitter.SetProperty(FairMQBalancedStandaloneSplitter::InputAddress, argv[i], 0); + splitter.SetProperty(FairMQSplitter::InputAddress, argv[i], 0); ++i; int outputSocketType = ZMQ_PUB; if (strcmp(argv[i], "push") == 0) { outputSocketType = ZMQ_PUSH; } - splitter.SetProperty(FairMQBalancedStandaloneSplitter::OutputSocketType, outputSocketType, 0); + splitter.SetProperty(FairMQSplitter::OutputSocketType, outputSocketType, 0); ++i; int outputSndBufSize; std::stringstream(argv[i]) >> outputSndBufSize; - splitter.SetProperty(FairMQBalancedStandaloneSplitter::OutputSndBufSize, outputSndBufSize, 0); + splitter.SetProperty(FairMQSplitter::OutputSndBufSize, outputSndBufSize, 0); ++i; - splitter.SetProperty(FairMQBalancedStandaloneSplitter::OutputMethod, argv[i], 0); + splitter.SetProperty(FairMQSplitter::OutputMethod, argv[i], 0); ++i; - splitter.SetProperty(FairMQBalancedStandaloneSplitter::OutputAddress, argv[i], 0); + splitter.SetProperty(FairMQSplitter::OutputAddress, argv[i], 0); ++i; outputSocketType = ZMQ_PUB; if (strcmp(argv[i], "push") == 0) { outputSocketType = ZMQ_PUSH; } - splitter.SetProperty(FairMQBalancedStandaloneSplitter::OutputSocketType, outputSocketType, 1); + splitter.SetProperty(FairMQSplitter::OutputSocketType, outputSocketType, 1); ++i; std::stringstream(argv[i]) >> outputSndBufSize; - splitter.SetProperty(FairMQBalancedStandaloneSplitter::OutputSndBufSize, outputSndBufSize, 1); + splitter.SetProperty(FairMQSplitter::OutputSndBufSize, outputSndBufSize, 1); ++i; - splitter.SetProperty(FairMQBalancedStandaloneSplitter::OutputMethod, argv[i], 1); + splitter.SetProperty(FairMQSplitter::OutputMethod, argv[i], 1); ++i; - splitter.SetProperty(FairMQBalancedStandaloneSplitter::OutputAddress, argv[i], 1); + splitter.SetProperty(FairMQSplitter::OutputAddress, argv[i], 1); ++i; - splitter.ChangeState(FairMQBalancedStandaloneSplitter::SETOUTPUT); - splitter.ChangeState(FairMQBalancedStandaloneSplitter::SETINPUT); - splitter.ChangeState(FairMQBalancedStandaloneSplitter::RUN); + splitter.ChangeState(FairMQSplitter::SETOUTPUT); + splitter.ChangeState(FairMQSplitter::SETINPUT); + splitter.ChangeState(FairMQSplitter::RUN); char ch; std::cin.get(ch); - splitter.ChangeState(FairMQBalancedStandaloneSplitter::STOP); - splitter.ChangeState(FairMQBalancedStandaloneSplitter::END); + splitter.ChangeState(FairMQSplitter::STOP); + splitter.ChangeState(FairMQSplitter::END); return 0; }