mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 08:41:16 +00:00
Zero MQ implimentation and example (Tutorial3)
git-svn-id: https://subversion.gsi.de/fairroot/fairbase/trunk@20162 0381ead4-6506-0410-b988-94b70fbc4730
This commit is contained in:
commit
231c7c8f7e
94
fairmq/CMakeLists.txt
Normal file
94
fairmq/CMakeLists.txt
Normal file
|
@ -0,0 +1,94 @@
|
||||||
|
INCLUDE_DIRECTORIES(
|
||||||
|
${BASE_INCLUDE_DIRECTORIES}
|
||||||
|
${CMAKE_SOURCE_DIR}/fairmq
|
||||||
|
${ZMQ_INCLUDE_DIR}
|
||||||
|
${ROOT_INCLUDE_DIR}
|
||||||
|
)
|
||||||
|
|
||||||
|
include_directories(${INCLUDE_DIRECTORIES})
|
||||||
|
|
||||||
|
SET(HEADERS
|
||||||
|
"FairMQSampler.h"
|
||||||
|
"FairMQBenchmarkSampler.h"
|
||||||
|
#"FairMQStateMachine.h"
|
||||||
|
"FairMQConfigurable.h"
|
||||||
|
"FairMQDevice.h"
|
||||||
|
"FairMQBuffer.h"
|
||||||
|
"FairMQSamplerTask.h"
|
||||||
|
"FairMQLogger.h"
|
||||||
|
"FairMQContext.h"
|
||||||
|
"FairMQMessage.h"
|
||||||
|
"FairMQSocket.h"
|
||||||
|
"FairMQBalancedStandaloneSplitter.h"
|
||||||
|
"FairMQStandaloneMerger.h"
|
||||||
|
"FairMQProcessor.h"
|
||||||
|
"FairMQProcessorTask.h"
|
||||||
|
"FairMQSink.h"
|
||||||
|
)
|
||||||
|
|
||||||
|
SET(SOURCES
|
||||||
|
"FairMQSampler.cxx"
|
||||||
|
"FairMQBenchmarkSampler.cxx"
|
||||||
|
#"FairMQStateMachine.cxx"
|
||||||
|
"FairMQConfigurable.cxx"
|
||||||
|
"FairMQBuffer.cxx"
|
||||||
|
"FairMQSamplerTask.cxx"
|
||||||
|
"FairMQLogger.cxx"
|
||||||
|
"FairMQContext.cxx"
|
||||||
|
"FairMQMessage.cxx"
|
||||||
|
"FairMQSocket.cxx"
|
||||||
|
"FairMQBalancedStandaloneSplitter.cxx"
|
||||||
|
"FairMQStandaloneMerger.cxx"
|
||||||
|
"FairMQProcessor.cxx"
|
||||||
|
"FairMQProcessorTask.cxx"
|
||||||
|
"FairMQSink.cxx"
|
||||||
|
"FairMQDevice.cxx"
|
||||||
|
)
|
||||||
|
|
||||||
|
set(LINK_DIRECTORIES
|
||||||
|
${ROOT_LIBRARY_DIR}
|
||||||
|
)
|
||||||
|
|
||||||
|
link_directories( ${LINK_DIRECTORIES})
|
||||||
|
|
||||||
|
SET(LINK_LIBRARIES
|
||||||
|
${CMAKE_THREAD_LIBS_INIT}
|
||||||
|
${ZMQ_LIBRARY_SHARED}
|
||||||
|
${ROOT_LIBRARIES}
|
||||||
|
Base ParBase FairTools GeoBase
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
ADD_LIBRARY(FairMQ SHARED ${HEADERS} ${SOURCES})
|
||||||
|
SET_TARGET_PROPERTIES(FairMQ PROPERTIES ${FAIRROOT_LIBRARY_PROPERTIES})
|
||||||
|
TARGET_LINK_LIBRARIES(FairMQ ${LINK_LIBRARIES})
|
||||||
|
|
||||||
|
#Set(LIBRARY_NAME FairMQ)
|
||||||
|
|
||||||
|
#GENERATE_LIBRARY()
|
||||||
|
|
||||||
|
ADD_LIBRARY(FairMQStatic STATIC ${HEADERS} ${SOURCES})
|
||||||
|
#SET_TARGET_PROPERTIES(FairMQStatic PROPERTIES OUTPUT_NAME FairMQ)
|
||||||
|
TARGET_LINK_LIBRARIES(FairMQStatic ${LINK_LIBRARIES})
|
||||||
|
|
||||||
|
ADD_EXECUTABLE(RunBenchmarkSampler runBenchmarkSampler.cxx)
|
||||||
|
SET_TARGET_PROPERTIES(RunBenchmarkSampler PROPERTIES OUTPUT_NAME bsampler)
|
||||||
|
TARGET_LINK_LIBRARIES(RunBenchmarkSampler FairMQ)
|
||||||
|
|
||||||
|
ADD_EXECUTABLE(RunBuffer runBuffer.cxx)
|
||||||
|
SET_TARGET_PROPERTIES(RunBuffer PROPERTIES OUTPUT_NAME buffer)
|
||||||
|
TARGET_LINK_LIBRARIES(RunBuffer FairMQ)
|
||||||
|
|
||||||
|
ADD_EXECUTABLE(RunSplitter runSplitter.cxx)
|
||||||
|
SET_TARGET_PROPERTIES(RunSplitter PROPERTIES OUTPUT_NAME splitter)
|
||||||
|
TARGET_LINK_LIBRARIES(RunSplitter FairMQ)
|
||||||
|
|
||||||
|
ADD_EXECUTABLE(RunMerger runMerger.cxx)
|
||||||
|
SET_TARGET_PROPERTIES(RunMerger PROPERTIES OUTPUT_NAME merger)
|
||||||
|
TARGET_LINK_LIBRARIES(RunMerger FairMQ)
|
||||||
|
|
||||||
|
ADD_EXECUTABLE(RunSink runSink.cxx)
|
||||||
|
SET_TARGET_PROPERTIES(RunSink PROPERTIES OUTPUT_NAME sink)
|
||||||
|
TARGET_LINK_LIBRARIES(RunSink FairMQ)
|
||||||
|
|
||||||
|
|
57
fairmq/FairMQBalancedStandaloneSplitter.cxx
Normal file
57
fairmq/FairMQBalancedStandaloneSplitter.cxx
Normal file
|
@ -0,0 +1,57 @@
|
||||||
|
/*
|
||||||
|
* FairMQBalancedStandaloneSplitter.cxx
|
||||||
|
*
|
||||||
|
* Created on: Dec 6, 2012
|
||||||
|
* Author: dklein
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "FairMQBalancedStandaloneSplitter.h"
|
||||||
|
|
||||||
|
#include "FairMQLogger.h"
|
||||||
|
|
||||||
|
FairMQBalancedStandaloneSplitter::FairMQBalancedStandaloneSplitter()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
FairMQBalancedStandaloneSplitter::~FairMQBalancedStandaloneSplitter()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQBalancedStandaloneSplitter::Run()
|
||||||
|
{
|
||||||
|
void* status; //necessary for pthread_join
|
||||||
|
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Run <<<<<<<");
|
||||||
|
|
||||||
|
pthread_t logger;
|
||||||
|
pthread_create(&logger, NULL, &FairMQDevice::callLogSocketRates, this);
|
||||||
|
|
||||||
|
// Initialize poll set
|
||||||
|
zmq_pollitem_t items[] = {
|
||||||
|
{ *(fPayloadInputs->at(0)->GetSocket()), 0, ZMQ_POLLIN, 0 }
|
||||||
|
};
|
||||||
|
|
||||||
|
Bool_t received = false;
|
||||||
|
Bool_t direction = false;
|
||||||
|
while (true) {
|
||||||
|
FairMQMessage msg;
|
||||||
|
|
||||||
|
zmq_poll(items, 1, -1);
|
||||||
|
|
||||||
|
if (items[0].revents & ZMQ_POLLIN) {
|
||||||
|
received = fPayloadInputs->at(0)->Receive(&msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (received) {
|
||||||
|
if (direction) {
|
||||||
|
fPayloadOutputs->at(0)->Send(&msg);
|
||||||
|
} else {
|
||||||
|
fPayloadOutputs->at(1)->Send(&msg);
|
||||||
|
}
|
||||||
|
direction = !direction;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pthread_join(logger, &status);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
23
fairmq/FairMQBalancedStandaloneSplitter.h
Normal file
23
fairmq/FairMQBalancedStandaloneSplitter.h
Normal file
|
@ -0,0 +1,23 @@
|
||||||
|
/*
|
||||||
|
* FairMQBalancedStandaloneSplitter.h
|
||||||
|
*
|
||||||
|
* Created on: Dec 6, 2012
|
||||||
|
* Author: dklein
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef FAIRMQBALANCEDSTANDALONESPLITTER_H_
|
||||||
|
#define FAIRMQBALANCEDSTANDALONESPLITTER_H_
|
||||||
|
|
||||||
|
#include "FairMQDevice.h"
|
||||||
|
#include "Rtypes.h"
|
||||||
|
|
||||||
|
|
||||||
|
class FairMQBalancedStandaloneSplitter: public FairMQDevice
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
FairMQBalancedStandaloneSplitter();
|
||||||
|
virtual ~FairMQBalancedStandaloneSplitter();
|
||||||
|
virtual void Run();
|
||||||
|
};
|
||||||
|
|
||||||
|
#endif /* FAIRMQBALANCEDSTANDALONESPLITTER_H_ */
|
151
fairmq/FairMQBenchmarkSampler.cxx
Normal file
151
fairmq/FairMQBenchmarkSampler.cxx
Normal file
|
@ -0,0 +1,151 @@
|
||||||
|
/*
|
||||||
|
* FairMQBenchmarkSampler.cpp
|
||||||
|
*
|
||||||
|
* Created on: Apr 23, 2013
|
||||||
|
* Author: dklein
|
||||||
|
*/
|
||||||
|
#include <vector>
|
||||||
|
#include <unistd.h>
|
||||||
|
|
||||||
|
#include "FairMQBenchmarkSampler.h"
|
||||||
|
#include "FairMQLogger.h"
|
||||||
|
|
||||||
|
FairMQBenchmarkSampler::FairMQBenchmarkSampler() :
|
||||||
|
fEventSize(10000),
|
||||||
|
fEventRate(1),
|
||||||
|
fEventCounter(0)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
FairMQBenchmarkSampler::~FairMQBenchmarkSampler()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQBenchmarkSampler::Init()
|
||||||
|
{
|
||||||
|
FairMQDevice::Init();
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQBenchmarkSampler::Run()
|
||||||
|
{
|
||||||
|
void* status; //necessary for pthread_join
|
||||||
|
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Run <<<<<<<");
|
||||||
|
usleep(1000000);
|
||||||
|
|
||||||
|
pthread_t logger;
|
||||||
|
pthread_create(&logger, NULL, &FairMQDevice::callLogSocketRates, this);
|
||||||
|
|
||||||
|
pthread_t resetEventCounter;
|
||||||
|
pthread_create(&resetEventCounter, NULL, &FairMQBenchmarkSampler::callResetEventCounter, this);
|
||||||
|
|
||||||
|
void* buffer = operator new[](fEventSize);
|
||||||
|
FairMQMessage* base_event = new FairMQMessage(buffer, fEventSize, NULL);
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
FairMQMessage event;
|
||||||
|
event.Copy(base_event);
|
||||||
|
|
||||||
|
fPayloadOutputs->at(0)->Send(&event);
|
||||||
|
|
||||||
|
--fEventCounter;
|
||||||
|
|
||||||
|
while (fEventCounter == 0) {
|
||||||
|
usleep(1000);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
delete base_event;
|
||||||
|
|
||||||
|
pthread_join(logger, &status);
|
||||||
|
pthread_join(resetEventCounter, &status);
|
||||||
|
}
|
||||||
|
|
||||||
|
void* FairMQBenchmarkSampler::ResetEventCounter()
|
||||||
|
{
|
||||||
|
while (true) {
|
||||||
|
fEventCounter = fEventRate / 100;
|
||||||
|
|
||||||
|
usleep(10000);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQBenchmarkSampler::Log(Int_t intervalInMs)
|
||||||
|
{
|
||||||
|
timestamp_t t0;
|
||||||
|
timestamp_t t1;
|
||||||
|
ULong_t bytes = fPayloadOutputs->at(0)->GetBytesTx();
|
||||||
|
ULong_t messages = fPayloadOutputs->at(0)->GetMessagesTx();
|
||||||
|
ULong_t bytesNew;
|
||||||
|
ULong_t messagesNew;
|
||||||
|
Double_t megabytesPerSecond = (bytesNew - bytes) / (1024 * 1024);
|
||||||
|
Double_t messagesPerSecond = (messagesNew - messages);
|
||||||
|
|
||||||
|
t0 = get_timestamp();
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
usleep(intervalInMs * 1000);
|
||||||
|
|
||||||
|
t1 = get_timestamp();
|
||||||
|
|
||||||
|
bytesNew = fPayloadOutputs->at(0)->GetBytesTx();
|
||||||
|
messagesNew = fPayloadOutputs->at(0)->GetMessagesTx();
|
||||||
|
|
||||||
|
timestamp_t timeSinceLastLog_ms = (t1 - t0) / 1000.0L;
|
||||||
|
|
||||||
|
megabytesPerSecond = ((Double_t) (bytesNew - bytes) / (1024. * 1024.)) / (Double_t) timeSinceLastLog_ms * 1000.;
|
||||||
|
messagesPerSecond = (Double_t) (messagesNew - messages) / (Double_t) timeSinceLastLog_ms * 1000.;
|
||||||
|
|
||||||
|
std::stringstream logmsg;
|
||||||
|
logmsg << "send " << messagesPerSecond << " msg/s, " << megabytesPerSecond << " MB/s";
|
||||||
|
FairMQLogger::GetInstance()->Log(FairMQLogger::DEBUG, logmsg.str());
|
||||||
|
|
||||||
|
bytes = bytesNew;
|
||||||
|
messages = messagesNew;
|
||||||
|
t0 = t1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQBenchmarkSampler::SetProperty(Int_t key, TString value, Int_t slot/*= 0*/)
|
||||||
|
{
|
||||||
|
switch (key) {
|
||||||
|
default:
|
||||||
|
FairMQDevice::SetProperty(key, value, slot);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
TString FairMQBenchmarkSampler::GetProperty(Int_t key, TString default_/*= ""*/, Int_t slot/*= 0*/)
|
||||||
|
{
|
||||||
|
switch (key) {
|
||||||
|
default:
|
||||||
|
return FairMQDevice::GetProperty(key, default_, slot);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQBenchmarkSampler::SetProperty(Int_t key, Int_t value, Int_t slot/*= 0*/)
|
||||||
|
{
|
||||||
|
switch (key) {
|
||||||
|
case EventSize:
|
||||||
|
fEventSize = value;
|
||||||
|
break;
|
||||||
|
case EventRate:
|
||||||
|
fEventRate = value;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
FairMQDevice::SetProperty(key, value, slot);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Int_t FairMQBenchmarkSampler::GetProperty(Int_t key, Int_t default_/*= 0*/, Int_t slot/*= 0*/)
|
||||||
|
{
|
||||||
|
switch (key) {
|
||||||
|
case EventSize:
|
||||||
|
return fEventSize;
|
||||||
|
case EventRate:
|
||||||
|
return fEventRate;
|
||||||
|
default:
|
||||||
|
return FairMQDevice::GetProperty(key, default_, slot);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
45
fairmq/FairMQBenchmarkSampler.h
Normal file
45
fairmq/FairMQBenchmarkSampler.h
Normal file
|
@ -0,0 +1,45 @@
|
||||||
|
/*
|
||||||
|
* FairMQBenchmarkSampler.h
|
||||||
|
*
|
||||||
|
* Created on: Apr 23, 2013
|
||||||
|
* Author: dklein
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef FAIRMQBENCHMARKSAMPLER_H_
|
||||||
|
#define FAIRMQBENCHMARKSAMPLER_H_
|
||||||
|
|
||||||
|
#include <string>
|
||||||
|
#include "FairMQDevice.h"
|
||||||
|
#include "TString.h"
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sampler to generate traffic for benchmarking.
|
||||||
|
*/
|
||||||
|
|
||||||
|
class FairMQBenchmarkSampler: public FairMQDevice
|
||||||
|
{
|
||||||
|
protected:
|
||||||
|
Int_t fEventSize;
|
||||||
|
Int_t fEventRate;
|
||||||
|
Int_t fEventCounter;
|
||||||
|
public:
|
||||||
|
enum {
|
||||||
|
InputFile = FairMQDevice::Last,
|
||||||
|
EventRate,
|
||||||
|
EventSize,
|
||||||
|
Last
|
||||||
|
};
|
||||||
|
FairMQBenchmarkSampler();
|
||||||
|
virtual ~FairMQBenchmarkSampler();
|
||||||
|
virtual void Init();
|
||||||
|
virtual void Run();
|
||||||
|
void Log(Int_t intervalInMs);
|
||||||
|
void* ResetEventCounter();
|
||||||
|
static void* callResetEventCounter(void* arg) { return ((FairMQBenchmarkSampler*)arg)->ResetEventCounter(); }
|
||||||
|
virtual void SetProperty(Int_t key, TString value, Int_t slot = 0);
|
||||||
|
virtual TString GetProperty(Int_t key, TString default_ = "", Int_t slot = 0);
|
||||||
|
virtual void SetProperty(Int_t key, Int_t value, Int_t slot = 0);
|
||||||
|
virtual Int_t GetProperty(Int_t key, Int_t default_ = 0, Int_t slot = 0);
|
||||||
|
};
|
||||||
|
|
||||||
|
#endif /* FAIRMQBENCHMARKSAMPLER_H_ */
|
51
fairmq/FairMQBuffer.cxx
Normal file
51
fairmq/FairMQBuffer.cxx
Normal file
|
@ -0,0 +1,51 @@
|
||||||
|
/*
|
||||||
|
* FairMQBuffer.cxx
|
||||||
|
*
|
||||||
|
* Created on: Oct 25, 2012
|
||||||
|
* Author: dklein
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "FairMQBuffer.h"
|
||||||
|
#include <iostream>
|
||||||
|
#include "FairMQLogger.h"
|
||||||
|
|
||||||
|
|
||||||
|
FairMQBuffer::FairMQBuffer()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQBuffer::Run()
|
||||||
|
{
|
||||||
|
void* status; //necessary for pthread_join
|
||||||
|
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Run <<<<<<<");
|
||||||
|
|
||||||
|
pthread_t logger;
|
||||||
|
pthread_create(&logger, NULL, &FairMQDevice::callLogSocketRates, this);
|
||||||
|
|
||||||
|
// Initialize poll set
|
||||||
|
zmq_pollitem_t items[] = {
|
||||||
|
{ *(fPayloadInputs->at(0)->GetSocket()), 0, ZMQ_POLLIN, 0 }
|
||||||
|
};
|
||||||
|
|
||||||
|
Bool_t received = false;
|
||||||
|
while (true) {
|
||||||
|
FairMQMessage msg;
|
||||||
|
|
||||||
|
zmq_poll(items, 1, -1);
|
||||||
|
|
||||||
|
if (items[0].revents & ZMQ_POLLIN) {
|
||||||
|
received = fPayloadInputs->at(0)->Receive(&msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (received) {
|
||||||
|
fPayloadOutputs->at(0)->Send(&msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pthread_join(logger, &status);
|
||||||
|
}
|
||||||
|
|
||||||
|
FairMQBuffer::~FairMQBuffer()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
24
fairmq/FairMQBuffer.h
Normal file
24
fairmq/FairMQBuffer.h
Normal file
|
@ -0,0 +1,24 @@
|
||||||
|
/*
|
||||||
|
* FairMQBuffer.h
|
||||||
|
*
|
||||||
|
* Created on: Oct 25, 2012
|
||||||
|
* Author: dklein
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef FAIRMQBUFFER_H_
|
||||||
|
#define FAIRMQBUFFER_H_
|
||||||
|
|
||||||
|
#include "FairMQDevice.h"
|
||||||
|
#include "Rtypes.h"
|
||||||
|
|
||||||
|
|
||||||
|
class FairMQBuffer: public FairMQDevice
|
||||||
|
{
|
||||||
|
private:
|
||||||
|
public:
|
||||||
|
FairMQBuffer();
|
||||||
|
virtual void Run();
|
||||||
|
virtual ~FairMQBuffer();
|
||||||
|
};
|
||||||
|
|
||||||
|
#endif /* FAIRMQBUFFER_H_ */
|
37
fairmq/FairMQConfigurable.cxx
Normal file
37
fairmq/FairMQConfigurable.cxx
Normal file
|
@ -0,0 +1,37 @@
|
||||||
|
/*
|
||||||
|
* FairMQConfigurable.cxx
|
||||||
|
*
|
||||||
|
* Created on: Oct 25, 2012
|
||||||
|
* Author: dklein
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "FairMQConfigurable.h"
|
||||||
|
|
||||||
|
|
||||||
|
FairMQConfigurable::FairMQConfigurable()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQConfigurable::SetProperty(Int_t key, TString value, Int_t slot/*= 0*/)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
TString FairMQConfigurable::GetProperty(Int_t key, TString default_/*= ""*/, Int_t slot/*= 0*/)
|
||||||
|
{
|
||||||
|
return default_;
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQConfigurable::SetProperty(Int_t key, Int_t value, Int_t slot/*= 0*/)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
Int_t FairMQConfigurable::GetProperty(Int_t key, Int_t default_/*= 0*/, Int_t slot/*= 0*/)
|
||||||
|
{
|
||||||
|
return default_;
|
||||||
|
}
|
||||||
|
|
||||||
|
FairMQConfigurable::~FairMQConfigurable()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
|
29
fairmq/FairMQConfigurable.h
Normal file
29
fairmq/FairMQConfigurable.h
Normal file
|
@ -0,0 +1,29 @@
|
||||||
|
/*
|
||||||
|
* FairMQConfigurable.h
|
||||||
|
*
|
||||||
|
* Created on: Oct 25, 2012
|
||||||
|
* Author: dklein
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef FAIRMQCONFIGURABLE_H_
|
||||||
|
#define FAIRMQCONFIGURABLE_H_
|
||||||
|
|
||||||
|
#include "Rtypes.h"
|
||||||
|
#include "TString.h"
|
||||||
|
|
||||||
|
|
||||||
|
class FairMQConfigurable
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
enum {
|
||||||
|
Last = 1
|
||||||
|
};
|
||||||
|
FairMQConfigurable();
|
||||||
|
virtual void SetProperty(Int_t key, TString value, Int_t slot = 0);
|
||||||
|
virtual TString GetProperty(Int_t key, TString default_ = "", Int_t slot = 0);
|
||||||
|
virtual void SetProperty(Int_t key, Int_t value, Int_t slot = 0);
|
||||||
|
virtual Int_t GetProperty(Int_t key, Int_t default_ = 0, Int_t slot = 0);
|
||||||
|
virtual ~FairMQConfigurable();
|
||||||
|
};
|
||||||
|
|
||||||
|
#endif /* FAIRMQCONFIGURABLE_H_ */
|
44
fairmq/FairMQContext.cxx
Normal file
44
fairmq/FairMQContext.cxx
Normal file
|
@ -0,0 +1,44 @@
|
||||||
|
/*
|
||||||
|
* FairMQContext.cxx
|
||||||
|
*
|
||||||
|
* Created on: Dec 5, 2012
|
||||||
|
* Author: dklein
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "FairMQContext.h"
|
||||||
|
#include <sstream>
|
||||||
|
|
||||||
|
|
||||||
|
const TString FairMQContext::PAYLOAD = "payload";
|
||||||
|
const TString FairMQContext::LOG = "log";
|
||||||
|
const TString FairMQContext::CONFIG = "config";
|
||||||
|
const TString FairMQContext::CONTROL = "control";
|
||||||
|
|
||||||
|
FairMQContext::FairMQContext(TString deviceId, TString contextId, Int_t numIoThreads)
|
||||||
|
{
|
||||||
|
std::stringstream id;
|
||||||
|
id << deviceId << "." << contextId;
|
||||||
|
fId = id.str();
|
||||||
|
|
||||||
|
fContext = new zmq::context_t(numIoThreads);
|
||||||
|
}
|
||||||
|
|
||||||
|
FairMQContext::~FairMQContext()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
TString FairMQContext::GetId()
|
||||||
|
{
|
||||||
|
return fId;
|
||||||
|
}
|
||||||
|
|
||||||
|
zmq::context_t* FairMQContext::GetContext()
|
||||||
|
{
|
||||||
|
return fContext;
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQContext::Close()
|
||||||
|
{
|
||||||
|
fContext->close();
|
||||||
|
}
|
||||||
|
|
31
fairmq/FairMQContext.h
Normal file
31
fairmq/FairMQContext.h
Normal file
|
@ -0,0 +1,31 @@
|
||||||
|
/*
|
||||||
|
* FairMQContext.h
|
||||||
|
*
|
||||||
|
* Created on: Dec 5, 2012
|
||||||
|
* Author: dklein
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef FAIRMQCONTEXT_H_
|
||||||
|
#define FAIRMQCONTEXT_H_
|
||||||
|
|
||||||
|
#include <string>
|
||||||
|
#include <zmq.hpp>
|
||||||
|
#include "Rtypes.h"
|
||||||
|
#include "TString.h"
|
||||||
|
|
||||||
|
|
||||||
|
class FairMQContext
|
||||||
|
{
|
||||||
|
private:
|
||||||
|
TString fId;
|
||||||
|
zmq::context_t* fContext;
|
||||||
|
public:
|
||||||
|
const static TString PAYLOAD, LOG, CONFIG, CONTROL;
|
||||||
|
FairMQContext(TString deviceId, TString contextId, Int_t numIoThreads);
|
||||||
|
virtual ~FairMQContext();
|
||||||
|
TString GetId();
|
||||||
|
zmq::context_t* GetContext();
|
||||||
|
void Close();
|
||||||
|
};
|
||||||
|
|
||||||
|
#endif /* FAIRMQCONTEXT_H_ */
|
341
fairmq/FairMQDevice.cxx
Normal file
341
fairmq/FairMQDevice.cxx
Normal file
|
@ -0,0 +1,341 @@
|
||||||
|
/**
|
||||||
|
* FairMQDevice.cxx
|
||||||
|
*
|
||||||
|
* @since Oct 25, 2012
|
||||||
|
* @authors: D. Klein
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "FairMQSocket.h"
|
||||||
|
#include "FairMQDevice.h"
|
||||||
|
|
||||||
|
#include "FairMQLogger.h"
|
||||||
|
|
||||||
|
FairMQDevice::FairMQDevice() :
|
||||||
|
fId(""),
|
||||||
|
fNumIoThreads(1),
|
||||||
|
fPayloadContext(NULL),
|
||||||
|
fPayloadInputs(new std::vector<FairMQSocket*>()),
|
||||||
|
fPayloadOutputs(new std::vector<FairMQSocket*>()),
|
||||||
|
fLogIntervalInMs(1000)
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQDevice::Init()
|
||||||
|
{
|
||||||
|
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Init <<<<<<<");
|
||||||
|
std::stringstream logmsg;
|
||||||
|
logmsg << "numIoThreads: " << fNumIoThreads;
|
||||||
|
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
|
||||||
|
|
||||||
|
fPayloadContext = new FairMQContext(fId, FairMQContext::PAYLOAD, fNumIoThreads);
|
||||||
|
|
||||||
|
fBindAddress = new std::vector<TString>(fNumOutputs);
|
||||||
|
fBindSocketType = new std::vector<Int_t>();
|
||||||
|
fBindSndBufferSize = new std::vector<Int_t>();
|
||||||
|
fBindRcvBufferSize = new std::vector<Int_t>();
|
||||||
|
|
||||||
|
for (Int_t i = 0; i < fNumOutputs; ++i) {
|
||||||
|
fBindSocketType->push_back(ZMQ_PUB);
|
||||||
|
fBindSndBufferSize->push_back(10000);
|
||||||
|
fBindRcvBufferSize->push_back(10000);
|
||||||
|
}
|
||||||
|
|
||||||
|
fConnectAddress = new std::vector<TString>(fNumInputs);
|
||||||
|
fConnectSocketType = new std::vector<Int_t>();
|
||||||
|
fConnectSndBufferSize = new std::vector<Int_t>();
|
||||||
|
fConnectRcvBufferSize = new std::vector<Int_t>();
|
||||||
|
|
||||||
|
for (Int_t i = 0; i < fNumInputs; ++i) {
|
||||||
|
fConnectSocketType->push_back(ZMQ_SUB);
|
||||||
|
fConnectSndBufferSize->push_back(10000);
|
||||||
|
fConnectRcvBufferSize->push_back(10000);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQDevice::Bind()
|
||||||
|
{
|
||||||
|
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Bind <<<<<<<");
|
||||||
|
|
||||||
|
for (Int_t i = 0; i < fNumOutputs; ++i) {
|
||||||
|
FairMQSocket* socket = new FairMQSocket(fPayloadContext, fBindSocketType->at(i), i);
|
||||||
|
socket->GetSocket()->setsockopt(ZMQ_SNDHWM, &fBindSndBufferSize->at(i), sizeof(fBindSndBufferSize->at(i)));
|
||||||
|
socket->GetSocket()->setsockopt(ZMQ_RCVHWM, &fBindRcvBufferSize->at(i), sizeof(fBindRcvBufferSize->at(i)));
|
||||||
|
fPayloadOutputs->push_back(socket);
|
||||||
|
}
|
||||||
|
|
||||||
|
Int_t i = 0;
|
||||||
|
|
||||||
|
for( std::vector<FairMQSocket*>::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++ ) {
|
||||||
|
try {
|
||||||
|
(*itr)->Bind(fBindAddress->at(i));
|
||||||
|
} catch (std::out_of_range& e) {
|
||||||
|
}
|
||||||
|
|
||||||
|
++i;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQDevice::Connect()
|
||||||
|
{
|
||||||
|
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Connect <<<<<<<");
|
||||||
|
|
||||||
|
for (Int_t i = 0; i < fNumInputs; ++i) {
|
||||||
|
FairMQSocket* socket = new FairMQSocket(fPayloadContext, fConnectSocketType->at(i), i);
|
||||||
|
socket->GetSocket()->setsockopt(ZMQ_SNDHWM, &fConnectSndBufferSize->at(i), sizeof(fConnectSndBufferSize->at(i)));
|
||||||
|
socket->GetSocket()->setsockopt(ZMQ_RCVHWM, &fConnectRcvBufferSize->at(i), sizeof(fConnectRcvBufferSize->at(i)));
|
||||||
|
fPayloadInputs->push_back(socket);
|
||||||
|
}
|
||||||
|
|
||||||
|
Int_t i = 0;
|
||||||
|
|
||||||
|
for( std::vector<FairMQSocket*>::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++ ) {
|
||||||
|
try {
|
||||||
|
(*itr)->Connect(fConnectAddress->at(i));
|
||||||
|
} catch (std::out_of_range& e) {
|
||||||
|
}
|
||||||
|
|
||||||
|
++i;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQDevice::Run()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQDevice::Pause()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQDevice::Shutdown()
|
||||||
|
{
|
||||||
|
for( std::vector<FairMQSocket*>::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++ ) {
|
||||||
|
(*itr)->Close();
|
||||||
|
}
|
||||||
|
|
||||||
|
for( std::vector<FairMQSocket*>::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++ ) {
|
||||||
|
(*itr)->Close();
|
||||||
|
}
|
||||||
|
|
||||||
|
fPayloadContext->Close();
|
||||||
|
}
|
||||||
|
|
||||||
|
void* FairMQDevice::LogSocketRates()
|
||||||
|
{
|
||||||
|
timestamp_t t0;
|
||||||
|
timestamp_t t1;
|
||||||
|
|
||||||
|
timestamp_t timeSinceLastLog_ms;
|
||||||
|
|
||||||
|
ULong_t* bytesInput = new ULong_t[fNumInputs];
|
||||||
|
ULong_t* messagesInput = new ULong_t[fNumInputs];
|
||||||
|
ULong_t* bytesOutput = new ULong_t[fNumOutputs];
|
||||||
|
ULong_t* messagesOutput = new ULong_t[fNumOutputs];
|
||||||
|
|
||||||
|
ULong_t* bytesInputNew = new ULong_t[fNumInputs];
|
||||||
|
ULong_t* messagesInputNew = new ULong_t[fNumInputs];
|
||||||
|
ULong_t* bytesOutputNew = new ULong_t[fNumOutputs];
|
||||||
|
ULong_t* messagesOutputNew = new ULong_t[fNumOutputs];
|
||||||
|
|
||||||
|
Double_t* megabytesPerSecondInput = new Double_t[fNumInputs];
|
||||||
|
Double_t* messagesPerSecondInput = new Double_t[fNumInputs];
|
||||||
|
Double_t* megabytesPerSecondOutput = new Double_t[fNumOutputs];
|
||||||
|
Double_t* messagesPerSecondOutput = new Double_t[fNumOutputs];
|
||||||
|
|
||||||
|
Int_t i = 0;
|
||||||
|
for( std::vector<FairMQSocket*>::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++ ) {
|
||||||
|
bytesInput[i] = (*itr)->GetBytesRx();
|
||||||
|
messagesInput[i] = (*itr)->GetMessagesRx();
|
||||||
|
++i;
|
||||||
|
}
|
||||||
|
|
||||||
|
i = 0;
|
||||||
|
for( std::vector<FairMQSocket*>::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++ ) {
|
||||||
|
bytesOutput[i] = (*itr)->GetBytesTx();
|
||||||
|
messagesOutput[i] = (*itr)->GetMessagesTx();
|
||||||
|
++i;
|
||||||
|
}
|
||||||
|
|
||||||
|
t0 = get_timestamp();
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
usleep(fLogIntervalInMs * 1000);
|
||||||
|
|
||||||
|
t1 = get_timestamp();
|
||||||
|
|
||||||
|
timeSinceLastLog_ms = (t1 - t0) / 1000.0L;
|
||||||
|
|
||||||
|
i = 0;
|
||||||
|
|
||||||
|
for( std::vector<FairMQSocket*>::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++ ) {
|
||||||
|
bytesInputNew[i] = (*itr)->GetBytesRx();
|
||||||
|
megabytesPerSecondInput[i] = ((Double_t) (bytesInputNew[i] - bytesInput[i]) / (1024. * 1024.)) / (Double_t) timeSinceLastLog_ms * 1000.;
|
||||||
|
bytesInput[i] = bytesInputNew[i];
|
||||||
|
messagesInputNew[i] = (*itr)->GetMessagesRx();
|
||||||
|
messagesPerSecondInput[i] = (Double_t) (messagesInputNew[i] - messagesInput[i]) / (Double_t) timeSinceLastLog_ms * 1000.;
|
||||||
|
messagesInput[i] = messagesInputNew[i];
|
||||||
|
|
||||||
|
std::stringstream logmsg;
|
||||||
|
logmsg << "#" << (*itr)->GetId() << ": " << messagesPerSecondInput[i] << " msg/s, " << megabytesPerSecondInput[i] << " MB/s";
|
||||||
|
FairMQLogger::GetInstance()->Log(FairMQLogger::DEBUG, logmsg.str());
|
||||||
|
|
||||||
|
++i;
|
||||||
|
}
|
||||||
|
|
||||||
|
i = 0;
|
||||||
|
|
||||||
|
for( std::vector<FairMQSocket*>::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++ ) {
|
||||||
|
bytesOutputNew[i] = (*itr)->GetBytesTx();
|
||||||
|
megabytesPerSecondOutput[i] = ((Double_t) (bytesOutputNew[i] - bytesOutput[i]) / (1024. * 1024.)) / (Double_t) timeSinceLastLog_ms * 1000.;
|
||||||
|
bytesOutput[i] = bytesOutputNew[i];
|
||||||
|
messagesOutputNew[i] = (*itr)->GetMessagesTx();
|
||||||
|
messagesPerSecondOutput[i] = (Double_t) (messagesOutputNew[i] - messagesOutput[i]) / (Double_t) timeSinceLastLog_ms * 1000.;
|
||||||
|
messagesOutput[i] = messagesOutputNew[i];
|
||||||
|
|
||||||
|
std::stringstream logmsg;
|
||||||
|
logmsg << "#" << (*itr)->GetId() << ": " << messagesPerSecondOutput[i] << " msg/s, " << megabytesPerSecondOutput[i] << " MB/s";
|
||||||
|
FairMQLogger::GetInstance()->Log(FairMQLogger::DEBUG, logmsg.str());
|
||||||
|
|
||||||
|
++i;
|
||||||
|
}
|
||||||
|
|
||||||
|
t0 = t1;
|
||||||
|
}
|
||||||
|
|
||||||
|
delete[] bytesInput;
|
||||||
|
delete[] messagesInput;
|
||||||
|
delete[] bytesOutput;
|
||||||
|
delete[] messagesOutput;
|
||||||
|
|
||||||
|
delete[] bytesInputNew;
|
||||||
|
delete[] messagesInputNew;
|
||||||
|
delete[] bytesOutputNew;
|
||||||
|
delete[] messagesOutputNew;
|
||||||
|
|
||||||
|
delete[] megabytesPerSecondInput;
|
||||||
|
delete[] messagesPerSecondInput;
|
||||||
|
delete[] megabytesPerSecondOutput;
|
||||||
|
delete[] messagesPerSecondOutput;
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQDevice::SetProperty(Int_t key, TString value, Int_t slot/*= 0*/)
|
||||||
|
{
|
||||||
|
switch (key) {
|
||||||
|
case Id:
|
||||||
|
fId = value;
|
||||||
|
break;
|
||||||
|
case BindAddress:
|
||||||
|
fBindAddress->erase(fBindAddress->begin() + slot);
|
||||||
|
fBindAddress->insert(fBindAddress->begin() + slot, value);
|
||||||
|
break;
|
||||||
|
case ConnectAddress:
|
||||||
|
fConnectAddress->erase(fConnectAddress->begin() + slot);
|
||||||
|
fConnectAddress->insert(fConnectAddress->begin() + slot, value);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
FairMQConfigurable::SetProperty(key, value, slot);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
TString FairMQDevice::GetProperty(Int_t key, TString default_/*= ""*/, Int_t slot/*= 0*/)
|
||||||
|
{
|
||||||
|
switch (key) {
|
||||||
|
case Id:
|
||||||
|
return fId;
|
||||||
|
case BindAddress:
|
||||||
|
return fBindAddress->at(slot);
|
||||||
|
case ConnectAddress:
|
||||||
|
return fConnectAddress->at(slot);
|
||||||
|
default:
|
||||||
|
return FairMQConfigurable::GetProperty(key, default_, slot);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQDevice::SetProperty(Int_t key, Int_t value, Int_t slot/*= 0*/)
|
||||||
|
{
|
||||||
|
switch (key) {
|
||||||
|
case NumIoThreads:
|
||||||
|
fNumIoThreads = value;
|
||||||
|
break;
|
||||||
|
case NumInputs:
|
||||||
|
fNumInputs = value;
|
||||||
|
break;
|
||||||
|
case NumOutputs:
|
||||||
|
fNumOutputs = value;
|
||||||
|
break;
|
||||||
|
case LogIntervalInMs:
|
||||||
|
fLogIntervalInMs = value;
|
||||||
|
break;
|
||||||
|
case BindSocketType:
|
||||||
|
fBindSocketType->erase(fBindSocketType->begin() + slot);
|
||||||
|
fBindSocketType->insert(fBindSocketType->begin() + slot, value);
|
||||||
|
break;
|
||||||
|
case BindSndBufferSize:
|
||||||
|
fBindSndBufferSize->erase(fBindSndBufferSize->begin() + slot);
|
||||||
|
fBindSndBufferSize->insert(fBindSndBufferSize->begin() + slot, value);
|
||||||
|
break;
|
||||||
|
case BindRcvBufferSize:
|
||||||
|
fBindRcvBufferSize->erase(fBindRcvBufferSize->begin() + slot);
|
||||||
|
fBindRcvBufferSize->insert(fBindRcvBufferSize->begin() + slot, value);
|
||||||
|
break;
|
||||||
|
case ConnectSocketType:
|
||||||
|
fConnectSocketType->erase(fConnectSocketType->begin() + slot);
|
||||||
|
fConnectSocketType->insert(fConnectSocketType->begin() + slot, value);
|
||||||
|
break;
|
||||||
|
case ConnectSndBufferSize:
|
||||||
|
fConnectSndBufferSize->erase(fConnectSndBufferSize->begin() + slot);
|
||||||
|
fConnectSndBufferSize->insert(fConnectSndBufferSize->begin() + slot, value);
|
||||||
|
break;
|
||||||
|
case ConnectRcvBufferSize:
|
||||||
|
fConnectRcvBufferSize->erase(fConnectRcvBufferSize->begin() + slot);
|
||||||
|
fConnectRcvBufferSize->insert(fConnectRcvBufferSize->begin() + slot, value);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
FairMQConfigurable::SetProperty(key, value, slot);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Int_t FairMQDevice::GetProperty(Int_t key, Int_t default_/*= 0*/, Int_t slot/*= 0*/)
|
||||||
|
{
|
||||||
|
switch (key) {
|
||||||
|
case NumIoThreads:
|
||||||
|
return fNumIoThreads;
|
||||||
|
case LogIntervalInMs:
|
||||||
|
return fLogIntervalInMs;
|
||||||
|
case BindSocketType:
|
||||||
|
return fBindSocketType->at(slot);
|
||||||
|
case ConnectSocketType:
|
||||||
|
return fConnectSocketType->at(slot);
|
||||||
|
case ConnectSndBufferSize:
|
||||||
|
return fConnectSndBufferSize->at(slot);
|
||||||
|
case ConnectRcvBufferSize:
|
||||||
|
return fConnectRcvBufferSize->at(slot);
|
||||||
|
case BindSndBufferSize:
|
||||||
|
return fBindSndBufferSize->at(slot);
|
||||||
|
case BindRcvBufferSize:
|
||||||
|
return fBindRcvBufferSize->at(slot);
|
||||||
|
default:
|
||||||
|
return FairMQConfigurable::GetProperty(key, default_, slot);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
FairMQDevice::~FairMQDevice()
|
||||||
|
{
|
||||||
|
for( std::vector<FairMQSocket*>::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++ ) {
|
||||||
|
delete (*itr);
|
||||||
|
}
|
||||||
|
|
||||||
|
for( std::vector<FairMQSocket*>::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++ ) {
|
||||||
|
delete (*itr);
|
||||||
|
}
|
||||||
|
|
||||||
|
delete fBindAddress;
|
||||||
|
delete fConnectAddress;
|
||||||
|
delete fPayloadInputs;
|
||||||
|
delete fPayloadOutputs;
|
||||||
|
}
|
||||||
|
|
74
fairmq/FairMQDevice.h
Normal file
74
fairmq/FairMQDevice.h
Normal file
|
@ -0,0 +1,74 @@
|
||||||
|
/**
|
||||||
|
* FairMQDevice.h
|
||||||
|
*
|
||||||
|
* @since Oct 25, 2012
|
||||||
|
* @authors: D. Klein
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef FAIRMQDEVICE_H_
|
||||||
|
#define FAIRMQDEVICE_H_
|
||||||
|
|
||||||
|
#include "FairMQConfigurable.h"
|
||||||
|
#include "FairMQStateMachine.h"
|
||||||
|
#include <vector>
|
||||||
|
#include "FairMQContext.h"
|
||||||
|
#include "FairMQSocket.h"
|
||||||
|
#include <stdexcept>
|
||||||
|
//#include "FairMQLogger.h"
|
||||||
|
#include "Rtypes.h"
|
||||||
|
#include "TString.h"
|
||||||
|
|
||||||
|
|
||||||
|
class FairMQDevice : /*public FairMQStateMachine,*/ public FairMQConfigurable
|
||||||
|
{
|
||||||
|
protected:
|
||||||
|
TString fId;
|
||||||
|
Int_t fNumIoThreads;
|
||||||
|
FairMQContext* fPayloadContext;
|
||||||
|
std::vector<TString> *fBindAddress;
|
||||||
|
std::vector<Int_t> *fBindSocketType;
|
||||||
|
std::vector<Int_t> *fBindSndBufferSize;
|
||||||
|
std::vector<Int_t> *fBindRcvBufferSize;
|
||||||
|
std::vector<TString> *fConnectAddress;
|
||||||
|
std::vector<Int_t> *fConnectSocketType;
|
||||||
|
std::vector<Int_t> *fConnectSndBufferSize;
|
||||||
|
std::vector<Int_t> *fConnectRcvBufferSize;
|
||||||
|
std::vector<FairMQSocket*> *fPayloadInputs;
|
||||||
|
std::vector<FairMQSocket*> *fPayloadOutputs;
|
||||||
|
Int_t fLogIntervalInMs;
|
||||||
|
Int_t fNumInputs;
|
||||||
|
Int_t fNumOutputs;
|
||||||
|
public:
|
||||||
|
enum {
|
||||||
|
Id = FairMQConfigurable::Last,
|
||||||
|
NumIoThreads,
|
||||||
|
NumInputs,
|
||||||
|
NumOutputs,
|
||||||
|
BindAddress,
|
||||||
|
BindSocketType,
|
||||||
|
BindSndBufferSize,
|
||||||
|
BindRcvBufferSize,
|
||||||
|
ConnectAddress,
|
||||||
|
ConnectSocketType,
|
||||||
|
ConnectSndBufferSize,
|
||||||
|
ConnectRcvBufferSize,
|
||||||
|
LogIntervalInMs,
|
||||||
|
Last
|
||||||
|
};
|
||||||
|
FairMQDevice();
|
||||||
|
virtual void Init();
|
||||||
|
virtual void Bind();
|
||||||
|
virtual void Connect();
|
||||||
|
virtual void Run();
|
||||||
|
virtual void Pause();
|
||||||
|
virtual void Shutdown();
|
||||||
|
virtual void* LogSocketRates();
|
||||||
|
static void* callLogSocketRates(void* arg) { return ((FairMQDevice*)arg)->LogSocketRates(); }
|
||||||
|
virtual void SetProperty(Int_t key, TString value, Int_t slot = 0);
|
||||||
|
virtual TString GetProperty(Int_t key, TString default_ = "", Int_t slot = 0);
|
||||||
|
virtual void SetProperty(Int_t key, Int_t value, Int_t slot = 0);
|
||||||
|
virtual Int_t GetProperty(Int_t key, Int_t default_ = 0, Int_t slot = 0);
|
||||||
|
virtual ~FairMQDevice();
|
||||||
|
};
|
||||||
|
|
||||||
|
#endif /* FAIRMQDEVICE_H_ */
|
71
fairmq/FairMQLogger.cxx
Normal file
71
fairmq/FairMQLogger.cxx
Normal file
|
@ -0,0 +1,71 @@
|
||||||
|
/*
|
||||||
|
* FairMQLogger.cxx
|
||||||
|
*
|
||||||
|
* Created on: Dec 4, 2012
|
||||||
|
* Author: dklein
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "FairMQLogger.h"
|
||||||
|
#include <iostream>
|
||||||
|
#include <ctime>
|
||||||
|
#include <iomanip>
|
||||||
|
|
||||||
|
|
||||||
|
FairMQLogger* FairMQLogger::instance = NULL;
|
||||||
|
|
||||||
|
FairMQLogger* FairMQLogger::GetInstance()
|
||||||
|
{
|
||||||
|
if (instance == NULL) {
|
||||||
|
instance = new FairMQLogger();
|
||||||
|
}
|
||||||
|
return instance;
|
||||||
|
}
|
||||||
|
|
||||||
|
FairMQLogger* FairMQLogger::InitInstance(TString bindAddress)
|
||||||
|
{
|
||||||
|
instance = new FairMQLogger(bindAddress);
|
||||||
|
return instance;
|
||||||
|
}
|
||||||
|
|
||||||
|
FairMQLogger::FairMQLogger() :
|
||||||
|
fBindAddress("")
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
FairMQLogger::FairMQLogger(TString bindAddress) :
|
||||||
|
fBindAddress(bindAddress)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
FairMQLogger::~FairMQLogger()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQLogger::Log(Int_t type, TString logmsg)
|
||||||
|
{
|
||||||
|
timestamp_t tm = get_timestamp();
|
||||||
|
timestamp_t ms = tm / 1000.0L;
|
||||||
|
timestamp_t s = ms / 1000.0L;
|
||||||
|
std::time_t t = s;
|
||||||
|
std::size_t fractional_seconds = ms % 1000;
|
||||||
|
Text_t mbstr[100];
|
||||||
|
std::strftime(mbstr, 100, "%H:%M:%S:", std::localtime(&t));
|
||||||
|
|
||||||
|
TString type_str;
|
||||||
|
switch (type) {
|
||||||
|
case DEBUG:
|
||||||
|
type_str = "\033[01;34mDEBUG\033[0m";
|
||||||
|
break;
|
||||||
|
case INFO:
|
||||||
|
type_str = "\033[01;32mINFO\033[0m";
|
||||||
|
break;
|
||||||
|
case ERROR:
|
||||||
|
type_str = "\033[01;31mERROR\033[0m";
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::cout << "[\033[01;36m" << mbstr << fractional_seconds << "\033[0m]" << "[" << type_str << "]" << " " << logmsg << std::endl;
|
||||||
|
}
|
||||||
|
|
45
fairmq/FairMQLogger.h
Normal file
45
fairmq/FairMQLogger.h
Normal file
|
@ -0,0 +1,45 @@
|
||||||
|
/*
|
||||||
|
* FairMQLogger.h
|
||||||
|
*
|
||||||
|
* Created on: Dec 4, 2012
|
||||||
|
* Author: dklein
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef FAIRMQLOGGER_H_
|
||||||
|
#define FAIRMQLOGGER_H_
|
||||||
|
#include <string>
|
||||||
|
#include <sstream>
|
||||||
|
//#ifndef _MAKECINT_
|
||||||
|
#include <sys/time.h>
|
||||||
|
//#endif
|
||||||
|
#include "Rtypes.h"
|
||||||
|
#include "TString.h"
|
||||||
|
|
||||||
|
|
||||||
|
class FairMQLogger
|
||||||
|
{
|
||||||
|
private:
|
||||||
|
static FairMQLogger* instance;
|
||||||
|
TString fBindAddress;
|
||||||
|
public:
|
||||||
|
enum {
|
||||||
|
DEBUG, INFO, ERROR
|
||||||
|
};
|
||||||
|
FairMQLogger();
|
||||||
|
FairMQLogger(TString bindAdress);
|
||||||
|
virtual ~FairMQLogger();
|
||||||
|
void Log(Int_t type, TString logmsg);
|
||||||
|
static FairMQLogger* GetInstance();
|
||||||
|
static FairMQLogger* InitInstance(TString bindAddress);
|
||||||
|
};
|
||||||
|
|
||||||
|
typedef unsigned long long timestamp_t;
|
||||||
|
|
||||||
|
static timestamp_t get_timestamp ()
|
||||||
|
{
|
||||||
|
struct timeval now;
|
||||||
|
gettimeofday (&now, NULL);
|
||||||
|
return now.tv_usec + (timestamp_t)now.tv_sec * 1000000;
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif /* FAIRMQLOGGER_H_ */
|
63
fairmq/FairMQMessage.cxx
Normal file
63
fairmq/FairMQMessage.cxx
Normal file
|
@ -0,0 +1,63 @@
|
||||||
|
/*
|
||||||
|
* FairMQMessage.cxx
|
||||||
|
*
|
||||||
|
* Created on: Dec 5, 2012
|
||||||
|
* Author: dklein
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "FairMQMessage.h"
|
||||||
|
#include "FairMQLogger.h"
|
||||||
|
|
||||||
|
|
||||||
|
FairMQMessage::FairMQMessage()
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
fMessage = new zmq::message_t();
|
||||||
|
} catch (zmq::error_t& e) {
|
||||||
|
std::stringstream logmsg;
|
||||||
|
logmsg << "failed allocating new message, reason: " << e.what();
|
||||||
|
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
FairMQMessage::FairMQMessage(void* data_, size_t size_, zmq::free_fn* ffn_, void* hint_/*= NULL*/)
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
fMessage = new zmq::message_t(data_, size_, ffn_, hint_);
|
||||||
|
} catch (zmq::error_t& e) {
|
||||||
|
std::stringstream logmsg;
|
||||||
|
logmsg << "failed allocating new message, reason: " << e.what();
|
||||||
|
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
FairMQMessage::~FairMQMessage()
|
||||||
|
{
|
||||||
|
delete fMessage;
|
||||||
|
}
|
||||||
|
|
||||||
|
zmq::message_t* FairMQMessage::GetMessage()
|
||||||
|
{
|
||||||
|
return fMessage;
|
||||||
|
}
|
||||||
|
|
||||||
|
Int_t FairMQMessage::Size()
|
||||||
|
{
|
||||||
|
return fMessage->size();
|
||||||
|
}
|
||||||
|
|
||||||
|
Bool_t FairMQMessage::Copy(FairMQMessage* msg)
|
||||||
|
{
|
||||||
|
Bool_t result = false;
|
||||||
|
|
||||||
|
try {
|
||||||
|
fMessage->copy(msg->GetMessage());
|
||||||
|
} catch (zmq::error_t& e) {
|
||||||
|
std::stringstream logmsg;
|
||||||
|
logmsg << "failed copying message, reason: " << e.what();
|
||||||
|
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
28
fairmq/FairMQMessage.h
Normal file
28
fairmq/FairMQMessage.h
Normal file
|
@ -0,0 +1,28 @@
|
||||||
|
/*
|
||||||
|
* FairMQMessage.h
|
||||||
|
*
|
||||||
|
* Created on: Dec 5, 2012
|
||||||
|
* Author: dklein
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef FAIRMQMESSAGE_H_
|
||||||
|
#define FAIRMQMESSAGE_H_
|
||||||
|
|
||||||
|
#include <zmq.hpp>
|
||||||
|
#include "Rtypes.h"
|
||||||
|
|
||||||
|
|
||||||
|
class FairMQMessage
|
||||||
|
{
|
||||||
|
private:
|
||||||
|
zmq::message_t* fMessage;
|
||||||
|
public:
|
||||||
|
FairMQMessage();
|
||||||
|
FairMQMessage(void* data_, size_t size_, zmq::free_fn* ffn_, void* hint_ = NULL);
|
||||||
|
virtual ~FairMQMessage();
|
||||||
|
zmq::message_t* GetMessage();
|
||||||
|
Int_t Size();
|
||||||
|
Bool_t Copy(FairMQMessage* msg);
|
||||||
|
};
|
||||||
|
|
||||||
|
#endif /* FAIRMQMESSAGE_H_ */
|
65
fairmq/FairMQProcessor.cxx
Normal file
65
fairmq/FairMQProcessor.cxx
Normal file
|
@ -0,0 +1,65 @@
|
||||||
|
/*
|
||||||
|
* FairMQProcessor.cxx
|
||||||
|
*
|
||||||
|
* Created on: Dec 6, 2012
|
||||||
|
* Author: dklein
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "FairMQProcessor.h"
|
||||||
|
#include "FairMQLogger.h"
|
||||||
|
|
||||||
|
FairMQProcessor::FairMQProcessor() :
|
||||||
|
fTask(NULL)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
FairMQProcessor::~FairMQProcessor()
|
||||||
|
{
|
||||||
|
delete fTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQProcessor::SetTask(FairMQProcessorTask* task)
|
||||||
|
{
|
||||||
|
fTask = task;
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQProcessor::Init()
|
||||||
|
{
|
||||||
|
FairMQDevice::Init();
|
||||||
|
|
||||||
|
fTask->InitTask();
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQProcessor::Run()
|
||||||
|
{
|
||||||
|
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Run <<<<<<<");
|
||||||
|
|
||||||
|
void* status; //necessary for pthread_join
|
||||||
|
pthread_t logger;
|
||||||
|
pthread_create(&logger, NULL, &FairMQDevice::callLogSocketRates, this);
|
||||||
|
|
||||||
|
// Initialize poll set
|
||||||
|
zmq_pollitem_t items[] = {
|
||||||
|
{ *(fPayloadInputs->at(0)->GetSocket()), 0, ZMQ_POLLIN, 0 }
|
||||||
|
};
|
||||||
|
|
||||||
|
Bool_t received = false;
|
||||||
|
while (true) {
|
||||||
|
FairMQMessage msg;
|
||||||
|
|
||||||
|
zmq_poll(items, 1, -1);
|
||||||
|
|
||||||
|
if (items[0].revents & ZMQ_POLLIN) {
|
||||||
|
received = fPayloadInputs->at(0)->Receive(&msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (received) {
|
||||||
|
fTask->Exec(&msg, NULL);
|
||||||
|
|
||||||
|
fPayloadOutputs->at(0)->Send(&msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pthread_join(logger, &status);
|
||||||
|
}
|
||||||
|
|
28
fairmq/FairMQProcessor.h
Normal file
28
fairmq/FairMQProcessor.h
Normal file
|
@ -0,0 +1,28 @@
|
||||||
|
/*
|
||||||
|
* FairMQProcessor.h
|
||||||
|
*
|
||||||
|
* Created on: Dec 6, 2012
|
||||||
|
* Author: dklein
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef FAIRMQPROCESSOR_H_
|
||||||
|
#define FAIRMQPROCESSOR_H_
|
||||||
|
|
||||||
|
#include "FairMQDevice.h"
|
||||||
|
#include "FairMQProcessorTask.h"
|
||||||
|
#include "Rtypes.h"
|
||||||
|
|
||||||
|
|
||||||
|
class FairMQProcessor: public FairMQDevice
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
FairMQProcessor();
|
||||||
|
virtual ~FairMQProcessor();
|
||||||
|
void SetTask(FairMQProcessorTask* task);
|
||||||
|
virtual void Init();
|
||||||
|
virtual void Run();
|
||||||
|
private:
|
||||||
|
FairMQProcessorTask* fTask;
|
||||||
|
};
|
||||||
|
|
||||||
|
#endif /* FAIRMQPROCESSOR_H_ */
|
18
fairmq/FairMQProcessorTask.cxx
Normal file
18
fairmq/FairMQProcessorTask.cxx
Normal file
|
@ -0,0 +1,18 @@
|
||||||
|
/*
|
||||||
|
* FairMQProcessorTask.cxx
|
||||||
|
*
|
||||||
|
* Created on: Dec 6, 2012
|
||||||
|
* Author: dklein
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "FairMQProcessorTask.h"
|
||||||
|
|
||||||
|
|
||||||
|
FairMQProcessorTask::FairMQProcessorTask()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
FairMQProcessorTask::~FairMQProcessorTask()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
24
fairmq/FairMQProcessorTask.h
Normal file
24
fairmq/FairMQProcessorTask.h
Normal file
|
@ -0,0 +1,24 @@
|
||||||
|
/*
|
||||||
|
* FairMQProcessorTask.h
|
||||||
|
*
|
||||||
|
* Created on: Dec 6, 2012
|
||||||
|
* Author: dklein
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef FAIRMQPROCESSORTASK_H_
|
||||||
|
#define FAIRMQPROCESSORTASK_H_
|
||||||
|
|
||||||
|
#include <vector>
|
||||||
|
#include "FairMQMessage.h"
|
||||||
|
#include "FairTask.h"
|
||||||
|
|
||||||
|
|
||||||
|
class FairMQProcessorTask : public FairTask
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
FairMQProcessorTask();
|
||||||
|
virtual ~FairMQProcessorTask();
|
||||||
|
virtual void Exec(FairMQMessage* msg, Option_t* opt) = 0;
|
||||||
|
};
|
||||||
|
|
||||||
|
#endif /* FAIRMQPROCESSORTASK_H_ */
|
193
fairmq/FairMQSampler.cxx
Normal file
193
fairmq/FairMQSampler.cxx
Normal file
|
@ -0,0 +1,193 @@
|
||||||
|
/*
|
||||||
|
* FairMQSampler.cpp
|
||||||
|
*
|
||||||
|
* Created on: Sep 27, 2012
|
||||||
|
* Author: dklein
|
||||||
|
*/
|
||||||
|
#include <vector>
|
||||||
|
#include <unistd.h>
|
||||||
|
|
||||||
|
#include "TList.h"
|
||||||
|
#include "TObjString.h"
|
||||||
|
#include "TClonesArray.h"
|
||||||
|
#include "FairParRootFileIo.h"
|
||||||
|
#include "FairRuntimeDb.h"
|
||||||
|
#include "TROOT.h"
|
||||||
|
|
||||||
|
#include "FairMQSampler.h"
|
||||||
|
#include "FairMQLogger.h"
|
||||||
|
|
||||||
|
|
||||||
|
FairMQSampler::FairMQSampler() :
|
||||||
|
fFairRunAna(new FairRunAna()),
|
||||||
|
fSamplerTask(NULL),
|
||||||
|
fInputFile(""),
|
||||||
|
fBranch(""),
|
||||||
|
fParFile(""),
|
||||||
|
fEventRate(1)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
FairMQSampler::~FairMQSampler()
|
||||||
|
{
|
||||||
|
delete fSamplerTask;
|
||||||
|
delete fFairRunAna;
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQSampler::Init()
|
||||||
|
{
|
||||||
|
FairMQDevice::Init();
|
||||||
|
|
||||||
|
fSamplerTask->SetBranch(fBranch);
|
||||||
|
|
||||||
|
TString rootlogon_macro = TString(getenv("VMCWORKDIR")) + "/gconfig/rootlogon.C";
|
||||||
|
gROOT->LoadMacro(rootlogon_macro.Data());
|
||||||
|
gROOT->ProcessLine("rootlogon()");
|
||||||
|
|
||||||
|
fFairRunAna->SetInputFile(TString(fInputFile));
|
||||||
|
fFairRunAna->SetOutputFile("dummy.out");
|
||||||
|
|
||||||
|
fFairRunAna->AddTask(fSamplerTask);
|
||||||
|
|
||||||
|
FairRuntimeDb* rtdb = fFairRunAna->GetRuntimeDb();
|
||||||
|
FairParRootFileIo* parInput1 = new FairParRootFileIo();
|
||||||
|
parInput1->open(TString(fParFile).Data());
|
||||||
|
rtdb->setFirstInput(parInput1);
|
||||||
|
rtdb->print();
|
||||||
|
|
||||||
|
// read complete file and extract digis.
|
||||||
|
fFairRunAna->Init();
|
||||||
|
fFairRunAna->Run(0, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQSampler::Run()
|
||||||
|
{
|
||||||
|
void* status; //necessary for pthread_join
|
||||||
|
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Run <<<<<<<");
|
||||||
|
usleep(1000000);
|
||||||
|
|
||||||
|
pthread_t logger;
|
||||||
|
pthread_create(&logger, NULL, &FairMQDevice::callLogSocketRates, this);
|
||||||
|
|
||||||
|
pthread_t resetEventCounter;
|
||||||
|
pthread_create(&resetEventCounter, NULL, &FairMQSampler::callResetEventCounter, this);
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
for( std::vector<FairMQMessage*>::iterator itr = fSamplerTask->GetOutput()->begin(); itr != fSamplerTask->GetOutput()->end(); itr++ ) {
|
||||||
|
FairMQMessage event;
|
||||||
|
event.Copy(*itr);
|
||||||
|
|
||||||
|
fPayloadOutputs->at(0)->Send(&event);
|
||||||
|
|
||||||
|
--fEventCounter;
|
||||||
|
|
||||||
|
while (fEventCounter == 0) {
|
||||||
|
usleep(1000);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
pthread_join(logger, &status);
|
||||||
|
pthread_join(resetEventCounter, &status);
|
||||||
|
}
|
||||||
|
|
||||||
|
void* FairMQSampler::ResetEventCounter()
|
||||||
|
{
|
||||||
|
while (true) {
|
||||||
|
fEventCounter = fEventRate / 100;
|
||||||
|
|
||||||
|
usleep(10000);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQSampler::Log(Int_t intervalInMs)
|
||||||
|
{
|
||||||
|
timestamp_t t0;
|
||||||
|
timestamp_t t1;
|
||||||
|
ULong_t bytes = fPayloadOutputs->at(0)->GetBytesTx();
|
||||||
|
ULong_t messages = fPayloadOutputs->at(0)->GetMessagesTx();
|
||||||
|
ULong_t bytesNew;
|
||||||
|
ULong_t messagesNew;
|
||||||
|
Double_t megabytesPerSecond = (bytesNew - bytes) / (1024 * 1024);
|
||||||
|
Double_t messagesPerSecond = (messagesNew - messages);
|
||||||
|
|
||||||
|
t0 = get_timestamp();
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
usleep(intervalInMs * 1000);
|
||||||
|
|
||||||
|
t1 = get_timestamp();
|
||||||
|
|
||||||
|
bytesNew = fPayloadOutputs->at(0)->GetBytesTx();
|
||||||
|
messagesNew = fPayloadOutputs->at(0)->GetMessagesTx();
|
||||||
|
|
||||||
|
timestamp_t timeSinceLastLog_ms = (t1 - t0) / 1000.0L;
|
||||||
|
|
||||||
|
megabytesPerSecond = ((Double_t) (bytesNew - bytes) / (1024. * 1024.)) / (Double_t) timeSinceLastLog_ms * 1000.;
|
||||||
|
messagesPerSecond = (Double_t) (messagesNew - messages) / (Double_t) timeSinceLastLog_ms * 1000.;
|
||||||
|
|
||||||
|
std::stringstream logmsg;
|
||||||
|
logmsg << "send " << messagesPerSecond << " msg/s, " << megabytesPerSecond << " MB/s";
|
||||||
|
FairMQLogger::GetInstance()->Log(FairMQLogger::DEBUG, logmsg.str());
|
||||||
|
|
||||||
|
bytes = bytesNew;
|
||||||
|
messages = messagesNew;
|
||||||
|
t0 = t1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQSampler::SetProperty(Int_t key, TString value, Int_t slot/*= 0*/)
|
||||||
|
{
|
||||||
|
switch (key) {
|
||||||
|
case InputFile:
|
||||||
|
fInputFile = value;
|
||||||
|
break;
|
||||||
|
case ParFile:
|
||||||
|
fParFile = value;
|
||||||
|
break;
|
||||||
|
case Branch:
|
||||||
|
fBranch = value;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
FairMQDevice::SetProperty(key, value, slot);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
TString FairMQSampler::GetProperty(Int_t key, TString default_/*= ""*/, Int_t slot/*= 0*/)
|
||||||
|
{
|
||||||
|
switch (key) {
|
||||||
|
case InputFile:
|
||||||
|
return fInputFile;
|
||||||
|
case ParFile:
|
||||||
|
return fParFile;
|
||||||
|
case Branch:
|
||||||
|
return fBranch;
|
||||||
|
default:
|
||||||
|
return FairMQDevice::GetProperty(key, default_, slot);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQSampler::SetProperty(Int_t key, Int_t value, Int_t slot/*= 0*/)
|
||||||
|
{
|
||||||
|
switch (key) {
|
||||||
|
case EventRate:
|
||||||
|
fEventRate = value;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
FairMQDevice::SetProperty(key, value, slot);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Int_t FairMQSampler::GetProperty(Int_t key, Int_t default_/*= 0*/, Int_t slot/*= 0*/)
|
||||||
|
{
|
||||||
|
switch (key) {
|
||||||
|
case EventRate:
|
||||||
|
return fEventRate;
|
||||||
|
default:
|
||||||
|
return FairMQDevice::GetProperty(key, default_, slot);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
60
fairmq/FairMQSampler.h
Normal file
60
fairmq/FairMQSampler.h
Normal file
|
@ -0,0 +1,60 @@
|
||||||
|
/*
|
||||||
|
* FairMQSampler.h
|
||||||
|
*
|
||||||
|
* Created on: Sep 27, 2012
|
||||||
|
* Author: dklein
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef FAIRMQSAMPLER_H_
|
||||||
|
#define FAIRMQSAMPLER_H_
|
||||||
|
|
||||||
|
#include <string>
|
||||||
|
#include "FairRunAna.h"
|
||||||
|
#include "FairTask.h"
|
||||||
|
#include "FairMQDevice.h"
|
||||||
|
#include "FairMQSamplerTask.h"
|
||||||
|
#include "TString.h"
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reads simulated digis from a root file and samples the digi as a time-series UDP stream.
|
||||||
|
* Must be initialized with the filename to the root file and the name of the sub-detector
|
||||||
|
* branch, whose digis should be streamed.
|
||||||
|
*
|
||||||
|
* The purpose of this class is to provide a data source of digis very similar to the
|
||||||
|
* future detector output at the point where the detector is connected to the online
|
||||||
|
* computing farm. For the development of online analysis algorithms, it is very important
|
||||||
|
* to simulate the future detector output as realistic as possible to evaluate the
|
||||||
|
* feasibility and quality of the various possible online analysis features.
|
||||||
|
*/
|
||||||
|
class FairMQSampler: public FairMQDevice
|
||||||
|
{
|
||||||
|
protected:
|
||||||
|
FairRunAna* fFairRunAna;
|
||||||
|
FairMQSamplerTask* fSamplerTask;
|
||||||
|
TString fInputFile; // Filename of a root file containing the simulated digis.
|
||||||
|
TString fParFile;
|
||||||
|
TString fBranch; // The name of the sub-detector branch to stream the digis from.
|
||||||
|
Int_t fEventRate;
|
||||||
|
Int_t fEventCounter;
|
||||||
|
public:
|
||||||
|
enum {
|
||||||
|
InputFile = FairMQDevice::Last,
|
||||||
|
Branch,
|
||||||
|
ParFile,
|
||||||
|
EventRate
|
||||||
|
};
|
||||||
|
FairMQSampler();
|
||||||
|
virtual ~FairMQSampler();
|
||||||
|
virtual void Init();
|
||||||
|
virtual void Run();
|
||||||
|
void Log(Int_t intervalInMs);
|
||||||
|
void* ResetEventCounter();
|
||||||
|
static void* callResetEventCounter(void* arg) { return ((FairMQSampler*)arg)->ResetEventCounter(); }
|
||||||
|
virtual void SetProperty(Int_t key, TString value, Int_t slot = 0);
|
||||||
|
virtual TString GetProperty(Int_t key, TString default_ = "", Int_t slot = 0);
|
||||||
|
virtual void SetProperty(Int_t key, Int_t value, Int_t slot = 0);
|
||||||
|
virtual Int_t GetProperty(Int_t key, Int_t default_ = 0, Int_t slot = 0);
|
||||||
|
};
|
||||||
|
|
||||||
|
#endif /* FAIRMQSAMPLER_H_ */
|
58
fairmq/FairMQSamplerTask.cxx
Normal file
58
fairmq/FairMQSamplerTask.cxx
Normal file
|
@ -0,0 +1,58 @@
|
||||||
|
/*
|
||||||
|
* FairMQSamplerTask.cxx
|
||||||
|
*
|
||||||
|
* Created on: Nov 22, 2012
|
||||||
|
* Author: dklein
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "FairMQSamplerTask.h"
|
||||||
|
|
||||||
|
|
||||||
|
FairMQSamplerTask::FairMQSamplerTask(const Text_t* name, Int_t iVerbose) :
|
||||||
|
FairTask(name, iVerbose),
|
||||||
|
fInput(NULL),
|
||||||
|
fBranch(""),
|
||||||
|
fMessageSize(32768),
|
||||||
|
fOutput(new std::vector<FairMQMessage*>)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
FairMQSamplerTask::FairMQSamplerTask() :
|
||||||
|
FairTask( "Abstract base task used for loading a branch from a root file into memory"),
|
||||||
|
fInput(NULL),
|
||||||
|
fBranch(""),
|
||||||
|
fMessageSize(32768),
|
||||||
|
fOutput(new std::vector<FairMQMessage*>)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
FairMQSamplerTask::~FairMQSamplerTask()
|
||||||
|
{
|
||||||
|
delete fInput;
|
||||||
|
|
||||||
|
// leave fOutput in memory, because it is needed even after FairMQSamplerTask is terminated.
|
||||||
|
}
|
||||||
|
|
||||||
|
InitStatus FairMQSamplerTask::Init()
|
||||||
|
{
|
||||||
|
FairRootManager* ioman = FairRootManager::Instance();
|
||||||
|
fInput = (TClonesArray*) ioman->GetObject(fBranch.Data());
|
||||||
|
|
||||||
|
return kSUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQSamplerTask::SetBranch(TString branch)
|
||||||
|
{
|
||||||
|
fBranch = branch;
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQSamplerTask::SetMessageSize(int size)
|
||||||
|
{
|
||||||
|
fMessageSize = size;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector<FairMQMessage*> *FairMQSamplerTask::GetOutput()
|
||||||
|
{
|
||||||
|
return fOutput;
|
||||||
|
}
|
||||||
|
|
37
fairmq/FairMQSamplerTask.h
Normal file
37
fairmq/FairMQSamplerTask.h
Normal file
|
@ -0,0 +1,37 @@
|
||||||
|
/*
|
||||||
|
* FairMQSamplerTask.h
|
||||||
|
*
|
||||||
|
* Created on: Nov 22, 2012
|
||||||
|
* Author: dklein
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef FAIRMQSAMPLERTASK_H_
|
||||||
|
#define FAIRMQSAMPLERTASK_H_
|
||||||
|
|
||||||
|
#include "FairTask.h"
|
||||||
|
#include <vector>
|
||||||
|
#include "TClonesArray.h"
|
||||||
|
#include <string>
|
||||||
|
#include "FairMQMessage.h"
|
||||||
|
#include "TString.h"
|
||||||
|
|
||||||
|
|
||||||
|
class FairMQSamplerTask: public FairTask
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
FairMQSamplerTask();
|
||||||
|
FairMQSamplerTask(const Text_t* name, Int_t iVerbose=1);
|
||||||
|
virtual ~FairMQSamplerTask();
|
||||||
|
virtual InitStatus Init();
|
||||||
|
virtual void Exec(Option_t* opt) = 0;
|
||||||
|
void SetBranch(TString branch);
|
||||||
|
void SetMessageSize(Int_t size);
|
||||||
|
std::vector<FairMQMessage*> *GetOutput();
|
||||||
|
protected:
|
||||||
|
TClonesArray* fInput;
|
||||||
|
TString fBranch;
|
||||||
|
Int_t fMessageSize;
|
||||||
|
std::vector<FairMQMessage*> *fOutput;
|
||||||
|
};
|
||||||
|
|
||||||
|
#endif /* FAIRMQSAMPLERTASK_H_ */
|
44
fairmq/FairMQSink.cxx
Normal file
44
fairmq/FairMQSink.cxx
Normal file
|
@ -0,0 +1,44 @@
|
||||||
|
/*
|
||||||
|
* FairMQSink.cxx
|
||||||
|
*
|
||||||
|
* Created on: Jan 9, 2013
|
||||||
|
* Author: dklein
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "FairMQSink.h"
|
||||||
|
#include "FairMQLogger.h"
|
||||||
|
|
||||||
|
FairMQSink::FairMQSink()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQSink::Run()
|
||||||
|
{
|
||||||
|
void* status; //necessary for pthread_join
|
||||||
|
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Run <<<<<<<");
|
||||||
|
|
||||||
|
pthread_t logger;
|
||||||
|
pthread_create(&logger, NULL, &FairMQDevice::callLogSocketRates, this);
|
||||||
|
|
||||||
|
// Initialize poll set
|
||||||
|
zmq_pollitem_t items[] = {
|
||||||
|
{ *(fPayloadInputs->at(0)->GetSocket()), 0, ZMQ_POLLIN, 0 }
|
||||||
|
};
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
FairMQMessage msg;
|
||||||
|
|
||||||
|
zmq_poll(items, 1, -1);
|
||||||
|
|
||||||
|
if (items[0].revents & ZMQ_POLLIN) {
|
||||||
|
fPayloadInputs->at(0)->Receive(&msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pthread_join(logger, &status);
|
||||||
|
}
|
||||||
|
|
||||||
|
FairMQSink::~FairMQSink()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
26
fairmq/FairMQSink.h
Normal file
26
fairmq/FairMQSink.h
Normal file
|
@ -0,0 +1,26 @@
|
||||||
|
/*
|
||||||
|
* FairMQSink.h
|
||||||
|
*
|
||||||
|
* Created on: Jan 9, 2013
|
||||||
|
* Author: dklein
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef FAIRMQSINK_H_
|
||||||
|
#define FAIRMQSINK_H_
|
||||||
|
|
||||||
|
#include "Rtypes.h"
|
||||||
|
#include <pthread.h>
|
||||||
|
|
||||||
|
#include "FairMQDevice.h"
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class FairMQSink: public FairMQDevice
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
FairMQSink();
|
||||||
|
virtual void Run();
|
||||||
|
virtual ~FairMQSink();
|
||||||
|
};
|
||||||
|
|
||||||
|
#endif /* FAIRMQSINK_H_ */
|
169
fairmq/FairMQSocket.cxx
Normal file
169
fairmq/FairMQSocket.cxx
Normal file
|
@ -0,0 +1,169 @@
|
||||||
|
/*
|
||||||
|
* FairMQSocket.cxx
|
||||||
|
*
|
||||||
|
* Created on: Dec 5, 2012
|
||||||
|
* Author: dklein
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "FairMQSocket.h"
|
||||||
|
#include <sstream>
|
||||||
|
#include "FairMQLogger.h"
|
||||||
|
|
||||||
|
|
||||||
|
const TString FairMQSocket::TCP = "tcp://";
|
||||||
|
const TString FairMQSocket::IPC = "ipc://";
|
||||||
|
const TString FairMQSocket::INPROC = "inproc://";
|
||||||
|
|
||||||
|
FairMQSocket::FairMQSocket(FairMQContext* context, int type, int num) :
|
||||||
|
fBytesTx(0),
|
||||||
|
fBytesRx(0),
|
||||||
|
fMessagesTx(0),
|
||||||
|
fMessagesRx(0)
|
||||||
|
{
|
||||||
|
std::stringstream id;
|
||||||
|
id << context->GetId() << "." << GetTypeString(type) << "." << num;
|
||||||
|
fId = id.str();
|
||||||
|
|
||||||
|
fSocket = new zmq::socket_t(*context->GetContext(), type);
|
||||||
|
fSocket->setsockopt(ZMQ_IDENTITY, &fId, fId.Length());
|
||||||
|
if (type == ZMQ_SUB) {
|
||||||
|
fSocket->setsockopt(ZMQ_SUBSCRIBE, NULL, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
std::stringstream logmsg;
|
||||||
|
logmsg << "created socket #" << fId;
|
||||||
|
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
|
||||||
|
}
|
||||||
|
|
||||||
|
FairMQSocket::~FairMQSocket()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
TString FairMQSocket::GetId()
|
||||||
|
{
|
||||||
|
return fId;
|
||||||
|
}
|
||||||
|
|
||||||
|
TString FairMQSocket::GetTypeString(Int_t type)
|
||||||
|
{
|
||||||
|
switch (type) {
|
||||||
|
case ZMQ_SUB:
|
||||||
|
return "sub";
|
||||||
|
case ZMQ_PUB:
|
||||||
|
return "pub";
|
||||||
|
case ZMQ_PUSH:
|
||||||
|
return "push";
|
||||||
|
case ZMQ_PULL:
|
||||||
|
return "pull";
|
||||||
|
default:
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Bool_t FairMQSocket::Bind(TString address)
|
||||||
|
{
|
||||||
|
Bool_t result = true;
|
||||||
|
|
||||||
|
try {
|
||||||
|
if ( address.Length() > 0 /*!address.empty()*/) {
|
||||||
|
std::stringstream logmsg;
|
||||||
|
logmsg << "bind socket #" << fId << " on " << address;
|
||||||
|
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
|
||||||
|
fSocket->bind(address.Data());
|
||||||
|
}
|
||||||
|
} catch (zmq::error_t& e) {
|
||||||
|
std::stringstream logmsg2;
|
||||||
|
logmsg2 << "failed binding socket #" << fId << ", reason: " << e.what();
|
||||||
|
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str());
|
||||||
|
result = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
Bool_t FairMQSocket::Connect(TString address)
|
||||||
|
{
|
||||||
|
Bool_t result = true;
|
||||||
|
|
||||||
|
try {
|
||||||
|
if ( address.Length() > 0 /*!address.empty()*/) {
|
||||||
|
std::stringstream logmsg;
|
||||||
|
logmsg << "connect socket #" << fId << " to " << address;
|
||||||
|
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
|
||||||
|
fSocket->connect(address.Data());
|
||||||
|
}
|
||||||
|
} catch (zmq::error_t& e) {
|
||||||
|
std::stringstream logmsg2;
|
||||||
|
logmsg2 << "failed connecting socket #" << fId << ", reason: " << e.what();
|
||||||
|
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str());
|
||||||
|
result = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
Bool_t FairMQSocket::Send(FairMQMessage* msg)
|
||||||
|
{
|
||||||
|
Bool_t result = false;
|
||||||
|
|
||||||
|
try {
|
||||||
|
fBytesTx += msg->Size();
|
||||||
|
++fMessagesTx;
|
||||||
|
result = fSocket->send(*msg->GetMessage());
|
||||||
|
} catch (zmq::error_t& e) {
|
||||||
|
std::stringstream logmsg;
|
||||||
|
logmsg << "failed sending on socket #" << fId << ", reason: " << e.what();
|
||||||
|
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
|
||||||
|
result = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
Bool_t FairMQSocket::Receive(FairMQMessage* msg)
|
||||||
|
{
|
||||||
|
Bool_t result = false;
|
||||||
|
|
||||||
|
try {
|
||||||
|
result = fSocket->recv(msg->GetMessage());
|
||||||
|
fBytesRx += msg->Size();
|
||||||
|
++fMessagesRx;
|
||||||
|
} catch (zmq::error_t& e) {
|
||||||
|
std::stringstream logmsg;
|
||||||
|
logmsg << "failed receiving on socket #" << fId << ", reason: " << e.what();
|
||||||
|
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
|
||||||
|
result = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQSocket::Close()
|
||||||
|
{
|
||||||
|
fSocket->close();
|
||||||
|
}
|
||||||
|
|
||||||
|
zmq::socket_t* FairMQSocket::GetSocket()
|
||||||
|
{
|
||||||
|
return fSocket;
|
||||||
|
}
|
||||||
|
|
||||||
|
ULong_t FairMQSocket::GetBytesTx()
|
||||||
|
{
|
||||||
|
return fBytesTx;
|
||||||
|
}
|
||||||
|
|
||||||
|
ULong_t FairMQSocket::GetBytesRx()
|
||||||
|
{
|
||||||
|
return fBytesRx;
|
||||||
|
}
|
||||||
|
|
||||||
|
ULong_t FairMQSocket::GetMessagesTx()
|
||||||
|
{
|
||||||
|
return fMessagesTx;
|
||||||
|
}
|
||||||
|
|
||||||
|
ULong_t FairMQSocket::GetMessagesRx()
|
||||||
|
{
|
||||||
|
return fMessagesRx;
|
||||||
|
}
|
47
fairmq/FairMQSocket.h
Normal file
47
fairmq/FairMQSocket.h
Normal file
|
@ -0,0 +1,47 @@
|
||||||
|
/*
|
||||||
|
* FairMQSocket.h
|
||||||
|
*
|
||||||
|
* Created on: Dec 5, 2012
|
||||||
|
* Author: dklein
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef FAIRMQSOCKET_H_
|
||||||
|
#define FAIRMQSOCKET_H_
|
||||||
|
|
||||||
|
#include <zmq.hpp>
|
||||||
|
#include <string>
|
||||||
|
#include "FairMQContext.h"
|
||||||
|
#include "FairMQMessage.h"
|
||||||
|
#include "Rtypes.h"
|
||||||
|
#include "TString.h"
|
||||||
|
|
||||||
|
|
||||||
|
class FairMQSocket
|
||||||
|
{
|
||||||
|
private:
|
||||||
|
zmq::socket_t* fSocket;
|
||||||
|
TString fId;
|
||||||
|
ULong_t fBytesTx;
|
||||||
|
ULong_t fBytesRx;
|
||||||
|
ULong_t fMessagesTx;
|
||||||
|
ULong_t fMessagesRx;
|
||||||
|
public:
|
||||||
|
const static TString TCP, IPC, INPROC;
|
||||||
|
FairMQSocket(FairMQContext* context, Int_t type, Int_t num);
|
||||||
|
virtual ~FairMQSocket();
|
||||||
|
TString GetId();
|
||||||
|
static TString GetTypeString(Int_t type);
|
||||||
|
Bool_t Send(FairMQMessage* msg);
|
||||||
|
Bool_t Receive(FairMQMessage* msg);
|
||||||
|
void Close();
|
||||||
|
Bool_t Bind(TString address);
|
||||||
|
Bool_t Connect(TString address);
|
||||||
|
zmq::socket_t* GetSocket();
|
||||||
|
ULong_t GetBytesTx();
|
||||||
|
ULong_t GetBytesRx();
|
||||||
|
ULong_t GetMessagesTx();
|
||||||
|
ULong_t GetMessagesRx();
|
||||||
|
|
||||||
|
};
|
||||||
|
|
||||||
|
#endif /* FAIRMQSOCKET_H_ */
|
76
fairmq/FairMQStandaloneMerger.cxx
Normal file
76
fairmq/FairMQStandaloneMerger.cxx
Normal file
|
@ -0,0 +1,76 @@
|
||||||
|
/*
|
||||||
|
* FairMQStandaloneMerger.cxx
|
||||||
|
*
|
||||||
|
* Created on: Dec 6, 2012
|
||||||
|
* Author: dklein
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "FairMQStandaloneMerger.h"
|
||||||
|
#include "FairMQLogger.h"
|
||||||
|
|
||||||
|
FairMQStandaloneMerger::FairMQStandaloneMerger()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
FairMQStandaloneMerger::~FairMQStandaloneMerger()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQStandaloneMerger::Run()
|
||||||
|
{
|
||||||
|
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Run <<<<<<<");
|
||||||
|
|
||||||
|
Bool_t received0 = false;
|
||||||
|
Bool_t received1 = false;
|
||||||
|
FairMQMessage* msg0 = NULL;
|
||||||
|
FairMQMessage* msg1 = NULL;
|
||||||
|
TString source0 = fPayloadInputs->at(0)->GetId();
|
||||||
|
TString source1 = fPayloadInputs->at(1)->GetId();
|
||||||
|
Int_t size0 = 0;
|
||||||
|
Int_t size1 = 0;
|
||||||
|
|
||||||
|
// Initialize poll set
|
||||||
|
zmq_pollitem_t items[] = {
|
||||||
|
{ *(fPayloadInputs->at(0)->GetSocket()), 0, ZMQ_POLLIN, 0 },
|
||||||
|
{ *(fPayloadInputs->at(1)->GetSocket()), 0, ZMQ_POLLIN, 0 }
|
||||||
|
};
|
||||||
|
|
||||||
|
void* status; //necessary for pthread_join
|
||||||
|
pthread_t logger;
|
||||||
|
pthread_create(&logger, NULL, &FairMQDevice::callLogSocketRates, this);
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
msg0 = new FairMQMessage();
|
||||||
|
msg1 = new FairMQMessage();
|
||||||
|
|
||||||
|
zmq_poll(items, 2, -1);
|
||||||
|
|
||||||
|
if (items[0].revents & ZMQ_POLLIN) {
|
||||||
|
received0 = fPayloadInputs->at(0)->Receive(msg0);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (items[1].revents & ZMQ_POLLIN) {
|
||||||
|
received1 = fPayloadInputs->at(1)->Receive(msg1);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (received0) {
|
||||||
|
size0 = msg0->Size();
|
||||||
|
fPayloadOutputs->at(0)->Send(msg0);
|
||||||
|
|
||||||
|
received0 = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (received1) {
|
||||||
|
size1 = msg1->Size();
|
||||||
|
fPayloadOutputs->at(0)->Send(msg1);
|
||||||
|
|
||||||
|
received1 = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
delete msg0;
|
||||||
|
delete msg1;
|
||||||
|
}
|
||||||
|
|
||||||
|
pthread_join(logger, &status);
|
||||||
|
}
|
||||||
|
|
24
fairmq/FairMQStandaloneMerger.h
Normal file
24
fairmq/FairMQStandaloneMerger.h
Normal file
|
@ -0,0 +1,24 @@
|
||||||
|
/*
|
||||||
|
* FairMQStandaloneMerger.h
|
||||||
|
*
|
||||||
|
* Created on: Dec 6, 2012
|
||||||
|
* Author: dklein
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef FAIRMQSTANDALONEMERGER_H_
|
||||||
|
#define FAIRMQSTANDALONEMERGER_H_
|
||||||
|
|
||||||
|
#include "FairMQDevice.h"
|
||||||
|
#include "Rtypes.h"
|
||||||
|
#include "TString.h"
|
||||||
|
|
||||||
|
|
||||||
|
class FairMQStandaloneMerger: public FairMQDevice
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
FairMQStandaloneMerger();
|
||||||
|
virtual ~FairMQStandaloneMerger();
|
||||||
|
virtual void Run();
|
||||||
|
};
|
||||||
|
|
||||||
|
#endif /* FAIRMQSTANDALONEMERGER_H_ */
|
42
fairmq/FairMQStateMachine.cxx
Normal file
42
fairmq/FairMQStateMachine.cxx
Normal file
|
@ -0,0 +1,42 @@
|
||||||
|
/*
|
||||||
|
* FairMQStateMachine.cxx
|
||||||
|
*
|
||||||
|
* Created on: Oct 25, 2012
|
||||||
|
* Author: dklein
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "FairMQStateMachine.h"
|
||||||
|
|
||||||
|
|
||||||
|
FairMQStateMachine::FairMQStateMachine() :
|
||||||
|
fState(START)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
FairMQStateMachine::RunStateMachine()
|
||||||
|
{
|
||||||
|
void* status; //necessary for pthread_join
|
||||||
|
pthread_t state;
|
||||||
|
|
||||||
|
changeState(INIT);
|
||||||
|
|
||||||
|
while(fState != END) {
|
||||||
|
switch(fState) {
|
||||||
|
case INIT:
|
||||||
|
pthread_create(&state, NULL, &FairMQStateMachine::Init, this);
|
||||||
|
break;
|
||||||
|
|
||||||
|
}
|
||||||
|
pthread_join(state, &status);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
FairMQStateMachine::~FairMQStateMachine()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
32
fairmq/FairMQStateMachine.h
Normal file
32
fairmq/FairMQStateMachine.h
Normal file
|
@ -0,0 +1,32 @@
|
||||||
|
/*
|
||||||
|
* FairMQStateMachine.h
|
||||||
|
*
|
||||||
|
* Created on: Oct 25, 2012
|
||||||
|
* Author: dklein
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef FAIRMQSTATEMACHINE_H_
|
||||||
|
#define FAIRMQSTATEMACHINE_H_
|
||||||
|
|
||||||
|
|
||||||
|
class FairMQStateMachine
|
||||||
|
{
|
||||||
|
private:
|
||||||
|
int fState;
|
||||||
|
public:
|
||||||
|
enum {
|
||||||
|
START, INIT, BIND, CONNECT, RUN, PAUSE, SHUTDOWN, END
|
||||||
|
};
|
||||||
|
FairMQStateMachine();
|
||||||
|
virtual void Init() = 0;
|
||||||
|
virtual void Bind() = 0;
|
||||||
|
virtual void Connect() = 0;
|
||||||
|
virtual void Run() = 0;
|
||||||
|
virtual void Pause() = 0;
|
||||||
|
virtual void Shutdown() = 0;
|
||||||
|
bool ChangeState(int new_state);
|
||||||
|
void RunStateMachine();
|
||||||
|
virtual ~FairMQStateMachine();
|
||||||
|
};
|
||||||
|
|
||||||
|
#endif /* FAIRMQSTATEMACHINE_H_ */
|
83
fairmq/runBenchmarkSampler.cxx
Normal file
83
fairmq/runBenchmarkSampler.cxx
Normal file
|
@ -0,0 +1,83 @@
|
||||||
|
/*
|
||||||
|
* runBenchmarkSampler.cxx
|
||||||
|
*
|
||||||
|
* Created on: Apr 23, 2013
|
||||||
|
* Author: dklein
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include <sstream>
|
||||||
|
#include <sys/types.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
#include "FairMQLogger.h"
|
||||||
|
#include <zmq.hpp>
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <iostream>
|
||||||
|
#include "FairMQBenchmarkSampler.h"
|
||||||
|
|
||||||
|
|
||||||
|
int main(int argc, char** argv)
|
||||||
|
{
|
||||||
|
if( argc != 8 ) {
|
||||||
|
std::cout << "Usage: bsampler ID eventSize eventRate numIoTreads\n" <<
|
||||||
|
"\t\tbindSocketType bindSndBufferSize BindAddress\n" << std::endl;
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
pid_t pid = getpid();
|
||||||
|
std::stringstream logmsg;
|
||||||
|
logmsg << "PID: " << pid;
|
||||||
|
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
|
||||||
|
|
||||||
|
int i = 1;
|
||||||
|
|
||||||
|
FairMQBenchmarkSampler* sampler = new FairMQBenchmarkSampler();
|
||||||
|
|
||||||
|
sampler->SetProperty(FairMQBenchmarkSampler::Id, argv[i]);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
int eventSize;
|
||||||
|
std::stringstream(argv[i]) >> eventSize;
|
||||||
|
sampler->SetProperty(FairMQBenchmarkSampler::EventSize, eventSize);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
int eventRate;
|
||||||
|
std::stringstream(argv[i]) >> eventRate;
|
||||||
|
sampler->SetProperty(FairMQBenchmarkSampler::EventRate, eventRate);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
int numIoThreads;
|
||||||
|
std::stringstream(argv[i]) >> numIoThreads;
|
||||||
|
sampler->SetProperty(FairMQBenchmarkSampler::NumIoThreads, numIoThreads);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
int numInputs = 0;
|
||||||
|
sampler->SetProperty(FairMQBenchmarkSampler::NumInputs, numInputs);
|
||||||
|
|
||||||
|
int numOutputs = 1;
|
||||||
|
sampler->SetProperty(FairMQBenchmarkSampler::NumOutputs, numOutputs);
|
||||||
|
|
||||||
|
sampler->Init();
|
||||||
|
|
||||||
|
int bindSocketType = ZMQ_PUB;
|
||||||
|
if (strcmp(argv[i], "push") == 0) {
|
||||||
|
bindSocketType = ZMQ_PUSH;
|
||||||
|
}
|
||||||
|
sampler->SetProperty(FairMQBenchmarkSampler::BindSocketType, bindSocketType, 0);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
int bindSndBufferSize;
|
||||||
|
std::stringstream(argv[i]) >> bindSndBufferSize;
|
||||||
|
sampler->SetProperty(FairMQBenchmarkSampler::BindSndBufferSize, bindSndBufferSize, 0);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
sampler->SetProperty(FairMQBenchmarkSampler::BindAddress, argv[i], 0);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
|
||||||
|
sampler->Bind();
|
||||||
|
sampler->Connect();
|
||||||
|
sampler->Run();
|
||||||
|
|
||||||
|
exit(0);
|
||||||
|
}
|
||||||
|
|
87
fairmq/runBuffer.cxx
Normal file
87
fairmq/runBuffer.cxx
Normal file
|
@ -0,0 +1,87 @@
|
||||||
|
/*
|
||||||
|
* runBuffer.cxx
|
||||||
|
*
|
||||||
|
* Created on: Oct 26, 2012
|
||||||
|
* Author: dklein
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "FairMQBuffer.h"
|
||||||
|
#include <sys/types.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
#include "FairMQLogger.h"
|
||||||
|
#include <zmq.hpp>
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <iostream>
|
||||||
|
|
||||||
|
|
||||||
|
int main(int argc, char** argv)
|
||||||
|
{
|
||||||
|
if( argc != 9 ) {
|
||||||
|
std::cout << "Usage: buffer \tID numIoTreads\n" <<
|
||||||
|
"\t\tconnectSocketType connectRcvBufferSize ConnectAddress\n" <<
|
||||||
|
"\t\tbindSocketType bindSndBufferSize BindAddress\n" << std::endl;
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
pid_t pid = getpid();
|
||||||
|
std::stringstream logmsg;
|
||||||
|
logmsg << "PID: " << pid;
|
||||||
|
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
|
||||||
|
|
||||||
|
int i = 1;
|
||||||
|
|
||||||
|
FairMQBuffer* buffer = new FairMQBuffer();
|
||||||
|
buffer->SetProperty(FairMQBuffer::Id, argv[i]);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
int numIoThreads;
|
||||||
|
std::stringstream(argv[i]) >> numIoThreads;
|
||||||
|
buffer->SetProperty(FairMQBuffer::NumIoThreads, numIoThreads);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
int numInputs = 1;
|
||||||
|
buffer->SetProperty(FairMQBuffer::NumInputs, numInputs);
|
||||||
|
|
||||||
|
int numOutputs = 1;
|
||||||
|
buffer->SetProperty(FairMQBuffer::NumOutputs, numOutputs);
|
||||||
|
|
||||||
|
buffer->Init();
|
||||||
|
|
||||||
|
int connectSocketType = ZMQ_SUB;
|
||||||
|
if (strcmp(argv[i], "pull") == 0) {
|
||||||
|
connectSocketType = ZMQ_PULL;
|
||||||
|
}
|
||||||
|
buffer->SetProperty(FairMQBuffer::ConnectSocketType, connectSocketType, 0);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
int connectRcvBufferSize;
|
||||||
|
std::stringstream(argv[i]) >> connectRcvBufferSize;
|
||||||
|
buffer->SetProperty(FairMQBuffer::ConnectRcvBufferSize, connectRcvBufferSize, 0);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
buffer->SetProperty(FairMQBuffer::ConnectAddress, argv[i], 0);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
int bindSocketType = ZMQ_PUB;
|
||||||
|
if (strcmp(argv[i], "push") == 0) {
|
||||||
|
bindSocketType = ZMQ_PUSH;
|
||||||
|
}
|
||||||
|
buffer->SetProperty(FairMQBuffer::BindSocketType, bindSocketType, 0);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
int bindSndBufferSize;
|
||||||
|
std::stringstream(argv[i]) >> bindSndBufferSize;
|
||||||
|
buffer->SetProperty(FairMQBuffer::BindSndBufferSize, bindSndBufferSize, 0);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
buffer->SetProperty(FairMQBuffer::BindAddress, argv[i], 0);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
|
||||||
|
buffer->Bind();
|
||||||
|
buffer->Connect();
|
||||||
|
buffer->Run();
|
||||||
|
|
||||||
|
exit(0);
|
||||||
|
}
|
||||||
|
|
102
fairmq/runMerger.cxx
Normal file
102
fairmq/runMerger.cxx
Normal file
|
@ -0,0 +1,102 @@
|
||||||
|
/*
|
||||||
|
* runMerger.cxx
|
||||||
|
*
|
||||||
|
* Created on: Dec 6, 2012
|
||||||
|
* Author: dklein
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "FairMQStandaloneMerger.h"
|
||||||
|
#include <sys/types.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
#include "FairMQLogger.h"
|
||||||
|
#include <zmq.hpp>
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <iostream>
|
||||||
|
|
||||||
|
|
||||||
|
int main(int argc, char** argv)
|
||||||
|
{
|
||||||
|
if( argc != 12 ) {
|
||||||
|
std::cout << "Usage: merger \tID numIoTreads\n" <<
|
||||||
|
"\t\tconnectSocketType connectRcvBufferSize ConnectAddress\n" <<
|
||||||
|
"\t\tconnectSocketType connectRcvBufferSize ConnectAddress\n" <<
|
||||||
|
"\t\tbindSocketType bindSndBufferSize BindAddress\n" << std::endl;
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
pid_t pid = getpid();
|
||||||
|
std::stringstream logmsg;
|
||||||
|
logmsg << "PID: " << pid;
|
||||||
|
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
|
||||||
|
|
||||||
|
int i = 1;
|
||||||
|
|
||||||
|
FairMQStandaloneMerger* merger = new FairMQStandaloneMerger();
|
||||||
|
merger->SetProperty(FairMQStandaloneMerger::Id, argv[i]);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
int numIoThreads;
|
||||||
|
std::stringstream(argv[i]) >> numIoThreads;
|
||||||
|
merger->SetProperty(FairMQStandaloneMerger::NumIoThreads, numIoThreads);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
int numInputs = 2;
|
||||||
|
merger->SetProperty(FairMQStandaloneMerger::NumInputs, numInputs);
|
||||||
|
|
||||||
|
int numOutputs = 1;
|
||||||
|
merger->SetProperty(FairMQStandaloneMerger::NumOutputs, numOutputs);
|
||||||
|
|
||||||
|
merger->Init();
|
||||||
|
|
||||||
|
int connectSocketType = ZMQ_SUB;
|
||||||
|
if (strcmp(argv[i], "pull") == 0) {
|
||||||
|
connectSocketType = ZMQ_PULL;
|
||||||
|
}
|
||||||
|
merger->SetProperty(FairMQStandaloneMerger::ConnectSocketType, connectSocketType, 0);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
int connectRcvBufferSize;
|
||||||
|
std::stringstream(argv[i]) >> connectRcvBufferSize;
|
||||||
|
merger->SetProperty(FairMQStandaloneMerger::ConnectRcvBufferSize, connectRcvBufferSize, 0);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
merger->SetProperty(FairMQStandaloneMerger::ConnectAddress, argv[i], 0);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
connectSocketType = ZMQ_SUB;
|
||||||
|
if (strcmp(argv[i], "pull") == 0) {
|
||||||
|
connectSocketType = ZMQ_PULL;
|
||||||
|
}
|
||||||
|
merger->SetProperty(FairMQStandaloneMerger::ConnectSocketType, connectSocketType, 1);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
std::stringstream(argv[i]) >> connectRcvBufferSize;
|
||||||
|
merger->SetProperty(FairMQStandaloneMerger::ConnectRcvBufferSize, connectRcvBufferSize, 1);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
merger->SetProperty(FairMQStandaloneMerger::ConnectAddress, argv[i], 1);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
int bindSocketType = ZMQ_PUB;
|
||||||
|
if (strcmp(argv[i], "push") == 0) {
|
||||||
|
bindSocketType = ZMQ_PUSH;
|
||||||
|
}
|
||||||
|
merger->SetProperty(FairMQStandaloneMerger::BindSocketType, bindSocketType, 0);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
int bindSndBufferSize;
|
||||||
|
std::stringstream(argv[i]) >> bindSndBufferSize;
|
||||||
|
merger->SetProperty(FairMQStandaloneMerger::BindSndBufferSize, bindSndBufferSize, 0);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
merger->SetProperty(FairMQStandaloneMerger::BindAddress, argv[i], 0);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
|
||||||
|
merger->Bind();
|
||||||
|
merger->Connect();
|
||||||
|
merger->Run();
|
||||||
|
|
||||||
|
exit(0);
|
||||||
|
}
|
||||||
|
|
71
fairmq/runSink.cxx
Normal file
71
fairmq/runSink.cxx
Normal file
|
@ -0,0 +1,71 @@
|
||||||
|
/*
|
||||||
|
* runSink.cxx
|
||||||
|
*
|
||||||
|
* Created on: Jan 21, 2013
|
||||||
|
* Author: dklein
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "FairMQSink.h"
|
||||||
|
#include <sys/types.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
#include "FairMQLogger.h"
|
||||||
|
#include <zmq.hpp>
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <iostream>
|
||||||
|
|
||||||
|
|
||||||
|
int main(int argc, char** argv)
|
||||||
|
{
|
||||||
|
if( argc != 6 ) {
|
||||||
|
std::cout << "Usage: sink \tID numIoTreads\n" <<
|
||||||
|
"\t\tconnectSocketType connectRcvBufferSize ConnectAddress\n" << std::endl;
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
pid_t pid = getpid();
|
||||||
|
std::stringstream logmsg;
|
||||||
|
logmsg << "PID: " << pid;
|
||||||
|
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
|
||||||
|
|
||||||
|
int i = 1;
|
||||||
|
|
||||||
|
FairMQSink* sink = new FairMQSink();
|
||||||
|
sink->SetProperty(FairMQSink::Id, argv[i]);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
int numIoThreads;
|
||||||
|
std::stringstream(argv[i]) >> numIoThreads;
|
||||||
|
sink->SetProperty(FairMQSink::NumIoThreads, numIoThreads);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
int numInputs = 1;
|
||||||
|
sink->SetProperty(FairMQSink::NumInputs, numInputs);
|
||||||
|
|
||||||
|
int numOutputs = 0;
|
||||||
|
sink->SetProperty(FairMQSink::NumOutputs, numOutputs);
|
||||||
|
|
||||||
|
sink->Init();
|
||||||
|
|
||||||
|
int connectSocketType = ZMQ_SUB;
|
||||||
|
if (strcmp(argv[i], "pull") == 0) {
|
||||||
|
connectSocketType = ZMQ_PULL;
|
||||||
|
}
|
||||||
|
sink->SetProperty(FairMQSink::ConnectSocketType, connectSocketType, 0);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
int connectRcvBufferSize;
|
||||||
|
std::stringstream(argv[i]) >> connectRcvBufferSize;
|
||||||
|
sink->SetProperty(FairMQSink::ConnectRcvBufferSize, connectRcvBufferSize, 0);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
sink->SetProperty(FairMQSink::ConnectAddress, argv[i], 0);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
|
||||||
|
sink->Bind();
|
||||||
|
sink->Connect();
|
||||||
|
sink->Run();
|
||||||
|
|
||||||
|
exit(0);
|
||||||
|
}
|
||||||
|
|
102
fairmq/runSplitter.cxx
Normal file
102
fairmq/runSplitter.cxx
Normal file
|
@ -0,0 +1,102 @@
|
||||||
|
/*
|
||||||
|
* runSplitter.cxx
|
||||||
|
*
|
||||||
|
* Created on: Dec 6, 2012
|
||||||
|
* Author: dklein
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "FairMQBalancedStandaloneSplitter.h"
|
||||||
|
#include <sys/types.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
#include "FairMQLogger.h"
|
||||||
|
#include <zmq.hpp>
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <iostream>
|
||||||
|
|
||||||
|
|
||||||
|
int main(int argc, char** argv)
|
||||||
|
{
|
||||||
|
if( argc != 12 ) {
|
||||||
|
std::cout << "Usage: splitter \tID numIoTreads\n" <<
|
||||||
|
"\t\tconnectSocketType connectRcvBufferSize ConnectAddress\n" <<
|
||||||
|
"\t\tbindSocketType bindSndBufferSize BindAddress\n" <<
|
||||||
|
"\t\tbindSocketType bindSndBufferSize BindAddress\n" << std::endl;
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
pid_t pid = getpid();
|
||||||
|
std::stringstream logmsg;
|
||||||
|
logmsg << "PID: " << pid;
|
||||||
|
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
|
||||||
|
|
||||||
|
int i = 1;
|
||||||
|
|
||||||
|
FairMQBalancedStandaloneSplitter* splitter = new FairMQBalancedStandaloneSplitter();
|
||||||
|
splitter->SetProperty(FairMQBalancedStandaloneSplitter::Id, argv[i]);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
int numIoThreads;
|
||||||
|
std::stringstream(argv[i]) >> numIoThreads;
|
||||||
|
splitter->SetProperty(FairMQBalancedStandaloneSplitter::NumIoThreads, numIoThreads);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
int numInputs = 1;
|
||||||
|
splitter->SetProperty(FairMQBalancedStandaloneSplitter::NumInputs, numInputs);
|
||||||
|
|
||||||
|
int numOutputs = 2;
|
||||||
|
splitter->SetProperty(FairMQBalancedStandaloneSplitter::NumOutputs, numOutputs);
|
||||||
|
|
||||||
|
splitter->Init();
|
||||||
|
|
||||||
|
int connectSocketType = ZMQ_SUB;
|
||||||
|
if (strcmp(argv[i], "pull") == 0) {
|
||||||
|
connectSocketType = ZMQ_PULL;
|
||||||
|
}
|
||||||
|
splitter->SetProperty(FairMQBalancedStandaloneSplitter::ConnectSocketType, connectSocketType, 0);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
int connectRcvBufferSize;
|
||||||
|
std::stringstream(argv[i]) >> connectRcvBufferSize;
|
||||||
|
splitter->SetProperty(FairMQBalancedStandaloneSplitter::ConnectRcvBufferSize, connectRcvBufferSize, 0);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
splitter->SetProperty(FairMQBalancedStandaloneSplitter::ConnectAddress, argv[i], 0);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
int bindSocketType = ZMQ_PUB;
|
||||||
|
if (strcmp(argv[i], "push") == 0) {
|
||||||
|
bindSocketType = ZMQ_PUSH;
|
||||||
|
}
|
||||||
|
splitter->SetProperty(FairMQBalancedStandaloneSplitter::BindSocketType, bindSocketType, 0);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
int bindSndBufferSize;
|
||||||
|
std::stringstream(argv[i]) >> bindSndBufferSize;
|
||||||
|
splitter->SetProperty(FairMQBalancedStandaloneSplitter::BindSndBufferSize, bindSndBufferSize, 0);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
splitter->SetProperty(FairMQBalancedStandaloneSplitter::BindAddress, argv[i], 0);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
bindSocketType = ZMQ_PUB;
|
||||||
|
if (strcmp(argv[i], "push") == 0) {
|
||||||
|
bindSocketType = ZMQ_PUSH;
|
||||||
|
}
|
||||||
|
splitter->SetProperty(FairMQBalancedStandaloneSplitter::BindSocketType, bindSocketType, 1);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
std::stringstream(argv[i]) >> bindSndBufferSize;
|
||||||
|
splitter->SetProperty(FairMQBalancedStandaloneSplitter::BindSndBufferSize, bindSndBufferSize, 1);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
splitter->SetProperty(FairMQBalancedStandaloneSplitter::BindAddress, argv[i], 1);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
|
||||||
|
splitter->Bind();
|
||||||
|
splitter->Connect();
|
||||||
|
splitter->Run();
|
||||||
|
|
||||||
|
exit(0);
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user