From f191c5099c231ed439b6d9309d06ca5c4ed22000 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Mon, 4 Mar 2019 11:39:43 +0100 Subject: [PATCH] Fix region example by moving our test code to a separate one --- examples/CMakeLists.txt | 1 + examples/{region => readout}/Builder.h | 0 examples/readout/CMakeLists.txt | 54 +++++++++++ examples/readout/README.md | 5 + examples/readout/Sampler.cxx | 91 +++++++++++++++++++ examples/readout/Sampler.h | 46 ++++++++++ examples/readout/Sink.cxx | 56 ++++++++++++ examples/readout/Sink.h | 42 +++++++++ .../readout/fairmq-start-ex-readout.sh.in | 30 ++++++ examples/{region => readout}/runBuilder.cxx | 0 examples/readout/runSampler.cxx | 24 +++++ examples/readout/runSink.cxx | 23 +++++ examples/region/CMakeLists.txt | 4 - examples/region/Sampler.cxx | 6 +- examples/region/fairmq-start-ex-region.sh.in | 15 +-- 15 files changed, 380 insertions(+), 17 deletions(-) rename examples/{region => readout}/Builder.h (100%) create mode 100644 examples/readout/CMakeLists.txt create mode 100644 examples/readout/README.md create mode 100644 examples/readout/Sampler.cxx create mode 100644 examples/readout/Sampler.h create mode 100644 examples/readout/Sink.cxx create mode 100644 examples/readout/Sink.h create mode 100755 examples/readout/fairmq-start-ex-readout.sh.in rename examples/{region => readout}/runBuilder.cxx (100%) create mode 100644 examples/readout/runSampler.cxx create mode 100644 examples/readout/runSink.cxx diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 5f4c76e8..bd248d52 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -16,5 +16,6 @@ add_subdirectory(multiple-channels) if(BUILD_NANOMSG_TRANSPORT) add_subdirectory(multiple-transports) endif() +add_subdirectory(readout) add_subdirectory(region) add_subdirectory(req-rep) diff --git a/examples/region/Builder.h b/examples/readout/Builder.h similarity index 100% rename from examples/region/Builder.h rename to examples/readout/Builder.h diff --git a/examples/readout/CMakeLists.txt b/examples/readout/CMakeLists.txt new file mode 100644 index 00000000..5e309485 --- /dev/null +++ b/examples/readout/CMakeLists.txt @@ -0,0 +1,54 @@ + ################################################################################ + # Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # + # # + # This software is distributed under the terms of the # + # GNU Lesser General Public Licence (LGPL) version 3, # + # copied verbatim in the file "LICENSE" # + ################################################################################ + +add_library(ExampleReadoutLib STATIC + "Sampler.cxx" + "Sampler.h" + "Builder.h" + "Sink.cxx" + "Sink.h" +) + +target_link_libraries(ExampleReadoutLib PUBLIC FairMQ) + +add_executable(fairmq-ex-readout-sampler runSampler.cxx) +target_link_libraries(fairmq-ex-readout-sampler PRIVATE ExampleReadoutLib) + +add_executable(fairmq-ex-readout-builder runBuilder.cxx) +target_link_libraries(fairmq-ex-readout-builder PRIVATE ExampleReadoutLib) + +add_executable(fairmq-ex-readout-sink runSink.cxx) +target_link_libraries(fairmq-ex-readout-sink PRIVATE ExampleReadoutLib) + +add_custom_target(Examplereadout DEPENDS fairmq-ex-readout-sampler fairmq-ex-readout-sink) + +set(EX_BIN_DIR ${CMAKE_CURRENT_BINARY_DIR}) +set(FAIRMQ_BIN_DIR ${CMAKE_BINARY_DIR}/fairmq) +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-readout.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-readout.sh) + +# install + +install( + TARGETS + fairmq-ex-readout-sampler + fairmq-ex-readout-sink + + LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR} + RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR} +) + +# configure run script with different executable paths for build and for install directories +set(EX_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}) +set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq) +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-readout.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-readout.sh_install) + +install( + PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-readout.sh_install + DESTINATION ${PROJECT_INSTALL_BINDIR} + RENAME fairmq-start-ex-readout.sh +) diff --git a/examples/readout/README.md b/examples/readout/README.md new file mode 100644 index 00000000..7be16bf3 --- /dev/null +++ b/examples/readout/README.md @@ -0,0 +1,5 @@ +Region example +============== + +This example demonstrates the use of a more advanced feature - UnmanagedRegion, that can be used to create a buffer through one of FairMQ transports. The contents of this buffer are managed by the user, who can also create messages out of sub-buffers of the created buffer. Such feature can be interesting in environments that have special requirements by the hardware that writes the data, to keep the transfer efficient (e.g. shared memory). + diff --git a/examples/readout/Sampler.cxx b/examples/readout/Sampler.cxx new file mode 100644 index 00000000..ba986d3d --- /dev/null +++ b/examples/readout/Sampler.cxx @@ -0,0 +1,91 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * Sampler.cpp + * + * @since 2014-10-10 + * @author A. Rybalchenko + */ + +#include "Sampler.h" + +#include + +using namespace std; + +namespace example_region +{ + +Sampler::Sampler() + : fMsgSize(10000) + , fMaxIterations(0) + , fNumIterations(0) + , fRegion(nullptr) + , fNumUnackedMsgs(0) +{ +} + +void Sampler::InitTask() +{ + fMsgSize = fConfig->GetValue("msg-size"); + fMaxIterations = fConfig->GetValue("max-iterations"); + + fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data1", + 0, + 10000000, + [this](void* /*data*/, size_t /*size*/, void* /*hint*/) { // callback to be called when message buffers no longer needed by transport + --fNumUnackedMsgs; + if (fMaxIterations > 0) + { + LOG(debug) << "Received ack"; + } + } + )); +} + +bool Sampler::ConditionalRun() +{ + FairMQMessagePtr msg(NewMessageFor("data1", // channel + 0, // sub-channel + fRegion, // region + fRegion->GetData(), // ptr within region + fMsgSize, // offset from ptr + nullptr // hint + )); + + if (Send(msg, "data1", 0) > 0) + { + ++fNumUnackedMsgs; + + if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) + { + LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state."; + return false; + } + } + + return true; +} + +void Sampler::ResetTask() +{ + // if not all messages acknowledged, wait for a bit. But only once, since receiver could be already dead. + if (fNumUnackedMsgs != 0) + { + LOG(debug) << "waiting for all acknowledgements... (" << fNumUnackedMsgs << ")"; + this_thread::sleep_for(chrono::milliseconds(500)); + LOG(debug) << "done, still unacked: " << fNumUnackedMsgs; + } + fRegion.reset(); +} + +Sampler::~Sampler() +{ +} + +} // namespace example_region diff --git a/examples/readout/Sampler.h b/examples/readout/Sampler.h new file mode 100644 index 00000000..4056fb3d --- /dev/null +++ b/examples/readout/Sampler.h @@ -0,0 +1,46 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * Sampler.h + * + * @since 2014-10-10 + * @author A. Rybalchenko + */ + +#ifndef FAIRMQEXAMPLEREGIONSAMPLER_H +#define FAIRMQEXAMPLEREGIONSAMPLER_H + +#include + +#include "FairMQDevice.h" + +namespace example_region +{ + +class Sampler : public FairMQDevice +{ + public: + Sampler(); + virtual ~Sampler(); + + protected: + virtual void InitTask(); + virtual bool ConditionalRun(); + virtual void ResetTask(); + + private: + int fMsgSize; + uint64_t fMaxIterations; + uint64_t fNumIterations; + FairMQUnmanagedRegionPtr fRegion; + std::atomic fNumUnackedMsgs; +}; + +} // namespace example_region + +#endif /* FAIRMQEXAMPLEREGIONSAMPLER_H */ diff --git a/examples/readout/Sink.cxx b/examples/readout/Sink.cxx new file mode 100644 index 00000000..55c1c04b --- /dev/null +++ b/examples/readout/Sink.cxx @@ -0,0 +1,56 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * Sink.cxx + * + * @since 2014-10-10 + * @author A. Rybalchenko + */ + +#include "Sink.h" + +using namespace std; + +namespace example_region +{ + +Sink::Sink() + : fMaxIterations(0) + , fNumIterations(0) +{ +} + +void Sink::InitTask() +{ + // Get the fMaxIterations value from the command line options (via fConfig) + fMaxIterations = fConfig->GetValue("max-iterations"); +} + +void Sink::Run() +{ + FairMQChannel& dataInChannel = fChannels.at("data").at(0); + + while (!NewStatePending()) + { + FairMQMessagePtr msg(dataInChannel.Transport()->CreateMessage()); + dataInChannel.Receive(msg); + // void* ptr = msg->GetData(); + + if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) + { + LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state."; + break; + } + } +} + +Sink::~Sink() +{ +} + +} // namespace example_region diff --git a/examples/readout/Sink.h b/examples/readout/Sink.h new file mode 100644 index 00000000..7e372e7a --- /dev/null +++ b/examples/readout/Sink.h @@ -0,0 +1,42 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * Sink.h + * + * @since 2014-10-10 + * @author A. Rybalchenko + */ + +#ifndef FAIRMQEXAMPLEREGIONSINK_H +#define FAIRMQEXAMPLEREGIONSINK_H + +#include + +#include "FairMQDevice.h" + +namespace example_region +{ + +class Sink : public FairMQDevice +{ + public: + Sink(); + virtual ~Sink(); + + protected: + virtual void Run(); + virtual void InitTask(); + + private: + uint64_t fMaxIterations; + uint64_t fNumIterations; +}; + +} // namespace example_region + +#endif /* FAIRMQEXAMPLEREGIONSINK_H */ diff --git a/examples/readout/fairmq-start-ex-readout.sh.in b/examples/readout/fairmq-start-ex-readout.sh.in new file mode 100755 index 00000000..c3518b84 --- /dev/null +++ b/examples/readout/fairmq-start-ex-readout.sh.in @@ -0,0 +1,30 @@ +#!/bin/bash + +export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@ + +msgSize="1000000" + +if [[ $1 =~ ^[0-9]+$ ]]; then + msgSize=$1 +fi + +SAMPLER="fairmq-ex-readout-sampler" +SAMPLER+=" --id sampler1" +SAMPLER+=" --severity debug" +SAMPLER+=" --msg-size $msgSize" +# SAMPLER+=" --rate 10" +SAMPLER+=" --channel-config name=data1,type=pair,method=bind,address=tcp://127.0.0.1:7777,transport=shmem" +xterm -geometry 80x23+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER & + +BUILDER="fairmq-ex-readout-builder" +BUILDER+=" --id builder1" +BUILDER+=" --severity debug" +BUILDER+=" --channel-config name=data1,type=pair,method=connect,address=tcp://127.0.0.1:7777,transport=shmem" +BUILDER+=" name=data2,type=pair,method=connect,address=tcp://127.0.0.1:7778,transport=ofi" +xterm -geometry 80x23+500+0 -hold -e @EX_BIN_DIR@/$BUILDER & + +SINK="fairmq-ex-readout-sink" +SINK+=" --id sink1" +SINK+=" --severity debug" +SINK+=" --channel-config name=data,type=pair,method=bind,address=tcp://127.0.0.1:7778,transport=ofi" +xterm -geometry 80x23+1000+0 -hold -e @EX_BIN_DIR@/$SINK & diff --git a/examples/region/runBuilder.cxx b/examples/readout/runBuilder.cxx similarity index 100% rename from examples/region/runBuilder.cxx rename to examples/readout/runBuilder.cxx diff --git a/examples/readout/runSampler.cxx b/examples/readout/runSampler.cxx new file mode 100644 index 00000000..6b8166da --- /dev/null +++ b/examples/readout/runSampler.cxx @@ -0,0 +1,24 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include "runFairMQDevice.h" +#include "Sampler.h" + +namespace bpo = boost::program_options; + +void addCustomOptions(bpo::options_description& options) +{ + options.add_options() + ("msg-size", bpo::value()->default_value(1000), "Message size in bytes") + ("max-iterations", bpo::value()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)"); +} + +FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) +{ + return new example_region::Sampler(); +} diff --git a/examples/readout/runSink.cxx b/examples/readout/runSink.cxx new file mode 100644 index 00000000..1ad8f4bd --- /dev/null +++ b/examples/readout/runSink.cxx @@ -0,0 +1,23 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include "runFairMQDevice.h" +#include "Sink.h" + +namespace bpo = boost::program_options; + +void addCustomOptions(bpo::options_description& options) +{ + options.add_options() + ("max-iterations", bpo::value()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)"); +} + +FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) +{ + return new example_region::Sink(); +} diff --git a/examples/region/CMakeLists.txt b/examples/region/CMakeLists.txt index 4723c409..07bd8408 100644 --- a/examples/region/CMakeLists.txt +++ b/examples/region/CMakeLists.txt @@ -9,7 +9,6 @@ add_library(ExampleRegionLib STATIC "Sampler.cxx" "Sampler.h" - "Builder.h" "Sink.cxx" "Sink.h" ) @@ -19,9 +18,6 @@ target_link_libraries(ExampleRegionLib PUBLIC FairMQ) add_executable(fairmq-ex-region-sampler runSampler.cxx) target_link_libraries(fairmq-ex-region-sampler PRIVATE ExampleRegionLib) -add_executable(fairmq-ex-region-builder runBuilder.cxx) -target_link_libraries(fairmq-ex-region-builder PRIVATE ExampleRegionLib) - add_executable(fairmq-ex-region-sink runSink.cxx) target_link_libraries(fairmq-ex-region-sink PRIVATE ExampleRegionLib) diff --git a/examples/region/Sampler.cxx b/examples/region/Sampler.cxx index ba986d3d..678c321e 100644 --- a/examples/region/Sampler.cxx +++ b/examples/region/Sampler.cxx @@ -35,7 +35,7 @@ void Sampler::InitTask() fMsgSize = fConfig->GetValue("msg-size"); fMaxIterations = fConfig->GetValue("max-iterations"); - fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data1", + fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data", 0, 10000000, [this](void* /*data*/, size_t /*size*/, void* /*hint*/) { // callback to be called when message buffers no longer needed by transport @@ -50,7 +50,7 @@ void Sampler::InitTask() bool Sampler::ConditionalRun() { - FairMQMessagePtr msg(NewMessageFor("data1", // channel + FairMQMessagePtr msg(NewMessageFor("data", // channel 0, // sub-channel fRegion, // region fRegion->GetData(), // ptr within region @@ -58,7 +58,7 @@ bool Sampler::ConditionalRun() nullptr // hint )); - if (Send(msg, "data1", 0) > 0) + if (Send(msg, "data", 0) > 0) { ++fNumUnackedMsgs; diff --git a/examples/region/fairmq-start-ex-region.sh.in b/examples/region/fairmq-start-ex-region.sh.in index e0e39a81..56a3fd66 100755 --- a/examples/region/fairmq-start-ex-region.sh.in +++ b/examples/region/fairmq-start-ex-region.sh.in @@ -13,18 +13,13 @@ SAMPLER+=" --id sampler1" SAMPLER+=" --severity debug" SAMPLER+=" --msg-size $msgSize" # SAMPLER+=" --rate 10" -SAMPLER+=" --channel-config name=data1,type=pair,method=bind,address=tcp://127.0.0.1:7777,transport=shmem" +SAMPLER+=" --transport shmem" +SAMPLER+=" --channel-config name=data,type=push,method=bind,address=tcp://127.0.0.1:7777,sndKernelSize=212992" xterm -geometry 80x23+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER & -BUILDER="fairmq-ex-region-builder" -BUILDER+=" --id builder1" -BUILDER+=" --severity debug" -BUILDER+=" --channel-config name=data1,type=pair,method=connect,address=tcp://127.0.0.1:7777,transport=shmem" -BUILDER+=" name=data2,type=pair,method=connect,address=tcp://127.0.0.1:7778,transport=ofi" -xterm -geometry 80x23+500+0 -hold -e @EX_BIN_DIR@/$BUILDER & - SINK="fairmq-ex-region-sink" SINK+=" --id sink1" SINK+=" --severity debug" -SINK+=" --channel-config name=data,type=pair,method=bind,address=tcp://127.0.0.1:7778,transport=ofi" -xterm -geometry 80x23+1000+0 -hold -e @EX_BIN_DIR@/$SINK & +SINK+=" --transport shmem" +SINK+=" --channel-config name=data,type=pull,method=connect,address=tcp://127.0.0.1:7777,rcvKernelSize=212992" +xterm -geometry 80x23+500+0 -hold -e @EX_BIN_DIR@/$SINK &