mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 16:46:47 +00:00
Add FlatBuffers & MessagePack examples
- Add FlatBuffers serialization example to Tutorial 3 - Add MessagePack serialization example to Tutorial 3 - Performance improvements in Boost serialization example - Use `GetEntriesFast()` for FairTestDetectorRecoTask - Use `Clear()` instead of `Delete()` in MQ parts of Tutorial 3 - Fix CMake variables from preventing compilation without nanomsg. - create macro/data directories in install directory - Get rid of data duplication in fill_parameters.C - Various cleanups and fixes
This commit is contained in:
parent
f1abb9ecdd
commit
82ab7670a9
|
@ -479,9 +479,9 @@ void FairMQDevice::SetTransport(const string& transport)
|
||||||
#endif
|
#endif
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
LOG(ERROR) << "Unknown transport implementation requested: "
|
LOG(ERROR) << "Unavailable transport implementation requested: "
|
||||||
<< transport
|
<< "\"" << transport << "\""
|
||||||
<< ". Supported are "
|
<< ". Available are: "
|
||||||
<< "\"zeromq\""
|
<< "\"zeromq\""
|
||||||
#ifdef NANOMSG_FOUND
|
#ifdef NANOMSG_FOUND
|
||||||
<< ", \"nanomsg\""
|
<< ", \"nanomsg\""
|
||||||
|
|
|
@ -29,17 +29,17 @@ FairMQMerger::~FairMQMerger()
|
||||||
|
|
||||||
void FairMQMerger::Run()
|
void FairMQMerger::Run()
|
||||||
{
|
{
|
||||||
std::unique_ptr<FairMQPoller> poller(fTransportFactory->CreatePoller(fChannels.at("data-in")));
|
|
||||||
|
|
||||||
// store the channel references to avoid traversing the map on every loop iteration
|
// store the channel references to avoid traversing the map on every loop iteration
|
||||||
|
auto& dataInChannelRef = fChannels.at("data-in");
|
||||||
const FairMQChannel& dataOutChannel = fChannels.at("data-out").at(0);
|
const FairMQChannel& dataOutChannel = fChannels.at("data-out").at(0);
|
||||||
std::vector<FairMQChannel*> dataInChannels(fChannels.at("data-in").size());
|
int numInputs = dataInChannelRef.size();
|
||||||
for (unsigned int i = 0; i < fChannels.at("data-in").size(); ++i)
|
std::vector<FairMQChannel*> dataInChannels(numInputs);
|
||||||
|
for (int i = 0; i < numInputs; ++i)
|
||||||
{
|
{
|
||||||
dataInChannels.at(i) = &(fChannels.at("data-in").at(i));
|
dataInChannels.at(i) = &(dataInChannelRef.at(i));
|
||||||
}
|
}
|
||||||
|
|
||||||
int numInputs = fChannels.at("data-in").size();
|
std::unique_ptr<FairMQPoller> poller(fTransportFactory->CreatePoller(dataInChannelRef));
|
||||||
|
|
||||||
while (CheckCurrentState(RUNNING))
|
while (CheckCurrentState(RUNNING))
|
||||||
{
|
{
|
||||||
|
|
|
@ -28,17 +28,18 @@ FairMQSplitter::~FairMQSplitter()
|
||||||
|
|
||||||
void FairMQSplitter::Run()
|
void FairMQSplitter::Run()
|
||||||
{
|
{
|
||||||
int direction = 0;
|
|
||||||
int numOutputs = fChannels.at("data-out").size();
|
|
||||||
|
|
||||||
// store the channel references to avoid traversing the map on every loop iteration
|
// store the channel references to avoid traversing the map on every loop iteration
|
||||||
|
auto& dataOutChannelRef = fChannels.at("data-out");
|
||||||
const FairMQChannel& dataInChannel = fChannels.at("data-in").at(0);
|
const FairMQChannel& dataInChannel = fChannels.at("data-in").at(0);
|
||||||
FairMQChannel* dataOutChannels[fChannels.at("data-out").size()];
|
int numOutputs = dataOutChannelRef.size();
|
||||||
|
std::vector<FairMQChannel*> dataOutChannels(numOutputs);
|
||||||
for (int i = 0; i < numOutputs; ++i)
|
for (int i = 0; i < numOutputs; ++i)
|
||||||
{
|
{
|
||||||
dataOutChannels[i] = &(fChannels.at("data-out").at(i));
|
dataOutChannels[i] = &(dataOutChannelRef.at(i));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int direction = 0;
|
||||||
|
|
||||||
while (CheckCurrentState(RUNNING))
|
while (CheckCurrentState(RUNNING))
|
||||||
{
|
{
|
||||||
std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
|
std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
|
||||||
|
|
|
@ -13,7 +13,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#ifndef LOGGER_DEF_H
|
#ifndef LOGGER_DEF_H
|
||||||
#define LOGGER_DEF_H
|
#define LOGGER_DEF_H
|
||||||
|
|
||||||
#include <array>
|
#include <array>
|
||||||
#include <fstream>
|
#include <fstream>
|
||||||
|
@ -34,28 +34,25 @@ namespace fairmq
|
||||||
ERROR,
|
ERROR,
|
||||||
STATE,
|
STATE,
|
||||||
NOLOG
|
NOLOG
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
static const std::array<std::string, 8> g_LogSeverityLevelString
|
static const std::array<std::string, 8> g_LogSeverityLevelString
|
||||||
{
|
{
|
||||||
{
|
{
|
||||||
"TRACE",
|
"TRACE",
|
||||||
"DEBUG",
|
"DEBUG",
|
||||||
"RESULTS",
|
"RESULTS",
|
||||||
"INFO",
|
"INFO",
|
||||||
"WARN",
|
"WARN",
|
||||||
"ERROR",
|
"ERROR",
|
||||||
"STATE",
|
"STATE",
|
||||||
"NOLOG"
|
"NOLOG"
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
namespace color
|
||||||
|
|
||||||
namespace color
|
|
||||||
{
|
{
|
||||||
enum code
|
enum code
|
||||||
{
|
{
|
||||||
FG_BLACK = 30,
|
FG_BLACK = 30,
|
||||||
FG_RED = 31,
|
FG_RED = 31,
|
||||||
|
@ -94,25 +91,24 @@ typedef fairmq::severity_level custom_severity_level;
|
||||||
struct tag_console;
|
struct tag_console;
|
||||||
struct tag_file;
|
struct tag_file;
|
||||||
|
|
||||||
|
|
||||||
// overload operator for console output
|
// overload operator for console output
|
||||||
inline boost::log::formatting_ostream& operator<<
|
inline boost::log::formatting_ostream& operator<<
|
||||||
(
|
(
|
||||||
boost::log::formatting_ostream& strm,
|
boost::log::formatting_ostream& strm,
|
||||||
boost::log::to_log_manip< custom_severity_level, tag_console > const& manip
|
boost::log::to_log_manip<custom_severity_level, tag_console> const& manip
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
custom_severity_level level = manip.get();
|
custom_severity_level level = manip.get();
|
||||||
std::size_t idx=static_cast< std::size_t >(level);
|
std::size_t idx = static_cast<std::size_t>(level);
|
||||||
if ( idx < fairmq::g_LogSeverityLevelString.size() )
|
if (idx < fairmq::g_LogSeverityLevelString.size())
|
||||||
{
|
{
|
||||||
//strm <<" idx = "<<idx <<" ";
|
// strm << " idx = " << idx << " ";
|
||||||
switch (level)
|
switch (level)
|
||||||
{
|
{
|
||||||
case custom_severity_level::TRACE :
|
case custom_severity_level::TRACE :
|
||||||
strm << write_in<fairmq::color::FG_BLUE>(fairmq::g_LogSeverityLevelString.at(idx));
|
strm << write_in<fairmq::color::FG_BLUE>(fairmq::g_LogSeverityLevelString.at(idx));
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case custom_severity_level::DEBUG :
|
case custom_severity_level::DEBUG :
|
||||||
strm << write_in<fairmq::color::FG_BLUE>(fairmq::g_LogSeverityLevelString.at(idx));
|
strm << write_in<fairmq::color::FG_BLUE>(fairmq::g_LogSeverityLevelString.at(idx));
|
||||||
break;
|
break;
|
||||||
|
@ -123,34 +119,32 @@ inline boost::log::formatting_ostream& operator<<
|
||||||
|
|
||||||
case custom_severity_level::INFO :
|
case custom_severity_level::INFO :
|
||||||
strm << write_in<fairmq::color::FG_GREEN>(fairmq::g_LogSeverityLevelString.at(idx));
|
strm << write_in<fairmq::color::FG_GREEN>(fairmq::g_LogSeverityLevelString.at(idx));
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case custom_severity_level::WARN :
|
case custom_severity_level::WARN :
|
||||||
strm << write_in<fairmq::color::FG_YELLOW>(fairmq::g_LogSeverityLevelString.at(idx));
|
strm << write_in<fairmq::color::FG_YELLOW>(fairmq::g_LogSeverityLevelString.at(idx));
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case custom_severity_level::STATE :
|
case custom_severity_level::STATE :
|
||||||
strm << write_in<fairmq::color::FG_MAGENTA>(fairmq::g_LogSeverityLevelString.at(idx));
|
strm << write_in<fairmq::color::FG_MAGENTA>(fairmq::g_LogSeverityLevelString.at(idx));
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case custom_severity_level::ERROR :
|
case custom_severity_level::ERROR :
|
||||||
strm << write_in<fairmq::color::FG_RED>(fairmq::g_LogSeverityLevelString.at(idx));
|
strm << write_in<fairmq::color::FG_RED>(fairmq::g_LogSeverityLevelString.at(idx));
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case custom_severity_level::NOLOG :
|
case custom_severity_level::NOLOG :
|
||||||
strm << write_in<fairmq::color::FG_DEFAULT>(fairmq::g_LogSeverityLevelString.at(idx));
|
strm << write_in<fairmq::color::FG_DEFAULT>(fairmq::g_LogSeverityLevelString.at(idx));
|
||||||
break;
|
break;
|
||||||
|
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
strm << write_in<fairmq::color::FG_RED>("Unknown log level ")
|
strm << write_in<fairmq::color::FG_RED>("Unknown log level ")
|
||||||
<< "(int level = "<<static_cast< int >(level)
|
<< "(int level = " << static_cast<int>(level) << ")";
|
||||||
<<")";
|
|
||||||
}
|
}
|
||||||
return strm;
|
return strm;
|
||||||
}
|
}
|
||||||
|
@ -159,27 +153,21 @@ inline boost::log::formatting_ostream& operator<<
|
||||||
inline boost::log::formatting_ostream& operator<<
|
inline boost::log::formatting_ostream& operator<<
|
||||||
(
|
(
|
||||||
boost::log::formatting_ostream& strm,
|
boost::log::formatting_ostream& strm,
|
||||||
boost::log::to_log_manip< custom_severity_level, tag_file > const& manip
|
boost::log::to_log_manip<custom_severity_level, tag_file> const& manip
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
custom_severity_level level = manip.get();
|
custom_severity_level level = manip.get();
|
||||||
std::size_t idx=static_cast< std::size_t >(level);
|
std::size_t idx = static_cast<std::size_t>(level);
|
||||||
if ( idx < fairmq::g_LogSeverityLevelString.size() )
|
if (idx < fairmq::g_LogSeverityLevelString.size())
|
||||||
|
{
|
||||||
strm << fairmq::g_LogSeverityLevelString.at(idx);
|
strm << fairmq::g_LogSeverityLevelString.at(idx);
|
||||||
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
strm << write_in<fairmq::color::FG_RED>("Unknown log level ")
|
strm << write_in<fairmq::color::FG_RED>("Unknown log level ")
|
||||||
<< "(int level = "<<static_cast< int >(level)
|
<< "(int level = " << static_cast<int>(level) << ")";
|
||||||
<<")";
|
|
||||||
}
|
}
|
||||||
return strm;
|
return strm;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#endif /* LOGGER_DEF_H */
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
#endif /* LOGGER_DEF_H */
|
|
||||||
|
|
||||||
|
|
|
@ -32,7 +32,6 @@ FairProgOptions::FairProgOptions() :
|
||||||
fVarMap(),
|
fVarMap(),
|
||||||
fSeverityMap()
|
fSeverityMap()
|
||||||
{
|
{
|
||||||
// define generic options
|
|
||||||
fGenericDesc.add_options()
|
fGenericDesc.add_options()
|
||||||
("help,h", "produce help")
|
("help,h", "produce help")
|
||||||
("version,v", "print version")
|
("version,v", "print version")
|
||||||
|
|
|
@ -1,14 +1,36 @@
|
||||||
#!/bin/bash
|
#!/bin/bash
|
||||||
|
|
||||||
|
numMsgs="0"
|
||||||
|
msgSize="1000000"
|
||||||
|
|
||||||
|
if [[ $1 =~ ^[0-9]+$ ]]; then
|
||||||
|
msgSize=$1
|
||||||
|
fi
|
||||||
|
|
||||||
|
echo "Starting benchmark with message size of $msgSize bytes."
|
||||||
|
|
||||||
|
if [[ $2 =~ ^[0-9]+$ ]]; then
|
||||||
|
numMsgs=$2
|
||||||
|
fi
|
||||||
|
|
||||||
|
if [ $numMsgs = 0 ]; then
|
||||||
|
echo "Unlimited number of messages."
|
||||||
|
else
|
||||||
|
echo "Number of messages: $numMsgs."
|
||||||
|
fi
|
||||||
|
|
||||||
|
echo ""
|
||||||
|
echo "Usage: startBenchmark [message size=1000000] [number of messages=0]"
|
||||||
|
|
||||||
SAMPLER="bsampler"
|
SAMPLER="bsampler"
|
||||||
SAMPLER+=" --id bsampler1"
|
SAMPLER+=" --id bsampler1"
|
||||||
SAMPLER+=" --msg-size 1000000"
|
SAMPLER+=" --msg-size $msgSize"
|
||||||
#SAMPLER+=" --num-msgs 1000"
|
SAMPLER+=" --num-msgs $numMsgs"
|
||||||
SAMPLER+=" --config-json-file @CMAKE_BINARY_DIR@/bin/config/benchmark.json"
|
SAMPLER+=" --config-json-file @CMAKE_BINARY_DIR@/bin/config/benchmark.json"
|
||||||
xterm -geometry 80x23+0+0 -hold -e @CMAKE_BINARY_DIR@/bin/$SAMPLER &
|
xterm -geometry 80x23+0+0 -hold -e taskset 0x1 @CMAKE_BINARY_DIR@/bin/$SAMPLER &
|
||||||
|
|
||||||
SINK="sink"
|
SINK="sink"
|
||||||
SINK+=" --id sink1"
|
SINK+=" --id sink1"
|
||||||
#SINK+=" --num-msgs 1000"
|
SINK+=" --num-msgs $numMsgs"
|
||||||
SINK+=" --config-json-file @CMAKE_BINARY_DIR@/bin/config/benchmark.json"
|
SINK+=" --config-json-file @CMAKE_BINARY_DIR@/bin/config/benchmark.json"
|
||||||
xterm -geometry 80x23+500+0 -hold -e @CMAKE_BINARY_DIR@/bin/$SINK &
|
xterm -geometry 80x23+500+0 -hold -e taskset 0x2 @CMAKE_BINARY_DIR@/bin/$SINK &
|
||||||
|
|
|
@ -27,9 +27,15 @@ Set(INCLUDE_DIRECTORIES
|
||||||
Set(SYSTEM_INCLUDE_DIRECTORIES
|
Set(SYSTEM_INCLUDE_DIRECTORIES
|
||||||
${Boost_INCLUDE_DIR}
|
${Boost_INCLUDE_DIR}
|
||||||
${ZMQ_INCLUDE_DIR}
|
${ZMQ_INCLUDE_DIR}
|
||||||
${NANOMSG_INCLUDE_DIR}
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
If(NANOMSG_FOUND)
|
||||||
|
Set(SYSTEM_INCLUDE_DIRECTORIES
|
||||||
|
${SYSTEM_INCLUDE_DIRECTORIES}
|
||||||
|
${NANOMSG_INCLUDE_DIR}
|
||||||
|
)
|
||||||
|
EndIf(NANOMSG_FOUND)
|
||||||
|
|
||||||
Include_Directories(${INCLUDE_DIRECTORIES})
|
Include_Directories(${INCLUDE_DIRECTORIES})
|
||||||
Include_Directories(SYSTEM ${SYSTEM_INCLUDE_DIRECTORIES})
|
Include_Directories(SYSTEM ${SYSTEM_INCLUDE_DIRECTORIES})
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user