mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 08:41:16 +00:00
Cleanup base/MQ.
This commit is contained in:
parent
334b91785b
commit
70e46a0b86
|
@ -138,7 +138,7 @@ set(FAIRMQ_SOURCE_FILES
|
|||
devices/FairMQMerger.cxx
|
||||
devices/FairMQMultiplier.cxx
|
||||
devices/FairMQProxy.cxx
|
||||
devices/FairMQSink.cxx
|
||||
# devices/FairMQSink.cxx
|
||||
devices/FairMQSplitter.cxx
|
||||
logger/logger.cxx
|
||||
options/FairMQParser.cxx
|
||||
|
|
|
@ -35,10 +35,10 @@
|
|||
|
||||
#include <fairmq/Tools.h>
|
||||
|
||||
typedef std::unordered_map<std::string, std::vector<FairMQChannel>> FairMQChannelMap;
|
||||
using FairMQChannelMap = std::unordered_map<std::string, std::vector<FairMQChannel>>;
|
||||
|
||||
typedef std::function<bool(FairMQMessagePtr&, int)> InputMsgCallback;
|
||||
typedef std::function<bool(FairMQParts&, int)> InputMultipartCallback;
|
||||
using InputMsgCallback = std::function<bool(FairMQMessagePtr&, int)>;
|
||||
using InputMultipartCallback = std::function<bool(FairMQParts&, int)>;
|
||||
|
||||
class FairMQProgOptions;
|
||||
|
||||
|
@ -357,7 +357,7 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable
|
|||
/// @param rhs Left hand side value for comparison
|
||||
static bool SortSocketsByAddress(const FairMQChannel &lhs, const FairMQChannel &rhs);
|
||||
|
||||
template<class T>
|
||||
template<typename T>
|
||||
void OnData(const std::string& channelName, bool (T::* memberFunction)(FairMQMessagePtr& msg, int index))
|
||||
{
|
||||
fDataCallbacks = true;
|
||||
|
@ -383,7 +383,7 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable
|
|||
}
|
||||
}
|
||||
|
||||
template<class T>
|
||||
template<typename T>
|
||||
void OnData(const std::string& channelName, bool (T::* memberFunction)(FairMQParts& parts, int index))
|
||||
{
|
||||
fDataCallbacks = true;
|
||||
|
|
|
@ -45,7 +45,7 @@
|
|||
ADD_LOG_FILESINK(filename,ERROR); // => Print severity >= ERROR to file (extension is added)
|
||||
*/
|
||||
|
||||
typedef unsigned long long timestamp_t;
|
||||
using timestamp_t = unsigned long long;
|
||||
|
||||
timestamp_t get_timestamp();
|
||||
|
||||
|
|
|
@ -32,7 +32,7 @@ int FairMQStateMachine::GetInterfaceVersion()
|
|||
return FAIRMQ_INTERFACE_VERSION;
|
||||
}
|
||||
|
||||
int FairMQStateMachine::GetEventNumber(std::string event)
|
||||
int FairMQStateMachine::GetEventNumber(const std::string& event)
|
||||
{
|
||||
if (event == "INIT_DEVICE") return INIT_DEVICE;
|
||||
if (event == "INIT_TASK") return INIT_TASK;
|
||||
|
@ -142,7 +142,7 @@ bool FairMQStateMachine::ChangeState(int event)
|
|||
return false;
|
||||
}
|
||||
|
||||
bool FairMQStateMachine::ChangeState(std::string event)
|
||||
bool FairMQStateMachine::ChangeState(const std::string& event)
|
||||
{
|
||||
return ChangeState(GetEventNumber(event));
|
||||
}
|
||||
|
@ -178,7 +178,7 @@ void FairMQStateMachine::WaitForEndOfState(int event)
|
|||
}
|
||||
}
|
||||
|
||||
void FairMQStateMachine::WaitForEndOfState(std::string event)
|
||||
void FairMQStateMachine::WaitForEndOfState(const std::string& event)
|
||||
{
|
||||
return WaitForEndOfState(GetEventNumber(event));
|
||||
}
|
||||
|
@ -219,7 +219,7 @@ bool FairMQStateMachine::WaitForEndOfStateForMs(int event, int durationInMs)
|
|||
return false;
|
||||
}
|
||||
|
||||
bool FairMQStateMachine::WaitForEndOfStateForMs(std::string event, int durationInMs)
|
||||
bool FairMQStateMachine::WaitForEndOfStateForMs(const std::string& event, int durationInMs)
|
||||
{
|
||||
return WaitForEndOfStateForMs(GetEventNumber(event), durationInMs);
|
||||
}
|
||||
|
|
|
@ -574,16 +574,16 @@ class FairMQStateMachine : public FairMQFSM::FairMQFSM
|
|||
|
||||
int GetInterfaceVersion();
|
||||
|
||||
int GetEventNumber(std::string event);
|
||||
int GetEventNumber(const std::string& event);
|
||||
|
||||
bool ChangeState(int event);
|
||||
bool ChangeState(std::string event);
|
||||
bool ChangeState(const std::string& event);
|
||||
|
||||
void WaitForEndOfState(int state);
|
||||
void WaitForEndOfState(std::string state);
|
||||
void WaitForEndOfState(const std::string& state);
|
||||
|
||||
bool WaitForEndOfStateForMs(int state, int durationInMs);
|
||||
bool WaitForEndOfStateForMs(std::string state, int durationInMs);
|
||||
bool WaitForEndOfStateForMs(const std::string& state, int durationInMs);
|
||||
|
||||
void SubscribeToStateChange(const std::string& key, std::function<void(const State)> callback);
|
||||
void UnsubscribeFromStateChange(const std::string& key);
|
||||
|
|
|
@ -16,21 +16,65 @@
|
|||
#define FAIRMQSINK_H_
|
||||
|
||||
#include <string>
|
||||
#include <chrono>
|
||||
|
||||
#include "FairMQDevice.h"
|
||||
#include "../FairMQDevice.h"
|
||||
#include "../FairMQLogger.h"
|
||||
#include "../options/FairMQProgOptions.h"
|
||||
|
||||
class FairMQSink : public FairMQDevice
|
||||
// template<typename OutputPolicy>
|
||||
class FairMQSink : public FairMQDevice//, public OutputPolicy
|
||||
{
|
||||
public:
|
||||
FairMQSink();
|
||||
virtual ~FairMQSink();
|
||||
FairMQSink()
|
||||
: fMaxIterations(0)
|
||||
, fNumIterations(0)
|
||||
, fInChannelName()
|
||||
{}
|
||||
|
||||
virtual ~FairMQSink()
|
||||
{}
|
||||
|
||||
protected:
|
||||
uint64_t fNumMsgs;
|
||||
uint64_t fMaxIterations;
|
||||
uint64_t fNumIterations;
|
||||
std::string fInChannelName;
|
||||
|
||||
virtual void Run();
|
||||
virtual void InitTask();
|
||||
virtual void InitTask()
|
||||
{
|
||||
fMaxIterations = fConfig->GetValue<uint64_t>("num-iterations");
|
||||
fInChannelName = fConfig->GetValue<std::string>("in-channel");
|
||||
}
|
||||
|
||||
virtual void Run()
|
||||
{
|
||||
// store the channel reference to avoid traversing the map on every loop iteration
|
||||
FairMQChannel& dataInChannel = fChannels.at(fInChannelName).at(0);
|
||||
|
||||
LOG(INFO) << "Starting the benchmark and expecting to receive " << fMaxIterations << " messages.";
|
||||
auto tStart = std::chrono::high_resolution_clock::now();
|
||||
|
||||
while (CheckCurrentState(RUNNING))
|
||||
{
|
||||
FairMQMessagePtr msg(dataInChannel.Transport()->CreateMessage());
|
||||
|
||||
if (dataInChannel.Receive(msg) >= 0)
|
||||
{
|
||||
if (fMaxIterations > 0)
|
||||
{
|
||||
if (fNumIterations >= fMaxIterations)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
fNumIterations++;
|
||||
}
|
||||
}
|
||||
|
||||
auto tEnd = std::chrono::high_resolution_clock::now();
|
||||
|
||||
LOG(INFO) << "Leaving RUNNING state. Received " << fNumIterations << " messages in " << std::chrono::duration<double, std::milli>(tEnd - tStart).count() << "ms.";
|
||||
}
|
||||
};
|
||||
|
||||
#endif /* FAIRMQSINK_H_ */
|
||||
|
|
|
@ -14,7 +14,7 @@ namespace fairroot
|
|||
struct null_deleter
|
||||
{
|
||||
//! Function object result type
|
||||
typedef void result_type;
|
||||
using result_type = void;
|
||||
/*!
|
||||
* Does nothing
|
||||
*/
|
||||
|
|
|
@ -15,10 +15,10 @@
|
|||
|
||||
#if BOOST_VERSION < 105600
|
||||
#include "fairroot_null_deleter.h"
|
||||
typedef fairroot::null_deleter empty_deleter_t;
|
||||
using empty_deleter_t = fairroot::null_deleter;
|
||||
#else
|
||||
#include <boost/core/null_deleter.hpp>
|
||||
typedef boost::null_deleter empty_deleter_t;
|
||||
using empty_deleter_t = boost::null_deleter;
|
||||
#endif
|
||||
|
||||
#include <boost/make_shared.hpp>
|
||||
|
@ -79,7 +79,7 @@ void reinit_logger(bool color, const std::string& filename, custom_severity_leve
|
|||
void DefaultConsoleInit(bool color/* = true*/)
|
||||
{
|
||||
// add a text sink
|
||||
typedef sinks::synchronous_sink<sinks::text_ostream_backend> text_sink;
|
||||
using text_sink = sinks::synchronous_sink<sinks::text_ostream_backend>;
|
||||
|
||||
RemoveRegisteredSinks();
|
||||
|
||||
|
|
|
@ -131,8 +131,8 @@ namespace Logger
|
|||
void AddFileSink(FunT&& func, Args&&... args)
|
||||
{
|
||||
// add a text sink
|
||||
typedef boost::log::sinks::text_file_backend sink_backend_t;
|
||||
typedef boost::log::sinks::synchronous_sink<sink_backend_t> sink_t;
|
||||
using sink_backend_t = boost::log::sinks::text_file_backend;
|
||||
using sink_t = boost::log::sinks::synchronous_sink<sink_backend_t>;
|
||||
|
||||
// forward keywords args for setting log file properties
|
||||
boost::shared_ptr<sink_backend_t> backend = boost::make_shared<sink_backend_t>(std::forward<Args>(args)...);
|
||||
|
|
|
@ -85,8 +85,7 @@ inline std::string write_in(const std::string& text_in_bold)
|
|||
return os.str();
|
||||
}
|
||||
|
||||
// typedef
|
||||
typedef FairMQ::severity_level custom_severity_level;
|
||||
using custom_severity_level = FairMQ::severity_level;
|
||||
#define SEVERITY_MINIMUM custom_severity_level::TRACE
|
||||
#define SEVERITY_ERROR custom_severity_level::ERROR
|
||||
#define SEVERITY_NOLOG custom_severity_level::NOLOG
|
||||
|
|
|
@ -83,7 +83,7 @@ template <> struct Traits2<EventId::UpdateParamInt> { using signal_type = boo
|
|||
class FairMQEventManager
|
||||
{
|
||||
public:
|
||||
typedef std::pair<EventId, std::string> EventKey;
|
||||
using EventKey = std::pair<EventId, std::string>;
|
||||
|
||||
FairMQEventManager() :
|
||||
fEventMap()
|
||||
|
|
|
@ -20,7 +20,7 @@
|
|||
namespace FairMQParser
|
||||
{
|
||||
|
||||
typedef std::unordered_map<std::string, std::vector<FairMQChannel>> FairMQMap;
|
||||
using FairMQMap = std::unordered_map<std::string, std::vector<FairMQChannel>>;
|
||||
|
||||
FairMQMap ptreeToMQMap(const boost::property_tree::ptree& pt, const std::string& deviceId, const std::string& rootNode, const std::string& formatFlag = "json");
|
||||
|
||||
|
|
|
@ -296,7 +296,7 @@ class FairMQProgOptions : public FairProgOptions , public FairMQEventManager
|
|||
}
|
||||
}
|
||||
|
||||
typedef std::tuple<std::string, int, std::string> MQKey;//store key info
|
||||
using MQKey = std::tuple<std::string, int, std::string>;//store key info
|
||||
std::map<std::string, MQKey> fMQKeyMap;// key=full path - val=key info
|
||||
|
||||
virtual int NotifySwitchOption(); // for custom help & version printing
|
||||
|
|
|
@ -208,8 +208,8 @@ class FairProgOptions
|
|||
|
||||
private:
|
||||
// Methods below are helper functions used in the PrintOptions method
|
||||
typedef std::tuple<std::string, std::string, std::string, std::string> VarValInfo_t;
|
||||
typedef std::map<std::string, VarValInfo_t> MapVarValInfo_t;
|
||||
using VarValInfo_t = std::tuple<std::string, std::string, std::string, std::string>;
|
||||
using MapVarValInfo_t = std::map<std::string, VarValInfo_t>;
|
||||
|
||||
VarValInfo_t GetVariableValueInfo(const po::variable_value& varValue);
|
||||
|
||||
|
|
|
@ -94,7 +94,7 @@ inline std::string ConvertVariableValueToString<boost::filesystem::path>(const p
|
|||
// policy to convert boost variable value into string
|
||||
struct ToString
|
||||
{
|
||||
typedef std::string returned_type;
|
||||
using returned_type = std::string;
|
||||
template<typename T>
|
||||
std::string Value(const po::variable_value& varValue, const std::string&, const std::string&, const std::string&)
|
||||
{
|
||||
|
@ -110,7 +110,7 @@ struct ToString
|
|||
// policy to convert variable value content into a tuple with value, type, defaulted, empty information
|
||||
struct ToVarInfo
|
||||
{
|
||||
typedef std::tuple<std::string, std::string,std::string, std::string> returned_type;
|
||||
using returned_type = std::tuple<std::string, std::string,std::string, std::string>;
|
||||
template<typename T>
|
||||
returned_type Value(const po::variable_value& varValue, const std::string& type, const std::string& defaulted, const std::string& empty)
|
||||
{
|
||||
|
|
|
@ -22,7 +22,7 @@
|
|||
|
||||
using namespace std;
|
||||
|
||||
typedef unordered_map<string, vector<FairMQChannel>> FairMQMap;
|
||||
using FairMQMap = unordered_map<string, vector<FairMQChannel>>;
|
||||
|
||||
class MyDevice : public FairMQDevice
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue
Block a user