Remove compile time transport interface switch

- Remove the compile time check of the transport implementation.
  The transport (zeromq/nanomsg) can be chosen at run time with:
  `device.SetTransport("zeromq"); // possible values are "zeromq" and "nanomsg"`.

  For devices that use FairMQProgOptions, the transport can be configured via cmd option:
  `--transport zeromq` or `--transport nanomsg`. Default values is "zeromq".
  The device receives the configured value with:
  `device.SetTransport(config.GetValue<std::string>("transport"));`

  Old method of setting transport still works. But the NANOMSG constant is not defined.

- Remove old `fairmq/prototest` directory. It was only used as a test for protobuf.
  The protobuf part of Tutorial3 does the same (with different values).

- Fix a bug in FairMQPollerNN, where the `revents` value was not initialized.
  This caused the `poller->CheckOutput()` to trigger when it should not.
This commit is contained in:
Alexey Rybalchenko 2016-01-13 17:21:24 +01:00
parent 0e1a1ad552
commit f1abb9ecdd
41 changed files with 162 additions and 1634 deletions

View File

@ -8,9 +8,6 @@
configure_file(${CMAKE_SOURCE_DIR}/fairmq/run/startBenchmark.sh.in ${CMAKE_BINARY_DIR}/bin/startBenchmark.sh) configure_file(${CMAKE_SOURCE_DIR}/fairmq/run/startBenchmark.sh.in ${CMAKE_BINARY_DIR}/bin/startBenchmark.sh)
configure_file(${CMAKE_SOURCE_DIR}/fairmq/run/benchmark.json ${CMAKE_BINARY_DIR}/bin/config/benchmark.json) configure_file(${CMAKE_SOURCE_DIR}/fairmq/run/benchmark.json ${CMAKE_BINARY_DIR}/bin/config/benchmark.json)
# following scripts are only for protobuf tests and are not essential part of FairMQ
# configure_file(${CMAKE_SOURCE_DIR}/examples/advanced/Tutorial3/MQ/run/startBin.sh.in ${CMAKE_BINARY_DIR}/bin/startBin.sh)
# configure_file(${CMAKE_SOURCE_DIR}/examples/advanced/Tutorial3/MQ/run/startProto.sh.in ${CMAKE_BINARY_DIR}/bin/startProto.sh)
add_subdirectory(logger) add_subdirectory(logger)
add_subdirectory(test) add_subdirectory(test)
@ -30,19 +27,8 @@ Set(SYSTEM_INCLUDE_DIRECTORIES
${ZMQ_INCLUDE_DIR} ${ZMQ_INCLUDE_DIR}
) )
If(PROTOBUF_FOUND)
Set(INCLUDE_DIRECTORIES
${INCLUDE_DIRECTORIES}
# following directory is only for protobuf tests and is not essential part of FairMQ
# ${CMAKE_SOURCE_DIR}/fairmq/prototest
)
Set(SYSTEM_INCLUDE_DIRECTORIES
${SYSTEM_INCLUDE_DIRECTORIES}
${PROTOBUF_INCLUDE_DIR}
)
EndIf(PROTOBUF_FOUND)
If(NANOMSG_FOUND) If(NANOMSG_FOUND)
add_definitions(-DNANOMSG_FOUND)
Set(INCLUDE_DIRECTORIES Set(INCLUDE_DIRECTORIES
${INCLUDE_DIRECTORIES} ${INCLUDE_DIRECTORIES}
${CMAKE_SOURCE_DIR}/fairmq/nanomsg ${CMAKE_SOURCE_DIR}/fairmq/nanomsg
@ -91,31 +77,6 @@ Set(SRCS
"options/FairMQParser.cxx" "options/FairMQParser.cxx"
) )
If(PROTOBUF_FOUND)
# following source files are only for protobuf tests and are not essential part of FairMQ
# add_custom_command(
# OUTPUT
# ${CMAKE_CURRENT_BINARY_DIR}/payload.pb.h
# ${CMAKE_CURRENT_BINARY_DIR}/payload.pb.cc
# COMMAND
# ${SIMPATH}/bin/protoc -I=. --cpp_out=${CMAKE_CURRENT_BINARY_DIR} payload.proto
# WORKING_DIRECTORY
# ${CMAKE_SOURCE_DIR}/fairmq/prototest
# )
# set(SRCS
# ${SRCS}
# ${CMAKE_CURRENT_BINARY_DIR}/payload.pb.cc
# "prototest/FairMQProtoSampler.cxx"
# "prototest/FairMQBinSampler.cxx"
# "prototest/FairMQBinSink.cxx"
# "prototest/FairMQProtoSink.cxx"
# )
Set(DEPENDENCIES
${DEPENDENCIES}
${PROTOBUF_LIBRARY}
)
Endif(PROTOBUF_FOUND)
If(NANOMSG_FOUND) If(NANOMSG_FOUND)
Set(SRCS Set(SRCS
${SRCS} ${SRCS}
@ -124,12 +85,9 @@ If(NANOMSG_FOUND)
"nanomsg/FairMQSocketNN.cxx" "nanomsg/FairMQSocketNN.cxx"
"nanomsg/FairMQPollerNN.cxx" "nanomsg/FairMQPollerNN.cxx"
) )
Set(DEPENDENCIES
${DEPENDENCIES}
${NANOMSG_LIBRARY_SHARED}
)
EndIf(NANOMSG_FOUND) EndIf(NANOMSG_FOUND)
# to copy src that are header-only files (e.g. c++ template) for FairRoot external installation # to copy src that are header-only files (e.g. c++ template) for FairRoot external installation
# manual install (globbing add not recommended) # manual install (globbing add not recommended)
Set(FAIRMQHEADERS Set(FAIRMQHEADERS
@ -163,6 +121,13 @@ Set(DEPENDENCIES
boost_regex boost_regex
) )
If(NANOMSG_FOUND)
Set(DEPENDENCIES
${DEPENDENCIES}
${NANOMSG_LIBRARY_SHARED}
)
EndIf(NANOMSG_FOUND)
Set(LIBRARY_NAME FairMQ) Set(LIBRARY_NAME FairMQ)
GENERATE_LIBRARY() GENERATE_LIBRARY()
@ -176,17 +141,6 @@ Set(Exe_Names
proxy proxy
) )
# following executables are only for protobuf tests and are not essential part of FairMQ
# if(PROTOBUF_FOUND)
# set(Exe_Names
# ${Exe_Names}
# binsampler
# protosampler
# binsink
# protosink
# )
# endif(PROTOBUF_FOUND)
Set(Exe_Source Set(Exe_Source
run/runBenchmarkSampler.cxx run/runBenchmarkSampler.cxx
run/runSink.cxx run/runSink.cxx
@ -196,17 +150,6 @@ Set(Exe_Source
run/runProxy.cxx run/runProxy.cxx
) )
# following source files are only for protobuf tests and are not essential part of FairMQ
# if(PROTOBUF_FOUND)
# set(Exe_Source
# ${Exe_Source}
# prototest/runBinSampler.cxx
# prototest/runProtoSampler.cxx
# prototest/runBinSink.cxx
# prototest/runProtoSink.cxx
# )
# endif(PROTOBUF_FOUND)
list(LENGTH Exe_Names _length) list(LENGTH Exe_Names _length)
math(EXPR _length ${_length}-1) math(EXPR _length ${_length}-1)

View File

@ -27,6 +27,11 @@
#include "FairMQDevice.h" #include "FairMQDevice.h"
#include "FairMQLogger.h" #include "FairMQLogger.h"
#include "FairMQTransportFactoryZMQ.h"
#ifdef NANOMSG_FOUND
#include "FairMQTransportFactoryNN.h"
#endif
using namespace std; using namespace std;
// boost::function and a wrapper to catch the signals // boost::function and a wrapper to catch the signals
@ -85,6 +90,12 @@ void FairMQDevice::SignalHandler(int signal)
void FairMQDevice::InitWrapper() void FairMQDevice::InitWrapper()
{ {
if (!fTransportFactory)
{
LOG(ERROR) << "Transport not initialized. Did you call SetTransport()?";
exit(EXIT_FAILURE);
}
if (!fCmdSocket) if (!fCmdSocket)
{ {
fCmdSocket = fTransportFactory->CreateSocket("pub", "device-commands", fNumIoThreads); fCmdSocket = fTransportFactory->CreateSocket("pub", "device-commands", fNumIoThreads);
@ -454,6 +465,32 @@ void FairMQDevice::SetTransport(FairMQTransportFactory* factory)
fTransportFactory = factory; fTransportFactory = factory;
} }
void FairMQDevice::SetTransport(const string& transport)
{
if (transport == "zeromq")
{
fTransportFactory = new FairMQTransportFactoryZMQ();
}
#ifdef NANOMSG_FOUND
else if (transport == "nanomsg")
{
fTransportFactory = new FairMQTransportFactoryNN();
}
#endif
else
{
LOG(ERROR) << "Unknown transport implementation requested: "
<< transport
<< ". Supported are "
<< "\"zeromq\""
#ifdef NANOMSG_FOUND
<< ", \"nanomsg\""
#endif
<< ". Exiting.";
exit(EXIT_FAILURE);
}
}
void FairMQDevice::LogSocketRates() void FairMQDevice::LogSocketRates()
{ {
timestamp_t t0; timestamp_t t0;

View File

@ -102,9 +102,12 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable
/// Print all properties of this and the parent class to LOG(INFO) /// Print all properties of this and the parent class to LOG(INFO)
virtual void ListProperties(); virtual void ListProperties();
/// Configures the device with a transport factory /// Configures the device with a transport factory (DEPRECATED)
/// @param factory Pointer to the transport factory object /// @param factory Pointer to the transport factory object
virtual void SetTransport(FairMQTransportFactory* factory); void SetTransport(FairMQTransportFactory* factory);
/// Configures the device with a transport factory
/// @param transport Transport string ("zeromq"/"nanomsg")
void SetTransport(const std::string& transport = "zeromq");
/// Implements the sort algorithm used in SortChannel() /// Implements the sort algorithm used in SortChannel()
/// @param lhs Right hand side value for comparison /// @param lhs Right hand side value for comparison

View File

@ -53,9 +53,10 @@ class GenericFileSink : public FairMQDevice, public T, public U
virtual ~GenericFileSink() virtual ~GenericFileSink()
{} {}
void SetTransport(FairMQTransportFactory* transport) template <typename... Args>
void SetTransport(Args... args)
{ {
FairMQDevice::SetTransport(transport); FairMQDevice::SetTransport(std::forward<Args>(args)...);
} }
template <typename... Args> template <typename... Args>

View File

@ -28,9 +28,10 @@ class GenericMerger : public FairMQDevice, public MergerPolicy, public InputPoli
virtual ~GenericMerger() virtual ~GenericMerger()
{} {}
void SetTransport(FairMQTransportFactory* transport) template <typename... Args>
void SetTransport(Args... args)
{ {
FairMQDevice::SetTransport(transport); FairMQDevice::SetTransport(std::forward<Args>(args)...);
} }
protected: protected:

View File

@ -60,9 +60,10 @@ class GenericProcessor : public FairMQDevice, public T, public U, public V
// the four following methods ensure // the four following methods ensure
// that the correct policy method is called // that the correct policy method is called
void SetTransport(FairMQTransportFactory* transport) template <typename... Args>
void SetTransport(Args... args)
{ {
FairMQDevice::SetTransport(transport); FairMQDevice::SetTransport(std::forward<Args>(args)...);
} }
template <typename... Args> template <typename... Args>

View File

@ -84,7 +84,12 @@ class base_GenericSampler : public FairMQDevice, public T, public U
}; };
*/ */
virtual void SetTransport(FairMQTransportFactory* factory); template <typename... Args>
void SetTransport(Args... args)
{
FairMQDevice::SetTransport(std::forward<Args>(args)...);
}
void ResetEventCounter(); void ResetEventCounter();
template <typename... Args> template <typename... Args>

View File

@ -22,12 +22,6 @@ base_GenericSampler<T,U,K,L>::~base_GenericSampler()
{ {
} }
template <typename T, typename U, typename K, typename L>
void base_GenericSampler<T,U,K,L>::SetTransport(FairMQTransportFactory* factory)
{
FairMQDevice::SetTransport(factory);
}
template <typename T, typename U, typename K, typename L> template <typename T, typename U, typename K, typename L>
void base_GenericSampler<T,U,K,L>::InitTask() void base_GenericSampler<T,U,K,L>::InitTask()
{ {

View File

@ -127,13 +127,15 @@ FairMQPollerNN::FairMQPollerNN(FairMQSocket& cmdSocket, FairMQSocket& dataSocket
items[0].fd = cmdSocket.GetSocket(1); items[0].fd = cmdSocket.GetSocket(1);
items[0].events = NN_POLLIN; items[0].events = NN_POLLIN;
items[0].revents = 0;
items[1].fd = dataSocket.GetSocket(1);
items[1].revents = 0;
int type = 0; int type = 0;
size_t sz = sizeof(type); size_t sz = sizeof(type);
nn_getsockopt(dataSocket.GetSocket(1), NN_SOL_SOCKET, NN_PROTOCOL, &type, &sz); nn_getsockopt(dataSocket.GetSocket(1), NN_SOL_SOCKET, NN_PROTOCOL, &type, &sz);
items[1].fd = dataSocket.GetSocket(1);
if (type == NN_REQ || type == NN_REP || type == NN_PAIR) if (type == NN_REQ || type == NN_REP || type == NN_PAIR)
{ {
items[1].events = NN_POLLIN|NN_POLLOUT; items[1].events = NN_POLLIN|NN_POLLOUT;
@ -153,7 +155,7 @@ FairMQPollerNN::FairMQPollerNN(FairMQSocket& cmdSocket, FairMQSocket& dataSocket
} }
} }
void FairMQPollerNN::Poll(int timeout) void FairMQPollerNN::Poll(const int timeout)
{ {
if (nn_poll(items, fNumItems, timeout) < 0) if (nn_poll(items, fNumItems, timeout) < 0)
{ {
@ -192,7 +194,7 @@ bool FairMQPollerNN::CheckInput(const string channelKey, const int index)
{ {
try try
{ {
if (items[fOffsetMap.at(channelKey) + index].revents & NN_POLLIN) if (items[fOffsetMap.at(channelKey) + index].revents & (NN_POLLIN | NN_POLLOUT))
{ {
return true; return true;
} }

View File

@ -132,17 +132,20 @@ void FairMQProgOptions::InitOptionDescription()
{ {
fMQOptionsInCmd.add_options() fMQOptionsInCmd.add_options()
("id", po::value<string>(), "Device ID (required argument).") ("id", po::value<string>(), "Device ID (required argument).")
("io-threads", po::value<int>()->default_value(1), "Number of I/O threads."); ("io-threads", po::value<int>()->default_value(1), "Number of I/O threads.")
("transport", po::value<string>()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg').");
fMQOptionsInCfg.add_options() fMQOptionsInCfg.add_options()
("id", po::value<string>()->required(), "Device ID (required argument).") ("id", po::value<string>()->required(), "Device ID (required argument).")
("io-threads", po::value<int>()->default_value(1), "Number of I/O threads."); ("io-threads", po::value<int>()->default_value(1), "Number of I/O threads.")
("transport", po::value<string>()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg').");
} }
else else
{ {
fMQOptionsInCmd.add_options() fMQOptionsInCmd.add_options()
("id", po::value<string>()->required(), "Device ID (required argument)") ("id", po::value<string>()->required(), "Device ID (required argument)")
("io-threads", po::value<int>()->default_value(1), "Number of I/O threads"); ("io-threads", po::value<int>()->default_value(1), "Number of I/O threads")
("transport", po::value<string>()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg').");
} }
fMQParserOptions.add_options() fMQParserOptions.add_options()

View File

@ -1,144 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQBinSampler.cpp
*
* @since 2013-04-23
* @author D. Klein, A. Rybalchenko
*/
#include <vector>
#include <stdlib.h> /* srand, rand */
#include <time.h> /* time */
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include "FairMQBinSampler.h"
#include "FairMQLogger.h"
using namespace std;
FairMQBinSampler::FairMQBinSampler()
: fEventSize(10000)
, fEventRate(1)
, fEventCounter(0)
{
}
FairMQBinSampler::~FairMQBinSampler()
{
}
void FairMQBinSampler::Run()
{
boost::thread resetEventCounter(boost::bind(&FairMQBinSampler::ResetEventCounter, this));
srand(time(NULL));
LOG(DEBUG) << "Message size: " << fEventSize * sizeof(Content) << " bytes.";
const FairMQChannel& dataOutChannel = fChannels.at("data-out").at(0);
while (CheckCurrentState(RUNNING))
{
Content* payload = new Content[fEventSize];
for (int i = 0; i < fEventSize; ++i)
{
(&payload[i])->x = rand() % 100 + 1;
(&payload[i])->y = rand() % 100 + 1;
(&payload[i])->z = rand() % 100 + 1;
(&payload[i])->a = (rand() % 100 + 1) / (rand() % 100 + 1);
(&payload[i])->b = (rand() % 100 + 1) / (rand() % 100 + 1);
// LOG(INFO) << (&payload[i])->x << " " << (&payload[i])->y << " " << (&payload[i])->z << " " << (&payload[i])->a << " " << (&payload[i])->b;
}
FairMQMessage* msg = fTransportFactory->CreateMessage(fEventSize * sizeof(Content));
memcpy(msg->GetData(), payload, fEventSize * sizeof(Content));
dataOutChannel.Send(msg);
--fEventCounter;
while (fEventCounter == 0)
{
boost::this_thread::sleep(boost::posix_time::milliseconds(1));
}
delete[] payload;
delete msg;
}
resetEventCounter.interrupt();
resetEventCounter.join();
}
void FairMQBinSampler::ResetEventCounter()
{
while (GetCurrentState() == RUNNING)
{
try
{
fEventCounter = fEventRate / 100;
boost::this_thread::sleep(boost::posix_time::milliseconds(10));
}
catch (boost::thread_interrupted&)
{
break;
}
}
}
void FairMQBinSampler::SetProperty(const int key, const string& value)
{
switch (key)
{
default:
FairMQDevice::SetProperty(key, value);
break;
}
}
string FairMQBinSampler::GetProperty(const int key, const string& default_ /*= ""*/)
{
switch (key)
{
default:
return FairMQDevice::GetProperty(key, default_);
}
}
void FairMQBinSampler::SetProperty(const int key, const int value)
{
switch (key)
{
case EventSize:
fEventSize = value;
break;
case EventRate:
fEventRate = value;
break;
default:
FairMQDevice::SetProperty(key, value);
break;
}
}
int FairMQBinSampler::GetProperty(const int key, const int default_ /*= 0*/)
{
switch (key)
{
case EventSize:
return fEventSize;
case EventRate:
return fEventRate;
default:
return FairMQDevice::GetProperty(key, default_);
}
}

View File

@ -1,58 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQBinSampler.h
*
* @since 2014-02-24
* @author A. Rybalchenko
*/
#ifndef FAIRMQBINSAMPLER_H_
#define FAIRMQBINSAMPLER_H_
#include <string>
#include "FairMQDevice.h"
struct Content
{
double a;
double b;
int x;
int y;
int z;
};
class FairMQBinSampler : public FairMQDevice
{
public:
enum
{
EventRate = FairMQDevice::Last,
EventSize,
Last
};
FairMQBinSampler();
virtual ~FairMQBinSampler();
void ResetEventCounter();
virtual void SetProperty(const int key, const std::string& value);
virtual std::string GetProperty(const int key, const std::string& default_ = "");
virtual void SetProperty(const int key, const int value);
virtual int GetProperty(const int key, const int default_ = 0);
protected:
int fEventSize;
int fEventRate;
int fEventCounter;
virtual void Run();
};
#endif /* FAIRMQBINSAMPLER_H_ */

View File

@ -1,49 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQBinSink.cxx
*
* @since 2013-01-09
* @author D. Klein, A. Rybalchenko
*/
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include "FairMQBinSink.h"
#include "FairMQLogger.h"
FairMQBinSink::FairMQBinSink()
{
}
void FairMQBinSink::Run()
{
const FairMQChannel& dataInChannel = fChannels.at("data-in").at(0);
while (CheckCurrentState(RUNNING))
{
FairMQMessage* msg = fTransportFactory->CreateMessage();
dataInChannel.Receive(msg);
int inputSize = msg->GetSize();
// int numInput = inputSize / sizeof(Content);
// Content* input = reinterpret_cast<Content*>(msg->GetData());
// for (int i = 0; i < numInput; ++i) {
// LOG(INFO) << (&input[i])->x << " " << (&input[i])->y << " " << (&input[i])->z << " " << (&input[i])->a << " " << (&input[i])->b;
// }
delete msg;
}
}
FairMQBinSink::~FairMQBinSink()
{
}

View File

@ -1,39 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQBinSink.h
*
* @since 2013-01-09
* @author D. Klein, A. Rybalchenko
*/
#ifndef FAIRMQPROTOSINK_H_
#define FAIRMQPROTOSINK_H_
#include "FairMQDevice.h"
struct Content
{
double a;
double b;
int x;
int y;
int z;
};
class FairMQBinSink : public FairMQDevice
{
public:
FairMQBinSink();
virtual ~FairMQBinSink();
protected:
virtual void Run();
};
#endif /* FAIRMQPROTOSINK_H_ */

View File

@ -1,147 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQProtoSampler.cpp
*
* @since 2013-04-23
* @author D. Klein, A. Rybalchenko
*/
#include <string>
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include "FairMQProtoSampler.h"
#include "FairMQLogger.h"
#include "payload.pb.h"
using namespace std;
FairMQProtoSampler::FairMQProtoSampler()
: fEventSize(10000)
, fEventRate(1)
, fEventCounter(0)
{
}
FairMQProtoSampler::~FairMQProtoSampler()
{
}
void FairMQProtoSampler::Run()
{
boost::thread resetEventCounter(boost::bind(&FairMQProtoSampler::ResetEventCounter, this));
srand(time(NULL));
const FairMQChannel& dataOutChannel = fChannels.at("data-out").at(0);
while (CheckCurrentState(RUNNING))
{
sampler::Payload p;
for (int i = 0; i < fEventSize; ++i)
{
sampler::Content* content = p.add_data();
content->set_x(rand() % 100 + 1);
content->set_y(rand() % 100 + 1);
content->set_z(rand() % 100 + 1);
content->set_a((rand() % 100 + 1) / (rand() % 100 + 1));
content->set_b((rand() % 100 + 1) / (rand() % 100 + 1));
// LOG(INFO) << content->x() << " " << content->y() << " " << content->z() << " " << content->a() << " " << content->b();
}
string str;
p.SerializeToString(&str);
size_t size = str.length();
FairMQMessage* msg = fTransportFactory->CreateMessage(size);
memcpy(msg->GetData(), str.c_str(), size);
dataOutChannel.Send(msg);
--fEventCounter;
while (fEventCounter == 0)
{
boost::this_thread::sleep(boost::posix_time::milliseconds(1));
}
delete msg;
}
resetEventCounter.interrupt();
resetEventCounter.join();
}
void FairMQProtoSampler::ResetEventCounter()
{
while (true)
{
try
{
fEventCounter = fEventRate / 100;
boost::this_thread::sleep(boost::posix_time::milliseconds(10));
}
catch (boost::thread_interrupted&)
{
break;
}
}
}
void FairMQProtoSampler::SetProperty(const int key, const string& value)
{
switch (key)
{
default:
FairMQDevice::SetProperty(key, value);
break;
}
}
string FairMQProtoSampler::GetProperty(const int key, const string& default_ /*= ""*/)
{
switch (key)
{
default:
return FairMQDevice::GetProperty(key, default_);
}
}
void FairMQProtoSampler::SetProperty(const int key, const int value)
{
switch (key)
{
case EventSize:
fEventSize = value;
break;
case EventRate:
fEventRate = value;
break;
default:
FairMQDevice::SetProperty(key, value);
break;
}
}
int FairMQProtoSampler::GetProperty(const int key, const int default_ /*= 0*/)
{
switch (key)
{
case EventSize:
return fEventSize;
case EventRate:
return fEventRate;
default:
return FairMQDevice::GetProperty(key, default_);
}
}

View File

@ -1,49 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQProtoSampler.h
*
* @since 2013-04-23
* @author D. Klein, A. Rybalchenko
*/
#ifndef FAIRMQPROTOSAMPLER_H_
#define FAIRMQPROTOSAMPLER_H_
#include <string>
#include "FairMQDevice.h"
class FairMQProtoSampler : public FairMQDevice
{
public:
enum
{
EventRate = FairMQDevice::Last,
EventSize,
Last
};
FairMQProtoSampler();
virtual ~FairMQProtoSampler();
void ResetEventCounter();
virtual void SetProperty(const int key, const std::string& value);
virtual std::string GetProperty(const int key, const std::string& default_ = "");
virtual void SetProperty(const int key, const int value);
virtual int GetProperty(const int key, const int default_ = 0);
protected:
int fEventSize;
int fEventRate;
int fEventCounter;
virtual void Run();
};
#endif /* FAIRMQPROTOSAMPLER_H_ */

View File

@ -1,52 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQProtoSink.cxx
*
* @since 2013-01-09
* @author D. Klein, A. Rybalchenko
*/
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include "FairMQProtoSink.h"
#include "FairMQLogger.h"
#include "payload.pb.h"
FairMQProtoSink::FairMQProtoSink()
{
}
void FairMQProtoSink::Run()
{
const FairMQChannel& dataInChannel = fChannels.at("data-in").at(0);
while (CheckCurrentState(RUNNING))
{
FairMQMessage* msg = fTransportFactory->CreateMessage();
dataInChannel.Receive(msg);
sampler::Payload p;
p.ParseFromArray(msg->GetData(), msg->GetSize());
// for (int i = 0; i < p.data_size(); ++i) {
// const sampler::Payload::Content& content = p.data(i);
// LOG(INFO) << content.x() << " " << content.y() << " " << content.z() << " " << content.a() << " " << content.b();
// }
delete msg;
}
}
FairMQProtoSink::~FairMQProtoSink()
{
}

View File

@ -1,39 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQProtoSink.h
*
* @since 2013-01-09
* @author D. Klein, A. Rybalchenko
*/
#ifndef FAIRMQPROTOSINK_H_
#define FAIRMQPROTOSINK_H_
#include "FairMQDevice.h"
struct Content
{
double a;
double b;
int x;
int y;
int z;
};
class FairMQProtoSink : public FairMQDevice
{
public:
FairMQProtoSink();
virtual ~FairMQProtoSink();
protected:
virtual void Run();
};
#endif /* FAIRMQPROTOSINK_H_ */

View File

@ -1,13 +0,0 @@
package sampler;
message Content {
optional double a = 1;
optional double b = 2;
optional int32 x = 3;
optional int32 y = 4;
optional int32 z = 5;
}
message Payload {
repeated Content data = 1;
}

View File

@ -1,153 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* runBenchmarkSampler.cxx
*
* @since 2013-04-23
* @author D. Klein, A. Rybalchenko
*/
#include <iostream>
#include "boost/program_options.hpp"
#include "FairMQLogger.h"
#include "FairMQBinSampler.h"
#ifdef NANOMSG
#include "FairMQTransportFactoryNN.h"
#else
#include "FairMQTransportFactoryZMQ.h"
#endif
using namespace std;
typedef struct DeviceOptions
{
DeviceOptions() :
id(), eventSize(0), eventRate(0), ioThreads(0),
outputSocketType(), outputBufSize(0), outputMethod(), outputAddress() {}
string id;
int eventSize;
int eventRate;
int ioThreads;
string outputSocketType;
int outputBufSize;
string outputMethod;
string outputAddress;
} DeviceOptions_t;
inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options)
{
if (_options == NULL)
throw runtime_error("Internal error: options' container is empty.");
namespace bpo = boost::program_options;
bpo::options_description desc("Options");
desc.add_options()
("id", bpo::value<string>()->required(), "Device ID")
("event-size", bpo::value<int>()->default_value(1000), "Event size in bytes")
("event-rate", bpo::value<int>()->default_value(0), "Event rate limit in maximum number of events per second")
("io-threads", bpo::value<int>()->default_value(1), "Number of I/O threads")
("output-socket-type", bpo::value<string>()->required(), "Output socket type: pub/push")
("output-buff-size", bpo::value<int>()->required(), "Output buffer size in number of messages (ZeroMQ)/bytes(nanomsg)")
("output-method", bpo::value<string>()->required(), "Output method: bind/connect")
("output-address", bpo::value<string>()->required(), "Output address, e.g.: \"tcp://*:5555\"")
("help", "Print help messages");
bpo::variables_map vm;
bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm);
if ( vm.count("help") )
{
LOG(INFO) << "FairMQ Bin Sampler" << endl << desc;
return false;
}
bpo::notify(vm);
if ( vm.count("id") )
_options->id = vm["id"].as<string>();
if ( vm.count("event-size") )
_options->eventSize = vm["event-size"].as<int>();
if ( vm.count("event-rate") )
_options->eventRate = vm["event-rate"].as<int>();
if ( vm.count("io-threads") )
_options->ioThreads = vm["io-threads"].as<int>();
if ( vm.count("output-socket-type") )
_options->outputSocketType = vm["output-socket-type"].as<string>();
if ( vm.count("output-buff-size") )
_options->outputBufSize = vm["output-buff-size"].as<int>();
if ( vm.count("output-method") )
_options->outputMethod = vm["output-method"].as<string>();
if ( vm.count("output-address") )
_options->outputAddress = vm["output-address"].as<string>();
return true;
}
int main(int argc, char** argv)
{
FairMQBinSampler sampler;
sampler.CatchSignals();
DeviceOptions_t options;
try
{
if (!parse_cmd_line(argc, argv, &options))
return 0;
}
catch (exception& e)
{
LOG(ERROR) << e.what();
return 1;
}
LOG(INFO) << "PID: " << getpid();
LOG(INFO) << "CONFIG: " << "id: " << options.id << ", event size: " << options.eventSize << ", event rate: " << options.eventRate << ", I/O threads: " << options.ioThreads;
LOG(INFO) << "OUTPUT: " << options.outputSocketType << " " << options.outputBufSize << " " << options.outputMethod << " " << options.outputAddress;
#ifdef NANOMSG
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
#else
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
#endif
sampler.SetTransport(transportFactory);
FairMQChannel outputChannel(options.outputSocketType, options.outputMethod, options.outputAddress);
outputChannel.UpdateSndBufSize(options.outputBufSize);
outputChannel.UpdateRcvBufSize(options.outputBufSize);
outputChannel.UpdateRateLogging(1);
sampler.fChannels["data-out"].push_back(outputChannel);
sampler.SetProperty(FairMQBinSampler::Id, options.id);
sampler.SetProperty(FairMQBinSampler::EventSize, options.eventSize);
sampler.SetProperty(FairMQBinSampler::EventRate, options.eventRate);
sampler.SetProperty(FairMQBinSampler::NumIoThreads, options.ioThreads);
sampler.ChangeState("INIT_DEVICE");
sampler.WaitForEndOfState("INIT_DEVICE");
sampler.ChangeState("INIT_TASK");
sampler.WaitForEndOfState("INIT_TASK");
sampler.ChangeState("RUN");
sampler.InteractiveStateLoop();
return 0;
}

View File

@ -1,139 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* runSink.cxx
*
* @since 2013-01-21
* @author D. Klein, A. Rybalchenko
*/
#include <iostream>
#include "boost/program_options.hpp"
#include "FairMQLogger.h"
#include "FairMQBinSink.h"
#ifdef NANOMSG
#include "FairMQTransportFactoryNN.h"
#else
#include "FairMQTransportFactoryZMQ.h"
#endif
using namespace std;
typedef struct DeviceOptions
{
DeviceOptions() :
id(), ioThreads(0),
inputSocketType(), inputBufSize(0), inputMethod(), inputAddress() {}
string id;
int ioThreads;
string inputSocketType;
int inputBufSize;
string inputMethod;
string inputAddress;
} DeviceOptions_t;
inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options)
{
if (_options == NULL)
throw runtime_error("Internal error: options' container is empty.");
namespace bpo = boost::program_options;
bpo::options_description desc("Options");
desc.add_options()
("id", bpo::value<string>()->required(), "Device ID")
("io-threads", bpo::value<int>()->default_value(1), "Number of I/O threads")
("input-socket-type", bpo::value<string>()->required(), "Input socket type: sub/pull")
("input-buff-size", bpo::value<int>()->required(), "Input buffer size in number of messages (ZeroMQ)/bytes(nanomsg)")
("input-method", bpo::value<string>()->required(), "Input method: bind/connect")
("input-address", bpo::value<string>()->required(), "Input address, e.g.: \"tcp://*:5555\"")
("help", "Print help messages");
bpo::variables_map vm;
bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm);
if ( vm.count("help") )
{
LOG(INFO) << "FairMQ Bin Sink" << endl << desc;
return false;
}
bpo::notify(vm);
if ( vm.count("id") )
_options->id = vm["id"].as<string>();
if ( vm.count("io-threads") )
_options->ioThreads = vm["io-threads"].as<int>();
if ( vm.count("input-socket-type") )
_options->inputSocketType = vm["input-socket-type"].as<string>();
if ( vm.count("input-buff-size") )
_options->inputBufSize = vm["input-buff-size"].as<int>();
if ( vm.count("input-method") )
_options->inputMethod = vm["input-method"].as<string>();
if ( vm.count("input-address") )
_options->inputAddress = vm["input-address"].as<string>();
return true;
}
int main(int argc, char** argv)
{
FairMQBinSink sink;
sink.CatchSignals();
DeviceOptions_t options;
try
{
if (!parse_cmd_line(argc, argv, &options))
return 0;
}
catch (exception& e)
{
LOG(ERROR) << e.what();
return 1;
}
LOG(INFO) << "PID: " << getpid();
#ifdef NANOMSG
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
#else
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
#endif
sink.SetTransport(transportFactory);
FairMQChannel inputChannel(options.inputSocketType, options.inputMethod, options.inputAddress);
inputChannel.UpdateSndBufSize(options.inputBufSize);
inputChannel.UpdateRcvBufSize(options.inputBufSize);
inputChannel.UpdateRateLogging(1);
sink.fChannels["data-in"].push_back(inputChannel);
sink.SetProperty(FairMQBinSink::Id, options.id);
sink.SetProperty(FairMQBinSink::NumIoThreads, options.ioThreads);
sink.ChangeState("INIT_DEVICE");
sink.WaitForEndOfState("INIT_DEVICE");
sink.ChangeState("INIT_TASK");
sink.WaitForEndOfState("INIT_TASK");
sink.ChangeState("RUN");
sink.InteractiveStateLoop();
return 0;
}

View File

@ -1,153 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* runBenchmarkSampler.cxx
*
* @since 2013-04-23
* @author D. Klein, A. Rybalchenko
*/
#include <iostream>
#include "boost/program_options.hpp"
#include "FairMQLogger.h"
#include "FairMQProtoSampler.h"
#ifdef NANOMSG
#include "FairMQTransportFactoryNN.h"
#else
#include "FairMQTransportFactoryZMQ.h"
#endif
using namespace std;
typedef struct DeviceOptions
{
DeviceOptions() :
id(), eventSize(0), eventRate(0), ioThreads(0),
outputSocketType(), outputBufSize(0), outputMethod(), outputAddress() {}
string id;
int eventSize;
int eventRate;
int ioThreads;
string outputSocketType;
int outputBufSize;
string outputMethod;
string outputAddress;
} DeviceOptions_t;
inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options)
{
if (_options == NULL)
throw runtime_error("Internal error: options' container is empty.");
namespace bpo = boost::program_options;
bpo::options_description desc("Options");
desc.add_options()
("id", bpo::value<string>()->required(), "Device ID")
("event-size", bpo::value<int>()->default_value(1000), "Event size in bytes")
("event-rate", bpo::value<int>()->default_value(0), "Event rate limit in maximum number of events per second")
("io-threads", bpo::value<int>()->default_value(1), "Number of I/O threads")
("output-socket-type", bpo::value<string>()->required(), "Output socket type: pub/push")
("output-buff-size", bpo::value<int>()->required(), "Output buffer size in number of messages (ZeroMQ)/bytes(nanomsg)")
("output-method", bpo::value<string>()->required(), "Output method: bind/connect")
("output-address", bpo::value<string>()->required(), "Output address, e.g.: \"tcp://*:5555\"")
("help", "Print help messages");
bpo::variables_map vm;
bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm);
if ( vm.count("help") )
{
LOG(INFO) << "FairMQ Proto Sampler" << endl << desc;
return false;
}
bpo::notify(vm);
if ( vm.count("id") )
_options->id = vm["id"].as<string>();
if ( vm.count("event-size") )
_options->eventSize = vm["event-size"].as<int>();
if ( vm.count("event-rate") )
_options->eventRate = vm["event-rate"].as<int>();
if ( vm.count("io-threads") )
_options->ioThreads = vm["io-threads"].as<int>();
if ( vm.count("output-socket-type") )
_options->outputSocketType = vm["output-socket-type"].as<string>();
if ( vm.count("output-buff-size") )
_options->outputBufSize = vm["output-buff-size"].as<int>();
if ( vm.count("output-method") )
_options->outputMethod = vm["output-method"].as<string>();
if ( vm.count("output-address") )
_options->outputAddress = vm["output-address"].as<string>();
return true;
}
int main(int argc, char** argv)
{
FairMQProtoSampler sampler;
sampler.CatchSignals();
DeviceOptions_t options;
try
{
if (!parse_cmd_line(argc, argv, &options))
return 0;
}
catch (exception& e)
{
LOG(ERROR) << e.what();
return 1;
}
LOG(INFO) << "PID: " << getpid();
LOG(INFO) << "CONFIG: " << "id: " << options.id << ", event size: " << options.eventSize << ", event rate: " << options.eventRate << ", I/O threads: " << options.ioThreads;
LOG(INFO) << "OUTPUT: " << options.outputSocketType << " " << options.outputBufSize << " " << options.outputMethod << " " << options.outputAddress;
#ifdef NANOMSG
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
#else
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
#endif
sampler.SetTransport(transportFactory);
FairMQChannel outputChannel(options.outputSocketType, options.outputMethod, options.outputAddress);
outputChannel.UpdateSndBufSize(options.outputBufSize);
outputChannel.UpdateRcvBufSize(options.outputBufSize);
outputChannel.UpdateRateLogging(1);
sampler.fChannels["data-out"].push_back(outputChannel);
sampler.SetProperty(FairMQProtoSampler::Id, options.id);
sampler.SetProperty(FairMQProtoSampler::EventSize, options.eventSize);
sampler.SetProperty(FairMQProtoSampler::EventRate, options.eventRate);
sampler.SetProperty(FairMQProtoSampler::NumIoThreads, options.ioThreads);
sampler.ChangeState("INIT_DEVICE");
sampler.WaitForEndOfState("INIT_DEVICE");
sampler.ChangeState("INIT_TASK");
sampler.WaitForEndOfState("INIT_TASK");
sampler.ChangeState("RUN");
sampler.InteractiveStateLoop();
return 0;
}

View File

@ -1,139 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* runSink.cxx
*
* @since 2013-01-21
* @author D. Klein, A. Rybalchenko
*/
#include <iostream>
#include "boost/program_options.hpp"
#include "FairMQLogger.h"
#include "FairMQProtoSink.h"
#ifdef NANOMSG
#include "FairMQTransportFactoryNN.h"
#else
#include "FairMQTransportFactoryZMQ.h"
#endif
using namespace std;
typedef struct DeviceOptions
{
DeviceOptions() :
id(), ioThreads(0),
inputSocketType(), inputBufSize(0), inputMethod(), inputAddress() {}
string id;
int ioThreads;
string inputSocketType;
int inputBufSize;
string inputMethod;
string inputAddress;
} DeviceOptions_t;
inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options)
{
if (_options == NULL)
throw runtime_error("Internal error: options' container is empty.");
namespace bpo = boost::program_options;
bpo::options_description desc("Options");
desc.add_options()
("id", bpo::value<string>()->required(), "Device ID")
("io-threads", bpo::value<int>()->default_value(1), "Number of I/O threads")
("input-socket-type", bpo::value<string>()->required(), "Input socket type: sub/pull")
("input-buff-size", bpo::value<int>()->required(), "Input buffer size in number of messages (ZeroMQ)/bytes(nanomsg)")
("input-method", bpo::value<string>()->required(), "Input method: bind/connect")
("input-address", bpo::value<string>()->required(), "Input address, e.g.: \"tcp://*:5555\"")
("help", "Print help messages");
bpo::variables_map vm;
bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm);
if ( vm.count("help") )
{
LOG(INFO) << "FairMQ Proto Sink" << endl << desc;
return false;
}
bpo::notify(vm);
if ( vm.count("id") )
_options->id = vm["id"].as<string>();
if ( vm.count("io-threads") )
_options->ioThreads = vm["io-threads"].as<int>();
if ( vm.count("input-socket-type") )
_options->inputSocketType = vm["input-socket-type"].as<string>();
if ( vm.count("input-buff-size") )
_options->inputBufSize = vm["input-buff-size"].as<int>();
if ( vm.count("input-method") )
_options->inputMethod = vm["input-method"].as<string>();
if ( vm.count("input-address") )
_options->inputAddress = vm["input-address"].as<string>();
return true;
}
int main(int argc, char** argv)
{
FairMQProtoSink sink;
sink.CatchSignals();
DeviceOptions_t options;
try
{
if (!parse_cmd_line(argc, argv, &options))
return 0;
}
catch (exception& e)
{
LOG(ERROR) << e.what();
return 1;
}
LOG(INFO) << "PID: " << getpid();
#ifdef NANOMSG
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
#else
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
#endif
sink.SetTransport(transportFactory);
FairMQChannel inputChannel(options.inputSocketType, options.inputMethod, options.inputAddress);
inputChannel.UpdateSndBufSize(options.inputBufSize);
inputChannel.UpdateRcvBufSize(options.inputBufSize);
inputChannel.UpdateRateLogging(1);
sink.fChannels["data-in"].push_back(inputChannel);
sink.SetProperty(FairMQProtoSink::Id, options.id);
sink.SetProperty(FairMQProtoSink::NumIoThreads, options.ioThreads);
sink.ChangeState("INIT_DEVICE");
sink.WaitForEndOfState("INIT_DEVICE");
sink.ChangeState("INIT_TASK");
sink.WaitForEndOfState("INIT_TASK");
sink.ChangeState("RUN");
sink.InteractiveStateLoop();
return 0;
}

View File

@ -1,18 +0,0 @@
#!/bin/bash
if(@NANOMSG_FOUND@); then
buffSize="50000000" # nanomsg buffer size is in bytes
else
buffSize="1000" # zeromq high-water mark is in messages
fi
SAMPLER="binsampler"
SAMPLER+=" --id 101"
SAMPLER+=" --event-size 10000"
SAMPLER+=" --output-socket-type push --output-buff-size $buffSize --output-method bind --output-address tcp://*:5565"
xterm -e @CMAKE_BINARY_DIR@/bin/$SAMPLER &
SINK="binsink"
SINK+=" --id 201"
SINK+=" --input-socket-type pull --input-buff-size $buffSize --input-method connect --input-address tcp://localhost:5565"
xterm -e @CMAKE_BINARY_DIR@/bin/$SINK &

View File

@ -1,18 +0,0 @@
#!/bin/bash
if(@NANOMSG_FOUND@); then
buffSize="50000000" # nanomsg buffer size is in bytes
else
buffSize="1000" # zeromq high-water mark is in messages
fi
SAMPLER="protosampler"
SAMPLER+=" --id 101"
SAMPLER+=" --event-size 10000"
SAMPLER+=" --output-socket-type push --output-buff-size $buffSize --output-method bind --output-address tcp://*:5565"
xterm -e @CMAKE_BINARY_DIR@/bin/$SAMPLER &
SINK="protosink"
SINK+=" --id 201"
SINK+=" --input-socket-type pull --input-buff-size $buffSize --input-method connect --input-address tcp://localhost:5565"
xterm -e @CMAKE_BINARY_DIR@/bin/$SINK &

View File

@ -22,12 +22,6 @@
#include "FairMQProgOptions.h" #include "FairMQProgOptions.h"
#include "FairMQBenchmarkSampler.h" #include "FairMQBenchmarkSampler.h"
#ifdef NANOMSG
#include "FairMQTransportFactoryNN.h"
#else
#include "FairMQTransportFactoryZMQ.h"
#endif
using namespace std; using namespace std;
using namespace FairMQParser; using namespace FairMQParser;
using namespace boost::program_options; using namespace boost::program_options;
@ -65,11 +59,7 @@ int main(int argc, char** argv)
LOG(INFO) << "PID: " << getpid(); LOG(INFO) << "PID: " << getpid();
#ifdef NANOMSG sampler.SetTransport(config.GetValue<std::string>("transport"));
sampler.SetTransport(new FairMQTransportFactoryNN());
#else
sampler.SetTransport(new FairMQTransportFactoryZMQ());
#endif
sampler.SetProperty(FairMQBenchmarkSampler::Id, id); sampler.SetProperty(FairMQBenchmarkSampler::Id, id);
sampler.SetProperty(FairMQBenchmarkSampler::MsgSize, msgSize); sampler.SetProperty(FairMQBenchmarkSampler::MsgSize, msgSize);

View File

@ -19,23 +19,18 @@
#include "FairMQLogger.h" #include "FairMQLogger.h"
#include "FairMQBuffer.h" #include "FairMQBuffer.h"
#ifdef NANOMSG
#include "FairMQTransportFactoryNN.h"
#else
#include "FairMQTransportFactoryZMQ.h"
#endif
using namespace std; using namespace std;
typedef struct DeviceOptions typedef struct DeviceOptions
{ {
DeviceOptions() : DeviceOptions() :
id(), ioThreads(0), id(), ioThreads(0), transport(),
inputSocketType(), inputBufSize(0), inputMethod(), inputAddress(), inputSocketType(), inputBufSize(0), inputMethod(), inputAddress(),
outputSocketType(), outputBufSize(0), outputMethod(), outputAddress() {} outputSocketType(), outputBufSize(0), outputMethod(), outputAddress() {}
string id; string id;
int ioThreads; int ioThreads;
string transport;
string inputSocketType; string inputSocketType;
int inputBufSize; int inputBufSize;
string inputMethod; string inputMethod;
@ -56,6 +51,7 @@ inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options)
desc.add_options() desc.add_options()
("id", bpo::value<string>(), "Device ID") ("id", bpo::value<string>(), "Device ID")
("io-threads", bpo::value<int>()->default_value(1), "Number of I/O threads") ("io-threads", bpo::value<int>()->default_value(1), "Number of I/O threads")
("transport", bpo::value<string>()->default_value("zeromq"), "Transport (zeromq/nanomsg)")
("input-socket-type", bpo::value<string>()->required(), "Input socket type: sub/pull") ("input-socket-type", bpo::value<string>()->required(), "Input socket type: sub/pull")
("input-buff-size", bpo::value<int>()->default_value(1000), "Input buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") ("input-buff-size", bpo::value<int>()->default_value(1000), "Input buffer size in number of messages (ZeroMQ)/bytes(nanomsg)")
("input-method", bpo::value<string>()->required(), "Input method: bind/connect") ("input-method", bpo::value<string>()->required(), "Input method: bind/connect")
@ -69,7 +65,7 @@ inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options)
bpo::variables_map vm; bpo::variables_map vm;
bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm); bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm);
if ( vm.count("help") ) if (vm.count("help"))
{ {
LOG(INFO) << "FairMQ Buffer" << endl << desc; LOG(INFO) << "FairMQ Buffer" << endl << desc;
return false; return false;
@ -77,35 +73,17 @@ inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options)
bpo::notify(vm); bpo::notify(vm);
if ( vm.count("id") ) if (vm.count("id")) { _options->id = vm["id"].as<string>(); }
_options->id = vm["id"].as<string>(); if (vm.count("io-threads")) { _options->ioThreads = vm["io-threads"].as<int>(); }
if (vm.count("transport")) { _options->transport = vm["transport"].as<string>(); }
if ( vm.count("io-threads") ) if (vm.count("input-socket-type")) { _options->inputSocketType = vm["input-socket-type"].as<string>(); }
_options->ioThreads = vm["io-threads"].as<int>(); if (vm.count("input-buff-size")) { _options->inputBufSize = vm["input-buff-size"].as<int>(); }
if (vm.count("input-method")) { _options->inputMethod = vm["input-method"].as<string>(); }
if ( vm.count("input-socket-type") ) if (vm.count("input-address")) { _options->inputAddress = vm["input-address"].as<string>(); }
_options->inputSocketType = vm["input-socket-type"].as<string>(); if (vm.count("output-socket-type")) { _options->outputSocketType = vm["output-socket-type"].as<string>(); }
if (vm.count("output-buff-size")) { _options->outputBufSize = vm["output-buff-size"].as<int>(); }
if ( vm.count("input-buff-size") ) if (vm.count("output-method")) { _options->outputMethod = vm["output-method"].as<string>(); }
_options->inputBufSize = vm["input-buff-size"].as<int>(); if (vm.count("output-address")) { _options->outputAddress = vm["output-address"].as<string>(); }
if ( vm.count("input-method") )
_options->inputMethod = vm["input-method"].as<string>();
if ( vm.count("input-address") )
_options->inputAddress = vm["input-address"].as<string>();
if ( vm.count("output-socket-type") )
_options->outputSocketType = vm["output-socket-type"].as<string>();
if ( vm.count("output-buff-size") )
_options->outputBufSize = vm["output-buff-size"].as<int>();
if ( vm.count("output-method") )
_options->outputMethod = vm["output-method"].as<string>();
if ( vm.count("output-address") )
_options->outputAddress = vm["output-address"].as<string>();
return true; return true;
} }
@ -129,13 +107,7 @@ int main(int argc, char** argv)
LOG(INFO) << "PID: " << getpid(); LOG(INFO) << "PID: " << getpid();
#ifdef NANOMSG buffer.SetTransport(options.transport);
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
#else
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
#endif
buffer.SetTransport(transportFactory);
FairMQChannel inputChannel(options.inputSocketType, options.inputMethod, options.inputAddress); FairMQChannel inputChannel(options.inputSocketType, options.inputMethod, options.inputAddress);
inputChannel.UpdateSndBufSize(options.inputBufSize); inputChannel.UpdateSndBufSize(options.inputBufSize);

View File

@ -19,23 +19,18 @@
#include "FairMQLogger.h" #include "FairMQLogger.h"
#include "FairMQMerger.h" #include "FairMQMerger.h"
#ifdef NANOMSG
#include "FairMQTransportFactoryNN.h"
#else
#include "FairMQTransportFactoryZMQ.h"
#endif
using namespace std; using namespace std;
typedef struct DeviceOptions typedef struct DeviceOptions
{ {
DeviceOptions() : DeviceOptions() :
id(), ioThreads(0), numInputs(0), id(), ioThreads(0), transport(), numInputs(0),
inputSocketType(), inputBufSize(), inputMethod(), inputAddress(), inputSocketType(), inputBufSize(), inputMethod(), inputAddress(),
outputSocketType(), outputBufSize(0), outputMethod(), outputAddress() {} outputSocketType(), outputBufSize(0), outputMethod(), outputAddress() {}
string id; string id;
int ioThreads; int ioThreads;
string transport;
int numInputs; int numInputs;
vector<string> inputSocketType; vector<string> inputSocketType;
vector<int> inputBufSize; vector<int> inputBufSize;
@ -57,6 +52,7 @@ inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options)
desc.add_options() desc.add_options()
("id", bpo::value<string>()->required(), "Device ID") ("id", bpo::value<string>()->required(), "Device ID")
("io-threads", bpo::value<int>()->default_value(1), "Number of I/O threads") ("io-threads", bpo::value<int>()->default_value(1), "Number of I/O threads")
("transport", bpo::value<string>()->default_value("zeromq"), "Transport (zeromq/nanomsg)")
("num-inputs", bpo::value<int>()->required(), "Number of Merger input sockets") ("num-inputs", bpo::value<int>()->required(), "Number of Merger input sockets")
("input-socket-type", bpo::value<vector<string>>()->required(), "Input socket type: sub/pull") ("input-socket-type", bpo::value<vector<string>>()->required(), "Input socket type: sub/pull")
("input-buff-size", bpo::value<vector<int>>()->required(), "Input buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") ("input-buff-size", bpo::value<vector<int>>()->required(), "Input buffer size in number of messages (ZeroMQ)/bytes(nanomsg)")
@ -79,38 +75,18 @@ inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options)
bpo::notify(vm); bpo::notify(vm);
if (vm.count("id")) if (vm.count("id")) { _options->id = vm["id"].as<string>(); }
_options->id = vm["id"].as<string>(); if (vm.count("io-threads")) { _options->ioThreads = vm["io-threads"].as<int>(); }
if (vm.count("transport")) { _options->transport = vm["transport"].as<string>(); }
if (vm.count("io-threads")) if (vm.count("num-inputs")) { _options->numInputs = vm["num-inputs"].as<int>(); }
_options->ioThreads = vm["io-threads"].as<int>(); if (vm.count("input-socket-type")) { _options->inputSocketType = vm["input-socket-type"].as<vector<string>>(); }
if (vm.count("input-buff-size")) { _options->inputBufSize = vm["input-buff-size"].as<vector<int>>(); }
if (vm.count("num-inputs")) if (vm.count("input-method")) { _options->inputMethod = vm["input-method"].as<vector<string>>(); }
_options->numInputs = vm["num-inputs"].as<int>(); if (vm.count("input-address")) { _options->inputAddress = vm["input-address"].as<vector<string>>(); }
if (vm.count("output-socket-type")) { _options->outputSocketType = vm["output-socket-type"].as<string>(); }
if (vm.count("input-socket-type")) if (vm.count("output-buff-size")) { _options->outputBufSize = vm["output-buff-size"].as<int>(); }
_options->inputSocketType = vm["input-socket-type"].as<vector<string>>(); if (vm.count("output-method")) { _options->outputMethod = vm["output-method"].as<string>(); }
if (vm.count("output-address")) { _options->outputAddress = vm["output-address"].as<string>(); }
if (vm.count("input-buff-size"))
_options->inputBufSize = vm["input-buff-size"].as<vector<int>>();
if (vm.count("input-method"))
_options->inputMethod = vm["input-method"].as<vector<string>>();
if (vm.count("input-address"))
_options->inputAddress = vm["input-address"].as<vector<string>>();
if (vm.count("output-socket-type"))
_options->outputSocketType = vm["output-socket-type"].as<string>();
if (vm.count("output-buff-size"))
_options->outputBufSize = vm["output-buff-size"].as<int>();
if (vm.count("output-method"))
_options->outputMethod = vm["output-method"].as<string>();
if (vm.count("output-address"))
_options->outputAddress = vm["output-address"].as<string>();
return true; return true;
} }
@ -134,13 +110,7 @@ int main(int argc, char** argv)
LOG(INFO) << "PID: " << getpid(); LOG(INFO) << "PID: " << getpid();
#ifdef NANOMSG merger.SetTransport(options.transport);
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
#else
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
#endif
merger.SetTransport(transportFactory);
for (unsigned int i = 0; i < options.inputAddress.size(); ++i) for (unsigned int i = 0; i < options.inputAddress.size(); ++i)
{ {

View File

@ -19,23 +19,18 @@
#include "FairMQLogger.h" #include "FairMQLogger.h"
#include "FairMQProxy.h" #include "FairMQProxy.h"
#ifdef NANOMSG
#include "FairMQTransportFactoryNN.h"
#else
#include "FairMQTransportFactoryZMQ.h"
#endif
using namespace std; using namespace std;
typedef struct DeviceOptions typedef struct DeviceOptions
{ {
DeviceOptions() : DeviceOptions() :
id(), ioThreads(0), id(), ioThreads(0), transport(),
inputSocketType(), inputBufSize(0), inputMethod(), inputAddress(), inputSocketType(), inputBufSize(0), inputMethod(), inputAddress(),
outputSocketType(), outputBufSize(0), outputMethod(), outputAddress() {} outputSocketType(), outputBufSize(0), outputMethod(), outputAddress() {}
string id; string id;
int ioThreads; int ioThreads;
string transport;
string inputSocketType; string inputSocketType;
int inputBufSize; int inputBufSize;
string inputMethod; string inputMethod;
@ -56,6 +51,7 @@ inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options)
desc.add_options() desc.add_options()
("id", bpo::value<string>()->required(), "Device ID") ("id", bpo::value<string>()->required(), "Device ID")
("io-threads", bpo::value<int>()->default_value(1), "Number of I/O threads") ("io-threads", bpo::value<int>()->default_value(1), "Number of I/O threads")
("transport", bpo::value<string>()->default_value("zeromq"), "Transport (zeromq/nanomsg)")
("input-socket-type", bpo::value<string>()->required(), "Input socket type: sub/pull") ("input-socket-type", bpo::value<string>()->required(), "Input socket type: sub/pull")
("input-buff-size", bpo::value<int>()->default_value(1000), "Input buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") ("input-buff-size", bpo::value<int>()->default_value(1000), "Input buffer size in number of messages (ZeroMQ)/bytes(nanomsg)")
("input-method", bpo::value<string>()->required(), "Input method: bind/connect") ("input-method", bpo::value<string>()->required(), "Input method: bind/connect")
@ -69,7 +65,7 @@ inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options)
bpo::variables_map vm; bpo::variables_map vm;
bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm); bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm);
if ( vm.count("help") ) if (vm.count("help"))
{ {
LOG(INFO) << "FairMQ Proxy" << endl << desc; LOG(INFO) << "FairMQ Proxy" << endl << desc;
return false; return false;
@ -77,35 +73,17 @@ inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options)
bpo::notify(vm); bpo::notify(vm);
if ( vm.count("id") ) if (vm.count("id")) { _options->id = vm["id"].as<string>(); }
_options->id = vm["id"].as<string>(); if (vm.count("io-threads")) { _options->ioThreads = vm["io-threads"].as<int>(); }
if (vm.count("transport")) { _options->transport = vm["transport"].as<string>(); }
if ( vm.count("io-threads") ) if (vm.count("input-socket-type")) { _options->inputSocketType = vm["input-socket-type"].as<string>(); }
_options->ioThreads = vm["io-threads"].as<int>(); if (vm.count("input-buff-size")) { _options->inputBufSize = vm["input-buff-size"].as<int>(); }
if (vm.count("input-method")) { _options->inputMethod = vm["input-method"].as<string>(); }
if ( vm.count("input-socket-type") ) if (vm.count("input-address")) { _options->inputAddress = vm["input-address"].as<string>(); }
_options->inputSocketType = vm["input-socket-type"].as<string>(); if (vm.count("output-socket-type")) { _options->outputSocketType = vm["output-socket-type"].as<string>(); }
if (vm.count("output-buff-size")) { _options->outputBufSize = vm["output-buff-size"].as<int>(); }
if ( vm.count("input-buff-size") ) if (vm.count("output-method")) { _options->outputMethod = vm["output-method"].as<string>(); }
_options->inputBufSize = vm["input-buff-size"].as<int>(); if (vm.count("output-address")) { _options->outputAddress = vm["output-address"].as<string>(); }
if ( vm.count("input-method") )
_options->inputMethod = vm["input-method"].as<string>();
if ( vm.count("input-address") )
_options->inputAddress = vm["input-address"].as<string>();
if ( vm.count("output-socket-type") )
_options->outputSocketType = vm["output-socket-type"].as<string>();
if ( vm.count("output-buff-size") )
_options->outputBufSize = vm["output-buff-size"].as<int>();
if ( vm.count("output-method") )
_options->outputMethod = vm["output-method"].as<string>();
if ( vm.count("output-address") )
_options->outputAddress = vm["output-address"].as<string>();
return true; return true;
} }
@ -129,13 +107,7 @@ int main(int argc, char** argv)
LOG(INFO) << "PID: " << getpid(); LOG(INFO) << "PID: " << getpid();
#ifdef NANOMSG proxy.SetTransport(options.transport);
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
#else
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
#endif
proxy.SetTransport(transportFactory);
FairMQChannel inputChannel(options.inputSocketType, options.inputMethod, options.inputAddress); FairMQChannel inputChannel(options.inputSocketType, options.inputMethod, options.inputAddress);
inputChannel.UpdateSndBufSize(options.inputBufSize); inputChannel.UpdateSndBufSize(options.inputBufSize);

View File

@ -21,12 +21,6 @@
#include "FairMQProgOptions.h" #include "FairMQProgOptions.h"
#include "FairMQSink.h" #include "FairMQSink.h"
#ifdef NANOMSG
#include "FairMQTransportFactoryNN.h"
#else
#include "FairMQTransportFactoryZMQ.h"
#endif
using namespace std; using namespace std;
using namespace FairMQParser; using namespace FairMQParser;
using namespace boost::program_options; using namespace boost::program_options;
@ -63,13 +57,7 @@ int main(int argc, char** argv)
LOG(INFO) << "PID: " << getpid(); LOG(INFO) << "PID: " << getpid();
#ifdef NANOMSG sink.SetTransport(config.GetValue<std::string>("transport"));
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
#else
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
#endif
sink.SetTransport(transportFactory);
sink.SetProperty(FairMQSink::Id, id); sink.SetProperty(FairMQSink::Id, id);
sink.SetProperty(FairMQSink::NumMsgs, numMsgs); sink.SetProperty(FairMQSink::NumMsgs, numMsgs);

View File

@ -19,24 +19,19 @@
#include "FairMQLogger.h" #include "FairMQLogger.h"
#include "FairMQSplitter.h" #include "FairMQSplitter.h"
#ifdef NANOMSG
#include "FairMQTransportFactoryNN.h"
#else
#include "FairMQTransportFactoryZMQ.h"
#endif
using namespace std; using namespace std;
typedef struct DeviceOptions typedef struct DeviceOptions
{ {
DeviceOptions() : DeviceOptions() :
id(), ioThreads(0), numOutputs(0), id(), ioThreads(0), transport(), numOutputs(0),
inputSocketType(), inputBufSize(0), inputMethod(), inputAddress(), inputSocketType(), inputBufSize(0), inputMethod(), inputAddress(),
outputSocketType(), outputBufSize(), outputMethod(), outputAddress() outputSocketType(), outputBufSize(), outputMethod(), outputAddress()
{} {}
string id; string id;
int ioThreads; int ioThreads;
string transport;
int numOutputs; int numOutputs;
string inputSocketType; string inputSocketType;
int inputBufSize; int inputBufSize;
@ -58,6 +53,7 @@ inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options)
desc.add_options() desc.add_options()
("id", bpo::value<string>()->required(), "Device ID") ("id", bpo::value<string>()->required(), "Device ID")
("io-threads", bpo::value<int>()->default_value(1), "Number of I/O threads") ("io-threads", bpo::value<int>()->default_value(1), "Number of I/O threads")
("transport", bpo::value<string>()->default_value("zeromq"), "Transport (zeromq/nanomsg)")
("num-outputs", bpo::value<int>()->required(), "Number of Splitter output sockets") ("num-outputs", bpo::value<int>()->required(), "Number of Splitter output sockets")
("input-socket-type", bpo::value<string>()->required(), "Input socket type: sub/pull") ("input-socket-type", bpo::value<string>()->required(), "Input socket type: sub/pull")
("input-buff-size", bpo::value<int>(), "Input buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") ("input-buff-size", bpo::value<int>(), "Input buffer size in number of messages (ZeroMQ)/bytes(nanomsg)")
@ -72,7 +68,7 @@ inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options)
bpo::variables_map vm; bpo::variables_map vm;
bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm); bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm);
if ( vm.count("help") ) if (vm.count("help"))
{ {
LOG(INFO) << "FairMQ Splitter" << endl << desc; LOG(INFO) << "FairMQ Splitter" << endl << desc;
return false; return false;
@ -80,38 +76,18 @@ inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options)
bpo::notify(vm); bpo::notify(vm);
if ( vm.count("id") ) if (vm.count("id")) { _options->id = vm["id"].as<string>(); }
_options->id = vm["id"].as<string>(); if (vm.count("io-threads")) { _options->ioThreads = vm["io-threads"].as<int>(); }
if (vm.count("transport")) { _options->transport = vm["transport"].as<string>(); }
if ( vm.count("io-threads") ) if (vm.count("num-outputs")) { _options->numOutputs = vm["num-outputs"].as<int>(); }
_options->ioThreads = vm["io-threads"].as<int>(); if (vm.count("input-socket-type")) { _options->inputSocketType = vm["input-socket-type"].as<string>(); }
if (vm.count("input-buff-size")) { _options->inputBufSize = vm["input-buff-size"].as<int>(); }
if ( vm.count("num-outputs") ) if (vm.count("input-method")) { _options->inputMethod = vm["input-method"].as<string>(); }
_options->numOutputs = vm["num-outputs"].as<int>(); if (vm.count("input-address")) { _options->inputAddress = vm["input-address"].as<string>(); }
if (vm.count("output-socket-type")) { _options->outputSocketType = vm["output-socket-type"].as<vector<string>>(); }
if ( vm.count("input-socket-type") ) if (vm.count("output-buff-size")) { _options->outputBufSize = vm["output-buff-size"].as<vector<int>>(); }
_options->inputSocketType = vm["input-socket-type"].as<string>(); if (vm.count("output-method")) { _options->outputMethod = vm["output-method"].as<vector<string>>(); }
if (vm.count("output-address")) { _options->outputAddress = vm["output-address"].as<vector<string>>(); }
if ( vm.count("input-buff-size") )
_options->inputBufSize = vm["input-buff-size"].as<int>();
if ( vm.count("input-method") )
_options->inputMethod = vm["input-method"].as<string>();
if ( vm.count("input-address") )
_options->inputAddress = vm["input-address"].as<string>();
if ( vm.count("output-socket-type") )
_options->outputSocketType = vm["output-socket-type"].as<vector<string>>();
if ( vm.count("output-buff-size") )
_options->outputBufSize = vm["output-buff-size"].as<vector<int>>();
if ( vm.count("output-method") )
_options->outputMethod = vm["output-method"].as<vector<string>>();
if ( vm.count("output-address") )
_options->outputAddress = vm["output-address"].as<vector<string>>();
return true; return true;
} }
@ -135,13 +111,7 @@ int main(int argc, char** argv)
LOG(INFO) << "PID: " << getpid(); LOG(INFO) << "PID: " << getpid();
#ifdef NANOMSG splitter.SetTransport(options.transport);
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
#else
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
#endif
splitter.SetTransport(transportFactory);
FairMQChannel inputChannel(options.inputSocketType, options.inputMethod, options.inputAddress); FairMQChannel inputChannel(options.inputSocketType, options.inputMethod, options.inputAddress);
inputChannel.UpdateSndBufSize(options.inputBufSize); inputChannel.UpdateSndBufSize(options.inputBufSize);

