a little clean up

This commit is contained in:
Alexey Rybalchenko 2014-01-17 12:34:57 +01:00
parent 31d10170f7
commit 3803a3d155
43 changed files with 525 additions and 513 deletions

View File

@ -2,6 +2,8 @@ include_directories(
${BASE_INCLUDE_DIRECTORIES} ${BASE_INCLUDE_DIRECTORIES}
${CMAKE_SOURCE_DIR}/fairmq ${CMAKE_SOURCE_DIR}/fairmq
${ZMQ_INCLUDE_DIR} ${ZMQ_INCLUDE_DIR}
${NANOMSG_INCLUDE_DIR}
${Boost_INCLUDE_DIR}
${ROOT_INCLUDE_DIR} ${ROOT_INCLUDE_DIR}
) )
@ -15,9 +17,13 @@ Set(SRCS
"FairMQLogger.cxx" "FairMQLogger.cxx"
"FairMQContext.cxx" "FairMQContext.cxx"
"FairMQMessage.cxx" "FairMQMessage.cxx"
"FairMQMessageZMQ.cxx"
"FairMQMessageNN.cxx"
"FairMQSocket.cxx" "FairMQSocket.cxx"
"FairMQBalancedStandaloneSplitter.cxx" "FairMQSocketZMQ.cxx"
"FairMQStandaloneMerger.cxx" "FairMQSocketNN.cxx"
"FairMQSplitter.cxx"
"FairMQMerger.cxx"
"FairMQProcessor.cxx" "FairMQProcessor.cxx"
"FairMQProcessorTask.cxx" "FairMQProcessorTask.cxx"
"FairMQSink.cxx" "FairMQSink.cxx"
@ -37,6 +43,7 @@ Set(LINKDEF)
Set(DEPENDENCIES Set(DEPENDENCIES
${CMAKE_THREAD_LIBS_INIT} ${CMAKE_THREAD_LIBS_INIT}
${ZMQ_LIBRARY_SHARED} ${ZMQ_LIBRARY_SHARED}
${NANOMSG_LIBRARY_SHARED}
Base ParBase FairTools GeoBase boost_thread boost_timer boost_system Base ParBase FairTools GeoBase boost_thread boost_timer boost_system
) )

View File

@ -1,9 +1,10 @@
/* /**
* FairMQBenchmarkSampler.cpp * FairMQBenchmarkSampler.cpp
* *
* Created on: Apr 23, 2013 * @since 2013-04-23
* Author: dklein * @author D. Klein, A. Rybalchenko
*/ */
#include <vector> #include <vector>
#include <boost/thread.hpp> #include <boost/thread.hpp>
@ -12,6 +13,7 @@
#include "FairMQBenchmarkSampler.h" #include "FairMQBenchmarkSampler.h"
#include "FairMQLogger.h" #include "FairMQLogger.h"
FairMQBenchmarkSampler::FairMQBenchmarkSampler() : FairMQBenchmarkSampler::FairMQBenchmarkSampler() :
fEventSize(10000), fEventSize(10000),
fEventRate(1), fEventRate(1),
@ -37,7 +39,7 @@ void FairMQBenchmarkSampler::Run()
boost::thread resetEventCounter(boost::bind(&FairMQBenchmarkSampler::ResetEventCounter, this)); boost::thread resetEventCounter(boost::bind(&FairMQBenchmarkSampler::ResetEventCounter, this));
void* buffer = operator new[](fEventSize); void* buffer = operator new[](fEventSize);
FairMQMessage* base_event = new FairMQMessage(buffer, fEventSize, NULL); FairMQMessage* base_event = new FairMQMessage(buffer, fEventSize);
while ( fState == RUNNING ) { while ( fState == RUNNING ) {
FairMQMessage event; FairMQMessage event;

View File

@ -1,14 +1,15 @@
/* /**
* FairMQBenchmarkSampler.h * FairMQBenchmarkSampler.h
* *
* Created on: Apr 23, 2013 * @since 2013-04-23
* Author: dklein * @author D. Klein, A. Rybalchenko
*/ */
#ifndef FAIRMQBENCHMARKSAMPLER_H_ #ifndef FAIRMQBENCHMARKSAMPLER_H_
#define FAIRMQBENCHMARKSAMPLER_H_ #define FAIRMQBENCHMARKSAMPLER_H_
#include <string> #include <string>
#include "FairMQDevice.h" #include "FairMQDevice.h"
#include "TString.h" #include "TString.h"

View File

@ -1,8 +1,8 @@
/* /**
* FairMQBuffer.cxx * FairMQBuffer.cxx
* *
* Created on: Oct 25, 2012 * @since 2012-10-25
* Author: dklein * @author D. Klein, A. Rybalchenko
*/ */
#include <iostream> #include <iostream>
@ -13,6 +13,7 @@
#include "FairMQBuffer.h" #include "FairMQBuffer.h"
#include "FairMQLogger.h" #include "FairMQLogger.h"
FairMQBuffer::FairMQBuffer() FairMQBuffer::FairMQBuffer()
{ {
} }
@ -23,20 +24,11 @@ void FairMQBuffer::Run()
boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this));
// Initialize poll set bool received = false;
zmq_pollitem_t items[] = {
{ *(fPayloadInputs->at(0)->GetSocket()), 0, ZMQ_POLLIN, 0 }
};
Bool_t received = false;
while ( fState == RUNNING ) { while ( fState == RUNNING ) {
FairMQMessage msg; FairMQMessage msg;
zmq_poll(items, 1, 100);
if (items[0].revents & ZMQ_POLLIN) {
received = fPayloadInputs->at(0)->Receive(&msg); received = fPayloadInputs->at(0)->Receive(&msg);
}
if (received) { if (received) {
fPayloadOutputs->at(0)->Send(&msg); fPayloadOutputs->at(0)->Send(&msg);

View File

@ -1,15 +1,14 @@
/* /**
* FairMQBuffer.h * FairMQBuffer.h
* *
* Created on: Oct 25, 2012 * @since 2012-10-25
* Author: dklein * @author D. Klein, A. Rybalchenko
*/ */
#ifndef FAIRMQBUFFER_H_ #ifndef FAIRMQBUFFER_H_
#define FAIRMQBUFFER_H_ #define FAIRMQBUFFER_H_
#include "FairMQDevice.h" #include "FairMQDevice.h"
#include "Rtypes.h"
class FairMQBuffer: public FairMQDevice class FairMQBuffer: public FairMQDevice

View File

@ -1,8 +1,8 @@
/* /**
* FairMQConfigurable.cxx * FairMQConfigurable.cxx
* *
* Created on: Oct 25, 2012 * @since 2012-10-25
* Author: dklein * @author D. Klein, A. Rybalchenko
*/ */
#include "FairMQConfigurable.h" #include "FairMQConfigurable.h"

View File

@ -1,8 +1,8 @@
/* /**
* FairMQConfigurable.h * FairMQConfigurable.h
* *
* Created on: Oct 25, 2012 * @since 2012-10-25
* Author: dklein * @author D. Klein, A. Rybalchenko
*/ */
#ifndef FAIRMQCONFIGURABLE_H_ #ifndef FAIRMQCONFIGURABLE_H_

View File

@ -1,44 +1,52 @@
/* /**
* FairMQContext.cxx * FairMQContext.cxx
* *
* Created on: Dec 5, 2012 * @since 2012-12-05
* Author: dklein * @author D. Klein, A. Rybalchenko
*/ */
#include "FairMQLogger.h"
#include "FairMQContext.h" #include "FairMQContext.h"
#include <sstream> #include <sstream>
FairMQContext::FairMQContext(int numIoThreads)
const TString FairMQContext::PAYLOAD = "payload";
const TString FairMQContext::LOG = "log";
const TString FairMQContext::CONFIG = "config";
const TString FairMQContext::CONTROL = "control";
FairMQContext::FairMQContext(TString deviceId, TString contextId, Int_t numIoThreads)
{ {
std::stringstream id; fContext = zmq_ctx_new ();
id << deviceId << "." << contextId; if (fContext == NULL){
fId = id.str(); std::stringstream logmsg;
logmsg << "failed creating context, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
}
fContext = new zmq::context_t(numIoThreads); int rc = zmq_ctx_set (fContext, ZMQ_IO_THREADS, numIoThreads);
if (rc != 0){
std::stringstream logmsg;
logmsg << "failed configuring context, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
}
} }
FairMQContext::~FairMQContext() FairMQContext::~FairMQContext()
{ {
} }
TString FairMQContext::GetId() void* FairMQContext::GetContext()
{
return fId;
}
zmq::context_t* FairMQContext::GetContext()
{ {
return fContext; return fContext;
} }
void FairMQContext::Close() void FairMQContext::Close()
{ {
fContext->close(); if (fContext == NULL){
} return;
}
int rc = zmq_ctx_destroy (fContext);
if (rc != 0) {
std::stringstream logmsg;
logmsg << "failed closing context, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
}
fContext = NULL;
}

View File

@ -1,31 +1,25 @@
/* /**
* FairMQContext.h * FairMQContext.h
* *
* Created on: Dec 5, 2012 * @since 2012-12-05
* Author: dklein * @author D. Klein, A. Rybalchenko
*/ */
#ifndef FAIRMQCONTEXT_H_ #ifndef FAIRMQCONTEXT_H_
#define FAIRMQCONTEXT_H_ #define FAIRMQCONTEXT_H_
#include <string> #include <zmq.h>
#include <zmq.hpp>
#include "Rtypes.h"
#include "TString.h"
class FairMQContext class FairMQContext
{ {
private:
TString fId;
zmq::context_t* fContext;
public: public:
const static TString PAYLOAD, LOG, CONFIG, CONTROL; FairMQContext(int numIoThreads);
FairMQContext(TString deviceId, TString contextId, Int_t numIoThreads);
virtual ~FairMQContext(); virtual ~FairMQContext();
TString GetId(); void* GetContext();
zmq::context_t* GetContext();
void Close(); void Close();
private:
void* fContext;
}; };
#endif /* FAIRMQCONTEXT_H_ */ #endif /* FAIRMQCONTEXT_H_ */

View File

@ -1,8 +1,8 @@
/** /**
* FairMQDevice.cxx * FairMQDevice.cxx
* *
* @since Oct 25, 2012 * @since 2012-10-25
* @authors: D. Klein, A. Rybalchenko * @author D. Klein, A. Rybalchenko
*/ */
#include <iostream> #include <iostream>
@ -30,7 +30,7 @@ void FairMQDevice::Init()
logmsg << "numIoThreads: " << fNumIoThreads; logmsg << "numIoThreads: " << fNumIoThreads;
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
fPayloadContext = new FairMQContext(fId, FairMQContext::PAYLOAD, fNumIoThreads); fPayloadContext = new FairMQContext(fNumIoThreads);
fInputAddress = new std::vector<TString>(fNumInputs); fInputAddress = new std::vector<TString>(fNumInputs);
fInputMethod = new std::vector<TString>(); fInputMethod = new std::vector<TString>();
@ -65,9 +65,12 @@ void FairMQDevice::InitInput()
for (Int_t i = 0; i < fNumInputs; ++i) { for (Int_t i = 0; i < fNumInputs; ++i) {
FairMQSocket* socket = new FairMQSocket(fPayloadContext, fInputSocketType->at(i), i); FairMQSocket* socket = new FairMQSocket(fPayloadContext, fInputSocketType->at(i), i);
socket->GetSocket()->setsockopt(ZMQ_SNDHWM, &fInputSndBufSize->at(i), sizeof(fInputSndBufSize->at(i)));
socket->GetSocket()->setsockopt(ZMQ_RCVHWM, &fInputRcvBufSize->at(i), sizeof(fInputRcvBufSize->at(i))); socket->SetOption(ZMQ_SNDHWM, &fInputSndBufSize->at(i), sizeof(fInputSndBufSize->at(i)));
socket->SetOption(ZMQ_RCVHWM, &fInputRcvBufSize->at(i), sizeof(fInputRcvBufSize->at(i)));
fPayloadInputs->push_back(socket); fPayloadInputs->push_back(socket);
try { try {
if (fInputMethod->at(i) == "bind") { if (fInputMethod->at(i) == "bind") {
fPayloadInputs->at(i)->Bind(fInputAddress->at(i)); fPayloadInputs->at(i)->Bind(fInputAddress->at(i));
@ -85,8 +88,8 @@ void FairMQDevice::InitOutput()
for (Int_t i = 0; i < fNumOutputs; ++i) { for (Int_t i = 0; i < fNumOutputs; ++i) {
FairMQSocket* socket = new FairMQSocket(fPayloadContext, fOutputSocketType->at(i), i); FairMQSocket* socket = new FairMQSocket(fPayloadContext, fOutputSocketType->at(i), i);
socket->GetSocket()->setsockopt(ZMQ_SNDHWM, &fOutputSndBufSize->at(i), sizeof(fOutputSndBufSize->at(i))); socket->SetOption(ZMQ_SNDHWM, &fOutputSndBufSize->at(i), sizeof(fOutputSndBufSize->at(i)));
socket->GetSocket()->setsockopt(ZMQ_RCVHWM, &fOutputRcvBufSize->at(i), sizeof(fOutputRcvBufSize->at(i))); socket->SetOption(ZMQ_RCVHWM, &fOutputRcvBufSize->at(i), sizeof(fOutputRcvBufSize->at(i)));
fPayloadOutputs->push_back(socket); fPayloadOutputs->push_back(socket);
try { try {
if (fOutputMethod->at(i) == "bind") { if (fOutputMethod->at(i) == "bind") {
@ -290,7 +293,7 @@ void FairMQDevice::LogSocketRates()
messagesInput[i] = messagesInputNew[i]; messagesInput[i] = messagesInputNew[i];
std::stringstream logmsg; std::stringstream logmsg;
logmsg << "#" << (*itr)->GetId() << ": " << messagesPerSecondInput[i] << " msg/s, " << megabytesPerSecondInput[i] << " MB/s"; logmsg << "#" << fId << "." << (*itr)->GetId() << ": " << messagesPerSecondInput[i] << " msg/s, " << megabytesPerSecondInput[i] << " MB/s";
FairMQLogger::GetInstance()->Log(FairMQLogger::DEBUG, logmsg.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::DEBUG, logmsg.str());
// Temp stuff for process termination // Temp stuff for process termination
@ -318,7 +321,7 @@ void FairMQDevice::LogSocketRates()
messagesOutput[i] = messagesOutputNew[i]; messagesOutput[i] = messagesOutputNew[i];
std::stringstream logmsg; std::stringstream logmsg;
logmsg << "#" << (*itr)->GetId() << ": " << messagesPerSecondOutput[i] << " msg/s, " << megabytesPerSecondOutput[i] << " MB/s"; logmsg << "#" << fId << "." << (*itr)->GetId() << ": " << messagesPerSecondOutput[i] << " msg/s, " << megabytesPerSecondOutput[i] << " MB/s";
FairMQLogger::GetInstance()->Log(FairMQLogger::DEBUG, logmsg.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::DEBUG, logmsg.str());
// Temp stuff for process termination // Temp stuff for process termination

View File

@ -1,8 +1,8 @@
/** /**
* FairMQDevice.h * FairMQDevice.h
* *
* @since Oct 25, 2012 * @since 2012-10-25
* @authors: D. Klein, A. Rybalchenko * @author D. Klein, A. Rybalchenko
*/ */
#ifndef FAIRMQDEVICE_H_ #ifndef FAIRMQDEVICE_H_

View File

@ -1,8 +1,8 @@
/* /**
* FairMQLogger.cxx * FairMQLogger.cxx
* *
* Created on: Dec 4, 2012 * @since 2012-12-04
* Author: dklein * @author D. Klein, A. Rybalchenko
*/ */
#include "FairMQLogger.h" #include "FairMQLogger.h"

View File

@ -1,8 +1,8 @@
/* /**
* FairMQLogger.h * FairMQLogger.h
* *
* Created on: Dec 4, 2012 * @since 2012-12-04
* Author: dklein * @author D. Klein, A. Rybalchenko
*/ */
#ifndef FAIRMQLOGGER_H_ #ifndef FAIRMQLOGGER_H_

View File

@ -1,34 +1,38 @@
/* /**
* FairMQStandaloneMerger.cxx * FairMQMerger.cxx
* *
* Created on: Dec 6, 2012 * @since 2012-12-06
* Author: dklein * @author D. Klein, A. Rybalchenko
*/ */
#include <boost/thread.hpp> #include <boost/thread.hpp>
#include <boost/bind.hpp> #include <boost/bind.hpp>
#include "FairMQLogger.h" #include "FairMQLogger.h"
#include "FairMQStandaloneMerger.h" #include "FairMQMerger.h"
FairMQStandaloneMerger::FairMQStandaloneMerger()
FairMQMerger::FairMQMerger()
{ {
} }
FairMQStandaloneMerger::~FairMQStandaloneMerger() FairMQMerger::~FairMQMerger()
{ {
} }
void FairMQStandaloneMerger::Run() void FairMQMerger::Run()
{ {
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Run <<<<<<<"); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Run <<<<<<<");
boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this));
zmq_pollitem_t items[fNumInputs]; zmq_pollitem_t items[fNumInputs];
for (Int_t iInput = 0; iInput < fNumInputs; iInput++) {
zmq_pollitem_t tempitem= {*(fPayloadInputs->at(iInput)->GetSocket()), 0, ZMQ_POLLIN, 0}; for (int i = 0; i < fNumInputs; i++) {
items[iInput] = tempitem; items[i].socket = fPayloadInputs->at(i)->GetSocket();
items[i].fd = 0;
items[i].events = ZMQ_POLLIN;
items[i].revents = 0;
} }
Bool_t received = false; Bool_t received = false;
@ -38,9 +42,9 @@ void FairMQStandaloneMerger::Run()
zmq_poll(items, fNumInputs, 100); zmq_poll(items, fNumInputs, 100);
for(Int_t iItem = 0; iItem < fNumInputs; iItem++) { for(int i = 0; i < fNumInputs; i++) {
if (items[iItem].revents & ZMQ_POLLIN) { if (items[i].revents & ZMQ_POLLIN) {
received = fPayloadInputs->at(iItem)->Receive(&msg); received = fPayloadInputs->at(i)->Receive(&msg);
} }
if (received) { if (received) {
fPayloadOutputs->at(0)->Send(&msg); fPayloadOutputs->at(0)->Send(&msg);

View File

@ -1,25 +1,23 @@
/* /**
* FairMQStandaloneMerger.h * FairMQMerger.h
* *
* Created on: Dec 6, 2012 * @since 2012-12-06
* Author: dklein * @author D. Klein, A. Rybalchenko
*/ */
#ifndef FAIRMQSTANDALONEMERGER_H_ #ifndef FAIRMQMERGER_H_
#define FAIRMQSTANDALONEMERGER_H_ #define FAIRMQMERGER_H_
#include "FairMQDevice.h" #include "FairMQDevice.h"
#include "Rtypes.h"
#include "TString.h"
class FairMQStandaloneMerger: public FairMQDevice class FairMQMerger: public FairMQDevice
{ {
public: public:
FairMQStandaloneMerger(); FairMQMerger();
virtual ~FairMQStandaloneMerger(); virtual ~FairMQMerger();
protected: protected:
virtual void Run(); virtual void Run();
}; };
#endif /* FAIRMQSTANDALONEMERGER_H_ */ #endif /* FAIRMQMERGER_H_ */

View File

@ -1,63 +1,99 @@
/* /**
* FairMQMessage.cxx * FairMQMessage.cxx
* *
* Created on: Dec 5, 2012 * @since 2012-12-05
* Author: dklein * @author: D. Klein, A. Rybalchenko
*/ */
#include <cstdlib>
#include "FairMQMessage.h" #include "FairMQMessage.h"
#include "FairMQLogger.h" #include "FairMQLogger.h"
FairMQMessage::FairMQMessage() FairMQMessage::FairMQMessage()
{ {
try { int rc = zmq_msg_init (&fMessage);
fMessage = new zmq::message_t(); if (rc != 0){
} catch (zmq::error_t& e) {
std::stringstream logmsg; std::stringstream logmsg;
logmsg << "failed allocating new message, reason: " << e.what(); logmsg << "failed initializing message, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
} }
} }
FairMQMessage::FairMQMessage(void* data_, size_t size_, zmq::free_fn* ffn_, void* hint_/*= NULL*/) FairMQMessage::FairMQMessage(size_t size)
{ {
try { int rc = zmq_msg_init_size (&fMessage, size);
fMessage = new zmq::message_t(data_, size_, ffn_, hint_); if (rc != 0){
} catch (zmq::error_t& e) {
std::stringstream logmsg; std::stringstream logmsg;
logmsg << "failed allocating new message, reason: " << e.what(); logmsg << "failed initializing message with size, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
}
}
FairMQMessage::FairMQMessage(void* data, size_t size)
{
int rc = zmq_msg_init_data (&fMessage, data, size, &CleanUp, NULL);
if (rc != 0){
std::stringstream logmsg;
logmsg << "failed initializing message with data, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
} }
} }
FairMQMessage::~FairMQMessage() FairMQMessage::~FairMQMessage()
{ {
delete fMessage; int rc = zmq_msg_close (&fMessage);
} if (rc != 0){
zmq::message_t* FairMQMessage::GetMessage()
{
return fMessage;
}
Int_t FairMQMessage::Size()
{
return fMessage->size();
}
Bool_t FairMQMessage::Copy(FairMQMessage* msg)
{
Bool_t result = false;
try {
fMessage->copy(msg->GetMessage());
} catch (zmq::error_t& e) {
std::stringstream logmsg; std::stringstream logmsg;
logmsg << "failed copying message, reason: " << e.what(); logmsg << "failed closing message with data, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
}
}
void FairMQMessage::Rebuild(void* data, size_t size)
{
int rc = zmq_msg_close (&fMessage);
if (rc != 0) {
std::stringstream logmsg;
logmsg << "failed closing message, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
} }
return result; rc = zmq_msg_init_data (&fMessage, data, size, &CleanUp, NULL);
if (rc != 0) {
std::stringstream logmsg2;
logmsg2 << "failed initializing message with data, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str());
}
} }
zmq_msg_t* FairMQMessage::GetMessage()
{
return &fMessage;
}
void* FairMQMessage::GetData()
{
return zmq_msg_data (&fMessage);
}
size_t FairMQMessage::GetSize()
{
return zmq_msg_size (&fMessage);
}
void FairMQMessage::Copy(FairMQMessage* msg)
{
int rc = zmq_msg_copy (&fMessage, &(msg->fMessage));
if (rc != 0) {
std::stringstream logmsg;
logmsg << "failed copying message, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
}
}
void FairMQMessage::CleanUp(void* data, void* hint)
{
free (data);
}

View File

@ -1,28 +1,41 @@
/* /**
* FairMQMessage.h * FairMQMessage.h
* *
* Created on: Dec 5, 2012 * @since 2012-12-05
* Author: dklein * @author: D. Klein, A. Rybalchenko
*/ */
#ifndef FAIRMQMESSAGE_H_ #ifndef FAIRMQMESSAGE_H_
#define FAIRMQMESSAGE_H_ #define FAIRMQMESSAGE_H_
#include <zmq.hpp> #include <cstddef>
#include "Rtypes.h"
#include <zmq.h>
class FairMQMessage class FairMQMessage
{ {
private:
zmq::message_t* fMessage;
public: public:
FairMQMessage(); FairMQMessage();
FairMQMessage(void* data_, size_t size_, zmq::free_fn* ffn_, void* hint_ = NULL); FairMQMessage(size_t size);
FairMQMessage(void* data, size_t size);
void Rebuild();
void Rebuild(size_t size);
void Rebuild(void* data, size_t site);
zmq_msg_t* GetMessage();
void* GetData();
size_t GetSize();
void Copy(FairMQMessage* msg);
static void CleanUp(void* data, void* hint);
virtual ~FairMQMessage(); virtual ~FairMQMessage();
zmq::message_t* GetMessage();
Int_t Size(); private:
Bool_t Copy(FairMQMessage* msg); zmq_msg_t fMessage;
}; };
#endif /* FAIRMQMESSAGE_H_ */ #endif /* FAIRMQMESSAGE_H_ */

View File

@ -1,8 +1,8 @@
/* /**
* FairMQProcessor.cxx * FairMQProcessor.cxx
* *
* Created on: Dec 6, 2012 * @since 2012-12-06
* Author: dklein * @author D. Klein, A. Rybalchenko
*/ */
#include <boost/thread.hpp> #include <boost/thread.hpp>
@ -39,11 +39,6 @@ void FairMQProcessor::Run()
boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this));
// Initialize poll set
zmq_pollitem_t items[] = {
{ *(fPayloadInputs->at(0)->GetSocket()), 0, ZMQ_POLLIN, 0 }
};
int receivedMsgs = 0; int receivedMsgs = 0;
int sentMsgs = 0; int sentMsgs = 0;
@ -52,12 +47,8 @@ void FairMQProcessor::Run()
while ( fState == RUNNING ) { while ( fState == RUNNING ) {
FairMQMessage msg; FairMQMessage msg;
zmq_poll(items, 1, 100);
if (items[0].revents & ZMQ_POLLIN) {
received = fPayloadInputs->at(0)->Receive(&msg); received = fPayloadInputs->at(0)->Receive(&msg);
receivedMsgs++; receivedMsgs++;
}
if (received) { if (received) {
fTask->Exec(&msg, NULL); fTask->Exec(&msg, NULL);

View File

@ -1,8 +1,8 @@
/* /**
* FairMQProcessor.h * FairMQProcessor.h
* *
* Created on: Dec 6, 2012 * @since 2012-12-06
* Author: dklein * @author D. Klein, A. Rybalchenko
*/ */
#ifndef FAIRMQPROCESSOR_H_ #ifndef FAIRMQPROCESSOR_H_

View File

@ -1,8 +1,8 @@
/* /**
* FairMQProcessorTask.cxx * FairMQProcessorTask.cxx
* *
* Created on: Dec 6, 2012 * @since Dec 6, 2012-12-06
* Author: dklein * @author: D. Klein, A. Rybalchenko
*/ */
#include "FairMQProcessorTask.h" #include "FairMQProcessorTask.h"
@ -15,8 +15,3 @@ FairMQProcessorTask::FairMQProcessorTask()
FairMQProcessorTask::~FairMQProcessorTask() FairMQProcessorTask::~FairMQProcessorTask()
{ {
} }
void FairMQProcessorTask::ClearOutput(void* data, void* hint)
{
free (data);
}

View File

@ -1,8 +1,8 @@
/* /**
* FairMQProcessorTask.h * FairMQProcessorTask.h
* *
* Created on: Dec 6, 2012 * @since Dec 6, 2012-12-06
* Author: dklein * @author: D. Klein, A. Rybalchenko
*/ */
#ifndef FAIRMQPROCESSORTASK_H_ #ifndef FAIRMQPROCESSORTASK_H_
@ -19,7 +19,6 @@ class FairMQProcessorTask : public FairTask
FairMQProcessorTask(); FairMQProcessorTask();
virtual ~FairMQProcessorTask(); virtual ~FairMQProcessorTask();
virtual void Exec(FairMQMessage* msg, Option_t* opt) = 0; virtual void Exec(FairMQMessage* msg, Option_t* opt) = 0;
static void ClearOutput(void* data, void* hint);
}; };
#endif /* FAIRMQPROCESSORTASK_H_ */ #endif /* FAIRMQPROCESSORTASK_H_ */

View File

@ -1,12 +1,10 @@
/* /**
* FairMQProxy.cxx * FairMQProxy.cxx
* *
* Created on: Oct 2, 2013 * @since 2013-10-02
* Author: A. Rybalchenko * @author A. Rybalchenko
*/ */
#include <iostream>
#include <boost/thread.hpp> #include <boost/thread.hpp>
#include <boost/bind.hpp> #include <boost/bind.hpp>
@ -27,15 +25,13 @@ void FairMQProxy::Run()
boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this));
//TODO: check rateLogger output FairMQMessage msg;
int rc = zmq_proxy(*(fPayloadInputs->at(0)->GetSocket()), *(fPayloadOutputs->at(0)->GetSocket()), NULL);
if (rc == -1) {
std::cout << "Error: proxy failed: " << strerror(errno) << std::endl;
}
//TODO: make proxy bind on both ends. while ( fState == RUNNING ) {
fPayloadInputs->at(0)->Receive(&msg);
fPayloadOutputs->at(0)->Send(&msg);
}
rateLogger.interrupt(); rateLogger.interrupt();
rateLogger.join(); rateLogger.join();
} }

View File

@ -1,8 +1,8 @@
/* /**
* FairMQProxy.h * FairMQProxy.h
* *
* Created on: Oct 2, 2013 * @since 2013-10-02
* Author: A. Rybalchenko * @author A. Rybalchenko
*/ */
#ifndef FAIRMQPROXY_H_ #ifndef FAIRMQPROXY_H_

View File

@ -1,8 +1,8 @@
/* /**
* FairMQSampler.cpp * FairMQSampler.cpp
* *
* Created on: Sep 27, 2012 * @since 2012-09-27
* Author: A. Rybalchenko, D. Klein * @author D. Klein, A. Rybalchenko
*/ */
#include <vector> #include <vector>
@ -135,21 +135,13 @@ void FairMQSampler::ListenToCommands()
{ {
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> ListenToCommands <<<<<<<"); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> ListenToCommands <<<<<<<");
// Initialize poll set bool received = false;
zmq_pollitem_t items[] = {
{ *(fPayloadInputs->at(0)->GetSocket()), 0, ZMQ_POLLIN, 0 }
};
Bool_t received = false;
while ( true ) { while ( true ) {
try { try {
FairMQMessage msg; FairMQMessage msg;
zmq_poll(items, 1, 100);
if (items[0].revents & ZMQ_POLLIN) {
received = fPayloadInputs->at(0)->Receive(&msg); received = fPayloadInputs->at(0)->Receive(&msg);
}
if (received) { if (received) {
//command handling goes here. //command handling goes here.

View File

@ -1,8 +1,8 @@
/* /**
* FairMQSampler.h * FairMQSampler.h
* *
* Created on: Sep 27, 2012 * @since 2012-09-27
* Author: dklein * @author D. Klein, A. Rybalchenko
*/ */
#ifndef FAIRMQSAMPLER_H_ #ifndef FAIRMQSAMPLER_H_

View File

@ -1,8 +1,8 @@
/* /**
* FairMQSamplerTask.cxx * FairMQSamplerTask.cxx
* *
* Created on: Nov 22, 2012 * @since 2012-11-22
* Author: dklein * @author D. Klein, A. Rybalchenko
*/ */
#include "FairMQSamplerTask.h" #include "FairMQSamplerTask.h"
@ -48,8 +48,4 @@ FairMQMessage* FairMQSamplerTask::GetOutput()
return fOutput; return fOutput;
} }
void FairMQSamplerTask::ClearOutput(void* data, void* hint)
{
free (data);
}

View File

@ -1,8 +1,8 @@
/* /**
* FairMQSamplerTask.h * FairMQSamplerTask.h
* *
* Created on: Nov 22, 2012 * @since 2012-11-22
* Author: dklein * @author D. Klein, A. Rybalchenko
*/ */
#ifndef FAIRMQSAMPLERTASK_H_ #ifndef FAIRMQSAMPLERTASK_H_
@ -26,7 +26,6 @@ class FairMQSamplerTask: public FairTask
virtual void Exec(Option_t* opt) = 0; virtual void Exec(Option_t* opt) = 0;
void SetBranch(TString branch); void SetBranch(TString branch);
FairMQMessage* GetOutput(); FairMQMessage* GetOutput();
static void ClearOutput(void* data, void* hint);
protected: protected:
TClonesArray* fInput; TClonesArray* fInput;
TString fBranch; TString fBranch;

View File

@ -1,12 +1,10 @@
/* /**
* FairMQSink.cxx * FairMQSink.cxx
* *
* Created on: Jan 9, 2013 * @since 2013-01-09
* Author: dklein * @author D. Klein, A. Rybalchenko
*/ */
#include <iostream>
#include <boost/thread.hpp> #include <boost/thread.hpp>
#include <boost/bind.hpp> #include <boost/bind.hpp>
@ -24,20 +22,11 @@ void FairMQSink::Run()
boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this));
// Initialize poll set
zmq_pollitem_t items[] = {
{ *(fPayloadInputs->at(0)->GetSocket()), 0, ZMQ_POLLIN, 0 }
};
while ( fState == RUNNING ) { while ( fState == RUNNING ) {
FairMQMessage msg; FairMQMessage msg;
zmq_poll(items, 1, 100);
if (items[0].revents & ZMQ_POLLIN) {
fPayloadInputs->at(0)->Receive(&msg); fPayloadInputs->at(0)->Receive(&msg);
} }
}
rateLogger.interrupt(); rateLogger.interrupt();
rateLogger.join(); rateLogger.join();

View File

@ -1,20 +1,16 @@
/* /**
* FairMQSink.h * FairMQSink.h
* *
* Created on: Jan 9, 2013 * @since 2013-01-09
* Author: dklein * @author D. Klein, A. Rybalchenko
*/ */
#ifndef FAIRMQSINK_H_ #ifndef FAIRMQSINK_H_
#define FAIRMQSINK_H_ #define FAIRMQSINK_H_
#include "Rtypes.h"
#include <pthread.h>
#include "FairMQDevice.h" #include "FairMQDevice.h"
class FairMQSink: public FairMQDevice class FairMQSink: public FairMQDevice
{ {
public: public:

View File

@ -1,8 +1,8 @@
/* /**
* FairMQSocket.cxx * FairMQSocket.cxx
* *
* Created on: Dec 5, 2012 * @since 2012-12-05
* Author: dklein * @author D. Klein, A. Rybalchenko
*/ */
#include "FairMQSocket.h" #include "FairMQSocket.h"
@ -10,10 +10,6 @@
#include "FairMQLogger.h" #include "FairMQLogger.h"
const TString FairMQSocket::TCP = "tcp://";
const TString FairMQSocket::IPC = "ipc://";
const TString FairMQSocket::INPROC = "inproc://";
FairMQSocket::FairMQSocket(FairMQContext* context, int type, int num) : FairMQSocket::FairMQSocket(FairMQContext* context, int type, int num) :
fBytesTx(0), fBytesTx(0),
fBytesRx(0), fBytesRx(0),
@ -21,18 +17,29 @@ FairMQSocket::FairMQSocket(FairMQContext* context, int type, int num) :
fMessagesRx(0) fMessagesRx(0)
{ {
std::stringstream id; std::stringstream id;
id << context->GetId() << "." << GetTypeString(type) << "." << num; id << GetTypeString(type) << "." << num;
fId = id.str(); fId = id.str();
fSocket = new zmq::socket_t(*context->GetContext(), type); fSocket = zmq_socket(context->GetContext(), type);
fSocket->setsockopt(ZMQ_IDENTITY, &fId, fId.Length()); int rc = zmq_setsockopt(fSocket, ZMQ_IDENTITY, &fId, fId.Length());
if (type == ZMQ_SUB) { if (rc != 0) {
fSocket->setsockopt(ZMQ_SUBSCRIBE, NULL, 0); std::stringstream logmsg;
logmsg << "failed setting socket option, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
} }
std::stringstream logmsg; if (type == ZMQ_SUB) {
logmsg << "created socket #" << fId; rc = zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, NULL, 0);
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); if (rc != 0) {
std::stringstream logmsg2;
logmsg2 << "failed setting socket option, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str());
}
}
std::stringstream logmsg3;
logmsg3 << "created socket #" << fId;
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg3.str());
} }
FairMQSocket::~FairMQSocket() FairMQSocket::~FairMQSocket()
@ -44,7 +51,7 @@ TString FairMQSocket::GetId()
return fId; return fId;
} }
TString FairMQSocket::GetTypeString(Int_t type) TString FairMQSocket::GetTypeString(int type)
{ {
switch (type) { switch (type) {
case ZMQ_SUB: case ZMQ_SUB:
@ -60,90 +67,95 @@ TString FairMQSocket::GetTypeString(Int_t type)
} }
} }
Bool_t FairMQSocket::Bind(TString address) void FairMQSocket::Bind(TString address)
{ {
Bool_t result = true;
try {
if ( address.Length() > 0 /*!address.empty()*/) {
std::stringstream logmsg; std::stringstream logmsg;
logmsg << "bind socket #" << fId << " on " << address; logmsg << "bind socket #" << fId << " on " << address;
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
fSocket->bind(address.Data());
}
} catch (zmq::error_t& e) {
std::stringstream logmsg2;
logmsg2 << "failed binding socket #" << fId << ", reason: " << e.what();
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str());
result = false;
}
return result; int rc = zmq_bind (fSocket, address);
if (rc != 0) {
std::stringstream logmsg2;
logmsg2 << "failed binding socket #" << fId << ", reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str());
}
} }
Bool_t FairMQSocket::Connect(TString address) void FairMQSocket::Connect(TString address)
{ {
Bool_t result = true;
try {
if ( address.Length() > 0 /*!address.empty()*/) {
std::stringstream logmsg; std::stringstream logmsg;
logmsg << "connect socket #" << fId << " to " << address; logmsg << "connect socket #" << fId << " on " << address;
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
fSocket->connect(address.Data());
} int rc = zmq_connect (fSocket, address);
} catch (zmq::error_t& e) { if (rc != 0) {
std::stringstream logmsg2; std::stringstream logmsg2;
logmsg2 << "failed connecting socket #" << fId << ", reason: " << e.what(); logmsg2 << "failed connecting socket #" << fId << ", reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str());
result = false;
} }
return result;
} }
Bool_t FairMQSocket::Send(FairMQMessage* msg) size_t FairMQSocket::Send(FairMQMessage* msg)
{ {
Bool_t result = false; int nbytes = zmq_msg_send (msg->GetMessage(), fSocket, 0);
if (nbytes >= 0){
try { fBytesTx += nbytes;
fBytesTx += msg->Size();
++fMessagesTx; ++fMessagesTx;
result = fSocket->send(*msg->GetMessage()); // use send(*msg->GetMessage(), ZMQ_DONTWAIT) for non-blocking call return nbytes;
} catch (zmq::error_t& e) {
std::stringstream logmsg;
logmsg << "failed sending on socket #" << fId << ", reason: " << e.what();
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
result = false;
} }
if (zmq_errno() == EAGAIN){
return result; return false;
}
std::stringstream logmsg;
logmsg << "failed sending on socket #" << fId << ", reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
return nbytes;
} }
Bool_t FairMQSocket::Receive(FairMQMessage* msg) size_t FairMQSocket::Receive(FairMQMessage* msg)
{ {
Bool_t result = false; int nbytes = zmq_msg_recv (msg->GetMessage(), fSocket, 0);
if (nbytes >= 0){
try { fBytesRx += nbytes;
result = fSocket->recv(msg->GetMessage());
fBytesRx += msg->Size();
++fMessagesRx; ++fMessagesRx;
} catch (zmq::error_t& e) { return nbytes;
std::stringstream logmsg;
logmsg << "failed receiving on socket #" << fId << ", reason: " << e.what();
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
result = false;
} }
if (zmq_errno() == EAGAIN){
return false;
}
std::stringstream logmsg;
logmsg << "failed receiving on socket #" << fId << ", reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
return nbytes;
}
return result; void FairMQSocket::SetOption(int option, const void* value, size_t valueSize)
{
int rc = zmq_setsockopt(fSocket, option, value, valueSize);
if (rc < 0) {
std::stringstream logmsg;
logmsg << "failed setting socket option, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
}
} }
void FairMQSocket::Close() void FairMQSocket::Close()
{ {
fSocket->close(); if (fSocket == NULL){
return;
}
int rc = zmq_close (fSocket);
if (rc != 0) {
std::stringstream logmsg;
logmsg << "failed closing socket, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
}
fSocket = NULL;
} }
zmq::socket_t* FairMQSocket::GetSocket() void* FairMQSocket::GetSocket()
{ {
return fSocket; return fSocket;
} }

View File

@ -1,14 +1,14 @@
/* /**
* FairMQSocket.h * FairMQSocket.h
* *
* Created on: Dec 5, 2012 * @since 2012-12-05
* Author: dklein * @author D. Klein, A. Rybalchenko
*/ */
#ifndef FAIRMQSOCKET_H_ #ifndef FAIRMQSOCKET_H_
#define FAIRMQSOCKET_H_ #define FAIRMQSOCKET_H_
#include <zmq.hpp> #include <zmq.h>
#include <string> #include <string>
#include "FairMQContext.h" #include "FairMQContext.h"
#include "FairMQMessage.h" #include "FairMQMessage.h"
@ -18,30 +18,32 @@
class FairMQSocket class FairMQSocket
{ {
private:
zmq::socket_t* fSocket;
TString fId;
ULong_t fBytesTx;
ULong_t fBytesRx;
ULong_t fMessagesTx;
ULong_t fMessagesRx;
public: public:
const static TString TCP, IPC, INPROC; FairMQSocket(FairMQContext* context, int type, int num);
FairMQSocket(FairMQContext* context, Int_t type, Int_t num);
virtual ~FairMQSocket(); virtual ~FairMQSocket();
TString GetId(); TString GetId();
static TString GetTypeString(Int_t type); static TString GetTypeString(int type);
Bool_t Send(FairMQMessage* msg); size_t Send(FairMQMessage* msg);
Bool_t Receive(FairMQMessage* msg); size_t Receive(FairMQMessage* msg);
void Close(); void Close();
Bool_t Bind(TString address); void Bind(TString address);
Bool_t Connect(TString address); void Connect(TString address);
zmq::socket_t* GetSocket(); void* GetSocket();
void SetOption(int option, const void* value, size_t valueSize);
ULong_t GetBytesTx(); ULong_t GetBytesTx();
ULong_t GetBytesRx(); ULong_t GetBytesRx();
ULong_t GetMessagesTx(); ULong_t GetMessagesTx();
ULong_t GetMessagesRx(); ULong_t GetMessagesRx();
private:
void* fSocket;
TString fId;
ULong_t fBytesTx;
ULong_t fBytesRx;
ULong_t fMessagesTx;
ULong_t fMessagesRx;
}; };
#endif /* FAIRMQSOCKET_H_ */ #endif /* FAIRMQSOCKET_H_ */

View File

@ -1,46 +1,38 @@
/* /**
* FairMQBalancedStandaloneSplitter.cxx * FairMQSplitter.cxx
* *
* Created on: Dec 6, 2012 * @since 2012-12-06
* Author: dklein * @author D. Klein, A. Rybalchenko
*/ */
#include <boost/thread.hpp> #include <boost/thread.hpp>
#include <boost/bind.hpp> #include <boost/bind.hpp>
#include "FairMQLogger.h" #include "FairMQLogger.h"
#include "FairMQBalancedStandaloneSplitter.h" #include "FairMQSplitter.h"
FairMQBalancedStandaloneSplitter::FairMQBalancedStandaloneSplitter()
FairMQSplitter::FairMQSplitter()
{ {
} }
FairMQBalancedStandaloneSplitter::~FairMQBalancedStandaloneSplitter() FairMQSplitter::~FairMQSplitter()
{ {
} }
void FairMQBalancedStandaloneSplitter::Run() void FairMQSplitter::Run()
{ {
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Run <<<<<<<"); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Run <<<<<<<");
boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this));
// Initialize poll set bool received = false;
zmq_pollitem_t items[] = { int direction = 0;
{ *(fPayloadInputs->at(0)->GetSocket()), 0, ZMQ_POLLIN, 0 }
};
Bool_t received = false;
Int_t direction = 0;
while ( fState == RUNNING ) { while ( fState == RUNNING ) {
FairMQMessage msg; FairMQMessage msg;
zmq_poll(items, 1, 100);
if (items[0].revents & ZMQ_POLLIN) {
received = fPayloadInputs->at(0)->Receive(&msg); received = fPayloadInputs->at(0)->Receive(&msg);
}
if (received) { if (received) {
fPayloadOutputs->at(direction)->Send(&msg); fPayloadOutputs->at(direction)->Send(&msg);
@ -49,7 +41,7 @@ void FairMQBalancedStandaloneSplitter::Run()
direction = 0; direction = 0;
} }
received = false; received = false;
}//if received }
} }
rateLogger.interrupt(); rateLogger.interrupt();

View File

@ -1,24 +1,23 @@
/* /**
* FairMQBalancedStandaloneSplitter.h * FairMQSplitter.h
* *
* Created on: Dec 6, 2012 * @since 2012-12-06
* Author: dklein * @author D. Klein, A. Rybalchenko
*/ */
#ifndef FAIRMQBALANCEDSTANDALONESPLITTER_H_ #ifndef FAIRMQSPLITTER_H_
#define FAIRMQBALANCEDSTANDALONESPLITTER_H_ #define FAIRMQSPLITTER_H_
#include "FairMQDevice.h" #include "FairMQDevice.h"
#include "Rtypes.h"
class FairMQBalancedStandaloneSplitter: public FairMQDevice class FairMQSplitter: public FairMQDevice
{ {
public: public:
FairMQBalancedStandaloneSplitter(); FairMQSplitter();
virtual ~FairMQBalancedStandaloneSplitter(); virtual ~FairMQSplitter();
protected: protected:
virtual void Run(); virtual void Run();
}; };
#endif /* FAIRMQBALANCEDSTANDALONESPLITTER_H_ */ #endif /* FAIRMQSPLITTER_H_ */

View File

@ -1,11 +1,10 @@
/* /**
* FairMQStateMachine.cxx * FairMQStateMachine.cxx
* *
* Created on: Oct 25, 2012 * @since 2012-10-25
* Author: dklein * @author D. Klein, A. Rybalchenko
*/ */
#include <iostream>
#include <boost/bind.hpp> #include <boost/bind.hpp>
#include <boost/thread.hpp> #include <boost/thread.hpp>

View File

@ -1,12 +1,10 @@
/* /**
* FairMQStateMachine.h * FairMQStateMachine.h
* *
* Created on: Oct 25, 2012 * @since 2012-10-25
* Author: dklein * @author D. Klein, A. Rybalchenko
*/ */
#ifndef FAIRMQSTATEMACHINE_H_ #ifndef FAIRMQSTATEMACHINE_H_
#define FAIRMQSTATEMACHINE_H_ #define FAIRMQSTATEMACHINE_H_

View File

@ -1,8 +1,8 @@
/* /**
* runBenchmarkSampler.cxx * runBenchmarkSampler.cxx
* *
* Created on: Apr 23, 2013 * @since Apr 23, 2013-04-23
* Author: dklein * @author D. Klein, A. Rybalchenko
*/ */
#include <iostream> #include <iostream>

View File

@ -1,8 +1,8 @@
/** /**
* runBuffer.cxx * runBuffer.cxx
* *
* @since Oct 26, 2012 * @since 2012-10-26
* @authors: D. Klein, A. Rybalchenko * @author: D. Klein, A. Rybalchenko
*/ */
#include <iostream> #include <iostream>

View File

@ -1,25 +1,25 @@
/* /**
* runMerger.cxx * runMerger.cxx
* *
* Created on: Dec 6, 2012 * @since 2012-12-06
* Author: dklein * @author D. Klein, A. Rybalchenko
*/ */
#include <iostream> #include <iostream>
#include <csignal> #include <csignal>
#include "FairMQLogger.h" #include "FairMQLogger.h"
#include "FairMQStandaloneMerger.h" #include "FairMQMerger.h"
FairMQStandaloneMerger merger; FairMQMerger merger;
static void s_signal_handler (int signal) static void s_signal_handler (int signal)
{ {
std::cout << std::endl << "Caught signal " << signal << std::endl; std::cout << std::endl << "Caught signal " << signal << std::endl;
merger.ChangeState(FairMQStandaloneMerger::STOP); merger.ChangeState(FairMQMerger::STOP);
merger.ChangeState(FairMQStandaloneMerger::END); merger.ChangeState(FairMQMerger::END);
std::cout << "Shutdown complete. Bye!" << std::endl; std::cout << "Shutdown complete. Bye!" << std::endl;
exit(1); exit(1);
@ -53,76 +53,76 @@ int main(int argc, char** argv)
int i = 1; int i = 1;
merger.SetProperty(FairMQStandaloneMerger::Id, argv[i]); merger.SetProperty(FairMQMerger::Id, argv[i]);
++i; ++i;
int numIoThreads; int numIoThreads;
std::stringstream(argv[i]) >> numIoThreads; std::stringstream(argv[i]) >> numIoThreads;
merger.SetProperty(FairMQStandaloneMerger::NumIoThreads, numIoThreads); merger.SetProperty(FairMQMerger::NumIoThreads, numIoThreads);
++i; ++i;
merger.SetProperty(FairMQStandaloneMerger::NumInputs, 2); merger.SetProperty(FairMQMerger::NumInputs, 2);
merger.SetProperty(FairMQStandaloneMerger::NumOutputs, 1); merger.SetProperty(FairMQMerger::NumOutputs, 1);
merger.ChangeState(FairMQStandaloneMerger::INIT); merger.ChangeState(FairMQMerger::INIT);
int inputSocketType = ZMQ_SUB; int inputSocketType = ZMQ_SUB;
if (strcmp(argv[i], "pull") == 0) { if (strcmp(argv[i], "pull") == 0) {
inputSocketType = ZMQ_PULL; inputSocketType = ZMQ_PULL;
} }
merger.SetProperty(FairMQStandaloneMerger::InputSocketType, inputSocketType, 0); merger.SetProperty(FairMQMerger::InputSocketType, inputSocketType, 0);
++i; ++i;
int inputRcvBufSize; int inputRcvBufSize;
std::stringstream(argv[i]) >> inputRcvBufSize; std::stringstream(argv[i]) >> inputRcvBufSize;
merger.SetProperty(FairMQStandaloneMerger::InputRcvBufSize, inputRcvBufSize, 0); merger.SetProperty(FairMQMerger::InputRcvBufSize, inputRcvBufSize, 0);
++i; ++i;
merger.SetProperty(FairMQStandaloneMerger::InputMethod, argv[i], 0); merger.SetProperty(FairMQMerger::InputMethod, argv[i], 0);
++i; ++i;
merger.SetProperty(FairMQStandaloneMerger::InputAddress, argv[i], 0); merger.SetProperty(FairMQMerger::InputAddress, argv[i], 0);
++i; ++i;
inputSocketType = ZMQ_SUB; inputSocketType = ZMQ_SUB;
if (strcmp(argv[i], "pull") == 0) { if (strcmp(argv[i], "pull") == 0) {
inputSocketType = ZMQ_PULL; inputSocketType = ZMQ_PULL;
} }
merger.SetProperty(FairMQStandaloneMerger::InputSocketType, inputSocketType, 1); merger.SetProperty(FairMQMerger::InputSocketType, inputSocketType, 1);
++i; ++i;
std::stringstream(argv[i]) >> inputRcvBufSize; std::stringstream(argv[i]) >> inputRcvBufSize;
merger.SetProperty(FairMQStandaloneMerger::InputRcvBufSize, inputRcvBufSize, 1); merger.SetProperty(FairMQMerger::InputRcvBufSize, inputRcvBufSize, 1);
++i; ++i;
merger.SetProperty(FairMQStandaloneMerger::InputMethod, argv[i], 1); merger.SetProperty(FairMQMerger::InputMethod, argv[i], 1);
++i; ++i;
merger.SetProperty(FairMQStandaloneMerger::InputAddress, argv[i], 1); merger.SetProperty(FairMQMerger::InputAddress, argv[i], 1);
++i; ++i;
int outputSocketType = ZMQ_PUB; int outputSocketType = ZMQ_PUB;
if (strcmp(argv[i], "push") == 0) { if (strcmp(argv[i], "push") == 0) {
outputSocketType = ZMQ_PUSH; outputSocketType = ZMQ_PUSH;
} }
merger.SetProperty(FairMQStandaloneMerger::OutputSocketType, outputSocketType, 0); merger.SetProperty(FairMQMerger::OutputSocketType, outputSocketType, 0);
++i; ++i;
int outputSndBufSize; int outputSndBufSize;
std::stringstream(argv[i]) >> outputSndBufSize; std::stringstream(argv[i]) >> outputSndBufSize;
merger.SetProperty(FairMQStandaloneMerger::OutputSndBufSize, outputSndBufSize, 0); merger.SetProperty(FairMQMerger::OutputSndBufSize, outputSndBufSize, 0);
++i; ++i;
merger.SetProperty(FairMQStandaloneMerger::OutputMethod, argv[i], 0); merger.SetProperty(FairMQMerger::OutputMethod, argv[i], 0);
++i; ++i;
merger.SetProperty(FairMQStandaloneMerger::OutputAddress, argv[i], 0); merger.SetProperty(FairMQMerger::OutputAddress, argv[i], 0);
++i; ++i;
merger.ChangeState(FairMQStandaloneMerger::SETOUTPUT); merger.ChangeState(FairMQMerger::SETOUTPUT);
merger.ChangeState(FairMQStandaloneMerger::SETINPUT); merger.ChangeState(FairMQMerger::SETINPUT);
merger.ChangeState(FairMQStandaloneMerger::RUN); merger.ChangeState(FairMQMerger::RUN);
char ch; char ch;
std::cin.get(ch); std::cin.get(ch);
merger.ChangeState(FairMQStandaloneMerger::STOP); merger.ChangeState(FairMQMerger::STOP);
merger.ChangeState(FairMQStandaloneMerger::END); merger.ChangeState(FairMQMerger::END);
return 0; return 0;
} }

View File

@ -1,25 +1,25 @@
/* /**
* runMerger.cxx * runNToOneMerger.cxx
* *
* Created on: Dec 6, 2012 * @since 2012-12-06
* Author: dklein * @author D. Klein, A. Rybalchenko
*/ */
#include <iostream> #include <iostream>
#include <csignal> #include <csignal>
#include "FairMQLogger.h" #include "FairMQLogger.h"
#include "FairMQStandaloneMerger.h" #include "FairMQMerger.h"
FairMQStandaloneMerger merger; FairMQMerger merger;
static void s_signal_handler (int signal) static void s_signal_handler (int signal)
{ {
std::cout << std::endl << "Caught signal " << signal << std::endl; std::cout << std::endl << "Caught signal " << signal << std::endl;
merger.ChangeState(FairMQStandaloneMerger::STOP); merger.ChangeState(FairMQMerger::STOP);
merger.ChangeState(FairMQStandaloneMerger::END); merger.ChangeState(FairMQMerger::END);
std::cout << "Shutdown complete. Bye!" << std::endl; std::cout << "Shutdown complete. Bye!" << std::endl;
exit(1); exit(1);
@ -55,23 +55,23 @@ int main(int argc, char** argv)
int i = 1; int i = 1;
merger.SetProperty(FairMQStandaloneMerger::Id, argv[i]); merger.SetProperty(FairMQMerger::Id, argv[i]);
++i; ++i;
int numIoThreads; int numIoThreads;
std::stringstream(argv[i]) >> numIoThreads; std::stringstream(argv[i]) >> numIoThreads;
merger.SetProperty(FairMQStandaloneMerger::NumIoThreads, numIoThreads); merger.SetProperty(FairMQMerger::NumIoThreads, numIoThreads);
++i; ++i;
int numInputs; int numInputs;
std::stringstream(argv[i]) >> numInputs; std::stringstream(argv[i]) >> numInputs;
merger.SetProperty(FairMQStandaloneMerger::NumInputs, numInputs); merger.SetProperty(FairMQMerger::NumInputs, numInputs);
++i; ++i;
merger.SetProperty(FairMQStandaloneMerger::NumOutputs, 1); merger.SetProperty(FairMQMerger::NumOutputs, 1);
merger.ChangeState(FairMQStandaloneMerger::INIT); merger.ChangeState(FairMQMerger::INIT);
int inputSocketType; int inputSocketType;
@ -80,15 +80,15 @@ int main(int argc, char** argv)
if (strcmp(argv[i], "pull") == 0) { if (strcmp(argv[i], "pull") == 0) {
inputSocketType = ZMQ_PULL; inputSocketType = ZMQ_PULL;
} }
merger.SetProperty(FairMQStandaloneMerger::InputSocketType, inputSocketType, iInput); merger.SetProperty(FairMQMerger::InputSocketType, inputSocketType, iInput);
++i; ++i;
int inputRcvBufSize; int inputRcvBufSize;
std::stringstream(argv[i]) >> inputRcvBufSize; std::stringstream(argv[i]) >> inputRcvBufSize;
merger.SetProperty(FairMQStandaloneMerger::InputRcvBufSize, inputRcvBufSize, iInput); merger.SetProperty(FairMQMerger::InputRcvBufSize, inputRcvBufSize, iInput);
++i; ++i;
merger.SetProperty(FairMQStandaloneMerger::InputMethod, argv[i], iInput); merger.SetProperty(FairMQMerger::InputMethod, argv[i], iInput);
++i; ++i;
merger.SetProperty(FairMQStandaloneMerger::InputAddress, argv[i], iInput); merger.SetProperty(FairMQMerger::InputAddress, argv[i], iInput);
++i; ++i;
} }
@ -96,28 +96,28 @@ int main(int argc, char** argv)
if (strcmp(argv[i], "push") == 0) { if (strcmp(argv[i], "push") == 0) {
outputSocketType = ZMQ_PUSH; outputSocketType = ZMQ_PUSH;
} }
merger.SetProperty(FairMQStandaloneMerger::OutputSocketType, outputSocketType, 0); merger.SetProperty(FairMQMerger::OutputSocketType, outputSocketType, 0);
++i; ++i;
int outputSndBufSize; int outputSndBufSize;
std::stringstream(argv[i]) >> outputSndBufSize; std::stringstream(argv[i]) >> outputSndBufSize;
merger.SetProperty(FairMQStandaloneMerger::OutputSndBufSize, outputSndBufSize, 0); merger.SetProperty(FairMQMerger::OutputSndBufSize, outputSndBufSize, 0);
++i; ++i;
merger.SetProperty(FairMQStandaloneMerger::OutputMethod, argv[i], 0); merger.SetProperty(FairMQMerger::OutputMethod, argv[i], 0);
++i; ++i;
merger.SetProperty(FairMQStandaloneMerger::OutputAddress, argv[i], 0); merger.SetProperty(FairMQMerger::OutputAddress, argv[i], 0);
++i; ++i;
merger.ChangeState(FairMQStandaloneMerger::SETOUTPUT); merger.ChangeState(FairMQMerger::SETOUTPUT);
merger.ChangeState(FairMQStandaloneMerger::SETINPUT); merger.ChangeState(FairMQMerger::SETINPUT);
merger.ChangeState(FairMQStandaloneMerger::RUN); merger.ChangeState(FairMQMerger::RUN);
char ch; char ch;
std::cin.get(ch); std::cin.get(ch);
merger.ChangeState(FairMQStandaloneMerger::STOP); merger.ChangeState(FairMQMerger::STOP);
merger.ChangeState(FairMQStandaloneMerger::END); merger.ChangeState(FairMQMerger::END);
return 0; return 0;
} }

View File

@ -1,25 +1,25 @@
/* /**
* runSplitter.cxx * runOneToNSplitter.cxx
* *
* Created on: Dec 6, 2012 * @since 2012-12-06
* Author: dklein * @author D. Klein, A. Rybalchenko
*/ */
#include <iostream> #include <iostream>
#include <csignal> #include <csignal>
#include "FairMQLogger.h" #include "FairMQLogger.h"
#include "FairMQBalancedStandaloneSplitter.h" #include "FairMQSplitter.h"
FairMQBalancedStandaloneSplitter splitter; FairMQSplitter splitter;
static void s_signal_handler (int signal) static void s_signal_handler (int signal)
{ {
std::cout << std::endl << "Caught signal " << signal << std::endl; std::cout << std::endl << "Caught signal " << signal << std::endl;
splitter.ChangeState(FairMQBalancedStandaloneSplitter::STOP); splitter.ChangeState(FairMQSplitter::STOP);
splitter.ChangeState(FairMQBalancedStandaloneSplitter::END); splitter.ChangeState(FairMQSplitter::END);
std::cout << "Shutdown complete. Bye!" << std::endl; std::cout << "Shutdown complete. Bye!" << std::endl;
exit(1); exit(1);
@ -37,7 +37,7 @@ static void s_catch_signals (void)
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
if ( argc < 16 || (argc-8)%4!=0 ) { //argc{name,id,threads,nout,insock,inbuff,inmet,inadd, ... out} if ( argc < 16 || (argc - 8) % 4 != 0 ) { // argc{ name, id, threads, nout, insock, inbuff, inmet, inadd, ... out}
std::cout << "Usage: splitter \tID numIoTreads numOutputs\n" std::cout << "Usage: splitter \tID numIoTreads numOutputs\n"
<< "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n"
<< "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n"
@ -54,38 +54,38 @@ int main(int argc, char** argv)
int i = 1; int i = 1;
splitter.SetProperty(FairMQBalancedStandaloneSplitter::Id, argv[i]); splitter.SetProperty(FairMQSplitter::Id, argv[i]);
++i; ++i;
int numIoThreads; int numIoThreads;
std::stringstream(argv[i]) >> numIoThreads; std::stringstream(argv[i]) >> numIoThreads;
splitter.SetProperty(FairMQBalancedStandaloneSplitter::NumIoThreads, numIoThreads); splitter.SetProperty(FairMQSplitter::NumIoThreads, numIoThreads);
++i; ++i;
splitter.SetProperty(FairMQBalancedStandaloneSplitter::NumInputs, 1); splitter.SetProperty(FairMQSplitter::NumInputs, 1);
int numOutputs; int numOutputs;
std::stringstream(argv[i]) >> numOutputs; std::stringstream(argv[i]) >> numOutputs;
splitter.SetProperty(FairMQBalancedStandaloneSplitter::NumOutputs, numOutputs); splitter.SetProperty(FairMQSplitter::NumOutputs, numOutputs);
++i; ++i;
splitter.ChangeState(FairMQBalancedStandaloneSplitter::INIT); splitter.ChangeState(FairMQSplitter::INIT);
int inputSocketType = ZMQ_SUB; int inputSocketType = ZMQ_SUB;
if (strcmp(argv[i], "pull") == 0) { if (strcmp(argv[i], "pull") == 0) {
inputSocketType = ZMQ_PULL; inputSocketType = ZMQ_PULL;
} }
splitter.SetProperty(FairMQBalancedStandaloneSplitter::InputSocketType, inputSocketType, 0); splitter.SetProperty(FairMQSplitter::InputSocketType, inputSocketType, 0);
++i; ++i;
int inputRcvBufSize; int inputRcvBufSize;
std::stringstream(argv[i]) >> inputRcvBufSize; std::stringstream(argv[i]) >> inputRcvBufSize;
splitter.SetProperty(FairMQBalancedStandaloneSplitter::InputRcvBufSize, inputRcvBufSize, 0); splitter.SetProperty(FairMQSplitter::InputRcvBufSize, inputRcvBufSize, 0);
++i; ++i;
splitter.SetProperty(FairMQBalancedStandaloneSplitter::InputMethod, argv[i], 0); splitter.SetProperty(FairMQSplitter::InputMethod, argv[i], 0);
++i; ++i;
splitter.SetProperty(FairMQBalancedStandaloneSplitter::InputAddress, argv[i], 0); splitter.SetProperty(FairMQSplitter::InputAddress, argv[i], 0);
++i; ++i;
int outputSocketType; int outputSocketType;
@ -95,27 +95,27 @@ int main(int argc, char** argv)
if (strcmp(argv[i], "push") == 0) { if (strcmp(argv[i], "push") == 0) {
outputSocketType = ZMQ_PUSH; outputSocketType = ZMQ_PUSH;
} }
splitter.SetProperty(FairMQBalancedStandaloneSplitter::OutputSocketType, outputSocketType, iOutput); splitter.SetProperty(FairMQSplitter::OutputSocketType, outputSocketType, iOutput);
++i; ++i;
std::stringstream(argv[i]) >> outputSndBufSize; std::stringstream(argv[i]) >> outputSndBufSize;
splitter.SetProperty(FairMQBalancedStandaloneSplitter::OutputSndBufSize, outputSndBufSize, iOutput); splitter.SetProperty(FairMQSplitter::OutputSndBufSize, outputSndBufSize, iOutput);
++i; ++i;
splitter.SetProperty(FairMQBalancedStandaloneSplitter::OutputMethod, argv[i], iOutput); splitter.SetProperty(FairMQSplitter::OutputMethod, argv[i], iOutput);
++i; ++i;
splitter.SetProperty(FairMQBalancedStandaloneSplitter::OutputAddress, argv[i], iOutput); splitter.SetProperty(FairMQSplitter::OutputAddress, argv[i], iOutput);
++i; ++i;
} }
splitter.ChangeState(FairMQBalancedStandaloneSplitter::SETOUTPUT); splitter.ChangeState(FairMQSplitter::SETOUTPUT);
splitter.ChangeState(FairMQBalancedStandaloneSplitter::SETINPUT); splitter.ChangeState(FairMQSplitter::SETINPUT);
splitter.ChangeState(FairMQBalancedStandaloneSplitter::RUN); splitter.ChangeState(FairMQSplitter::RUN);
char ch; char ch;
std::cin.get(ch); std::cin.get(ch);
splitter.ChangeState(FairMQBalancedStandaloneSplitter::STOP); splitter.ChangeState(FairMQSplitter::STOP);
splitter.ChangeState(FairMQBalancedStandaloneSplitter::END); splitter.ChangeState(FairMQSplitter::END);
return 0; return 0;
} }

View File

@ -1,8 +1,8 @@
/** /**
* runProxy.cxx * runProxy.cxx
* *
* @since: Oct 07, 2013 * @since 2013-10-07
* @authors: A. Rybalchenko * @author A. Rybalchenko
*/ */
#include <iostream> #include <iostream>

View File

@ -1,8 +1,8 @@
/* /**
* runSink.cxx * runSink.cxx
* *
* @since: Jan 21, 2013 * @since 2013-01-21
* @author: dklein * @author D. Klein, A. Rybalchenko
*/ */
#include <iostream> #include <iostream>

View File

@ -1,25 +1,25 @@
/* /**
* runSplitter.cxx * runSplitter.cxx
* *
* Created on: Dec 6, 2012 * @since 2012-12-06
* Author: dklein * @author D. Klein, A. Rybalchenko
*/ */
#include <iostream> #include <iostream>
#include <csignal> #include <csignal>
#include "FairMQLogger.h" #include "FairMQLogger.h"
#include "FairMQBalancedStandaloneSplitter.h" #include "FairMQSplitter.h"
FairMQBalancedStandaloneSplitter splitter; FairMQSplitter splitter;
static void s_signal_handler (int signal) static void s_signal_handler (int signal)
{ {
std::cout << std::endl << "Caught signal " << signal << std::endl; std::cout << std::endl << "Caught signal " << signal << std::endl;
splitter.ChangeState(FairMQBalancedStandaloneSplitter::STOP); splitter.ChangeState(FairMQSplitter::STOP);
splitter.ChangeState(FairMQBalancedStandaloneSplitter::END); splitter.ChangeState(FairMQSplitter::END);
std::cout << "Shutdown complete. Bye!" << std::endl; std::cout << "Shutdown complete. Bye!" << std::endl;
exit(1); exit(1);
@ -53,76 +53,76 @@ int main(int argc, char** argv)
int i = 1; int i = 1;
splitter.SetProperty(FairMQBalancedStandaloneSplitter::Id, argv[i]); splitter.SetProperty(FairMQSplitter::Id, argv[i]);
++i; ++i;
int numIoThreads; int numIoThreads;
std::stringstream(argv[i]) >> numIoThreads; std::stringstream(argv[i]) >> numIoThreads;
splitter.SetProperty(FairMQBalancedStandaloneSplitter::NumIoThreads, numIoThreads); splitter.SetProperty(FairMQSplitter::NumIoThreads, numIoThreads);
++i; ++i;
splitter.SetProperty(FairMQBalancedStandaloneSplitter::NumInputs, 1); splitter.SetProperty(FairMQSplitter::NumInputs, 1);
splitter.SetProperty(FairMQBalancedStandaloneSplitter::NumOutputs, 2); splitter.SetProperty(FairMQSplitter::NumOutputs, 2);
splitter.ChangeState(FairMQBalancedStandaloneSplitter::INIT); splitter.ChangeState(FairMQSplitter::INIT);
int inputSocketType = ZMQ_SUB; int inputSocketType = ZMQ_SUB;
if (strcmp(argv[i], "pull") == 0) { if (strcmp(argv[i], "pull") == 0) {
inputSocketType = ZMQ_PULL; inputSocketType = ZMQ_PULL;
} }
splitter.SetProperty(FairMQBalancedStandaloneSplitter::InputSocketType, inputSocketType, 0); splitter.SetProperty(FairMQSplitter::InputSocketType, inputSocketType, 0);
++i; ++i;
int inputRcvBufSize; int inputRcvBufSize;
std::stringstream(argv[i]) >> inputRcvBufSize; std::stringstream(argv[i]) >> inputRcvBufSize;
splitter.SetProperty(FairMQBalancedStandaloneSplitter::InputRcvBufSize, inputRcvBufSize, 0); splitter.SetProperty(FairMQSplitter::InputRcvBufSize, inputRcvBufSize, 0);
++i; ++i;
splitter.SetProperty(FairMQBalancedStandaloneSplitter::InputMethod, argv[i], 0); splitter.SetProperty(FairMQSplitter::InputMethod, argv[i], 0);
++i; ++i;
splitter.SetProperty(FairMQBalancedStandaloneSplitter::InputAddress, argv[i], 0); splitter.SetProperty(FairMQSplitter::InputAddress, argv[i], 0);
++i; ++i;
int outputSocketType = ZMQ_PUB; int outputSocketType = ZMQ_PUB;
if (strcmp(argv[i], "push") == 0) { if (strcmp(argv[i], "push") == 0) {
outputSocketType = ZMQ_PUSH; outputSocketType = ZMQ_PUSH;
} }
splitter.SetProperty(FairMQBalancedStandaloneSplitter::OutputSocketType, outputSocketType, 0); splitter.SetProperty(FairMQSplitter::OutputSocketType, outputSocketType, 0);
++i; ++i;
int outputSndBufSize; int outputSndBufSize;
std::stringstream(argv[i]) >> outputSndBufSize; std::stringstream(argv[i]) >> outputSndBufSize;
splitter.SetProperty(FairMQBalancedStandaloneSplitter::OutputSndBufSize, outputSndBufSize, 0); splitter.SetProperty(FairMQSplitter::OutputSndBufSize, outputSndBufSize, 0);
++i; ++i;
splitter.SetProperty(FairMQBalancedStandaloneSplitter::OutputMethod, argv[i], 0); splitter.SetProperty(FairMQSplitter::OutputMethod, argv[i], 0);
++i; ++i;
splitter.SetProperty(FairMQBalancedStandaloneSplitter::OutputAddress, argv[i], 0); splitter.SetProperty(FairMQSplitter::OutputAddress, argv[i], 0);
++i; ++i;
outputSocketType = ZMQ_PUB; outputSocketType = ZMQ_PUB;
if (strcmp(argv[i], "push") == 0) { if (strcmp(argv[i], "push") == 0) {
outputSocketType = ZMQ_PUSH; outputSocketType = ZMQ_PUSH;
} }
splitter.SetProperty(FairMQBalancedStandaloneSplitter::OutputSocketType, outputSocketType, 1); splitter.SetProperty(FairMQSplitter::OutputSocketType, outputSocketType, 1);
++i; ++i;
std::stringstream(argv[i]) >> outputSndBufSize; std::stringstream(argv[i]) >> outputSndBufSize;
splitter.SetProperty(FairMQBalancedStandaloneSplitter::OutputSndBufSize, outputSndBufSize, 1); splitter.SetProperty(FairMQSplitter::OutputSndBufSize, outputSndBufSize, 1);
++i; ++i;
splitter.SetProperty(FairMQBalancedStandaloneSplitter::OutputMethod, argv[i], 1); splitter.SetProperty(FairMQSplitter::OutputMethod, argv[i], 1);
++i; ++i;
splitter.SetProperty(FairMQBalancedStandaloneSplitter::OutputAddress, argv[i], 1); splitter.SetProperty(FairMQSplitter::OutputAddress, argv[i], 1);
++i; ++i;
splitter.ChangeState(FairMQBalancedStandaloneSplitter::SETOUTPUT); splitter.ChangeState(FairMQSplitter::SETOUTPUT);
splitter.ChangeState(FairMQBalancedStandaloneSplitter::SETINPUT); splitter.ChangeState(FairMQSplitter::SETINPUT);
splitter.ChangeState(FairMQBalancedStandaloneSplitter::RUN); splitter.ChangeState(FairMQSplitter::RUN);
char ch; char ch;
std::cin.get(ch); std::cin.get(ch);
splitter.ChangeState(FairMQBalancedStandaloneSplitter::STOP); splitter.ChangeState(FairMQSplitter::STOP);
splitter.ChangeState(FairMQBalancedStandaloneSplitter::END); splitter.ChangeState(FairMQSplitter::END);
return 0; return 0;
} }