diff --git a/examples/readout/Builder.h b/examples/readout/Builder.h index 2420bbde..cb832edf 100644 --- a/examples/readout/Builder.h +++ b/examples/readout/Builder.h @@ -5,36 +5,42 @@ * GNU Lesser General Public Licence (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ - #ifndef FAIRMQEXAMPLEREGIONBUILDER_H #define FAIRMQEXAMPLEREGIONBUILDER_H -#include - #include "FairMQDevice.h" -namespace example_region +#include + +namespace example_readout { class Builder : public FairMQDevice { public: - Builder() { - OnData("data1", &Builder::HandleData); - } - virtual ~Builder() {} + Builder() + : fOutputChannelName() + {} + + void Init() override + { + fOutputChannelName = fConfig->GetValue("output-name"); + OnData("rb", &Builder::HandleData); + } - protected: bool HandleData(FairMQMessagePtr& msg, int /*index*/) { - if (Send(msg, "data2") < 0) { + if (Send(msg, fOutputChannelName) < 0) { return false; } return true; } + + private: + std::string fOutputChannelName; }; -} // namespace example_region +} // namespace example_readout #endif /* FAIRMQEXAMPLEREGIONBUILDER_H */ diff --git a/examples/readout/CMakeLists.txt b/examples/readout/CMakeLists.txt index 5e309485..c8141d27 100644 --- a/examples/readout/CMakeLists.txt +++ b/examples/readout/CMakeLists.txt @@ -6,37 +6,35 @@ # copied verbatim in the file "LICENSE" # ################################################################################ -add_library(ExampleReadoutLib STATIC - "Sampler.cxx" - "Sampler.h" - "Builder.h" - "Sink.cxx" - "Sink.h" -) - -target_link_libraries(ExampleReadoutLib PUBLIC FairMQ) - -add_executable(fairmq-ex-readout-sampler runSampler.cxx) -target_link_libraries(fairmq-ex-readout-sampler PRIVATE ExampleReadoutLib) +add_executable(fairmq-ex-readout-readout runReadout.cxx) +target_link_libraries(fairmq-ex-readout-readout PRIVATE FairMQ) add_executable(fairmq-ex-readout-builder runBuilder.cxx) -target_link_libraries(fairmq-ex-readout-builder PRIVATE ExampleReadoutLib) +target_link_libraries(fairmq-ex-readout-builder PRIVATE FairMQ) -add_executable(fairmq-ex-readout-sink runSink.cxx) -target_link_libraries(fairmq-ex-readout-sink PRIVATE ExampleReadoutLib) +add_executable(fairmq-ex-readout-processor runProcessor.cxx) +target_link_libraries(fairmq-ex-readout-processor PRIVATE FairMQ) -add_custom_target(Examplereadout DEPENDS fairmq-ex-readout-sampler fairmq-ex-readout-sink) +add_executable(fairmq-ex-readout-sender runSender.cxx) +target_link_libraries(fairmq-ex-readout-sender PRIVATE FairMQ) + +add_executable(fairmq-ex-readout-receiver runReceiver.cxx) +target_link_libraries(fairmq-ex-readout-receiver PRIVATE FairMQ) set(EX_BIN_DIR ${CMAKE_CURRENT_BINARY_DIR}) set(FAIRMQ_BIN_DIR ${CMAKE_BINARY_DIR}/fairmq) configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-readout.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-readout.sh) +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-readout-processing.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-readout-processing.sh) # install install( TARGETS - fairmq-ex-readout-sampler - fairmq-ex-readout-sink + fairmq-ex-readout-readout + fairmq-ex-readout-builder + fairmq-ex-readout-processor + fairmq-ex-readout-sender + fairmq-ex-readout-receiver LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR} RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR} @@ -46,9 +44,16 @@ install( set(EX_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}) set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq) configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-readout.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-readout.sh_install) +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-readout-processing.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-readout-processing.sh_install) install( PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-readout.sh_install DESTINATION ${PROJECT_INSTALL_BINDIR} RENAME fairmq-start-ex-readout.sh ) + +install( + PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-readout-processing.sh_install + DESTINATION ${PROJECT_INSTALL_BINDIR} + RENAME fairmq-start-ex-readout-processing.sh +) diff --git a/examples/readout/Sink.h b/examples/readout/Processor.h similarity index 55% rename from examples/readout/Sink.h rename to examples/readout/Processor.h index 7e372e7a..76fcb927 100644 --- a/examples/readout/Sink.h +++ b/examples/readout/Processor.h @@ -5,38 +5,33 @@ * GNU Lesser General Public Licence (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -/** - * Sink.h - * - * @since 2014-10-10 - * @author A. Rybalchenko - */ - -#ifndef FAIRMQEXAMPLEREGIONSINK_H -#define FAIRMQEXAMPLEREGIONSINK_H - -#include +#ifndef FAIRMQEXAMPLEREGIONPROCESSOR_H +#define FAIRMQEXAMPLEREGIONPROCESSOR_H #include "FairMQDevice.h" -namespace example_region +namespace example_readout { -class Sink : public FairMQDevice +class Processor : public FairMQDevice { public: - Sink(); - virtual ~Sink(); + Processor() { + OnData("bp", &Processor::HandleData); + } protected: - virtual void Run(); - virtual void InitTask(); + bool HandleData(FairMQMessagePtr& msg, int /*index*/) + { + FairMQMessagePtr msg2(NewMessageFor("ps", 0, msg->GetSize())); + if (Send(msg2, "ps") < 0) { + return false; + } - private: - uint64_t fMaxIterations; - uint64_t fNumIterations; + return true; + } }; -} // namespace example_region +} // namespace example_readout -#endif /* FAIRMQEXAMPLEREGIONSINK_H */ +#endif /* FAIRMQEXAMPLEREGIONPROCESSOR_H */ diff --git a/examples/readout/README.md b/examples/readout/README.md index 7be16bf3..ac49c19a 100644 --- a/examples/readout/README.md +++ b/examples/readout/README.md @@ -1,5 +1,27 @@ -Region example -============== +# Readout example -This example demonstrates the use of a more advanced feature - UnmanagedRegion, that can be used to create a buffer through one of FairMQ transports. The contents of this buffer are managed by the user, who can also create messages out of sub-buffers of the created buffer. Such feature can be interesting in environments that have special requirements by the hardware that writes the data, to keep the transfer efficient (e.g. shared memory). +This examples shows two possible topologies (out of many) for a node connected to a detector readout (followed by a processing node). +## Setup without new data generation + +``` +|------------------------------- Readout Node ---------------------------| |- Processing Node -| +| Readout --> Builder --> Sender | --> | Receiver | +| [# shared memory segment (unused in this topology) ##################] | ofi | | +| [# shmem unmanaged region (readout writes here, others read) ########] | | | +|------------------------------------------------------------------------| |-------------------| +``` + +The devices one the Readout Node communicate via shared memory transport. Readout device writes into shared memory unmanaged region. The data is then forwarded through Builder to Sender process, which sends it out via OFI transport. + +## Setup with generating new data on the Readout node + +``` +|------------------------------- Readout Node ---------------------------| |- Processing Node -| +| Readout --> Builder --> Processor --> Sender | --> | Receiver | +| [# shared memory segment (used between Proccessor and Sender) #######] | ofi | | +| [# shmem unmanaged region (readout writes here, builder & proc read) ] | | | +|------------------------------------------------------------------------| |-------------------| +``` + +In this topology one more device is added - Processor. It examines the arriving data and creates new data in shared memory. This data is not part of the unmanaged region, but lives in the general shared memory segment (unused in the previous setup). This new data is then forwarded to Sender and the Readout device is notified that the corresponding data piece in the unmanaged region is no longer used. diff --git a/examples/readout/Readout.h b/examples/readout/Readout.h new file mode 100644 index 00000000..6d686372 --- /dev/null +++ b/examples/readout/Readout.h @@ -0,0 +1,91 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +#ifndef FAIRMQEXAMPLEREADOUTREADOUT_H +#define FAIRMQEXAMPLEREADOUTREADOUT_H + +#include +#include +#include + +#include "FairMQDevice.h" + +namespace example_readout +{ + +class Readout : public FairMQDevice +{ + public: + Readout() + : fMsgSize(10000) + , fMaxIterations(0) + , fNumIterations(0) + , fRegion(nullptr) + , fNumUnackedMsgs(0) + {} + + protected: + void InitTask() override + { + fMsgSize = fConfig->GetValue("msg-size"); + fMaxIterations = fConfig->GetValue("max-iterations"); + + fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("rb", + 0, + 10000000, + [this](void* /*data*/, size_t /*size*/, void* /*hint*/) { // callback to be called when message buffers no longer needed by transport + --fNumUnackedMsgs; + if (fMaxIterations > 0) { + LOG(debug) << "Received ack"; + } + } + )); + } + + bool ConditionalRun() override + { + FairMQMessagePtr msg(NewMessageFor("rb", // channel + 0, // sub-channel + fRegion, // region + fRegion->GetData(), // ptr within region + fMsgSize, // offset from ptr + nullptr // hint + )); + + if (Send(msg, "rb", 0) > 0) { + ++fNumUnackedMsgs; + + if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) { + LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state."; + return false; + } + } + + return true; + } + void ResetTask() override + { + // if not all messages acknowledged, wait for a bit. But only once, since receiver could be already dead. + if (fNumUnackedMsgs != 0) { + LOG(debug) << "waiting for all acknowledgements... (" << fNumUnackedMsgs << ")"; + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + LOG(debug) << "done, still unacked: " << fNumUnackedMsgs; + } + fRegion.reset(); + } + + private: + int fMsgSize; + uint64_t fMaxIterations; + uint64_t fNumIterations; + FairMQUnmanagedRegionPtr fRegion; + std::atomic fNumUnackedMsgs; +}; + +} // namespace example_readout + +#endif /* FAIRMQEXAMPLEREADOUTREADOUT_H */ diff --git a/examples/readout/Receiver.h b/examples/readout/Receiver.h new file mode 100644 index 00000000..dae126ce --- /dev/null +++ b/examples/readout/Receiver.h @@ -0,0 +1,54 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +#ifndef FAIRMQEXAMPLEREGIONRECEIVER_H +#define FAIRMQEXAMPLEREGIONRECEIVER_H + +#include "FairMQDevice.h" + +namespace example_readout +{ + +class Receiver : public FairMQDevice +{ + public: + Receiver() + : fMaxIterations(0) + , fNumIterations(0) + {} + + protected: + void InitTask() override + { + // Get the fMaxIterations value from the command line options (via fConfig) + fMaxIterations = fConfig->GetValue("max-iterations"); + } + + void Run() override + { + FairMQChannel& dataInChannel = fChannels.at("sr").at(0); + + while (!NewStatePending()) { + FairMQMessagePtr msg(dataInChannel.Transport()->CreateMessage()); + dataInChannel.Receive(msg); + // void* ptr = msg->GetData(); + + if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) { + LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state."; + break; + } + } + } + + private: + uint64_t fMaxIterations; + uint64_t fNumIterations; +}; + +} // namespace example_readout + +#endif /* FAIRMQEXAMPLEREGIONRECEIVER_H */ diff --git a/examples/readout/Sampler.cxx b/examples/readout/Sampler.cxx deleted file mode 100644 index ba986d3d..00000000 --- a/examples/readout/Sampler.cxx +++ /dev/null @@ -1,91 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ -/** - * Sampler.cpp - * - * @since 2014-10-10 - * @author A. Rybalchenko - */ - -#include "Sampler.h" - -#include - -using namespace std; - -namespace example_region -{ - -Sampler::Sampler() - : fMsgSize(10000) - , fMaxIterations(0) - , fNumIterations(0) - , fRegion(nullptr) - , fNumUnackedMsgs(0) -{ -} - -void Sampler::InitTask() -{ - fMsgSize = fConfig->GetValue("msg-size"); - fMaxIterations = fConfig->GetValue("max-iterations"); - - fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data1", - 0, - 10000000, - [this](void* /*data*/, size_t /*size*/, void* /*hint*/) { // callback to be called when message buffers no longer needed by transport - --fNumUnackedMsgs; - if (fMaxIterations > 0) - { - LOG(debug) << "Received ack"; - } - } - )); -} - -bool Sampler::ConditionalRun() -{ - FairMQMessagePtr msg(NewMessageFor("data1", // channel - 0, // sub-channel - fRegion, // region - fRegion->GetData(), // ptr within region - fMsgSize, // offset from ptr - nullptr // hint - )); - - if (Send(msg, "data1", 0) > 0) - { - ++fNumUnackedMsgs; - - if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) - { - LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state."; - return false; - } - } - - return true; -} - -void Sampler::ResetTask() -{ - // if not all messages acknowledged, wait for a bit. But only once, since receiver could be already dead. - if (fNumUnackedMsgs != 0) - { - LOG(debug) << "waiting for all acknowledgements... (" << fNumUnackedMsgs << ")"; - this_thread::sleep_for(chrono::milliseconds(500)); - LOG(debug) << "done, still unacked: " << fNumUnackedMsgs; - } - fRegion.reset(); -} - -Sampler::~Sampler() -{ -} - -} // namespace example_region diff --git a/examples/readout/Sampler.h b/examples/readout/Sender.h similarity index 50% rename from examples/readout/Sampler.h rename to examples/readout/Sender.h index 4056fb3d..35f54ddb 100644 --- a/examples/readout/Sampler.h +++ b/examples/readout/Sender.h @@ -5,42 +5,42 @@ * GNU Lesser General Public Licence (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -/** - * Sampler.h - * - * @since 2014-10-10 - * @author A. Rybalchenko - */ - -#ifndef FAIRMQEXAMPLEREGIONSAMPLER_H -#define FAIRMQEXAMPLEREGIONSAMPLER_H - -#include +#ifndef FAIRMQEXAMPLEREGIONSENDER_H +#define FAIRMQEXAMPLEREGIONSENDER_H #include "FairMQDevice.h" -namespace example_region +#include + +namespace example_readout { -class Sampler : public FairMQDevice +class Sender : public FairMQDevice { public: - Sampler(); - virtual ~Sampler(); + Sender() + : fInputChannelName() + {} - protected: - virtual void InitTask(); - virtual bool ConditionalRun(); - virtual void ResetTask(); + void Init() override + { + fInputChannelName = fConfig->GetValue("input-name"); + OnData(fInputChannelName, &Sender::HandleData); + } + + bool HandleData(FairMQMessagePtr& msg, int /*index*/) + { + if (Send(msg, "sr") < 0) { + return false; + } + + return true; + } private: - int fMsgSize; - uint64_t fMaxIterations; - uint64_t fNumIterations; - FairMQUnmanagedRegionPtr fRegion; - std::atomic fNumUnackedMsgs; + std::string fInputChannelName; }; -} // namespace example_region +} // namespace example_readout -#endif /* FAIRMQEXAMPLEREGIONSAMPLER_H */ +#endif /* FAIRMQEXAMPLEREGIONSENDER_H */ diff --git a/examples/readout/Sink.cxx b/examples/readout/Sink.cxx deleted file mode 100644 index 55c1c04b..00000000 --- a/examples/readout/Sink.cxx +++ /dev/null @@ -1,56 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ -/** - * Sink.cxx - * - * @since 2014-10-10 - * @author A. Rybalchenko - */ - -#include "Sink.h" - -using namespace std; - -namespace example_region -{ - -Sink::Sink() - : fMaxIterations(0) - , fNumIterations(0) -{ -} - -void Sink::InitTask() -{ - // Get the fMaxIterations value from the command line options (via fConfig) - fMaxIterations = fConfig->GetValue("max-iterations"); -} - -void Sink::Run() -{ - FairMQChannel& dataInChannel = fChannels.at("data").at(0); - - while (!NewStatePending()) - { - FairMQMessagePtr msg(dataInChannel.Transport()->CreateMessage()); - dataInChannel.Receive(msg); - // void* ptr = msg->GetData(); - - if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) - { - LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state."; - break; - } - } -} - -Sink::~Sink() -{ -} - -} // namespace example_region diff --git a/examples/readout/fairmq-start-ex-readout-processing.sh.in b/examples/readout/fairmq-start-ex-readout-processing.sh.in new file mode 100755 index 00000000..7cfbda3d --- /dev/null +++ b/examples/readout/fairmq-start-ex-readout-processing.sh.in @@ -0,0 +1,40 @@ +#!/bin/bash + +export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@ + +msgSize="1000000" + +if [[ $1 =~ ^[0-9]+$ ]]; then + msgSize=$1 +fi + +READOUT="fairmq-ex-readout-readout" +READOUT+=" --id readout1" +READOUT+=" --msg-size $msgSize" +READOUT+=" --channel-config name=rb,type=pair,method=bind,address=tcp://localhost:7777,transport=shmem" +xterm -geometry 80x23+0+0 -hold -e @EX_BIN_DIR@/$READOUT & + +BUILDER="fairmq-ex-readout-builder" +BUILDER+=" --id builder1" +BUILDER+=" --output-name bp" +BUILDER+=" --channel-config name=rb,type=pair,method=connect,address=tcp://localhost:7777,transport=shmem" +BUILDER+=" name=bp,type=pair,method=connect,address=tcp://localhost:7778,transport=shmem" +xterm -geometry 80x23+500+0 -hold -e @EX_BIN_DIR@/$BUILDER & + +PROCESSOR="fairmq-ex-readout-processor" +PROCESSOR+=" --id processor1" +PROCESSOR+=" --channel-config name=bp,type=pair,method=bind,address=tcp://localhost:7778,transport=shmem" +PROCESSOR+=" name=ps,type=pair,method=connect,address=tcp://localhost:7779,transport=shmem" +xterm -geometry 80x23+750+500 -hold -e @EX_BIN_DIR@/$PROCESSOR & + +SENDER="fairmq-ex-readout-sender" +SENDER+=" --id sender1" +SENDER+=" --input-name ps" +SENDER+=" --channel-config name=ps,type=pair,method=bind,address=tcp://localhost:7779,transport=shmem" +SENDER+=" name=sr,type=pair,method=connect,address=tcp://localhost:7780,transport=ofi" +xterm -geometry 80x23+1000+0 -hold -e @EX_BIN_DIR@/$SENDER & + +RECEIVER="fairmq-ex-readout-receiver" +RECEIVER+=" --id receiver1" +RECEIVER+=" --channel-config name=sr,type=pair,method=bind,address=tcp://localhost:7780,transport=ofi" +xterm -geometry 80x23+1500+0 -hold -e @EX_BIN_DIR@/$RECEIVER & diff --git a/examples/readout/fairmq-start-ex-readout.sh.in b/examples/readout/fairmq-start-ex-readout.sh.in index c3518b84..3f895ad6 100755 --- a/examples/readout/fairmq-start-ex-readout.sh.in +++ b/examples/readout/fairmq-start-ex-readout.sh.in @@ -8,23 +8,27 @@ if [[ $1 =~ ^[0-9]+$ ]]; then msgSize=$1 fi -SAMPLER="fairmq-ex-readout-sampler" -SAMPLER+=" --id sampler1" -SAMPLER+=" --severity debug" -SAMPLER+=" --msg-size $msgSize" -# SAMPLER+=" --rate 10" -SAMPLER+=" --channel-config name=data1,type=pair,method=bind,address=tcp://127.0.0.1:7777,transport=shmem" -xterm -geometry 80x23+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER & +READOUT="fairmq-ex-readout-readout" +READOUT+=" --id readout1" +READOUT+=" --msg-size $msgSize" +READOUT+=" --channel-config name=rb,type=pair,method=bind,address=tcp://localhost:7777,transport=shmem" +xterm -geometry 80x23+0+0 -hold -e @EX_BIN_DIR@/$READOUT & BUILDER="fairmq-ex-readout-builder" BUILDER+=" --id builder1" -BUILDER+=" --severity debug" -BUILDER+=" --channel-config name=data1,type=pair,method=connect,address=tcp://127.0.0.1:7777,transport=shmem" -BUILDER+=" name=data2,type=pair,method=connect,address=tcp://127.0.0.1:7778,transport=ofi" +BUILDER+=" --output-name bs" +BUILDER+=" --channel-config name=rb,type=pair,method=connect,address=tcp://localhost:7777,transport=shmem" +BUILDER+=" name=bs,type=pair,method=connect,address=tcp://localhost:7778,transport=shmem" xterm -geometry 80x23+500+0 -hold -e @EX_BIN_DIR@/$BUILDER & -SINK="fairmq-ex-readout-sink" -SINK+=" --id sink1" -SINK+=" --severity debug" -SINK+=" --channel-config name=data,type=pair,method=bind,address=tcp://127.0.0.1:7778,transport=ofi" -xterm -geometry 80x23+1000+0 -hold -e @EX_BIN_DIR@/$SINK & +SENDER="fairmq-ex-readout-sender" +SENDER+=" --id sender1" +SENDER+=" --input-name bs" +SENDER+=" --channel-config name=bs,type=pair,method=bind,address=tcp://localhost:7778,transport=shmem" +SENDER+=" name=sr,type=pair,method=connect,address=tcp://localhost:7779,transport=ofi" +xterm -geometry 80x23+1000+0 -hold -e @EX_BIN_DIR@/$SENDER & + +RECEIVER="fairmq-ex-readout-receiver" +RECEIVER+=" --id receiver1" +RECEIVER+=" --channel-config name=sr,type=pair,method=bind,address=tcp://localhost:7779,transport=ofi" +xterm -geometry 80x23+1500+0 -hold -e @EX_BIN_DIR@/$RECEIVER & diff --git a/examples/readout/runBuilder.cxx b/examples/readout/runBuilder.cxx index 4d11eaab..89476fc4 100644 --- a/examples/readout/runBuilder.cxx +++ b/examples/readout/runBuilder.cxx @@ -11,10 +11,13 @@ namespace bpo = boost::program_options; -void addCustomOptions(bpo::options_description& /* options */) -{} +void addCustomOptions(bpo::options_description& options) +{ + options.add_options() + ("output-name", bpo::value()->default_value("bs"), "Output channel name"); +} FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { - return new example_region::Builder(); + return new example_readout::Builder(); } diff --git a/examples/readout/runProcessor.cxx b/examples/readout/runProcessor.cxx new file mode 100644 index 00000000..1da2e3df --- /dev/null +++ b/examples/readout/runProcessor.cxx @@ -0,0 +1,20 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include "runFairMQDevice.h" +#include "Processor.h" + +namespace bpo = boost::program_options; + +void addCustomOptions(bpo::options_description& /* options */) +{} + +FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) +{ + return new example_readout::Processor(); +} diff --git a/examples/readout/runSampler.cxx b/examples/readout/runReadout.cxx similarity index 94% rename from examples/readout/runSampler.cxx rename to examples/readout/runReadout.cxx index 6b8166da..4c2fbef3 100644 --- a/examples/readout/runSampler.cxx +++ b/examples/readout/runReadout.cxx @@ -7,7 +7,7 @@ ********************************************************************************/ #include "runFairMQDevice.h" -#include "Sampler.h" +#include "Readout.h" namespace bpo = boost::program_options; @@ -20,5 +20,5 @@ void addCustomOptions(bpo::options_description& options) FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { - return new example_region::Sampler(); + return new example_readout::Readout(); } diff --git a/examples/readout/runSink.cxx b/examples/readout/runReceiver.cxx similarity index 93% rename from examples/readout/runSink.cxx rename to examples/readout/runReceiver.cxx index 1ad8f4bd..aeed1e91 100644 --- a/examples/readout/runSink.cxx +++ b/examples/readout/runReceiver.cxx @@ -7,7 +7,7 @@ ********************************************************************************/ #include "runFairMQDevice.h" -#include "Sink.h" +#include "Receiver.h" namespace bpo = boost::program_options; @@ -19,5 +19,5 @@ void addCustomOptions(bpo::options_description& options) FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { - return new example_region::Sink(); + return new example_readout::Receiver(); } diff --git a/examples/readout/runSender.cxx b/examples/readout/runSender.cxx new file mode 100644 index 00000000..2a4e4c99 --- /dev/null +++ b/examples/readout/runSender.cxx @@ -0,0 +1,23 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include "runFairMQDevice.h" +#include "Sender.h" + +namespace bpo = boost::program_options; + +void addCustomOptions(bpo::options_description& options) +{ + options.add_options() + ("input-name", bpo::value()->default_value("bs"), "Input channel name"); +} + +FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) +{ + return new example_readout::Sender(); +} diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index 0796bb18..ad99e4bd 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -26,33 +26,34 @@ #include // std::max using namespace std; +using namespace fair::mq; -static map backwardsCompatibilityWaitForEndOfStateHelper = +static map backwardsCompatibilityWaitForEndOfStateHelper = { - { fair::mq::Transition::InitDevice, fair::mq::State::InitializingDevice }, - { fair::mq::Transition::CompleteInit, fair::mq::State::Initialized }, - { fair::mq::Transition::Bind, fair::mq::State::Bound }, - { fair::mq::Transition::Connect, fair::mq::State::DeviceReady }, - { fair::mq::Transition::InitTask, fair::mq::State::Ready }, - { fair::mq::Transition::Run, fair::mq::State::Ready }, - { fair::mq::Transition::Stop, fair::mq::State::Ready }, - { fair::mq::Transition::ResetTask, fair::mq::State::DeviceReady }, - { fair::mq::Transition::ResetDevice, fair::mq::State::Idle } + { Transition::InitDevice, State::InitializingDevice }, + { Transition::CompleteInit, State::Initialized }, + { Transition::Bind, State::Bound }, + { Transition::Connect, State::DeviceReady }, + { Transition::InitTask, State::Ready }, + { Transition::Run, State::Ready }, + { Transition::Stop, State::Ready }, + { Transition::ResetTask, State::DeviceReady }, + { Transition::ResetDevice, State::Idle } }; -static map backwardsCompatibilityChangeStateHelper = +static map backwardsCompatibilityChangeStateHelper = { - { FairMQDevice::Event::INIT_DEVICE, fair::mq::Transition::InitDevice }, - { FairMQDevice::Event::internal_DEVICE_READY, fair::mq::Transition::Auto }, - { FairMQDevice::Event::INIT_TASK, fair::mq::Transition::InitTask }, - { FairMQDevice::Event::internal_READY, fair::mq::Transition::Auto }, - { FairMQDevice::Event::RUN, fair::mq::Transition::Run }, - { FairMQDevice::Event::STOP, fair::mq::Transition::Stop }, - { FairMQDevice::Event::RESET_TASK, fair::mq::Transition::ResetTask }, - { FairMQDevice::Event::RESET_DEVICE, fair::mq::Transition::ResetDevice }, - { FairMQDevice::Event::internal_IDLE, fair::mq::Transition::Auto }, - { FairMQDevice::Event::END, fair::mq::Transition::End }, - { FairMQDevice::Event::ERROR_FOUND, fair::mq::Transition::ErrorFound } + { FairMQDevice::Event::INIT_DEVICE, Transition::InitDevice }, + { FairMQDevice::Event::internal_DEVICE_READY, Transition::Auto }, + { FairMQDevice::Event::INIT_TASK, Transition::InitTask }, + { FairMQDevice::Event::internal_READY, Transition::Auto }, + { FairMQDevice::Event::RUN, Transition::Run }, + { FairMQDevice::Event::STOP, Transition::Stop }, + { FairMQDevice::Event::RESET_TASK, Transition::ResetTask }, + { FairMQDevice::Event::RESET_DEVICE, Transition::ResetDevice }, + { FairMQDevice::Event::internal_IDLE, Transition::Auto }, + { FairMQDevice::Event::END, Transition::End }, + { FairMQDevice::Event::ERROR_FOUND, Transition::ErrorFound } }; FairMQDevice::FairMQDevice() @@ -65,21 +66,21 @@ FairMQDevice::FairMQDevice(FairMQProgOptions& config) { } -FairMQDevice::FairMQDevice(const fair::mq::tools::Version version) +FairMQDevice::FairMQDevice(const tools::Version version) : FairMQDevice(nullptr, version) { } -FairMQDevice::FairMQDevice(FairMQProgOptions& config, const fair::mq::tools::Version version) +FairMQDevice::FairMQDevice(FairMQProgOptions& config, const tools::Version version) : FairMQDevice(&config, version) { } -FairMQDevice::FairMQDevice(FairMQProgOptions* config, const fair::mq::tools::Version version) +FairMQDevice::FairMQDevice(FairMQProgOptions* config, const tools::Version version) : fTransportFactory(nullptr) , fTransports() , fChannels() - , fInternalConfig(config ? nullptr : fair::mq::tools::make_unique()) + , fInternalConfig(config ? nullptr : tools::make_unique()) , fConfig(config ? config : fInternalConfig.get()) , fId() , fDefaultTransportType(fair::mq::Transport::ZMQ) @@ -99,11 +100,11 @@ FairMQDevice::FairMQDevice(FairMQProgOptions* config, const fair::mq::tools::Ver , fMaxRunRuntimeInS(0) , fRawCmdLineArgs() { - SubscribeToNewTransition("device", [&](fair::mq::Transition transition) { + SubscribeToNewTransition("device", [&](Transition transition) { LOG(trace) << "device notified on new transition: " << transition; switch (transition) { - case fair::mq::Transition::Stop: + case Transition::Stop: UnblockTransports(); break; default: @@ -182,7 +183,7 @@ bool FairMQDevice::ChangeState(const int transition) return ChangeState(backwardsCompatibilityChangeStateHelper.at(transition)); } -void FairMQDevice::WaitForEndOfState(fair::mq::Transition transition) +void FairMQDevice::WaitForEndOfState(Transition transition) { WaitForState(backwardsCompatibilityWaitForEndOfStateHelper.at(transition)); } @@ -226,7 +227,7 @@ void FairMQDevice::InitWrapper() int subChannelIndex = 0; for (auto& vi : mi.second) { // set channel name: name + vector index - vi.fName = fair::mq::tools::ToString(mi.first, "[", subChannelIndex, "]"); + vi.fName = tools::ToString(mi.first, "[", subChannelIndex, "]"); // set channel transport LOG(debug) << "Initializing transport for channel " << vi.fName << ": " << fair::mq::TransportNames.at(vi.fTransportType); @@ -237,9 +238,9 @@ void FairMQDevice::InitWrapper() if (vi.fAddress == "unspecified" || vi.fAddress == "") { // if the configured network interface is default, get its name from the default route if (networkInterface == "default") { - networkInterface = fair::mq::tools::getDefaultRouteNetworkInterface(); + networkInterface = tools::getDefaultRouteNetworkInterface(); } - vi.fAddress = "tcp://" + fair::mq::tools::getInterfaceIP(networkInterface) + ":1"; + vi.fAddress = "tcp://" + tools::getInterfaceIP(networkInterface) + ":1"; } // fill the uninitialized list fUninitializedBindingChannels.push_back(&vi); @@ -251,14 +252,14 @@ void FairMQDevice::InitWrapper() fUninitializedConnectingChannels.push_back(&vi); } else { LOG(error) << "Cannot update configuration. Socket method (bind/connect) for channel '" << vi.fName << "' not specified."; - throw runtime_error(fair::mq::tools::ToString("Cannot update configuration. Socket method (bind/connect) for channel ", vi.fName, " not specified.")); + throw runtime_error(tools::ToString("Cannot update configuration. Socket method (bind/connect) for channel ", vi.fName, " not specified.")); } subChannelIndex++; } } - // ChangeState(fair::mq::Transition::Auto); + // ChangeState(Transition::Auto); } void FairMQDevice::BindWrapper() @@ -269,12 +270,12 @@ void FairMQDevice::BindWrapper() if (!fUninitializedBindingChannels.empty()) { LOG(error) << fUninitializedBindingChannels.size() << " of the binding channels could not initialize. Initial configuration incomplete."; - throw runtime_error(fair::mq::tools::ToString(fUninitializedBindingChannels.size(), " of the binding channels could not initialize. Initial configuration incomplete.")); + throw runtime_error(tools::ToString(fUninitializedBindingChannels.size(), " of the binding channels could not initialize. Initial configuration incomplete.")); } Bind(); - ChangeState(fair::mq::Transition::Auto); + ChangeState(Transition::Auto); } void FairMQDevice::ConnectWrapper() @@ -301,7 +302,7 @@ void FairMQDevice::ConnectWrapper() if (numAttempts++ > maxAttempts) { LOG(error) << "could not connect all channels after " << initializationTimeoutInS << " attempts"; - throw runtime_error(fair::mq::tools::ToString("could not connect all channels after ", initializationTimeoutInS, " attempts")); + throw runtime_error(tools::ToString("could not connect all channels after ", initializationTimeoutInS, " attempts")); } AttachChannels(fUninitializedConnectingChannels); @@ -313,7 +314,7 @@ void FairMQDevice::ConnectWrapper() Connect(); - ChangeState(fair::mq::Transition::Auto); + ChangeState(Transition::Auto); } void FairMQDevice::AttachChannels(vector& chans) @@ -366,7 +367,7 @@ bool FairMQDevice::AttachChannel(FairMQChannel& chan) string hostPart = addressString.substr(0, pos); if (!(bind && hostPart == "*")) { string portPart = addressString.substr(pos + 1); - string resolvedHost = fair::mq::tools::getIpFromHostname(hostPart); + string resolvedHost = tools::getIpFromHostname(hostPart); if (resolvedHost == "") { return false; } @@ -414,7 +415,7 @@ void FairMQDevice::InitTaskWrapper() { InitTask(); - ChangeState(fair::mq::Transition::Auto); + ChangeState(Transition::Auto); } bool FairMQDevice::SortSocketsByAddress(const FairMQChannel &lhs, const FairMQChannel &rhs) @@ -433,7 +434,7 @@ void FairMQDevice::SortChannel(const string& name, const bool reindex) for (auto vi = fChannels.at(name).begin(); vi != fChannels.at(name).end(); ++vi) { // set channel name: name + vector index - vi->fName = fair::mq::tools::ToString(name, "[", vi - fChannels.at(name).begin(), "]"); + vi->fName = tools::ToString(name, "[", vi - fChannels.at(name).begin(), "]"); } } } @@ -467,7 +468,7 @@ void FairMQDevice::RunWrapper() HandleMultipleChannelInput(); } } else { - fair::mq::tools::RateLimiter rateLimiter(fRate); + tools::RateLimiter rateLimiter(fRate); while (!NewStatePending() && ConditionalRun()) { if (fRate > 0.001) { @@ -481,7 +482,7 @@ void FairMQDevice::RunWrapper() // if Run() exited and the state is still RUNNING, transition to READY. if (!NewStatePending()) { UnblockTransports(); - ChangeState(fair::mq::Transition::Stop); + ChangeState(Transition::Stop); } PostRun(); @@ -489,10 +490,10 @@ void FairMQDevice::RunWrapper() } catch (const out_of_range& oor) { LOG(error) << "out of range: " << oor.what(); LOG(error) << "incorrect/incomplete channel configuration?"; - ChangeState(fair::mq::Transition::ErrorFound); + ChangeState(Transition::ErrorFound); throw; } catch (...) { - ChangeState(fair::mq::Transition::ErrorFound); + ChangeState(Transition::ErrorFound); throw; } @@ -664,7 +665,7 @@ void FairMQDevice::PollForTransport(const FairMQTransportFactory* factory, const catch (exception& e) { LOG(error) << "FairMQDevice::PollForTransport() failed: " << e.what() << ", going to ERROR state."; - throw runtime_error(fair::mq::tools::ToString("FairMQDevice::PollForTransport() failed: ", e.what(), ", going to ERROR state.")); + throw runtime_error(tools::ToString("FairMQDevice::PollForTransport() failed: ", e.what(), ", going to ERROR state.")); } } @@ -738,7 +739,7 @@ void FairMQDevice::LogSocketRates() filteredSockets.push_back(vi->fSocket.get()); logIntervals.push_back(vi->fRateLogging); intervalCounters.push_back(0); - filteredChannelNames.push_back(fair::mq::tools::ToString(mi.first, "[", vi - (mi.second).begin(), "]")); + filteredChannelNames.push_back(tools::ToString(mi.first, "[", vi - (mi.second).begin(), "]")); chanNameLen = max(chanNameLen, filteredChannelNames.back().length()); } } @@ -814,7 +815,7 @@ void FairMQDevice::LogSocketRates() t0 = t1; if (fMaxRunRuntimeInS > 0 && ++secondsElapsed >= fMaxRunRuntimeInS) { - ChangeState(fair::mq::Transition::Stop); + ChangeState(Transition::Stop); } } } @@ -830,7 +831,7 @@ void FairMQDevice::ResetTaskWrapper() { ResetTask(); - ChangeState(fair::mq::Transition::Auto); + ChangeState(Transition::Auto); } void FairMQDevice::ResetWrapper() @@ -850,7 +851,7 @@ void FairMQDevice::ResetWrapper() Reset(); - ChangeState(fair::mq::Transition::Auto); + ChangeState(Transition::Auto); } FairMQDevice::~FairMQDevice()