View File

@ -12,6 +12,8 @@ configure_file(${CMAKE_SOURCE_DIR}/fairmq/test/test-fairmq-req-rep.sh.in ${CMAKE
Set(INCLUDE_DIRECTORIES Set(INCLUDE_DIRECTORIES
${CMAKE_SOURCE_DIR}/fairmq ${CMAKE_SOURCE_DIR}/fairmq
${CMAKE_SOURCE_DIR}/fairmq/zeromq
${CMAKE_SOURCE_DIR}/fairmq/nanomsg
${CMAKE_SOURCE_DIR}/fairmq/devices ${CMAKE_SOURCE_DIR}/fairmq/devices
${CMAKE_SOURCE_DIR}/fairmq/tools ${CMAKE_SOURCE_DIR}/fairmq/tools
${CMAKE_SOURCE_DIR}/fairmq/options ${CMAKE_SOURCE_DIR}/fairmq/options
@ -24,28 +26,10 @@ Set(INCLUDE_DIRECTORIES
Set(SYSTEM_INCLUDE_DIRECTORIES Set(SYSTEM_INCLUDE_DIRECTORIES
${Boost_INCLUDE_DIR} ${Boost_INCLUDE_DIR}
${ZMQ_INCLUDE_DIR}
${NANOMSG_INCLUDE_DIR}
) )
If(NANOMSG_FOUND)
Set(INCLUDE_DIRECTORIES
${INCLUDE_DIRECTORIES}
${CMAKE_SOURCE_DIR}/fairmq/nanomsg
)
Set(SYSTEM_INCLUDE_DIRECTORIES
${SYSTEM_INCLUDE_DIRECTORIES}
${ZMQ_INCLUDE_DIR}
)
Else(NANOMSG_FOUND)
Set(INCLUDE_DIRECTORIES
${INCLUDE_DIRECTORIES}
${CMAKE_SOURCE_DIR}/fairmq/zeromq
)
Set(SYSTEM_INCLUDE_DIRECTORIES
${SYSTEM_INCLUDE_DIRECTORIES}
${ZMQ_INCLUDE_DIR}
)
EndIf(NANOMSG_FOUND)
Include_Directories(${INCLUDE_DIRECTORIES}) Include_Directories(${INCLUDE_DIRECTORIES})
Include_Directories(SYSTEM ${SYSTEM_INCLUDE_DIRECTORIES}) Include_Directories(SYSTEM ${SYSTEM_INCLUDE_DIRECTORIES})

View File

@ -12,27 +12,14 @@
* @author A. Rybalchenko * @author A. Rybalchenko
*/ */
#include "FairMQLogger.h" #include "FairMQLogger.h"
#include "FairMQTestPub.h" #include "FairMQTestPub.h"
#ifdef NANOMSG
#include "FairMQTransportFactoryNN.h"
#else
#include "FairMQTransportFactoryZMQ.h"
#endif
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
FairMQTestPub testPub; FairMQTestPub testPub;
testPub.CatchSignals(); testPub.CatchSignals();
testPub.SetTransport("zeromq");
#ifdef NANOMSG
testPub.SetTransport(new FairMQTransportFactoryNN());
#else
testPub.SetTransport(new FairMQTransportFactoryZMQ());
#endif
testPub.SetProperty(FairMQTestPub::Id, "testPub"); testPub.SetProperty(FairMQTestPub::Id, "testPub");

View File

@ -17,22 +17,11 @@
#include "FairMQLogger.h" #include "FairMQLogger.h"
#include "FairMQTestSub.h" #include "FairMQTestSub.h"
#ifdef NANOMSG
#include "FairMQTransportFactoryNN.h"
#else
#include "FairMQTransportFactoryZMQ.h"
#endif
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
FairMQTestSub testSub; FairMQTestSub testSub;
testSub.CatchSignals(); testSub.CatchSignals();
testSub.SetTransport("zeromq");
#ifdef NANOMSG
testSub.SetTransport(new FairMQTransportFactoryNN());
#else
testSub.SetTransport(new FairMQTransportFactoryZMQ());
#endif
testSub.SetProperty(FairMQTestSub::Id, "testSub_" + std::to_string(getpid())); testSub.SetProperty(FairMQTestSub::Id, "testSub_" + std::to_string(getpid()));

View File

@ -15,22 +15,11 @@
#include "FairMQLogger.h" #include "FairMQLogger.h"
#include "FairMQTestPull.h" #include "FairMQTestPull.h"
#ifdef NANOMSG
#include "FairMQTransportFactoryNN.h"
#else
#include "FairMQTransportFactoryZMQ.h"
#endif
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
FairMQTestPull testPull; FairMQTestPull testPull;
testPull.CatchSignals(); testPull.CatchSignals();
testPull.SetTransport("zeromq");
#ifdef NANOMSG
testPull.SetTransport(new FairMQTransportFactoryNN());
#else
testPull.SetTransport(new FairMQTransportFactoryZMQ());
#endif
testPull.SetProperty(FairMQTestPull::Id, "testPull"); testPull.SetProperty(FairMQTestPull::Id, "testPull");

View File

@ -15,22 +15,11 @@
#include "FairMQLogger.h" #include "FairMQLogger.h"
#include "FairMQTestPush.h" #include "FairMQTestPush.h"
#ifdef NANOMSG
#include "FairMQTransportFactoryNN.h"
#else
#include "FairMQTransportFactoryZMQ.h"
#endif
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
FairMQTestPush testPush; FairMQTestPush testPush;
testPush.CatchSignals(); testPush.CatchSignals();
testPush.SetTransport("zeromq");
#ifdef NANOMSG
testPush.SetTransport(new FairMQTransportFactoryNN());
#else
testPush.SetTransport(new FairMQTransportFactoryZMQ());
#endif
testPush.SetProperty(FairMQTestPush::Id, "testPush"); testPush.SetProperty(FairMQTestPush::Id, "testPush");

View File

@ -17,22 +17,11 @@
#include "FairMQLogger.h" #include "FairMQLogger.h"
#include "FairMQTestRep.h" #include "FairMQTestRep.h"
#ifdef NANOMSG
#include "FairMQTransportFactoryNN.h"
#else
#include "FairMQTransportFactoryZMQ.h"
#endif
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
FairMQTestRep testRep; FairMQTestRep testRep;
testRep.CatchSignals(); testRep.CatchSignals();
testRep.SetTransport("zeromq");
#ifdef NANOMSG
testRep.SetTransport(new FairMQTransportFactoryNN());
#else
testRep.SetTransport(new FairMQTransportFactoryZMQ());
#endif
testRep.SetProperty(FairMQTestRep::Id, "testRep"); testRep.SetProperty(FairMQTestRep::Id, "testRep");

View File

@ -17,22 +17,11 @@
#include "FairMQLogger.h" #include "FairMQLogger.h"
#include "FairMQTestReq.h" #include "FairMQTestReq.h"
#ifdef NANOMSG
#include "FairMQTransportFactoryNN.h"
#else
#include "FairMQTransportFactoryZMQ.h"
#endif
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
FairMQTestReq testReq; FairMQTestReq testReq;
testReq.CatchSignals(); testReq.CatchSignals();
testReq.SetTransport("zeromq");
#ifdef NANOMSG
testReq.SetTransport(new FairMQTransportFactoryNN());
#else
testReq.SetTransport(new FairMQTransportFactoryZMQ());
#endif
testReq.SetProperty(FairMQTestReq::Id, "testReq"); testReq.SetProperty(FairMQTestReq::Id, "testReq");

View File

@ -15,12 +15,6 @@
#include "FairMQLogger.h" #include "FairMQLogger.h"
#include "FairMQDevice.h" #include "FairMQDevice.h"
#ifdef NANOMSG
#include "FairMQTransportFactoryNN.h"
#else
#include "FairMQTransportFactoryZMQ.h"
#endif
class TransferTimeoutTester : public FairMQDevice class TransferTimeoutTester : public FairMQDevice
{ {
public: public:
@ -98,12 +92,7 @@ int main(int argc, char** argv)
{ {
TransferTimeoutTester timeoutTester; TransferTimeoutTester timeoutTester;
timeoutTester.CatchSignals(); timeoutTester.CatchSignals();
timeoutTester.SetTransport("zeromq");
#ifdef NANOMSG
timeoutTester.SetTransport(new FairMQTransportFactoryNN());
#else
timeoutTester.SetTransport(new FairMQTransportFactoryZMQ());
#endif
timeoutTester.SetProperty(TransferTimeoutTester::Id, "timeoutTester"); timeoutTester.SetProperty(TransferTimeoutTester::Id, "timeoutTester");

View File

@ -16,13 +16,6 @@
/// boost /// boost
#include "boost/program_options.hpp" #include "boost/program_options.hpp"
/// ZMQ/nmsg (in FairSoft)
#ifdef NANOMSG
#include "FairMQTransportFactoryNN.h"
#else
#include "FairMQTransportFactoryZMQ.h"
#endif
/// FairRoot - FairMQ /// FairRoot - FairMQ
#include "FairMQLogger.h" #include "FairMQLogger.h"
#include "FairMQParser.h" #include "FairMQParser.h"
@ -47,13 +40,7 @@ inline int runStateMachine(TMQDevice& device, FairMQProgOptions& config)
LOG(INFO) << "PID: " << getpid(); LOG(INFO) << "PID: " << getpid();
#ifdef NANOMSG device.SetTransport(config.GetValue<std::string>("transport"));
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
#else
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
#endif
device.SetTransport(transportFactory);
device.ChangeState(TMQDevice::INIT_DEVICE); device.ChangeState(TMQDevice::INIT_DEVICE);
device.WaitForEndOfState(TMQDevice::INIT_DEVICE); device.WaitForEndOfState(TMQDevice::INIT_DEVICE);
@ -84,13 +71,7 @@ inline int runNonInteractiveStateMachine(TMQDevice& device, FairMQProgOptions& c
LOG(INFO) << "PID: " << getpid(); LOG(INFO) << "PID: " << getpid();
#ifdef NANOMSG device.SetTransport(config.GetValue<std::string>("transport"));
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
#else
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
#endif
device.SetTransport(transportFactory);
device.ChangeState(TMQDevice::INIT_DEVICE); device.ChangeState(TMQDevice::INIT_DEVICE);
device.WaitForEndOfState(TMQDevice::INIT_DEVICE); device.WaitForEndOfState(TMQDevice::INIT_DEVICE);

View File

@ -50,7 +50,7 @@ class FairMQPollerZMQ : public FairMQPoller
zmq_pollitem_t* items; zmq_pollitem_t* items;
int fNumItems; int fNumItems;
std::unordered_map<std::string,int> fOffsetMap; std::unordered_map<std::string, int> fOffsetMap;
}; };
#endif /* FAIRMQPOLLERZMQ_H_ */ #endif /* FAIRMQPOLLERZMQ_H_ */