From 319bdc91a1105bbe7eed621c9e4e6643d7173a37 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Wed, 23 Aug 2017 11:12:29 +0200 Subject: [PATCH] Tests for MQ examples --- examples/MQ/1-sampler-sink/CMakeLists.txt | 40 ++++++++------- .../1-sampler-sink/FairMQExample1Sampler.cxx | 14 ++++-- .../MQ/1-sampler-sink/FairMQExample1Sampler.h | 2 + .../MQ/1-sampler-sink/FairMQExample1Sink.cxx | 17 ++++++- .../MQ/1-sampler-sink/FairMQExample1Sink.h | 5 ++ .../MQ/1-sampler-sink/runExample1Sampler.cxx | 3 +- .../MQ/1-sampler-sink/runExample1Sink.cxx | 4 +- examples/MQ/1-sampler-sink/testMQEx1.sh.in | 26 ++++++++++ .../2-sampler-processor-sink/CMakeLists.txt | 46 +++++++++-------- .../FairMQExample2Sampler.cxx | 14 ++++-- .../FairMQExample2Sampler.h | 2 + .../FairMQExample2Sink.cxx | 17 ++++++- .../FairMQExample2Sink.h | 5 ++ .../runExample2Sampler.cxx | 3 +- .../runExample2Sink.cxx | 4 +- .../2-sampler-processor-sink/testMQEx2.sh.in | 50 +++++++++++++++++++ examples/MQ/4-copypush/CMakeLists.txt | 12 +++-- .../MQ/4-copypush/FairMQExample4Sampler.cxx | 16 +++++- .../MQ/4-copypush/FairMQExample4Sampler.h | 2 + examples/MQ/4-copypush/FairMQExample4Sink.cxx | 15 ++++++ examples/MQ/4-copypush/FairMQExample4Sink.h | 5 ++ examples/MQ/4-copypush/runExample4Sampler.cxx | 4 +- examples/MQ/4-copypush/runExample4Sink.cxx | 4 +- examples/MQ/4-copypush/testMQEx4.sh.in | 35 +++++++++++++ examples/MQ/5-req-rep/CMakeLists.txt | 30 ++++++----- .../MQ/5-req-rep/FairMQExample5Client.cxx | 13 ++++- examples/MQ/5-req-rep/FairMQExample5Client.h | 2 + .../MQ/5-req-rep/FairMQExample5Server.cxx | 15 ++++++ examples/MQ/5-req-rep/FairMQExample5Server.h | 5 ++ examples/MQ/5-req-rep/runExample5Client.cxx | 3 +- examples/MQ/5-req-rep/runExample5Server.cxx | 4 +- examples/MQ/5-req-rep/testMQEx5.sh.in | 26 ++++++++++ .../MQ/6-multiple-channels/CMakeLists.txt | 40 ++++++++------- .../FairMQExample6Sampler.cxx | 9 ++++ .../FairMQExample6Sampler.h | 2 + .../FairMQExample6Sink.cxx | 29 ++++++++++- .../6-multiple-channels/FairMQExample6Sink.h | 8 +++ .../runExample6Sampler.cxx | 3 +- .../6-multiple-channels/runExample6Sink.cxx | 4 +- .../MQ/6-multiple-channels/testMQEx6.sh.in | 40 +++++++++++++++ examples/MQ/8-multipart/CMakeLists.txt | 12 +++-- .../MQ/8-multipart/FairMQExample8Sampler.cxx | 27 +++++++--- .../MQ/8-multipart/FairMQExample8Sampler.h | 4 +- .../MQ/8-multipart/FairMQExample8Sink.cxx | 6 ++- .../MQ/8-multipart/runExample8Sampler.cxx | 4 +- examples/MQ/8-multipart/testMQEx8.sh.in | 23 +++++++++ .../MQ/multiple-transports/CMakeLists.txt | 12 +++-- .../FairMQExampleMTSampler1.cxx | 9 ++++ .../FairMQExampleMTSampler1.h | 2 + .../FairMQExampleMTSampler2.cxx | 13 +++++ .../FairMQExampleMTSampler2.h | 5 ++ .../FairMQExampleMTSink.cxx | 29 ++++++++++- .../multiple-transports/FairMQExampleMTSink.h | 9 ++++ .../runExampleMTSampler1.cxx | 2 +- .../runExampleMTSampler2.cxx | 2 +- .../multiple-transports/runExampleMTSink.cxx | 4 +- .../MQ/multiple-transports/testMTEx.sh.in | 35 +++++++++++++ 57 files changed, 658 insertions(+), 118 deletions(-) create mode 100755 examples/MQ/1-sampler-sink/testMQEx1.sh.in create mode 100755 examples/MQ/2-sampler-processor-sink/testMQEx2.sh.in create mode 100755 examples/MQ/4-copypush/testMQEx4.sh.in create mode 100755 examples/MQ/5-req-rep/testMQEx5.sh.in create mode 100755 examples/MQ/6-multiple-channels/testMQEx6.sh.in create mode 100755 examples/MQ/8-multipart/testMQEx8.sh.in create mode 100755 examples/MQ/multiple-transports/testMTEx.sh.in diff --git a/examples/MQ/1-sampler-sink/CMakeLists.txt b/examples/MQ/1-sampler-sink/CMakeLists.txt index ae5c904a..96380de8 100644 --- a/examples/MQ/1-sampler-sink/CMakeLists.txt +++ b/examples/MQ/1-sampler-sink/CMakeLists.txt @@ -6,12 +6,11 @@ # copied verbatim in the file "LICENSE" # ################################################################################ -configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/1-sampler-sink/ex1-sampler-sink.json - ${CMAKE_BINARY_DIR}/bin/config/ex1-sampler-sink.json) -configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/1-sampler-sink/startMQEx1.sh.in - ${CMAKE_BINARY_DIR}/bin/examples/MQ/1-sampler-sink/startMQEx1.sh) +configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/1-sampler-sink/ex1-sampler-sink.json ${CMAKE_BINARY_DIR}/bin/config/ex1-sampler-sink.json) +configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/1-sampler-sink/startMQEx1.sh.in ${CMAKE_BINARY_DIR}/bin/examples/MQ/1-sampler-sink/startMQEx1.sh) +configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/1-sampler-sink/testMQEx1.sh.in ${CMAKE_BINARY_DIR}/bin/examples/MQ/1-sampler-sink/testMQEx1.sh) -Set(INCLUDE_DIRECTORIES +set(INCLUDE_DIRECTORIES ${CMAKE_SOURCE_DIR}/fairmq ${CMAKE_SOURCE_DIR}/fairmq/devices ${CMAKE_SOURCE_DIR}/fairmq/tools @@ -20,40 +19,40 @@ Set(INCLUDE_DIRECTORIES ${CMAKE_CURRENT_BINARY_DIR} ) -Set(SYSTEM_INCLUDE_DIRECTORIES +set(SYSTEM_INCLUDE_DIRECTORIES ${Boost_INCLUDE_DIR} ${ZeroMQ_INCLUDE_DIR} ) -Include_Directories(${INCLUDE_DIRECTORIES}) -Include_Directories(SYSTEM ${SYSTEM_INCLUDE_DIRECTORIES}) +include_directories(${INCLUDE_DIRECTORIES}) +include_directories(SYSTEM ${SYSTEM_INCLUDE_DIRECTORIES}) -Set(LINK_DIRECTORIES +set(LINK_DIRECTORIES ${Boost_LIBRARY_DIRS} ) -Link_Directories(${LINK_DIRECTORIES}) +link_directories(${LINK_DIRECTORIES}) -Set(SRCS +set(SRCS "FairMQExample1Sampler.cxx" "FairMQExample1Sink.cxx" ) -Set(DEPENDENCIES +set(DEPENDENCIES ${DEPENDENCIES} FairMQ ) -Set(LIBRARY_NAME FairMQExample1) +set(LIBRARY_NAME FairMQExample1) GENERATE_LIBRARY() -Set(Exe_Names +set(Exe_Names ex1-sampler ex1-sink ) -Set(Exe_Source +set(Exe_Source runExample1Sampler.cxx runExample1Sink.cxx ) @@ -64,16 +63,21 @@ math(EXPR _length ${_length}-1) set(BIN_DESTINATION share/fairbase/examples/MQ/1-sampler-sink/bin) set(EXECUTABLE_OUTPUT_PATH "${EXECUTABLE_OUTPUT_PATH}/examples/MQ/1-sampler-sink") -ForEach(_file RANGE 0 ${_length}) +foreach(_file RANGE 0 ${_length}) list(GET Exe_Names ${_file} _name) list(GET Exe_Source ${_file} _src) set(EXE_NAME ${_name}) set(SRCS ${_src}) set(DEPENDENCIES FairMQExample1) GENERATE_EXECUTABLE() -EndForEach(_file RANGE 0 ${_length}) +endforeach(_file RANGE 0 ${_length}) -Install( +add_test(NAME MQ.ex1-sampler-sink COMMAND ${CMAKE_BINARY_DIR}/bin/examples/MQ/1-sampler-sink/testMQEx1.sh) +set_tests_properties(MQ.ex1-sampler-sink PROPERTIES TIMEOUT "30") +set_tests_properties(MQ.ex1-sampler-sink PROPERTIES RUN_SERIAL true) +set_tests_properties(MQ.ex1-sampler-sink PROPERTIES PASS_REGULAR_EXPRESSION " Received: \"Hello\"") + +install( FILES ex1-sampler-sink.json DESTINATION share/fairbase/examples/MQ/1-sampler-sink/config/ ) diff --git a/examples/MQ/1-sampler-sink/FairMQExample1Sampler.cxx b/examples/MQ/1-sampler-sink/FairMQExample1Sampler.cxx index 963f96cd..2bbb8c2c 100644 --- a/examples/MQ/1-sampler-sink/FairMQExample1Sampler.cxx +++ b/examples/MQ/1-sampler-sink/FairMQExample1Sampler.cxx @@ -23,19 +23,20 @@ using namespace std; FairMQExample1Sampler::FairMQExample1Sampler() : fText() + , fMaxIterations(0) + , fNumIterations(0) { } void FairMQExample1Sampler::InitTask() { - // Get the fText value from the command line option (via fConfig) + // Get the fText and fMaxIterations values from the command line options (via fConfig) fText = fConfig->GetValue("text"); + fMaxIterations = fConfig->GetValue("max-iterations"); } bool FairMQExample1Sampler::ConditionalRun() { - std::this_thread::sleep_for(std::chrono::seconds(1)); - // create a copy of the data with new(), that will be deleted after the transfer is complete string* text = new string(fText); @@ -56,6 +57,13 @@ bool FairMQExample1Sampler::ConditionalRun() { return false; } + else if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) + { + LOG(INFO) << "Configured maximum number of iterations reached. Leaving RUNNING state."; + return false; + } + + this_thread::sleep_for(chrono::seconds(1)); return true; } diff --git a/examples/MQ/1-sampler-sink/FairMQExample1Sampler.h b/examples/MQ/1-sampler-sink/FairMQExample1Sampler.h index 7aebce39..867e8e7b 100644 --- a/examples/MQ/1-sampler-sink/FairMQExample1Sampler.h +++ b/examples/MQ/1-sampler-sink/FairMQExample1Sampler.h @@ -27,6 +27,8 @@ class FairMQExample1Sampler : public FairMQDevice protected: std::string fText; + uint64_t fMaxIterations; + uint64_t fNumIterations; virtual void InitTask(); virtual bool ConditionalRun(); diff --git a/examples/MQ/1-sampler-sink/FairMQExample1Sink.cxx b/examples/MQ/1-sampler-sink/FairMQExample1Sink.cxx index 5aaa47e9..f998cb83 100644 --- a/examples/MQ/1-sampler-sink/FairMQExample1Sink.cxx +++ b/examples/MQ/1-sampler-sink/FairMQExample1Sink.cxx @@ -14,21 +14,36 @@ #include "FairMQExample1Sink.h" #include "FairMQLogger.h" +#include "FairMQProgOptions.h" // device->fConfig using namespace std; FairMQExample1Sink::FairMQExample1Sink() + : fMaxIterations(0) + , fNumIterations(0) { // register a handler for data arriving on "data" channel OnData("data", &FairMQExample1Sink::HandleData); } +void FairMQExample1Sink::InitTask() +{ + // Get the fMaxIterations value from the command line options (via fConfig) + fMaxIterations = fConfig->GetValue("max-iterations"); +} + // handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0) bool FairMQExample1Sink::HandleData(FairMQMessagePtr& msg, int /*index*/) { LOG(INFO) << "Received: \"" << string(static_cast(msg->GetData()), msg->GetSize()) << "\""; - // return true if want to be called again (otherwise go to IDLE state) + if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) + { + LOG(INFO) << "Configured maximum number of iterations reached. Leaving RUNNING state."; + return false; + } + + // return true if want to be called again (otherwise return false go to IDLE state) return true; } diff --git a/examples/MQ/1-sampler-sink/FairMQExample1Sink.h b/examples/MQ/1-sampler-sink/FairMQExample1Sink.h index bd6f35ea..4e559c2c 100644 --- a/examples/MQ/1-sampler-sink/FairMQExample1Sink.h +++ b/examples/MQ/1-sampler-sink/FairMQExample1Sink.h @@ -24,7 +24,12 @@ class FairMQExample1Sink : public FairMQDevice virtual ~FairMQExample1Sink(); protected: + virtual void InitTask(); bool HandleData(FairMQMessagePtr&, int); + + private: + uint64_t fMaxIterations; + uint64_t fNumIterations; }; #endif /* FAIRMQEXAMPLE1SINK_H_ */ diff --git a/examples/MQ/1-sampler-sink/runExample1Sampler.cxx b/examples/MQ/1-sampler-sink/runExample1Sampler.cxx index 27473638..c0d0729d 100644 --- a/examples/MQ/1-sampler-sink/runExample1Sampler.cxx +++ b/examples/MQ/1-sampler-sink/runExample1Sampler.cxx @@ -14,7 +14,8 @@ namespace bpo = boost::program_options; void addCustomOptions(bpo::options_description& options) { options.add_options() - ("text", bpo::value()->default_value("Hello"), "Text to send out"); + ("text", bpo::value()->default_value("Hello"), "Text to send out") + ("max-iterations", bpo::value()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)"); } FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) diff --git a/examples/MQ/1-sampler-sink/runExample1Sink.cxx b/examples/MQ/1-sampler-sink/runExample1Sink.cxx index 46656b02..1e4b5f93 100644 --- a/examples/MQ/1-sampler-sink/runExample1Sink.cxx +++ b/examples/MQ/1-sampler-sink/runExample1Sink.cxx @@ -11,8 +11,10 @@ namespace bpo = boost::program_options; -void addCustomOptions(bpo::options_description& /*options*/) +void addCustomOptions(bpo::options_description& options) { + options.add_options() + ("max-iterations", bpo::value()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)"); } FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) diff --git a/examples/MQ/1-sampler-sink/testMQEx1.sh.in b/examples/MQ/1-sampler-sink/testMQEx1.sh.in new file mode 100755 index 00000000..434ad500 --- /dev/null +++ b/examples/MQ/1-sampler-sink/testMQEx1.sh.in @@ -0,0 +1,26 @@ +#!/bin/bash + +ex1config="@CMAKE_BINARY_DIR@/bin/config/ex1-sampler-sink.json" + +# setup a trap to kill everything if the test fails/timeouts +trap 'kill -TERM $SAMPLER_PID; kill -TERM $SINK_PID; wait $SAMPLER_PID; wait $SINK_PID;' TERM + +SAMPLER="ex1-sampler" +SAMPLER+=" --id sampler1" +SAMPLER+=" --control static --log-color false" +SAMPLER+=" --max-iterations 1" +SAMPLER+=" --mq-config $ex1config" +@CMAKE_BINARY_DIR@/bin/examples/MQ/1-sampler-sink/$SAMPLER & +SAMPLER_PID=$! + +SINK="ex1-sink" +SINK+=" --id sink1" +SINK+=" --control static --log-color false" +SINK+=" --max-iterations 1" +SINK+=" --mq-config $ex1config" +@CMAKE_BINARY_DIR@/bin/examples/MQ/1-sampler-sink/$SINK & +SINK_PID=$! + +# wait for sampler and sink to finish +wait $SAMPLER_PID +wait $SINK_PID diff --git a/examples/MQ/2-sampler-processor-sink/CMakeLists.txt b/examples/MQ/2-sampler-processor-sink/CMakeLists.txt index b346faaf..12fc90a3 100644 --- a/examples/MQ/2-sampler-processor-sink/CMakeLists.txt +++ b/examples/MQ/2-sampler-processor-sink/CMakeLists.txt @@ -6,12 +6,11 @@ # copied verbatim in the file "LICENSE" # ################################################################################ -configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/2-sampler-processor-sink/ex2-sampler-processor-sink.json - ${CMAKE_BINARY_DIR}/bin/config/ex2-sampler-processor-sink.json) -configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/2-sampler-processor-sink/startMQEx2.sh.in - ${CMAKE_BINARY_DIR}/bin/examples/MQ/2-sampler-processor-sink/startMQEx2.sh) +configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/2-sampler-processor-sink/ex2-sampler-processor-sink.json ${CMAKE_BINARY_DIR}/bin/config/ex2-sampler-processor-sink.json) +configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/2-sampler-processor-sink/startMQEx2.sh.in ${CMAKE_BINARY_DIR}/bin/examples/MQ/2-sampler-processor-sink/startMQEx2.sh) +configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/2-sampler-processor-sink/testMQEx2.sh.in ${CMAKE_BINARY_DIR}/bin/examples/MQ/2-sampler-processor-sink/testMQEx2.sh) -Set(INCLUDE_DIRECTORIES +set(INCLUDE_DIRECTORIES ${CMAKE_SOURCE_DIR}/fairmq ${CMAKE_SOURCE_DIR}/fairmq/devices ${CMAKE_SOURCE_DIR}/fairmq/tools @@ -20,42 +19,42 @@ Set(INCLUDE_DIRECTORIES ${CMAKE_CURRENT_BINARY_DIR} ) -Set(SYSTEM_INCLUDE_DIRECTORIES +set(SYSTEM_INCLUDE_DIRECTORIES ${Boost_INCLUDE_DIR} ${ZeroMQ_INCLUDE_DIR} ) -Include_Directories(${INCLUDE_DIRECTORIES}) -Include_Directories(SYSTEM ${SYSTEM_INCLUDE_DIRECTORIES}) +include_directories(${INCLUDE_DIRECTORIES}) +include_directories(SYSTEM ${SYSTEM_INCLUDE_DIRECTORIES}) -Set(LINK_DIRECTORIES +set(LINK_DIRECTORIES ${Boost_LIBRARY_DIRS} ) -Link_Directories(${LINK_DIRECTORIES}) +link_directories(${LINK_DIRECTORIES}) -Set(SRCS +set(SRCS "FairMQExample2Sampler.cxx" "FairMQExample2Processor.cxx" "FairMQExample2Sink.cxx" ) -Set(DEPENDENCIES +set(DEPENDENCIES ${DEPENDENCIES} FairMQ ) -Set(LIBRARY_NAME FairMQExample2) +set(LIBRARY_NAME FairMQExample2) GENERATE_LIBRARY() -Set(Exe_Names +set(Exe_Names ex2-sampler ex2-processor ex2-sink ) -Set(Exe_Source +set(Exe_Source runExample2Sampler.cxx runExample2Processor.cxx runExample2Sink.cxx @@ -67,16 +66,21 @@ math(EXPR _length ${_length}-1) set(BIN_DESTINATION share/fairbase/examples/MQ/2-sampler-processor-sink/bin) set(EXECUTABLE_OUTPUT_PATH "${EXECUTABLE_OUTPUT_PATH}/examples/MQ/2-sampler-processor-sink") -ForEach(_file RANGE 0 ${_length}) +foreach(_file RANGE 0 ${_length}) list(GET Exe_Names ${_file} _name) list(GET Exe_Source ${_file} _src) - Set(EXE_NAME ${_name}) - Set(SRCS ${_src}) - Set(DEPENDENCIES FairMQExample2) + set(EXE_NAME ${_name}) + set(SRCS ${_src}) + set(DEPENDENCIES FairMQExample2) GENERATE_EXECUTABLE() -EndForEach(_file RANGE 0 ${_length}) +endforeach(_file RANGE 0 ${_length}) -Install( +add_test(NAME MQ.ex2-sampler-processor-sink COMMAND ${CMAKE_BINARY_DIR}/bin/examples/MQ/2-sampler-processor-sink/testMQEx2.sh) +set_tests_properties(MQ.ex2-sampler-processor-sink PROPERTIES TIMEOUT "30") +set_tests_properties(MQ.ex2-sampler-processor-sink PROPERTIES RUN_SERIAL true) +set_tests_properties(MQ.ex2-sampler-processor-sink PROPERTIES PASS_REGULAR_EXPRESSION "Received: ") + +install( FILES ex2-sampler-processor-sink.json DESTINATION share/fairbase/examples/MQ/2-sampler-processor-sink/config/ ) diff --git a/examples/MQ/2-sampler-processor-sink/FairMQExample2Sampler.cxx b/examples/MQ/2-sampler-processor-sink/FairMQExample2Sampler.cxx index 7ee5752e..d034eeb4 100644 --- a/examples/MQ/2-sampler-processor-sink/FairMQExample2Sampler.cxx +++ b/examples/MQ/2-sampler-processor-sink/FairMQExample2Sampler.cxx @@ -23,19 +23,20 @@ using namespace std; FairMQExample2Sampler::FairMQExample2Sampler() : fText() + , fMaxIterations(0) + , fNumIterations(0) { } void FairMQExample2Sampler::InitTask() { - // Get the fText value from the command line option (via fConfig) + // Get the fText and fMaxIterations values from the command line options (via fConfig) fText = fConfig->GetValue("text"); + fMaxIterations = fConfig->GetValue("max-iterations"); } bool FairMQExample2Sampler::ConditionalRun() { - this_thread::sleep_for(chrono::seconds(1)); - // Initializing message with NewStaticMessage will avoid copy // but won't delete the data after the sending is completed. FairMQMessagePtr msg(NewStaticMessage(fText)); @@ -48,6 +49,13 @@ bool FairMQExample2Sampler::ConditionalRun() { return false; } + else if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) + { + LOG(INFO) << "Configured maximum number of iterations reached. Leaving RUNNING state."; + return false; + } + + this_thread::sleep_for(chrono::seconds(1)); return true; } diff --git a/examples/MQ/2-sampler-processor-sink/FairMQExample2Sampler.h b/examples/MQ/2-sampler-processor-sink/FairMQExample2Sampler.h index 82ba081c..988853f3 100644 --- a/examples/MQ/2-sampler-processor-sink/FairMQExample2Sampler.h +++ b/examples/MQ/2-sampler-processor-sink/FairMQExample2Sampler.h @@ -27,6 +27,8 @@ class FairMQExample2Sampler : public FairMQDevice protected: std::string fText; + uint64_t fMaxIterations; + uint64_t fNumIterations; virtual void InitTask(); virtual bool ConditionalRun(); diff --git a/examples/MQ/2-sampler-processor-sink/FairMQExample2Sink.cxx b/examples/MQ/2-sampler-processor-sink/FairMQExample2Sink.cxx index 74ef614c..9043597f 100644 --- a/examples/MQ/2-sampler-processor-sink/FairMQExample2Sink.cxx +++ b/examples/MQ/2-sampler-processor-sink/FairMQExample2Sink.cxx @@ -14,21 +14,36 @@ #include "FairMQExample2Sink.h" #include "FairMQLogger.h" +#include "FairMQProgOptions.h" // device->fConfig using namespace std; FairMQExample2Sink::FairMQExample2Sink() + : fMaxIterations(0) + , fNumIterations(0) { // register a handler for data arriving on "data2" channel OnData("data2", &FairMQExample2Sink::HandleData); } +void FairMQExample2Sink::InitTask() +{ + // Get the fMaxIterations value from the command line options (via fConfig) + fMaxIterations = fConfig->GetValue("max-iterations"); +} + // handler is called whenever a message arrives on "data2", with a reference to the message and a sub-channel index (here 0) bool FairMQExample2Sink::HandleData(FairMQMessagePtr& msg, int /*index*/) { LOG(INFO) << "Received: \"" << string(static_cast(msg->GetData()), msg->GetSize()) << "\""; - // return true if want to be called again (otherwise go to IDLE state) + if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) + { + LOG(INFO) << "Configured maximum number of iterations reached. Leaving RUNNING state."; + return false; + } + + // return true if want to be called again (otherwise return false go to IDLE state) return true; } diff --git a/examples/MQ/2-sampler-processor-sink/FairMQExample2Sink.h b/examples/MQ/2-sampler-processor-sink/FairMQExample2Sink.h index 437d7a3c..b882eba0 100644 --- a/examples/MQ/2-sampler-processor-sink/FairMQExample2Sink.h +++ b/examples/MQ/2-sampler-processor-sink/FairMQExample2Sink.h @@ -24,7 +24,12 @@ class FairMQExample2Sink : public FairMQDevice virtual ~FairMQExample2Sink(); protected: + virtual void InitTask(); bool HandleData(FairMQMessagePtr&, int); + + private: + uint64_t fMaxIterations; + uint64_t fNumIterations; }; #endif /* FAIRMQEXAMPLE2SINK_H_ */ diff --git a/examples/MQ/2-sampler-processor-sink/runExample2Sampler.cxx b/examples/MQ/2-sampler-processor-sink/runExample2Sampler.cxx index c82d076b..375c444e 100644 --- a/examples/MQ/2-sampler-processor-sink/runExample2Sampler.cxx +++ b/examples/MQ/2-sampler-processor-sink/runExample2Sampler.cxx @@ -14,7 +14,8 @@ namespace bpo = boost::program_options; void addCustomOptions(bpo::options_description& options) { options.add_options() - ("text", bpo::value()->default_value("Hello"), "Text to send out"); + ("text", bpo::value()->default_value("Hello"), "Text to send out") + ("max-iterations", bpo::value()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)"); } FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) diff --git a/examples/MQ/2-sampler-processor-sink/runExample2Sink.cxx b/examples/MQ/2-sampler-processor-sink/runExample2Sink.cxx index b688fbca..a93387a0 100644 --- a/examples/MQ/2-sampler-processor-sink/runExample2Sink.cxx +++ b/examples/MQ/2-sampler-processor-sink/runExample2Sink.cxx @@ -11,8 +11,10 @@ namespace bpo = boost::program_options; -void addCustomOptions(bpo::options_description& /*options*/) +void addCustomOptions(bpo::options_description& options) { + options.add_options() + ("max-iterations", bpo::value()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)"); } FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) diff --git a/examples/MQ/2-sampler-processor-sink/testMQEx2.sh.in b/examples/MQ/2-sampler-processor-sink/testMQEx2.sh.in new file mode 100755 index 00000000..831023df --- /dev/null +++ b/examples/MQ/2-sampler-processor-sink/testMQEx2.sh.in @@ -0,0 +1,50 @@ +#!/bin/bash + +ex2config="@CMAKE_BINARY_DIR@/bin/config/ex2-sampler-processor-sink.json" + +# setup a trap to kill everything if the test fails/timeouts +trap 'kill -TERM $SAMPLER_PID; kill -TERM $SINK_PID; kill -TERM $PROCESSOR1_PID; kill -TERM $PROCESSOR2_PID; wait $SAMPLER_PID; wait $SINK_PID; wait $PROCESSOR1_PID; wait $PROCESSOR2_PID;' TERM + +SAMPLER="ex2-sampler" +SAMPLER+=" --id sampler1" +SAMPLER+=" --control static --log-color false" +SAMPLER+=" --max-iterations 2" +SAMPLER+=" --mq-config $ex2config" +@CMAKE_BINARY_DIR@/bin/examples/MQ/2-sampler-processor-sink/$SAMPLER & +SAMPLER_PID=$! + +PROCESSOR1="ex2-processor" +PROCESSOR1+=" --id processor1" +PROCESSOR1+=" --control static --log-color false" +PROCESSOR1+=" --mq-config $ex2config" +PROCESSOR1+=" --config-key processor" +@CMAKE_BINARY_DIR@/bin/examples/MQ/2-sampler-processor-sink/$PROCESSOR1 & +PROCESSOR1_PID=$! + +PROCESSOR2="ex2-processor" +PROCESSOR2+=" --id processor2" +PROCESSOR2+=" --control static --log-color false" +PROCESSOR2+=" --mq-config $ex2config" +PROCESSOR2+=" --config-key processor" +@CMAKE_BINARY_DIR@/bin/examples/MQ/2-sampler-processor-sink/$PROCESSOR2 & +PROCESSOR2_PID=$! + +SINK="ex2-sink" +SINK+=" --id sink1" +SINK+=" --control static --log-color false" +SINK+=" --max-iterations 2" +SINK+=" --mq-config $ex2config" +@CMAKE_BINARY_DIR@/bin/examples/MQ/2-sampler-processor-sink/$SINK & +SINK_PID=$! + +# wait for sampler and sink to finish +wait $SAMPLER_PID +wait $SINK_PID + +# stop processors +kill -SIGINT $PROCESSOR1_PID +kill -SIGINT $PROCESSOR2_PID + +# wait for everything to finish +wait $PROCESSOR1_PID +wait $PROCESSOR2_PID diff --git a/examples/MQ/4-copypush/CMakeLists.txt b/examples/MQ/4-copypush/CMakeLists.txt index 6ecba16a..0abc3837 100644 --- a/examples/MQ/4-copypush/CMakeLists.txt +++ b/examples/MQ/4-copypush/CMakeLists.txt @@ -6,10 +6,9 @@ # copied verbatim in the file "LICENSE" # ################################################################################ -configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/4-copypush/ex4-copypush.json - ${CMAKE_BINARY_DIR}/bin/config/ex4-copypush.json) -configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/4-copypush/startMQEx4.sh.in - ${CMAKE_BINARY_DIR}/bin/examples/MQ/4-copypush/startMQEx4.sh) +configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/4-copypush/ex4-copypush.json ${CMAKE_BINARY_DIR}/bin/config/ex4-copypush.json) +configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/4-copypush/startMQEx4.sh.in ${CMAKE_BINARY_DIR}/bin/examples/MQ/4-copypush/startMQEx4.sh) +configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/4-copypush/testMQEx4.sh.in ${CMAKE_BINARY_DIR}/bin/examples/MQ/4-copypush/testMQEx4.sh) Set(INCLUDE_DIRECTORIES ${CMAKE_SOURCE_DIR}/fairmq @@ -73,6 +72,11 @@ ForEach(_file RANGE 0 ${_length}) GENERATE_EXECUTABLE() EndForEach(_file RANGE 0 ${_length}) +add_test(NAME MQ.ex4-copypush COMMAND ${CMAKE_BINARY_DIR}/bin/examples/MQ/4-copypush/testMQEx4.sh) +set_tests_properties(MQ.ex4-copypush PROPERTIES TIMEOUT "30") +set_tests_properties(MQ.ex4-copypush PROPERTIES RUN_SERIAL true) +set_tests_properties(MQ.ex4-copypush PROPERTIES PASS_REGULAR_EXPRESSION "Received message: ") + Install( FILES ex4-copypush.json DESTINATION share/fairbase/examples/MQ/4-copypush/config/ diff --git a/examples/MQ/4-copypush/FairMQExample4Sampler.cxx b/examples/MQ/4-copypush/FairMQExample4Sampler.cxx index c833bc05..1a5f10b1 100644 --- a/examples/MQ/4-copypush/FairMQExample4Sampler.cxx +++ b/examples/MQ/4-copypush/FairMQExample4Sampler.cxx @@ -17,21 +17,26 @@ #include "FairMQExample4Sampler.h" #include "FairMQLogger.h" +#include "FairMQProgOptions.h" // device->fConfig + +using namespace std; FairMQExample4Sampler::FairMQExample4Sampler() : fNumDataChannels(0) , fCounter(0) + , fMaxIterations(0) + , fNumIterations(0) { } void FairMQExample4Sampler::InitTask() { fNumDataChannels = fChannels.at("data").size(); + fMaxIterations = fConfig->GetValue("max-iterations"); } bool FairMQExample4Sampler::ConditionalRun() { - std::this_thread::sleep_for(std::chrono::seconds(1)); // NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place). // Should only be used for small data because of the cost of an additional copy @@ -44,6 +49,15 @@ bool FairMQExample4Sampler::ConditionalRun() Send(msgCopy, "data", i); } Send(msg, "data", fNumDataChannels - 1); + + if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) + { + LOG(INFO) << "Configured maximum number of iterations reached. Leaving RUNNING state."; + return false; + } + + this_thread::sleep_for(chrono::seconds(1)); + return true; } diff --git a/examples/MQ/4-copypush/FairMQExample4Sampler.h b/examples/MQ/4-copypush/FairMQExample4Sampler.h index fd6b72cc..91e80537 100644 --- a/examples/MQ/4-copypush/FairMQExample4Sampler.h +++ b/examples/MQ/4-copypush/FairMQExample4Sampler.h @@ -31,6 +31,8 @@ class FairMQExample4Sampler : public FairMQDevice int fNumDataChannels; uint64_t fCounter; + uint64_t fMaxIterations; + uint64_t fNumIterations; }; #endif /* FAIRMQEXAMPLE4SAMPLER_H_ */ diff --git a/examples/MQ/4-copypush/FairMQExample4Sink.cxx b/examples/MQ/4-copypush/FairMQExample4Sink.cxx index bf5e44e6..df2f0854 100644 --- a/examples/MQ/4-copypush/FairMQExample4Sink.cxx +++ b/examples/MQ/4-copypush/FairMQExample4Sink.cxx @@ -14,18 +14,33 @@ #include "FairMQExample4Sink.h" #include "FairMQLogger.h" +#include "FairMQProgOptions.h" // device->fConfig #include // uint64_t FairMQExample4Sink::FairMQExample4Sink() + : fMaxIterations(0) + , fNumIterations(0) { OnData("data", &FairMQExample4Sink::HandleData); } +void FairMQExample4Sink::InitTask() +{ + // Get the fMaxIterations value from the command line options (via fConfig) + fMaxIterations = fConfig->GetValue("max-iterations"); +} + bool FairMQExample4Sink::HandleData(FairMQMessagePtr& msg, int /*index*/) { LOG(INFO) << "Received message: \"" << *(static_cast(msg->GetData())) << "\""; + if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) + { + LOG(INFO) << "Configured maximum number of iterations reached. Leaving RUNNING state."; + return false; + } + // return true if want to be called again (otherwise go to IDLE state) return true; } diff --git a/examples/MQ/4-copypush/FairMQExample4Sink.h b/examples/MQ/4-copypush/FairMQExample4Sink.h index e7058b7b..190d3b3b 100644 --- a/examples/MQ/4-copypush/FairMQExample4Sink.h +++ b/examples/MQ/4-copypush/FairMQExample4Sink.h @@ -24,7 +24,12 @@ class FairMQExample4Sink : public FairMQDevice virtual ~FairMQExample4Sink(); protected: + virtual void InitTask(); bool HandleData(FairMQMessagePtr&, int); + + private: + uint64_t fMaxIterations; + uint64_t fNumIterations; }; #endif /* FAIRMQEXAMPLE4SINK_H_ */ diff --git a/examples/MQ/4-copypush/runExample4Sampler.cxx b/examples/MQ/4-copypush/runExample4Sampler.cxx index 58856d88..2bc712f4 100644 --- a/examples/MQ/4-copypush/runExample4Sampler.cxx +++ b/examples/MQ/4-copypush/runExample4Sampler.cxx @@ -11,8 +11,10 @@ namespace bpo = boost::program_options; -void addCustomOptions(bpo::options_description& /*options*/) +void addCustomOptions(bpo::options_description& options) { + options.add_options() + ("max-iterations", bpo::value()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)"); } FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) diff --git a/examples/MQ/4-copypush/runExample4Sink.cxx b/examples/MQ/4-copypush/runExample4Sink.cxx index 1179aa0a..5a956e00 100644 --- a/examples/MQ/4-copypush/runExample4Sink.cxx +++ b/examples/MQ/4-copypush/runExample4Sink.cxx @@ -11,8 +11,10 @@ namespace bpo = boost::program_options; -void addCustomOptions(bpo::options_description& /*options*/) +void addCustomOptions(bpo::options_description& options) { + options.add_options() + ("max-iterations", bpo::value()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)"); } FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) diff --git a/examples/MQ/4-copypush/testMQEx4.sh.in b/examples/MQ/4-copypush/testMQEx4.sh.in new file mode 100755 index 00000000..5ff37e0b --- /dev/null +++ b/examples/MQ/4-copypush/testMQEx4.sh.in @@ -0,0 +1,35 @@ +#!/bin/bash + +ex4config="@CMAKE_BINARY_DIR@/bin/config/ex4-copypush.json" + +# setup a trap to kill everything if the test fails/timeouts +trap 'kill -TERM $SAMPLER_PID; kill -TERM $SINK1_PID; kill -TERM $SINK2_PID; wait $SAMPLER_PID; wait $SINK1_PID; wait $SINK2_PID;' TERM + +SAMPLER="ex4-sampler" +SAMPLER+=" --id sampler1" +SAMPLER+=" --control static --log-color false" +SAMPLER+=" --max-iterations 1" +SAMPLER+=" --mq-config $ex4config" +@CMAKE_BINARY_DIR@/bin/examples/MQ/4-copypush/$SAMPLER & +SAMPLER_PID=$! + +SINK1="ex4-sink" +SINK1+=" --id sink1" +SINK1+=" --control static --log-color false" +SINK1+=" --max-iterations 1" +SINK1+=" --mq-config $ex4config" +@CMAKE_BINARY_DIR@/bin/examples/MQ/4-copypush/$SINK1 & +SINK1_PID=$! + +SINK2="ex4-sink" +SINK2+=" --id sink2" +SINK2+=" --control static --log-color false" +SINK2+=" --max-iterations 1" +SINK2+=" --mq-config $ex4config" +@CMAKE_BINARY_DIR@/bin/examples/MQ/4-copypush/$SINK2 & +SINK2_PID=$! + +# wait for everything to finish +wait $SAMPLER_PID +wait $SINK1_PID +wait $SINK2_PID diff --git a/examples/MQ/5-req-rep/CMakeLists.txt b/examples/MQ/5-req-rep/CMakeLists.txt index dd590ee7..1b9bd322 100644 --- a/examples/MQ/5-req-rep/CMakeLists.txt +++ b/examples/MQ/5-req-rep/CMakeLists.txt @@ -6,12 +6,11 @@ # copied verbatim in the file "LICENSE" # ################################################################################ -configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/5-req-rep/ex5-req-rep.json - ${CMAKE_BINARY_DIR}/bin/config/ex5-req-rep.json) -configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/5-req-rep/startMQEx5.sh.in - ${CMAKE_BINARY_DIR}/bin/examples/MQ/5-req-rep/startMQEx5.sh) +configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/5-req-rep/ex5-req-rep.json ${CMAKE_BINARY_DIR}/bin/config/ex5-req-rep.json) +configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/5-req-rep/startMQEx5.sh.in ${CMAKE_BINARY_DIR}/bin/examples/MQ/5-req-rep/startMQEx5.sh) +configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/5-req-rep/testMQEx5.sh.in ${CMAKE_BINARY_DIR}/bin/examples/MQ/5-req-rep/testMQEx5.sh) -Set(INCLUDE_DIRECTORIES +set(INCLUDE_DIRECTORIES ${CMAKE_SOURCE_DIR}/fairmq ${CMAKE_SOURCE_DIR}/fairmq/devices ${CMAKE_SOURCE_DIR}/fairmq/tools @@ -20,19 +19,19 @@ Set(INCLUDE_DIRECTORIES ${CMAKE_CURRENT_BINARY_DIR} ) -Set(SYSTEM_INCLUDE_DIRECTORIES +set(SYSTEM_INCLUDE_DIRECTORIES ${Boost_INCLUDE_DIR} ${ZeroMQ_INCLUDE_DIR} ) -Include_Directories(${INCLUDE_DIRECTORIES}) -Include_Directories(SYSTEM ${SYSTEM_INCLUDE_DIRECTORIES}) +include_directories(${INCLUDE_DIRECTORIES}) +include_directories(SYSTEM ${SYSTEM_INCLUDE_DIRECTORIES}) -Set(LINK_DIRECTORIES +set(LINK_DIRECTORIES ${Boost_LIBRARY_DIRS} ) -Link_Directories(${LINK_DIRECTORIES}) +link_directories(${LINK_DIRECTORIES}) set(SRCS "FairMQExample5Client.cxx" @@ -64,16 +63,21 @@ math(EXPR _length ${_length}-1) set(BIN_DESTINATION share/fairbase/examples/MQ/5-req-rep/bin) set(EXECUTABLE_OUTPUT_PATH "${EXECUTABLE_OUTPUT_PATH}/examples/MQ/5-req-rep") -ForEach(_file RANGE 0 ${_length}) +foreach(_file RANGE 0 ${_length}) list(GET Exe_Names ${_file} _name) list(GET Exe_Source ${_file} _src) set(EXE_NAME ${_name}) set(SRCS ${_src}) set(DEPENDENCIES FairMQExample5) GENERATE_EXECUTABLE() -EndForEach(_file RANGE 0 ${_length}) +endforeach(_file RANGE 0 ${_length}) -Install( +add_test(NAME MQ.ex5-req-rep COMMAND ${CMAKE_BINARY_DIR}/bin/examples/MQ/5-req-rep/testMQEx5.sh) +set_tests_properties(MQ.ex5-req-rep PROPERTIES TIMEOUT "30") +set_tests_properties(MQ.ex5-req-rep PROPERTIES RUN_SERIAL true) +set_tests_properties(MQ.ex5-req-rep PROPERTIES PASS_REGULAR_EXPRESSION "Received reply from server: ") + +install( FILES ex5-req-rep.json DESTINATION share/fairbase/examples/MQ/5-req-rep/config/ ) diff --git a/examples/MQ/5-req-rep/FairMQExample5Client.cxx b/examples/MQ/5-req-rep/FairMQExample5Client.cxx index edc0585c..7ef5415d 100644 --- a/examples/MQ/5-req-rep/FairMQExample5Client.cxx +++ b/examples/MQ/5-req-rep/FairMQExample5Client.cxx @@ -24,17 +24,19 @@ using namespace std; FairMQExample5Client::FairMQExample5Client() : fText() + , fMaxIterations(0) + , fNumIterations(0) { } void FairMQExample5Client::InitTask() { fText = fConfig->GetValue("text"); + fMaxIterations = fConfig->GetValue("max-iterations"); } bool FairMQExample5Client::ConditionalRun() { - this_thread::sleep_for(chrono::seconds(1)); string* text = new string(fText); @@ -55,6 +57,15 @@ bool FairMQExample5Client::ConditionalRun() if (Receive(reply, "data") >= 0) { LOG(INFO) << "Received reply from server: \"" << string(static_cast(reply->GetData()), reply->GetSize()) << "\""; + + if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) + { + LOG(INFO) << "Configured maximum number of iterations reached. Leaving RUNNING state."; + return false; + } + + this_thread::sleep_for(chrono::seconds(1)); + return true; } } diff --git a/examples/MQ/5-req-rep/FairMQExample5Client.h b/examples/MQ/5-req-rep/FairMQExample5Client.h index 8d28785e..f1ba54cd 100644 --- a/examples/MQ/5-req-rep/FairMQExample5Client.h +++ b/examples/MQ/5-req-rep/FairMQExample5Client.h @@ -27,6 +27,8 @@ class FairMQExample5Client : public FairMQDevice protected: std::string fText; + uint64_t fMaxIterations; + uint64_t fNumIterations; virtual bool ConditionalRun(); virtual void InitTask(); diff --git a/examples/MQ/5-req-rep/FairMQExample5Server.cxx b/examples/MQ/5-req-rep/FairMQExample5Server.cxx index 09fb4059..e4e64d68 100644 --- a/examples/MQ/5-req-rep/FairMQExample5Server.cxx +++ b/examples/MQ/5-req-rep/FairMQExample5Server.cxx @@ -14,14 +14,23 @@ #include "FairMQExample5Server.h" #include "FairMQLogger.h" +#include "FairMQProgOptions.h" // device->fConfig using namespace std; FairMQExample5Server::FairMQExample5Server() + : fMaxIterations(0) + , fNumIterations(0) { OnData("data", &FairMQExample5Server::HandleData); } +void FairMQExample5Server::InitTask() +{ + // Get the fMaxIterations value from the command line options (via fConfig) + fMaxIterations = fConfig->GetValue("max-iterations"); +} + bool FairMQExample5Server::HandleData(FairMQMessagePtr& request, int /*index*/) { LOG(INFO) << "Received request from client: \"" << string(static_cast(request->GetData()), request->GetSize()) << "\""; @@ -37,6 +46,12 @@ bool FairMQExample5Server::HandleData(FairMQMessagePtr& request, int /*index*/) if (Send(reply, "data") > 0) { + if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) + { + LOG(INFO) << "Configured maximum number of iterations reached. Leaving RUNNING state."; + return false; + } + return true; } diff --git a/examples/MQ/5-req-rep/FairMQExample5Server.h b/examples/MQ/5-req-rep/FairMQExample5Server.h index 2291f6ea..be63cb0c 100644 --- a/examples/MQ/5-req-rep/FairMQExample5Server.h +++ b/examples/MQ/5-req-rep/FairMQExample5Server.h @@ -24,7 +24,12 @@ class FairMQExample5Server : public FairMQDevice virtual ~FairMQExample5Server(); protected: + virtual void InitTask(); bool HandleData(FairMQMessagePtr&, int); + + private: + uint64_t fMaxIterations; + uint64_t fNumIterations; }; #endif /* FAIRMQEXAMPLE5SERVER_H_ */ diff --git a/examples/MQ/5-req-rep/runExample5Client.cxx b/examples/MQ/5-req-rep/runExample5Client.cxx index 8c4b4fbe..e43efff5 100644 --- a/examples/MQ/5-req-rep/runExample5Client.cxx +++ b/examples/MQ/5-req-rep/runExample5Client.cxx @@ -14,7 +14,8 @@ namespace bpo = boost::program_options; void addCustomOptions(bpo::options_description& options) { options.add_options() - ("text", bpo::value()->default_value("Hello"), "Text to send out"); + ("text", bpo::value()->default_value("Hello"), "Text to send out") + ("max-iterations", bpo::value()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)"); } FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) diff --git a/examples/MQ/5-req-rep/runExample5Server.cxx b/examples/MQ/5-req-rep/runExample5Server.cxx index c990b001..a1c5adf9 100644 --- a/examples/MQ/5-req-rep/runExample5Server.cxx +++ b/examples/MQ/5-req-rep/runExample5Server.cxx @@ -11,8 +11,10 @@ namespace bpo = boost::program_options; -void addCustomOptions(bpo::options_description& /*options*/) +void addCustomOptions(bpo::options_description& options) { + options.add_options() + ("max-iterations", bpo::value()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)"); } FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) diff --git a/examples/MQ/5-req-rep/testMQEx5.sh.in b/examples/MQ/5-req-rep/testMQEx5.sh.in new file mode 100755 index 00000000..6e95502e --- /dev/null +++ b/examples/MQ/5-req-rep/testMQEx5.sh.in @@ -0,0 +1,26 @@ +#!/bin/bash + +ex5config="@CMAKE_BINARY_DIR@/bin/config/ex5-req-rep.json" + +# setup a trap to kill everything if the test fails/timeouts +trap 'kill -TERM $CLIENT_PID; kill -TERM $SERVER_PID; wait $CLIENT_PID; wait $SERVER_PID;' TERM + +CLIENT="ex5-client" +CLIENT+=" --id client" +CLIENT+=" --control static --log-color false" +CLIENT+=" --max-iterations 1" +CLIENT+=" --mq-config $ex5config" +@CMAKE_BINARY_DIR@/bin/examples/MQ/5-req-rep/$CLIENT & +CLIENT_PID=$! + +SERVER="ex5-server" +SERVER+=" --id server" +SERVER+=" --control static --log-color false" +SERVER+=" --max-iterations 1" +SERVER+=" --mq-config $ex5config" +@CMAKE_BINARY_DIR@/bin/examples/MQ/5-req-rep/$SERVER & +SERVER_PID=$! + +# wait for everything to finish +wait $CLIENT_PID +wait $SERVER_PID diff --git a/examples/MQ/6-multiple-channels/CMakeLists.txt b/examples/MQ/6-multiple-channels/CMakeLists.txt index e421fb9f..2af88b21 100644 --- a/examples/MQ/6-multiple-channels/CMakeLists.txt +++ b/examples/MQ/6-multiple-channels/CMakeLists.txt @@ -6,12 +6,11 @@ # copied verbatim in the file "LICENSE" # ################################################################################ -configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/6-multiple-channels/ex6-multiple-channels.json - ${CMAKE_BINARY_DIR}/bin/config/ex6-multiple-channels.json) -configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/6-multiple-channels/startMQEx6.sh.in - ${CMAKE_BINARY_DIR}/bin/examples/MQ/6-multiple-channels/startMQEx6.sh) +configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/6-multiple-channels/ex6-multiple-channels.json ${CMAKE_BINARY_DIR}/bin/config/ex6-multiple-channels.json) +configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/6-multiple-channels/startMQEx6.sh.in ${CMAKE_BINARY_DIR}/bin/examples/MQ/6-multiple-channels/startMQEx6.sh) +configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/6-multiple-channels/testMQEx6.sh.in ${CMAKE_BINARY_DIR}/bin/examples/MQ/6-multiple-channels/testMQEx6.sh) -Set(INCLUDE_DIRECTORIES +set(INCLUDE_DIRECTORIES ${CMAKE_SOURCE_DIR}/fairmq ${CMAKE_SOURCE_DIR}/fairmq/devices ${CMAKE_SOURCE_DIR}/fairmq/tools @@ -20,42 +19,42 @@ Set(INCLUDE_DIRECTORIES ${CMAKE_CURRENT_BINARY_DIR} ) -Set(SYSTEM_INCLUDE_DIRECTORIES +set(SYSTEM_INCLUDE_DIRECTORIES ${Boost_INCLUDE_DIR} ${ZeroMQ_INCLUDE_DIR} ) -Include_Directories(${INCLUDE_DIRECTORIES}) -Include_Directories(SYSTEM ${SYSTEM_INCLUDE_DIRECTORIES}) +include_directories(${INCLUDE_DIRECTORIES}) +include_directories(SYSTEM ${SYSTEM_INCLUDE_DIRECTORIES}) -Set(LINK_DIRECTORIES +set(LINK_DIRECTORIES ${Boost_LIBRARY_DIRS} ) -Link_Directories(${LINK_DIRECTORIES}) +link_directories(${LINK_DIRECTORIES}) -Set(SRCS +set(SRCS "FairMQExample6Sampler.cxx" "FairMQExample6Sink.cxx" "FairMQExample6Broadcaster.cxx" ) -Set(DEPENDENCIES +set(DEPENDENCIES ${DEPENDENCIES} FairMQ ) -Set(LIBRARY_NAME FairMQExample6) +set(LIBRARY_NAME FairMQExample6) GENERATE_LIBRARY() -Set(Exe_Names +set(Exe_Names ex6-sampler ex6-sink ex6-broadcaster ) -Set(Exe_Source +set(Exe_Source runExample6Sampler.cxx runExample6Sink.cxx runExample6Broadcaster.cxx @@ -67,16 +66,21 @@ math(EXPR _length ${_length}-1) set(BIN_DESTINATION share/fairbase/examples/MQ/6-multiple-channels/bin) set(EXECUTABLE_OUTPUT_PATH "${EXECUTABLE_OUTPUT_PATH}/examples/MQ/6-multiple-channels") -ForEach(_file RANGE 0 ${_length}) +foreach(_file RANGE 0 ${_length}) list(GET Exe_Names ${_file} _name) list(GET Exe_Source ${_file} _src) set(EXE_NAME ${_name}) set(SRCS ${_src}) set(DEPENDENCIES FairMQExample6) GENERATE_EXECUTABLE() -EndForEach(_file RANGE 0 ${_length}) +endforeach(_file RANGE 0 ${_length}) -Install( +add_test(NAME MQ.ex6-multiple-channels COMMAND ${CMAKE_BINARY_DIR}/bin/examples/MQ/6-multiple-channels/testMQEx6.sh) +set_tests_properties(MQ.ex6-multiple-channels PROPERTIES TIMEOUT "30") +set_tests_properties(MQ.ex6-multiple-channels PROPERTIES RUN_SERIAL true) +set_tests_properties(MQ.ex6-multiple-channels PROPERTIES PASS_REGULAR_EXPRESSION "Received messages from both sources.") + +install( FILES ex6-multiple-channels.json DESTINATION share/fairbase/examples/MQ/6-multiple-channels/config/ ) diff --git a/examples/MQ/6-multiple-channels/FairMQExample6Sampler.cxx b/examples/MQ/6-multiple-channels/FairMQExample6Sampler.cxx index 7c997271..069842a1 100644 --- a/examples/MQ/6-multiple-channels/FairMQExample6Sampler.cxx +++ b/examples/MQ/6-multiple-channels/FairMQExample6Sampler.cxx @@ -25,12 +25,15 @@ using namespace std; FairMQExample6Sampler::FairMQExample6Sampler() : fText() + , fMaxIterations(0) + , fNumIterations(0) { } void FairMQExample6Sampler::InitTask() { fText = fConfig->GetValue("text"); + fMaxIterations = fConfig->GetValue("max-iterations"); } void FairMQExample6Sampler::Run() @@ -62,6 +65,12 @@ void FairMQExample6Sampler::Run() LOG(INFO) << "Sent \"" << fText << "\""; } } + + if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) + { + LOG(INFO) << "Configured maximum number of iterations reached. Leaving RUNNING state."; + break; + } } } diff --git a/examples/MQ/6-multiple-channels/FairMQExample6Sampler.h b/examples/MQ/6-multiple-channels/FairMQExample6Sampler.h index 102d272e..6160d338 100644 --- a/examples/MQ/6-multiple-channels/FairMQExample6Sampler.h +++ b/examples/MQ/6-multiple-channels/FairMQExample6Sampler.h @@ -27,6 +27,8 @@ class FairMQExample6Sampler : public FairMQDevice protected: std::string fText; + uint64_t fMaxIterations; + uint64_t fNumIterations; virtual void Run(); virtual void InitTask(); diff --git a/examples/MQ/6-multiple-channels/FairMQExample6Sink.cxx b/examples/MQ/6-multiple-channels/FairMQExample6Sink.cxx index 87803ddf..810e464b 100644 --- a/examples/MQ/6-multiple-channels/FairMQExample6Sink.cxx +++ b/examples/MQ/6-multiple-channels/FairMQExample6Sink.cxx @@ -15,27 +15,52 @@ #include // unique_ptr #include "FairMQExample6Sink.h" -#include "FairMQPoller.h" #include "FairMQLogger.h" +#include "FairMQProgOptions.h" using namespace std; FairMQExample6Sink::FairMQExample6Sink() + : fReceivedData(false) + , fReceivedBroadcast(false) + , fMaxIterations(0) + , fNumIterations(0) { OnData("broadcast", &FairMQExample6Sink::HandleBroadcast); OnData("data", &FairMQExample6Sink::HandleData); } +void FairMQExample6Sink::InitTask() +{ + fMaxIterations = fConfig->GetValue("max-iterations"); +} + bool FairMQExample6Sink::HandleBroadcast(FairMQMessagePtr& msg, int /*index*/) { LOG(INFO) << "Received broadcast: \"" << string(static_cast(msg->GetData()), msg->GetSize()) << "\""; + fReceivedBroadcast = true; - return true; + return CheckIterations(); } bool FairMQExample6Sink::HandleData(FairMQMessagePtr& msg, int /*index*/) { LOG(INFO) << "Received message: \"" << string(static_cast(msg->GetData()), msg->GetSize()) << "\""; + fReceivedData = true; + + return CheckIterations(); +} + +bool FairMQExample6Sink::CheckIterations() +{ + if (fMaxIterations > 0) + { + if (fReceivedData && fReceivedBroadcast && ++fNumIterations >= fMaxIterations) + { + LOG(INFO) << "Configured maximum number of iterations reached & Received messages from both sources. Leaving RUNNING state."; + return false; + } + } return true; } diff --git a/examples/MQ/6-multiple-channels/FairMQExample6Sink.h b/examples/MQ/6-multiple-channels/FairMQExample6Sink.h index e6a77f72..29bcda39 100644 --- a/examples/MQ/6-multiple-channels/FairMQExample6Sink.h +++ b/examples/MQ/6-multiple-channels/FairMQExample6Sink.h @@ -26,6 +26,14 @@ class FairMQExample6Sink : public FairMQDevice protected: bool HandleBroadcast(FairMQMessagePtr&, int); bool HandleData(FairMQMessagePtr&, int); + bool CheckIterations(); + virtual void InitTask(); + + private: + bool fReceivedData; + bool fReceivedBroadcast; + uint64_t fMaxIterations; + uint64_t fNumIterations; }; #endif /* FAIRMQEXAMPLE6SINK_H_ */ diff --git a/examples/MQ/6-multiple-channels/runExample6Sampler.cxx b/examples/MQ/6-multiple-channels/runExample6Sampler.cxx index 2298ac36..87fe75a5 100644 --- a/examples/MQ/6-multiple-channels/runExample6Sampler.cxx +++ b/examples/MQ/6-multiple-channels/runExample6Sampler.cxx @@ -14,7 +14,8 @@ namespace bpo = boost::program_options; void addCustomOptions(bpo::options_description& options) { options.add_options() - ("text", bpo::value()->default_value("Hello"), "Text to send out"); + ("text", bpo::value()->default_value("Hello"), "Text to send out") + ("max-iterations", bpo::value()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)"); } FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) diff --git a/examples/MQ/6-multiple-channels/runExample6Sink.cxx b/examples/MQ/6-multiple-channels/runExample6Sink.cxx index 2b67c6fc..867844f6 100644 --- a/examples/MQ/6-multiple-channels/runExample6Sink.cxx +++ b/examples/MQ/6-multiple-channels/runExample6Sink.cxx @@ -11,8 +11,10 @@ namespace bpo = boost::program_options; -void addCustomOptions(bpo::options_description& /*options*/) +void addCustomOptions(bpo::options_description& options) { + options.add_options() + ("max-iterations", bpo::value()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)"); } FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) diff --git a/examples/MQ/6-multiple-channels/testMQEx6.sh.in b/examples/MQ/6-multiple-channels/testMQEx6.sh.in new file mode 100755 index 00000000..94969685 --- /dev/null +++ b/examples/MQ/6-multiple-channels/testMQEx6.sh.in @@ -0,0 +1,40 @@ +#!/bin/bash +ex6config="@CMAKE_BINARY_DIR@/bin/config/ex6-multiple-channels.json" + +# setup a trap to kill everything if the test fails/timeouts +trap 'kill -TERM $SAMPLER_PID; kill -TERM $SINK_PID; kill -TERM $BROADCASTER_PID; wait $SAMPLER_PID; wait $SINK_PID; wait $BROADCASTER_PID;' TERM + + +SINK="ex6-sink" +SINK+=" --id sink1" +SINK+=" --max-iterations 1" +SINK+=" --control static --log-color false" +SINK+=" --mq-config $ex6config" +@CMAKE_BINARY_DIR@/bin/examples/MQ/6-multiple-channels/$SINK & +SINK_PID=$! + +sleep 1 + +SAMPLER="ex6-sampler" +SAMPLER+=" --id sampler1" +SAMPLER+=" --max-iterations 1" +SAMPLER+=" --control static --log-color false" +SAMPLER+=" --mq-config $ex6config" +@CMAKE_BINARY_DIR@/bin/examples/MQ/6-multiple-channels/$SAMPLER & +SAMPLER_PID=$! + +BROADCASTER="ex6-broadcaster" +BROADCASTER+=" --id broadcaster1" +BROADCASTER+=" --control static --log-color false" +BROADCASTER+=" --mq-config $ex6config" +@CMAKE_BINARY_DIR@/bin/examples/MQ/6-multiple-channels/$BROADCASTER & +BROADCASTER_PID=$! + +wait $SAMPLER_PID +wait $SINK_PID + +# stop broadcaster +kill -SIGINT $BROADCASTER_PID + +# wait for broadcaster to finish +wait $BROADCASTER_PID diff --git a/examples/MQ/8-multipart/CMakeLists.txt b/examples/MQ/8-multipart/CMakeLists.txt index 37e67804..1383f2e3 100644 --- a/examples/MQ/8-multipart/CMakeLists.txt +++ b/examples/MQ/8-multipart/CMakeLists.txt @@ -6,10 +6,9 @@ # copied verbatim in the file "LICENSE" # ################################################################################ -configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/8-multipart/ex8-multipart.json - ${CMAKE_BINARY_DIR}/bin/config/ex8-multipart.json) -configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/8-multipart/startMQEx8.sh.in - ${CMAKE_BINARY_DIR}/bin/examples/MQ/8-multipart/startMQEx8.sh) +configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/8-multipart/ex8-multipart.json ${CMAKE_BINARY_DIR}/bin/config/ex8-multipart.json) +configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/8-multipart/startMQEx8.sh.in ${CMAKE_BINARY_DIR}/bin/examples/MQ/8-multipart/startMQEx8.sh) +configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/8-multipart/testMQEx8.sh.in ${CMAKE_BINARY_DIR}/bin/examples/MQ/8-multipart/testMQEx8.sh) Set(INCLUDE_DIRECTORIES ${CMAKE_SOURCE_DIR}/fairmq @@ -73,6 +72,11 @@ ForEach(_file RANGE 0 ${_length}) GENERATE_EXECUTABLE() EndForEach(_file RANGE 0 ${_length}) +add_test(NAME MQ.ex8-multipart COMMAND ${CMAKE_BINARY_DIR}/bin/examples/MQ/8-multipart/testMQEx8.sh) +set_tests_properties(MQ.ex8-multipart PROPERTIES TIMEOUT "30") +set_tests_properties(MQ.ex8-multipart PROPERTIES RUN_SERIAL true) +set_tests_properties(MQ.ex8-multipart PROPERTIES PASS_REGULAR_EXPRESSION "Received message with 2 parts") + Install( FILES ex8-multipart.json DESTINATION share/fairbase/examples/MQ/8-multipart/config/ diff --git a/examples/MQ/8-multipart/FairMQExample8Sampler.cxx b/examples/MQ/8-multipart/FairMQExample8Sampler.cxx index 2677fd2d..83d13b5e 100644 --- a/examples/MQ/8-multipart/FairMQExample8Sampler.cxx +++ b/examples/MQ/8-multipart/FairMQExample8Sampler.cxx @@ -18,22 +18,31 @@ #include "FairMQExample8Sampler.h" #include "FairMQEx8Header.h" #include "FairMQLogger.h" +#include "FairMQProgOptions.h" using namespace std; FairMQExample8Sampler::FairMQExample8Sampler() - : fCounter(0) + : fMaxIterations(5) + , fNumIterations(0) { } +void FairMQExample8Sampler::InitTask() +{ + fMaxIterations = fConfig->GetValue("max-iterations"); +} + bool FairMQExample8Sampler::ConditionalRun() { - // Wait a second to keep the output readable. - this_thread::sleep_for(chrono::seconds(1)); - Ex8Header header; - // Set stopFlag to 1 for the first 4 messages, and to 0 for the 5th. - fCounter < 5 ? header.stopFlag = 0 : header.stopFlag = 1; + header.stopFlag = 0; + + // Set stopFlag to 1 for last message. + if (fMaxIterations > 0 && fNumIterations == fMaxIterations - 1) + { + header.stopFlag = 1; + } LOG(INFO) << "Sending header with stopFlag: " << header.stopFlag; FairMQParts parts; @@ -48,11 +57,15 @@ bool FairMQExample8Sampler::ConditionalRun() Send(parts, "data-out"); // Go out of the sending loop if the stopFlag was sent. - if (fCounter++ == 5) + if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) { + LOG(INFO) << "Configured maximum number of iterations reached. Leaving RUNNING state."; return false; } + // Wait a second to keep the output readable. + this_thread::sleep_for(chrono::seconds(1)); + return true; } diff --git a/examples/MQ/8-multipart/FairMQExample8Sampler.h b/examples/MQ/8-multipart/FairMQExample8Sampler.h index 9c55c0cc..8b403c28 100644 --- a/examples/MQ/8-multipart/FairMQExample8Sampler.h +++ b/examples/MQ/8-multipart/FairMQExample8Sampler.h @@ -24,10 +24,12 @@ class FairMQExample8Sampler : public FairMQDevice virtual ~FairMQExample8Sampler(); protected: + virtual void InitTask(); virtual bool ConditionalRun(); private: - int fCounter; + uint64_t fMaxIterations; + uint64_t fNumIterations; }; #endif /* FAIRMQEXAMPLE8SAMPLER_H_ */ diff --git a/examples/MQ/8-multipart/FairMQExample8Sink.cxx b/examples/MQ/8-multipart/FairMQExample8Sink.cxx index 44dfa6e5..e5976749 100644 --- a/examples/MQ/8-multipart/FairMQExample8Sink.cxx +++ b/examples/MQ/8-multipart/FairMQExample8Sink.cxx @@ -27,11 +27,15 @@ bool FairMQExample8Sink::HandleData(FairMQParts& parts, int /*index*/) { Ex8Header header; header.stopFlag = (static_cast(parts.At(0)->GetData()))->stopFlag; + + LOG(INFO) << "Received message with " << parts.Size() << " parts"; + LOG(INFO) << "Received header with stopFlag: " << header.stopFlag; LOG(INFO) << "Received body of size: " << parts.At(1)->GetSize(); + if (header.stopFlag == 1) { - LOG(INFO) << "stopFlag is 0, going IDLE"; + LOG(INFO) << "stopFlag is 1, going IDLE"; return false; } diff --git a/examples/MQ/8-multipart/runExample8Sampler.cxx b/examples/MQ/8-multipart/runExample8Sampler.cxx index 1966286f..4234f445 100644 --- a/examples/MQ/8-multipart/runExample8Sampler.cxx +++ b/examples/MQ/8-multipart/runExample8Sampler.cxx @@ -11,8 +11,10 @@ namespace bpo = boost::program_options; -void addCustomOptions(bpo::options_description& /*options*/) +void addCustomOptions(bpo::options_description& options) { + options.add_options() + ("max-iterations", bpo::value()->default_value(5), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)"); } FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) diff --git a/examples/MQ/8-multipart/testMQEx8.sh.in b/examples/MQ/8-multipart/testMQEx8.sh.in new file mode 100755 index 00000000..3147c4d0 --- /dev/null +++ b/examples/MQ/8-multipart/testMQEx8.sh.in @@ -0,0 +1,23 @@ +#!/bin/bash +ex8config="@CMAKE_BINARY_DIR@/bin/config/ex8-multipart.json" + +# setup a trap to kill everything if the test fails/timeouts +trap 'kill -TERM $SAMPLER_PID; kill -TERM $SINK_PID; wait $SAMPLER_PID; wait $SINK_PID;' TERM + +SAMPLER="ex8-sampler" +SAMPLER+=" --id sampler1" +SAMPLER+=" --max-iterations 1" +SAMPLER+=" --control static --log-color false" +SAMPLER+=" --mq-config $ex8config" +@CMAKE_BINARY_DIR@/bin/examples/MQ/8-multipart/$SAMPLER & +SAMPLER_PID=$! + +SINK="ex8-sink" +SINK+=" --id sink1" +SINK+=" --control static --log-color false" +SINK+=" --mq-config $ex8config" +@CMAKE_BINARY_DIR@/bin/examples/MQ/8-multipart/$SINK & +SINK_PID=$! + +wait $SAMPLER_PID +wait $SINK_PID diff --git a/examples/MQ/multiple-transports/CMakeLists.txt b/examples/MQ/multiple-transports/CMakeLists.txt index 3153e582..c2681954 100644 --- a/examples/MQ/multiple-transports/CMakeLists.txt +++ b/examples/MQ/multiple-transports/CMakeLists.txt @@ -6,10 +6,9 @@ # copied verbatim in the file "LICENSE" # ################################################################################ -configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/multiple-transports/ex-multiple-transports.json - ${CMAKE_BINARY_DIR}/bin/config/ex-multiple-transports.json) -configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/multiple-transports/startMTEx.sh.in - ${CMAKE_BINARY_DIR}/bin/examples/MQ/multiple-transports/startMTEx.sh) +configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/multiple-transports/ex-multiple-transports.json ${CMAKE_BINARY_DIR}/bin/config/ex-multiple-transports.json) +configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/multiple-transports/startMTEx.sh.in ${CMAKE_BINARY_DIR}/bin/examples/MQ/multiple-transports/startMTEx.sh) +configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/multiple-transports/testMTEx.sh.in ${CMAKE_BINARY_DIR}/bin/examples/MQ/multiple-transports/testMTEx.sh) Set(INCLUDE_DIRECTORIES ${CMAKE_SOURCE_DIR}/fairmq @@ -76,6 +75,11 @@ ForEach(_file RANGE 0 ${_length}) GENERATE_EXECUTABLE() EndForEach(_file RANGE 0 ${_length}) +add_test(NAME MQ.ex-multiple-transports COMMAND ${CMAKE_BINARY_DIR}/bin/examples/MQ/multiple-transports/testMTEx.sh) +set_tests_properties(MQ.ex-multiple-transports PROPERTIES TIMEOUT "30") +set_tests_properties(MQ.ex-multiple-transports PROPERTIES RUN_SERIAL true) +set_tests_properties(MQ.ex-multiple-transports PROPERTIES PASS_REGULAR_EXPRESSION "Received messages from both sources.") + Install( FILES ex-multiple-transports.json DESTINATION share/fairbase/examples/MQ/multiple-transports/config/ diff --git a/examples/MQ/multiple-transports/FairMQExampleMTSampler1.cxx b/examples/MQ/multiple-transports/FairMQExampleMTSampler1.cxx index 67d242cd..f26d6e67 100644 --- a/examples/MQ/multiple-transports/FairMQExampleMTSampler1.cxx +++ b/examples/MQ/multiple-transports/FairMQExampleMTSampler1.cxx @@ -14,11 +14,14 @@ using namespace std; FairMQExampleMTSampler1::FairMQExampleMTSampler1() : fAckListener() + , fMaxIterations(0) + , fNumIterations(0) { } void FairMQExampleMTSampler1::InitTask() { + fMaxIterations = fConfig->GetValue("max-iterations"); } void FairMQExampleMTSampler1::PreRun() @@ -38,6 +41,12 @@ bool FairMQExampleMTSampler1::ConditionalRun() return false; } + if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) + { + LOG(INFO) << "Configured maximum number of iterations reached. Leaving RUNNING state."; + return false; + } + return true; } diff --git a/examples/MQ/multiple-transports/FairMQExampleMTSampler1.h b/examples/MQ/multiple-transports/FairMQExampleMTSampler1.h index 80b4916e..8dabaa95 100644 --- a/examples/MQ/multiple-transports/FairMQExampleMTSampler1.h +++ b/examples/MQ/multiple-transports/FairMQExampleMTSampler1.h @@ -28,6 +28,8 @@ class FairMQExampleMTSampler1 : public FairMQDevice void ListenForAcks(); std::thread fAckListener; + uint64_t fMaxIterations; + uint64_t fNumIterations; }; #endif /* FAIRMQEXAMPLEMTSAMPLER1_H_ */ diff --git a/examples/MQ/multiple-transports/FairMQExampleMTSampler2.cxx b/examples/MQ/multiple-transports/FairMQExampleMTSampler2.cxx index 40a52054..65838127 100644 --- a/examples/MQ/multiple-transports/FairMQExampleMTSampler2.cxx +++ b/examples/MQ/multiple-transports/FairMQExampleMTSampler2.cxx @@ -13,9 +13,16 @@ using namespace std; FairMQExampleMTSampler2::FairMQExampleMTSampler2() + : fMaxIterations(0) + , fNumIterations(0) { } +void FairMQExampleMTSampler2::InitTask() +{ + fMaxIterations = fConfig->GetValue("max-iterations"); +} + bool FairMQExampleMTSampler2::ConditionalRun() { FairMQMessagePtr msg(NewMessage(1000)); @@ -27,6 +34,12 @@ bool FairMQExampleMTSampler2::ConditionalRun() return false; } + if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) + { + LOG(INFO) << "Configured maximum number of iterations reached. Leaving RUNNING state."; + return false; + } + return true; } diff --git a/examples/MQ/multiple-transports/FairMQExampleMTSampler2.h b/examples/MQ/multiple-transports/FairMQExampleMTSampler2.h index cf08a30e..cc3b1c5d 100644 --- a/examples/MQ/multiple-transports/FairMQExampleMTSampler2.h +++ b/examples/MQ/multiple-transports/FairMQExampleMTSampler2.h @@ -21,7 +21,12 @@ class FairMQExampleMTSampler2 : public FairMQDevice virtual ~FairMQExampleMTSampler2(); protected: + virtual void InitTask(); virtual bool ConditionalRun(); + + private: + uint64_t fMaxIterations; + uint64_t fNumIterations; }; #endif /* FAIRMQEXAMPLEMTSAMPLER2_H_ */ diff --git a/examples/MQ/multiple-transports/FairMQExampleMTSink.cxx b/examples/MQ/multiple-transports/FairMQExampleMTSink.cxx index a557f175..4cb499cd 100644 --- a/examples/MQ/multiple-transports/FairMQExampleMTSink.cxx +++ b/examples/MQ/multiple-transports/FairMQExampleMTSink.cxx @@ -14,19 +14,31 @@ #include "FairMQExampleMTSink.h" #include "FairMQLogger.h" +#include "FairMQProgOptions.h" using namespace std; FairMQExampleMTSink::FairMQExampleMTSink() + : fMaxIterations(0) + , fNumIterations1(0) + , fNumIterations2(0) + , fReceived1(false) + , fReceived2(false) { // register a handler for data arriving on "data" channel OnData("data1", &FairMQExampleMTSink::HandleData1); OnData("data2", &FairMQExampleMTSink::HandleData2); } +void FairMQExampleMTSink::InitTask() +{ + fMaxIterations = fConfig->GetValue("max-iterations"); +} + // handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0) bool FairMQExampleMTSink::HandleData1(FairMQMessagePtr& /*msg*/, int /*index*/) { + fNumIterations1++; // Creates a message using the transport of channel ack FairMQMessagePtr ack(NewMessageFor("ack", 0)); if (Send(ack, "ack") < 0) @@ -35,13 +47,28 @@ bool FairMQExampleMTSink::HandleData1(FairMQMessagePtr& /*msg*/, int /*index*/) } // return true if want to be called again (otherwise go to IDLE state) - return true; + return CheckIterations(); } // handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0) bool FairMQExampleMTSink::HandleData2(FairMQMessagePtr& /*msg*/, int /*index*/) { + fNumIterations2++; // return true if want to be called again (otherwise go to IDLE state) + return CheckIterations(); +} + +bool FairMQExampleMTSink::CheckIterations() +{ + if (fMaxIterations > 0) + { + if (fNumIterations1 >= fMaxIterations && fNumIterations2 >= fMaxIterations) + { + LOG(INFO) << "Configured maximum number of iterations reached & Received messages from both sources. Leaving RUNNING state."; + return false; + } + } + return true; } diff --git a/examples/MQ/multiple-transports/FairMQExampleMTSink.h b/examples/MQ/multiple-transports/FairMQExampleMTSink.h index 1bfbcab6..a2d178b6 100644 --- a/examples/MQ/multiple-transports/FairMQExampleMTSink.h +++ b/examples/MQ/multiple-transports/FairMQExampleMTSink.h @@ -24,8 +24,17 @@ class FairMQExampleMTSink : public FairMQDevice virtual ~FairMQExampleMTSink(); protected: + virtual void InitTask(); bool HandleData1(FairMQMessagePtr&, int); bool HandleData2(FairMQMessagePtr&, int); + bool CheckIterations(); + + private: + uint64_t fMaxIterations; + uint64_t fNumIterations1; + uint64_t fNumIterations2; + bool fReceived1; + bool fReceived2; }; #endif /* FAIRMQEXAMPLEMTSINK_H_ */ diff --git a/examples/MQ/multiple-transports/runExampleMTSampler1.cxx b/examples/MQ/multiple-transports/runExampleMTSampler1.cxx index 0bb35ce7..597a03d6 100644 --- a/examples/MQ/multiple-transports/runExampleMTSampler1.cxx +++ b/examples/MQ/multiple-transports/runExampleMTSampler1.cxx @@ -14,7 +14,7 @@ namespace bpo = boost::program_options; void addCustomOptions(bpo::options_description& options) { options.add_options() - ("text", bpo::value()->default_value("Hello"), "Text to send out"); + ("max-iterations", bpo::value()->default_value(5), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)"); } FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) diff --git a/examples/MQ/multiple-transports/runExampleMTSampler2.cxx b/examples/MQ/multiple-transports/runExampleMTSampler2.cxx index 3c711822..7f5ab2e9 100644 --- a/examples/MQ/multiple-transports/runExampleMTSampler2.cxx +++ b/examples/MQ/multiple-transports/runExampleMTSampler2.cxx @@ -14,7 +14,7 @@ namespace bpo = boost::program_options; void addCustomOptions(bpo::options_description& options) { options.add_options() - ("text", bpo::value()->default_value("Hello"), "Text to send out"); + ("max-iterations", bpo::value()->default_value(5), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)"); } FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) diff --git a/examples/MQ/multiple-transports/runExampleMTSink.cxx b/examples/MQ/multiple-transports/runExampleMTSink.cxx index ab12bc8d..f038a7b2 100644 --- a/examples/MQ/multiple-transports/runExampleMTSink.cxx +++ b/examples/MQ/multiple-transports/runExampleMTSink.cxx @@ -11,8 +11,10 @@ namespace bpo = boost::program_options; -void addCustomOptions(bpo::options_description& /*options*/) +void addCustomOptions(bpo::options_description& options) { + options.add_options() + ("max-iterations", bpo::value()->default_value(5), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)"); } FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) diff --git a/examples/MQ/multiple-transports/testMTEx.sh.in b/examples/MQ/multiple-transports/testMTEx.sh.in new file mode 100755 index 00000000..1f10d132 --- /dev/null +++ b/examples/MQ/multiple-transports/testMTEx.sh.in @@ -0,0 +1,35 @@ +#!/bin/bash +config="@CMAKE_BINARY_DIR@/bin/config/ex-multiple-transports.json" + +trap 'kill -TERM $SAMPLER1_PID; kill -TERM $SAMPLER2_PID; kill -TERM $SINK_PID; wait $SAMPLER1_PID; wait $SAMPLER2_PID; wait $SINK_PID; @CMAKE_BINARY_DIR@/bin/shmmonitor --cleanup;' TERM + +SINK="ex-mt-sink" +SINK+=" --id sink1" +SINK+=" --max-iterations 1" +SINK+=" --control static --log-color false" +SINK+=" --transport shmem" +SINK+=" --mq-config $config" +@CMAKE_BINARY_DIR@/bin/examples/MQ/multiple-transports/$SINK & +SINK_PID=$! + +SAMPLER1="ex-mt-sampler1" +SAMPLER1+=" --id sampler1" +SAMPLER1+=" --max-iterations 1" +SAMPLER1+=" --control static --log-color false" +SAMPLER1+=" --transport shmem" +SAMPLER1+=" --mq-config $config" +@CMAKE_BINARY_DIR@/bin/examples/MQ/multiple-transports/$SAMPLER1 & +SAMPLER1_PID=$! + +SAMPLER2="ex-mt-sampler2" +SAMPLER2+=" --id sampler2" +SAMPLER2+=" --max-iterations 1" +SAMPLER2+=" --control static --log-color false" +SAMPLER2+=" --transport nanomsg" +SAMPLER2+=" --mq-config $config" +@CMAKE_BINARY_DIR@/bin/examples/MQ/multiple-transports/$SAMPLER2 & +SAMPLER2_PID=$! + +wait $SAMPLER1_PID +wait $SAMPLER2_PID +wait $SINK_PID