From 70e46a0b8634f031231f559fa5c88832f971bed8 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Fri, 1 Sep 2017 10:00:05 +0200 Subject: [PATCH] Cleanup base/MQ. --- fairmq/CMakeLists.txt | 2 +- fairmq/FairMQDevice.h | 10 ++--- fairmq/FairMQLogger.h | 2 +- fairmq/FairMQStateMachine.cxx | 8 ++-- fairmq/FairMQStateMachine.h | 8 ++-- fairmq/devices/FairMQSink.h | 58 ++++++++++++++++++++++---- fairmq/logger/fairroot_null_deleter.h | 2 +- fairmq/logger/logger.cxx | 6 +-- fairmq/logger/logger.h | 4 +- fairmq/logger/logger_def.h | 3 +- fairmq/options/FairMQEventManager.h | 2 +- fairmq/options/FairMQParser.h | 2 +- fairmq/options/FairMQProgOptions.h | 2 +- fairmq/options/FairProgOptions.h | 4 +- fairmq/options/FairProgOptionsHelper.h | 4 +- fairmq/options/runConfigEx.cxx | 2 +- 16 files changed, 81 insertions(+), 38 deletions(-) diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index 9ebd713f..8e4e5d50 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -138,7 +138,7 @@ set(FAIRMQ_SOURCE_FILES devices/FairMQMerger.cxx devices/FairMQMultiplier.cxx devices/FairMQProxy.cxx - devices/FairMQSink.cxx + # devices/FairMQSink.cxx devices/FairMQSplitter.cxx logger/logger.cxx options/FairMQParser.cxx diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index 33cd72d3..faa9b508 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -35,10 +35,10 @@ #include -typedef std::unordered_map> FairMQChannelMap; +using FairMQChannelMap = std::unordered_map>; -typedef std::function InputMsgCallback; -typedef std::function InputMultipartCallback; +using InputMsgCallback = std::function; +using InputMultipartCallback = std::function; class FairMQProgOptions; @@ -357,7 +357,7 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable /// @param rhs Left hand side value for comparison static bool SortSocketsByAddress(const FairMQChannel &lhs, const FairMQChannel &rhs); - template + template void OnData(const std::string& channelName, bool (T::* memberFunction)(FairMQMessagePtr& msg, int index)) { fDataCallbacks = true; @@ -383,7 +383,7 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable } } - template + template void OnData(const std::string& channelName, bool (T::* memberFunction)(FairMQParts& parts, int index)) { fDataCallbacks = true; diff --git a/fairmq/FairMQLogger.h b/fairmq/FairMQLogger.h index 052dee0a..a8d901a6 100644 --- a/fairmq/FairMQLogger.h +++ b/fairmq/FairMQLogger.h @@ -45,7 +45,7 @@ ADD_LOG_FILESINK(filename,ERROR); // => Print severity >= ERROR to file (extension is added) */ -typedef unsigned long long timestamp_t; +using timestamp_t = unsigned long long; timestamp_t get_timestamp(); diff --git a/fairmq/FairMQStateMachine.cxx b/fairmq/FairMQStateMachine.cxx index 9fece256..be889dc9 100644 --- a/fairmq/FairMQStateMachine.cxx +++ b/fairmq/FairMQStateMachine.cxx @@ -32,7 +32,7 @@ int FairMQStateMachine::GetInterfaceVersion() return FAIRMQ_INTERFACE_VERSION; } -int FairMQStateMachine::GetEventNumber(std::string event) +int FairMQStateMachine::GetEventNumber(const std::string& event) { if (event == "INIT_DEVICE") return INIT_DEVICE; if (event == "INIT_TASK") return INIT_TASK; @@ -142,7 +142,7 @@ bool FairMQStateMachine::ChangeState(int event) return false; } -bool FairMQStateMachine::ChangeState(std::string event) +bool FairMQStateMachine::ChangeState(const std::string& event) { return ChangeState(GetEventNumber(event)); } @@ -178,7 +178,7 @@ void FairMQStateMachine::WaitForEndOfState(int event) } } -void FairMQStateMachine::WaitForEndOfState(std::string event) +void FairMQStateMachine::WaitForEndOfState(const std::string& event) { return WaitForEndOfState(GetEventNumber(event)); } @@ -219,7 +219,7 @@ bool FairMQStateMachine::WaitForEndOfStateForMs(int event, int durationInMs) return false; } -bool FairMQStateMachine::WaitForEndOfStateForMs(std::string event, int durationInMs) +bool FairMQStateMachine::WaitForEndOfStateForMs(const std::string& event, int durationInMs) { return WaitForEndOfStateForMs(GetEventNumber(event), durationInMs); } diff --git a/fairmq/FairMQStateMachine.h b/fairmq/FairMQStateMachine.h index b27bd03b..247a8266 100644 --- a/fairmq/FairMQStateMachine.h +++ b/fairmq/FairMQStateMachine.h @@ -574,16 +574,16 @@ class FairMQStateMachine : public FairMQFSM::FairMQFSM int GetInterfaceVersion(); - int GetEventNumber(std::string event); + int GetEventNumber(const std::string& event); bool ChangeState(int event); - bool ChangeState(std::string event); + bool ChangeState(const std::string& event); void WaitForEndOfState(int state); - void WaitForEndOfState(std::string state); + void WaitForEndOfState(const std::string& state); bool WaitForEndOfStateForMs(int state, int durationInMs); - bool WaitForEndOfStateForMs(std::string state, int durationInMs); + bool WaitForEndOfStateForMs(const std::string& state, int durationInMs); void SubscribeToStateChange(const std::string& key, std::function callback); void UnsubscribeFromStateChange(const std::string& key); diff --git a/fairmq/devices/FairMQSink.h b/fairmq/devices/FairMQSink.h index 211357e7..a2768879 100644 --- a/fairmq/devices/FairMQSink.h +++ b/fairmq/devices/FairMQSink.h @@ -16,21 +16,65 @@ #define FAIRMQSINK_H_ #include +#include -#include "FairMQDevice.h" +#include "../FairMQDevice.h" +#include "../FairMQLogger.h" +#include "../options/FairMQProgOptions.h" -class FairMQSink : public FairMQDevice +// template +class FairMQSink : public FairMQDevice//, public OutputPolicy { public: - FairMQSink(); - virtual ~FairMQSink(); + FairMQSink() + : fMaxIterations(0) + , fNumIterations(0) + , fInChannelName() + {} + + virtual ~FairMQSink() + {} protected: - uint64_t fNumMsgs; + uint64_t fMaxIterations; + uint64_t fNumIterations; std::string fInChannelName; - virtual void Run(); - virtual void InitTask(); + virtual void InitTask() + { + fMaxIterations = fConfig->GetValue("num-iterations"); + fInChannelName = fConfig->GetValue("in-channel"); + } + + virtual void Run() + { + // store the channel reference to avoid traversing the map on every loop iteration + FairMQChannel& dataInChannel = fChannels.at(fInChannelName).at(0); + + LOG(INFO) << "Starting the benchmark and expecting to receive " << fMaxIterations << " messages."; + auto tStart = std::chrono::high_resolution_clock::now(); + + while (CheckCurrentState(RUNNING)) + { + FairMQMessagePtr msg(dataInChannel.Transport()->CreateMessage()); + + if (dataInChannel.Receive(msg) >= 0) + { + if (fMaxIterations > 0) + { + if (fNumIterations >= fMaxIterations) + { + break; + } + } + fNumIterations++; + } + } + + auto tEnd = std::chrono::high_resolution_clock::now(); + + LOG(INFO) << "Leaving RUNNING state. Received " << fNumIterations << " messages in " << std::chrono::duration(tEnd - tStart).count() << "ms."; + } }; #endif /* FAIRMQSINK_H_ */ diff --git a/fairmq/logger/fairroot_null_deleter.h b/fairmq/logger/fairroot_null_deleter.h index 4c6613c1..7c161f9e 100644 --- a/fairmq/logger/fairroot_null_deleter.h +++ b/fairmq/logger/fairroot_null_deleter.h @@ -14,7 +14,7 @@ namespace fairroot struct null_deleter { //! Function object result type - typedef void result_type; + using result_type = void; /*! * Does nothing */ diff --git a/fairmq/logger/logger.cxx b/fairmq/logger/logger.cxx index 9c21a44b..1d1c18a5 100644 --- a/fairmq/logger/logger.cxx +++ b/fairmq/logger/logger.cxx @@ -15,10 +15,10 @@ #if BOOST_VERSION < 105600 #include "fairroot_null_deleter.h" -typedef fairroot::null_deleter empty_deleter_t; +using empty_deleter_t = fairroot::null_deleter; #else #include -typedef boost::null_deleter empty_deleter_t; +using empty_deleter_t = boost::null_deleter; #endif #include @@ -79,7 +79,7 @@ void reinit_logger(bool color, const std::string& filename, custom_severity_leve void DefaultConsoleInit(bool color/* = true*/) { // add a text sink - typedef sinks::synchronous_sink text_sink; + using text_sink = sinks::synchronous_sink; RemoveRegisteredSinks(); diff --git a/fairmq/logger/logger.h b/fairmq/logger/logger.h index 7d242d23..b48dd402 100644 --- a/fairmq/logger/logger.h +++ b/fairmq/logger/logger.h @@ -131,8 +131,8 @@ namespace Logger void AddFileSink(FunT&& func, Args&&... args) { // add a text sink - typedef boost::log::sinks::text_file_backend sink_backend_t; - typedef boost::log::sinks::synchronous_sink sink_t; + using sink_backend_t = boost::log::sinks::text_file_backend; + using sink_t = boost::log::sinks::synchronous_sink; // forward keywords args for setting log file properties boost::shared_ptr backend = boost::make_shared(std::forward(args)...); diff --git a/fairmq/logger/logger_def.h b/fairmq/logger/logger_def.h index 19d7d235..f029e27b 100644 --- a/fairmq/logger/logger_def.h +++ b/fairmq/logger/logger_def.h @@ -85,8 +85,7 @@ inline std::string write_in(const std::string& text_in_bold) return os.str(); } -// typedef -typedef FairMQ::severity_level custom_severity_level; +using custom_severity_level = FairMQ::severity_level; #define SEVERITY_MINIMUM custom_severity_level::TRACE #define SEVERITY_ERROR custom_severity_level::ERROR #define SEVERITY_NOLOG custom_severity_level::NOLOG diff --git a/fairmq/options/FairMQEventManager.h b/fairmq/options/FairMQEventManager.h index a1e4629a..71244b25 100644 --- a/fairmq/options/FairMQEventManager.h +++ b/fairmq/options/FairMQEventManager.h @@ -83,7 +83,7 @@ template <> struct Traits2 { using signal_type = boo class FairMQEventManager { public: - typedef std::pair EventKey; + using EventKey = std::pair; FairMQEventManager() : fEventMap() diff --git a/fairmq/options/FairMQParser.h b/fairmq/options/FairMQParser.h index 575946be..ffe8696b 100644 --- a/fairmq/options/FairMQParser.h +++ b/fairmq/options/FairMQParser.h @@ -20,7 +20,7 @@ namespace FairMQParser { -typedef std::unordered_map> FairMQMap; +using FairMQMap = std::unordered_map>; FairMQMap ptreeToMQMap(const boost::property_tree::ptree& pt, const std::string& deviceId, const std::string& rootNode, const std::string& formatFlag = "json"); diff --git a/fairmq/options/FairMQProgOptions.h b/fairmq/options/FairMQProgOptions.h index b24495e5..bd33ba75 100644 --- a/fairmq/options/FairMQProgOptions.h +++ b/fairmq/options/FairMQProgOptions.h @@ -296,7 +296,7 @@ class FairMQProgOptions : public FairProgOptions , public FairMQEventManager } } - typedef std::tuple MQKey;//store key info + using MQKey = std::tuple;//store key info std::map fMQKeyMap;// key=full path - val=key info virtual int NotifySwitchOption(); // for custom help & version printing diff --git a/fairmq/options/FairProgOptions.h b/fairmq/options/FairProgOptions.h index 18c60a04..11c40294 100644 --- a/fairmq/options/FairProgOptions.h +++ b/fairmq/options/FairProgOptions.h @@ -208,8 +208,8 @@ class FairProgOptions private: // Methods below are helper functions used in the PrintOptions method - typedef std::tuple VarValInfo_t; - typedef std::map MapVarValInfo_t; + using VarValInfo_t = std::tuple; + using MapVarValInfo_t = std::map; VarValInfo_t GetVariableValueInfo(const po::variable_value& varValue); diff --git a/fairmq/options/FairProgOptionsHelper.h b/fairmq/options/FairProgOptionsHelper.h index da2d080f..49cbfe3b 100644 --- a/fairmq/options/FairProgOptionsHelper.h +++ b/fairmq/options/FairProgOptionsHelper.h @@ -94,7 +94,7 @@ inline std::string ConvertVariableValueToString(const p // policy to convert boost variable value into string struct ToString { - typedef std::string returned_type; + using returned_type = std::string; template std::string Value(const po::variable_value& varValue, const std::string&, const std::string&, const std::string&) { @@ -110,7 +110,7 @@ struct ToString // policy to convert variable value content into a tuple with value, type, defaulted, empty information struct ToVarInfo { - typedef std::tuple returned_type; + using returned_type = std::tuple; template returned_type Value(const po::variable_value& varValue, const std::string& type, const std::string& defaulted, const std::string& empty) { diff --git a/fairmq/options/runConfigEx.cxx b/fairmq/options/runConfigEx.cxx index 37b30a99..4b79d9d5 100644 --- a/fairmq/options/runConfigEx.cxx +++ b/fairmq/options/runConfigEx.cxx @@ -22,7 +22,7 @@ using namespace std; -typedef unordered_map> FairMQMap; +using FairMQMap = unordered_map>; class MyDevice : public FairMQDevice {