From 14d6d717a3369da73b1301a4ab58c9afef56d702 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Thu, 13 Feb 2020 17:05:38 +0100 Subject: [PATCH] Add qc example --- examples/CMakeLists.txt | 3 +- examples/qc/CMakeLists.txt | 71 ++++++++ examples/qc/ex-qc-topology.xml | 48 ++++++ examples/qc/fairmq-ex-qc-env.sh | 16 ++ examples/qc/fairmq-start-ex-qc.sh.in | 79 +++++++++ examples/qc/runQCConsumer.cxx | 26 +++ examples/qc/runQCProducer.cxx | 66 +++++++ examples/qc/runSampler.cxx | 55 ++++++ examples/qc/runSink.cxx | 51 ++++++ fairmq/plugins/DDS/CMakeLists.txt | 15 +- fairmq/plugins/DDS/runDDSCommandUInew.cxx | 199 ++++++++++++++++++++++ 11 files changed, 623 insertions(+), 6 deletions(-) create mode 100644 examples/qc/CMakeLists.txt create mode 100644 examples/qc/ex-qc-topology.xml create mode 100755 examples/qc/fairmq-ex-qc-env.sh create mode 100755 examples/qc/fairmq-start-ex-qc.sh.in create mode 100644 examples/qc/runQCConsumer.cxx create mode 100644 examples/qc/runQCProducer.cxx create mode 100644 examples/qc/runSampler.cxx create mode 100644 examples/qc/runSink.cxx create mode 100644 fairmq/plugins/DDS/runDDSCommandUInew.cxx diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index bd248d52..8348574e 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -15,7 +15,8 @@ add_subdirectory(multipart) add_subdirectory(multiple-channels) if(BUILD_NANOMSG_TRANSPORT) add_subdirectory(multiple-transports) -endif() + endif() +add_subdirectory(qc) add_subdirectory(readout) add_subdirectory(region) add_subdirectory(req-rep) diff --git a/examples/qc/CMakeLists.txt b/examples/qc/CMakeLists.txt new file mode 100644 index 00000000..0204a3ff --- /dev/null +++ b/examples/qc/CMakeLists.txt @@ -0,0 +1,71 @@ +################################################################################ +# Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # +# # +# This software is distributed under the terms of the # +# GNU Lesser General Public Licence (LGPL) version 3, # +# copied verbatim in the file "LICENSE" # +################################################################################ + +add_executable(fairmq-ex-qc-sampler runSampler.cxx) +target_link_libraries(fairmq-ex-qc-sampler PRIVATE FairMQ) + +add_executable(fairmq-ex-qc-producer runQCProducer.cxx) +target_link_libraries(fairmq-ex-qc-producer PRIVATE FairMQ) + +add_executable(fairmq-ex-qc-consumer runQCConsumer.cxx) +target_link_libraries(fairmq-ex-qc-consumer PRIVATE FairMQ) + +add_executable(fairmq-ex-qc-sink runSink.cxx) +target_link_libraries(fairmq-ex-qc-sink PRIVATE FairMQ) + +add_custom_target(ExampleQC DEPENDS fairmq-ex-qc-sampler fairmq-ex-qc-producer fairmq-ex-qc-consumer fairmq-ex-qc-sink) + +list(JOIN Boost_LIBRARY_DIRS ":" LIB_DIR) +set(BIN_DIR ${CMAKE_CURRENT_BINARY_DIR}:${CMAKE_BINARY_DIR}/fairmq/plugins/DDS) +set(DATA_DIR ${CMAKE_CURRENT_BINARY_DIR}) +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ex-qc-topology.xml ${CMAKE_CURRENT_BINARY_DIR}/ex-qc-topology.xml @ONLY) +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-ex-qc-env.sh ${CMAKE_CURRENT_BINARY_DIR}/fairmq-ex-qc-env.sh @ONLY) +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-qc.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-qc.sh @ONLY) + +# test +if(DDS_FOUND) +add_test(NAME Example.QC.localhost COMMAND ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-qc.sh localhost) +set_tests_properties(Example.QC.localhost PROPERTIES TIMEOUT 15 PASS_REGULAR_EXPRESSION "Example successful") +endif() + +# install +install( + TARGETS + fairmq-ex-qc-sampler + fairmq-ex-qc-producer + fairmq-ex-qc-consumer + fairmq-ex-qc-sink + + LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR} + RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR} +) + +# configure run script with different executable paths for build and for install directories +set(BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}) +set(DATA_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_DATADIR}) +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ex-qc-topology.xml ${CMAKE_CURRENT_BINARY_DIR}/ex-qc-topology.xml_install @ONLY) +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-ex-qc-env.sh ${CMAKE_CURRENT_BINARY_DIR}/fairmq-ex-qc-env.sh_install @ONLY) +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-qc.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-qc.sh_install @ONLY) + +install( + FILES ${CMAKE_CURRENT_BINARY_DIR}/ex-qc-topology.xml_install + DESTINATION ${PROJECT_INSTALL_DATADIR} + RENAME ex-qc-topology.xml +) + +install( + PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-ex-qc-env.sh_install + DESTINATION ${PROJECT_INSTALL_BINDIR} + RENAME fairmq-ex-qc-env.sh +) + +install( + PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-qc.sh_install + DESTINATION ${PROJECT_INSTALL_BINDIR} + RENAME fairmq-start-ex-qc.sh +) diff --git a/examples/qc/ex-qc-topology.xml b/examples/qc/ex-qc-topology.xml new file mode 100644 index 00000000..2fd2942d --- /dev/null +++ b/examples/qc/ex-qc-topology.xml @@ -0,0 +1,48 @@ + + + + + + + + fairmq-ex-qc-sampler --color false --channel-config name=data1,type=push,method=bind -P dds --max-iterations 1000 + fairmq-ex-qc-env.sh + + fmqchan_data1 + + + + + fairmq-ex-qc-producer --color false --channel-config name=data1,type=pull,method=connect name=data2,type=push,method=connect name=qc,type=push,method=connect -P dds + fairmq-ex-qc-env.sh + + fmqchan_data1 + fmqchan_data2 + fmqchan_qc + + + + + fairmq-ex-qc-consumer --color false --channel-config name=qc,type=pull,method=bind -P dds + fairmq-ex-qc-env.sh + + fmqchan_qc + + + + + fairmq-ex-qc-sink --color false --channel-config name=data2,type=pull,method=bind -P dds --max-iterations 1000 + fairmq-ex-qc-env.sh + + fmqchan_data2 + + + +
+ Sampler + QCProducer + QCConsumer + Sink +
+ +
diff --git a/examples/qc/fairmq-ex-qc-env.sh b/examples/qc/fairmq-ex-qc-env.sh new file mode 100755 index 00000000..6f2e45c5 --- /dev/null +++ b/examples/qc/fairmq-ex-qc-env.sh @@ -0,0 +1,16 @@ +#!/bin/bash + +################################################################################ +# Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # +# # +# This software is distributed under the terms of the # +# GNU Lesser General Public Licence (LGPL) version 3, # +# copied verbatim in the file "LICENSE" # +################################################################################ + +export PATH=@BIN_DIR@:$PATH + +OS=$(uname -s 2>&1) +if [ "$OS" == "Darwin" ]; then + export DYLD_LIBRARY_PATH=@LIB_DIR@:$DYLD_LIBRARY_PATH +fi diff --git a/examples/qc/fairmq-start-ex-qc.sh.in b/examples/qc/fairmq-start-ex-qc.sh.in new file mode 100755 index 00000000..5b9bf346 --- /dev/null +++ b/examples/qc/fairmq-start-ex-qc.sh.in @@ -0,0 +1,79 @@ +#!/bin/bash + +################################################################################ +# Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # +# # +# This software is distributed under the terms of the # +# GNU Lesser General Public Licence (LGPL) version 3, # +# copied verbatim in the file "LICENSE" # +################################################################################ + +# fairmq-start-ex-qc.sh -> submit agents with localhost plugin + +set -e + +cleanup() { + dds-session stop $1 + echo "CLEANUP PERFORMED" +} + +source @DDS_INSTALL_PREFIX@/DDS_env.sh +export PATH=@BIN_DIR@:$PATH + +exec 5>&1 +output=$(dds-session start | tee >(cat - >&5)) +export FAIRMQ_DDS_SESSION_ID=$(echo ${output} | grep "DDS session ID: " | cut -d' ' -f4) +echo "SESSION ID: ${FAIRMQ_DDS_SESSION_ID}" + +trap "cleanup ${FAIRMQ_DDS_SESSION_ID}" EXIT + +requiredNofSlots=4 +dds-submit -r localhost --slots ${requiredNofSlots} +echo "...waiting for ${requiredNofSlots} idle slots..." +dds-info --idle-count --wait ${requiredNofSlots} + +export FAIRMQ_DDS_TOPO_FILE=@DATA_DIR@/ex-qc-topology.xml +echo "TOPOLOGY FILE: ${FAIRMQ_DDS_TOPO_FILE}" +echo "TOPOLOGY NAME: $(dds-topology --disable-validation --topology-name ${FAIRMQ_DDS_TOPO_FILE})" + +dds-info --active-topology +dds-topology --activate ${FAIRMQ_DDS_TOPO_FILE} +dds-info --active-topology +echo "...waiting for ${requiredNofSlots} executing slots..." +dds-info --executing-count --wait ${requiredNofSlots} + +echo "------------------------" +echo "...waiting for Topology to finish..." +# TODO Retrieve number of devices from DDS topology API instead of having the user pass it explicitely +fairmq-dds-command-ui -w "IDLE" -n ${requiredNofSlots} +fairmq-dds-command-ui -c i -w "INITIALIZING DEVICE" -n ${requiredNofSlots} +fairmq-dds-command-ui -c k -w "INITIALIZED" -n ${requiredNofSlots} +fairmq-dds-command-ui -c b -w "BOUND" -n ${requiredNofSlots} +fairmq-dds-command-ui -c x -w "DEVICE READY" -n ${requiredNofSlots} +fairmq-dds-command-ui -c j -w "READY" -n ${requiredNofSlots} +fairmq-dds-command-ui -c r +sampler_and_sink="main/(Sampler|Sink)" +# processors="main/ProcessorGroup/Processor" +fairmq-dds-command-ui -p $sampler_and_sink -w "RUNNING->READY" -n 2 +echo "...$sampler_and_sink are READY, sending shutdown..." +fairmq-dds-command-ui -c s -w "RUNNING->READY" -n ${requiredNofSlots} +fairmq-dds-command-ui -c t -w "DEVICE READY" -n ${requiredNofSlots} +fairmq-dds-command-ui -c d -w "IDLE" -n ${requiredNofSlots} +fairmq-dds-command-ui -c q -w "EXITING" -n ${requiredNofSlots} +echo "...waiting for ${requiredNofSlots} idle slots..." +dds-info --idle-count --wait ${requiredNofSlots} +echo "------------------------" + +dds-info --active-topology +dds-topology --stop +dds-info --active-topology + +dds-agent-cmd getlog -a +logDir="${wrkDir}/logs" +for file in $(find "${logDir}" -name "*.tar.gz"); do tar -xf ${file} -C "${logDir}" ; done +echo "AGENT LOG FILES IN: ${logDir}" + +# This string is used by ctest to detect success +echo "Example successful :)" + +# Cleanup function is called by EXIT trap diff --git a/examples/qc/runQCConsumer.cxx b/examples/qc/runQCConsumer.cxx new file mode 100644 index 00000000..af31808a --- /dev/null +++ b/examples/qc/runQCConsumer.cxx @@ -0,0 +1,26 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include "runFairMQDevice.h" +#include "FairMQDevice.h" + +class QCConsumer : public FairMQDevice +{ + public: + QCConsumer() + { + OnData("qc", [](FairMQMessagePtr& /*msg*/, int){ + LOG(info) << "received data"; + return true; + }); + } +}; + +namespace bpo = boost::program_options; +void addCustomOptions(bpo::options_description& /*options*/) {} +FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/) { return new QCConsumer(); } diff --git a/examples/qc/runQCProducer.cxx b/examples/qc/runQCProducer.cxx new file mode 100644 index 00000000..e3e5536f --- /dev/null +++ b/examples/qc/runQCProducer.cxx @@ -0,0 +1,66 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include "runFairMQDevice.h" +#include "FairMQDevice.h" + +class QCProducer : public FairMQDevice +{ + public: + QCProducer() + : fDoQC(false) + , fCounter(0) + , fInterval(100) + { + OnData("data1", &QCProducer::HandleData); + } + + void InitTask() override + { + GetConfig()->Subscribe("qc", [&](const std::string& key, std::string value) { + if (key == "qc") { + if (value == "active") { + fDoQC.store(true); + } else if (value == "inactive") { + fDoQC.store(false); + } + } + }); + } + + protected: + bool HandleData(FairMQMessagePtr& msg, int) + { + if (fDoQC.load() == true) { + if (++fCounter == fInterval) { + fCounter = 0; + FairMQMessagePtr msgCopy(NewMessage()); + msgCopy->Copy(*msg); + if (Send(msg, "qc") < 0) { + return false; + } + } + } + + if (Send(msg, "data2") < 0) { + return false; + } + + return true; + } + + void ResetTask() override { GetConfig()->Unsubscribe("qc"); } + + private: + std::atomic fDoQC; + int fCounter; + int fInterval; +}; + +void addCustomOptions(boost::program_options::options_description& /*options*/) {} +FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/) { return new QCProducer(); } diff --git a/examples/qc/runSampler.cxx b/examples/qc/runSampler.cxx new file mode 100644 index 00000000..403c3356 --- /dev/null +++ b/examples/qc/runSampler.cxx @@ -0,0 +1,55 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include "runFairMQDevice.h" +#include "FairMQDevice.h" + +#include // this_thread::sleep_for +#include + +class Sampler : public FairMQDevice +{ + public: + Sampler() + : fMaxIterations(0) + , fNumIterations(0) + {} + + protected: + uint64_t fMaxIterations; + uint64_t fNumIterations; + + virtual void InitTask() + { + fMaxIterations = fConfig->GetProperty("max-iterations"); + } + + virtual bool ConditionalRun() + { + FairMQMessagePtr msg(NewMessage(1000)); + + if (Send(msg, "data1") < 0) { + return false; + } else if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) { + LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state."; + return false; + } + + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + + return true; + } +}; + +namespace bpo = boost::program_options; +void addCustomOptions(bpo::options_description& options) +{ + options.add_options() + ("max-iterations", bpo::value()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)"); +} +FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/) { return new Sampler(); } diff --git a/examples/qc/runSink.cxx b/examples/qc/runSink.cxx new file mode 100644 index 00000000..c5920669 --- /dev/null +++ b/examples/qc/runSink.cxx @@ -0,0 +1,51 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include "runFairMQDevice.h" +#include "FairMQDevice.h" + +#include + +class Sink : public FairMQDevice +{ + public: + Sink() + : fMaxIterations(0) + , fNumIterations(0) + { + OnData("data2", &Sink::HandleData); + } + + protected: + virtual void InitTask() + { + fMaxIterations = fConfig->GetProperty("max-iterations"); + } + + bool HandleData(FairMQMessagePtr& /*msg*/, int /*index*/) + { + if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) { + LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state."; + return false; + } + + return true; + } + + private: + uint64_t fMaxIterations; + uint64_t fNumIterations; +}; + +namespace bpo = boost::program_options; +void addCustomOptions(bpo::options_description& options) +{ + options.add_options() + ("max-iterations", bpo::value()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)"); +} +FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/) { return new Sink(); } diff --git a/fairmq/plugins/DDS/CMakeLists.txt b/fairmq/plugins/DDS/CMakeLists.txt index 2b48d259..52555131 100644 --- a/fairmq/plugins/DDS/CMakeLists.txt +++ b/fairmq/plugins/DDS/CMakeLists.txt @@ -17,12 +17,17 @@ set_target_properties(${plugin} PROPERTIES LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/fairmq ) -set(exe fairmq-dds-command-ui) -add_executable(${exe} ${CMAKE_CURRENT_SOURCE_DIR}/runDDSCommandUI.cxx) -target_link_libraries(${exe} FairMQ Commands StateMachine DDS::dds_intercom_lib DDS::dds_protocol_lib) -target_include_directories(${exe} PRIVATE ${CMAKE_CURRENT_BINARY_DIR}) +set(exe1 fairmq-dds-command-ui) +add_executable(${exe1} ${CMAKE_CURRENT_SOURCE_DIR}/runDDSCommandUI.cxx) +target_link_libraries(${exe1} FairMQ Commands StateMachine DDS::dds_intercom_lib DDS::dds_protocol_lib) +target_include_directories(${exe1} PRIVATE ${CMAKE_CURRENT_BINARY_DIR}) -install(TARGETS ${plugin} ${exe} +set(exe2 fairmq-dds-command-ui-new) +add_executable(${exe2} ${CMAKE_CURRENT_SOURCE_DIR}/runDDSCommandUInew.cxx) +target_link_libraries(${exe2} FairMQ Commands SDK StateMachine DDS::dds_intercom_lib DDS::dds_protocol_lib) +target_include_directories(${exe2} PRIVATE ${CMAKE_CURRENT_BINARY_DIR}) + +install(TARGETS ${plugin} ${exe1} ${exe2} EXPORT ${PROJECT_EXPORT_SET} LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR} RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR} diff --git a/fairmq/plugins/DDS/runDDSCommandUInew.cxx b/fairmq/plugins/DDS/runDDSCommandUInew.cxx new file mode 100644 index 00000000..066125b2 --- /dev/null +++ b/fairmq/plugins/DDS/runDDSCommandUInew.cxx @@ -0,0 +1,199 @@ +/******************************************************************************** + * Copyright (C) 2014-2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include +#include +#include + +#include + +#include // raw mode console input +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace std; +using namespace fair::mq; +using namespace fair::mq::sdk; +using namespace fair::mq::sdk::cmd; +namespace bpo = boost::program_options; + +struct TerminalConfig +{ + explicit TerminalConfig() + { + termios t; + tcgetattr(STDIN_FILENO, &t); // get the current terminal I/O structure + t.c_lflag &= ~ICANON; // disable canonical input + // t.c_lflag &= ~ECHO; // do not echo input chars + tcsetattr(STDIN_FILENO, TCSANOW, &t); // apply the new settings + } + + ~TerminalConfig() + { + termios t; + tcgetattr(STDIN_FILENO, &t); // get the current terminal I/O structure + t.c_lflag |= ICANON; // re-enable canonical input + // t.c_lflag |= ECHO; // echo input chars + tcsetattr(STDIN_FILENO, TCSANOW, &t); // apply the new settings + } +}; + +void printControlsHelp() +{ + cout << "Use keys to control the devices:" << endl; + cout << "[c] check states, [o] dump config, [h] help, [r] run, [s] stop, [t] reset task, [d] reset device, [q] end, [j] init task, [i] init device, [k] complete init, [b] bind, [x] connect" << endl; + cout << "To quit press Ctrl+C" << endl; +} + +void sendCommand(const string& commandIn, const string& topologyPath, Topology& topo) +{ + char c; + string command(commandIn); + TerminalConfig tconfig; + + if (command == "") { + printControlsHelp(); + cin >> c; + command = c; + } + + while (true) { + if (command == "c") { + cout << "> checking state of the devices" << endl; + topo.GetCurrentState(); + // TODO: extend me + } else if (command == "o") { + cout << "> dumping config of the devices" << endl; + auto const result = topo.GetProperties("^(session|id)$", topologyPath); + // TODO: extend me + } else if (command == "i") { + cout << "> init devices" << endl; + topo.ChangeState(TopologyTransition::InitDevice); + } else if (command == "k") { + cout << "> complete init" << endl; + topo.ChangeState(TopologyTransition::CompleteInit); + } else if (command == "b") { + cout << "> bind devices" << endl; + topo.ChangeState(TopologyTransition::Bind); + } else if (command == "x") { + cout << "> connect devices" << endl; + topo.ChangeState(TopologyTransition::Connect); + } else if (command == "j") { + cout << "> init tasks" << endl; + topo.ChangeState(TopologyTransition::InitTask); + } else if (command == "r") { + cout << "> run tasks" << endl; + topo.ChangeState(TopologyTransition::Run); + } else if (command == "s") { + cout << "> stop devices" << endl; + topo.ChangeState(TopologyTransition::Stop); + } else if (command == "t") { + cout << "> reset tasks" << endl; + topo.ChangeState(TopologyTransition::ResetTask); + } else if (command == "d") { + cout << "> reset devices" << endl; + topo.ChangeState(TopologyTransition::ResetDevice); + } else if (command == "h") { + cout << "> help" << endl; + printControlsHelp(); + } else if (command == "q") { + cout << "> end" << endl; + topo.ChangeState(TopologyTransition::End); + // TODO: extend me..? + } else { + cout << "\033[01;32mInvalid input: [" << c << "]\033[0m" << endl; + printControlsHelp(); + } + + if (commandIn != "") { + this_thread::sleep_for(chrono::milliseconds(100)); // give dds a chance to complete request + break; + } else { + cin >> c; + command = c; + } + } +} + +int main(int argc, char* argv[]) +try { + string topoFile; + string sessionID; + string command; + string topologyPath; + string targetState; + unsigned int timeout; + + bpo::options_description opts("Common options"); + + auto envSessionId = getenv("FAIRMQ_DDS_SESSION_ID"); + if (envSessionId) { + opts.add_options()("session,s", bpo::value(&sessionID)->default_value(envSessionId), "DDS Session ID (overrides any value in env var $FAIRMQ_DDS_SESSION_ID)"); + } else { + opts.add_options()("session,s", bpo::value(&sessionID)->required(), "DDS Session ID (overrides any value in env var $FAIRMQ_DDS_SESSION_ID)"); + } + + auto envTopoFile = getenv("FAIRMQ_DDS_TOPO_FILE"); + if (envTopoFile) { + opts.add_options()("topology-file,f", bpo::value(&topoFile)->default_value(envTopoFile), "DDS topology file path"); + } else { + opts.add_options()("topology-file,f", bpo::value(&topoFile)->required(), "DDS topology file path"); + } + + opts.add_options() + ("command,c", bpo::value(&command)->default_value(""), "Command character") + ("path,p", bpo::value(&topologyPath)->default_value(""), "DDS Topology path to send command to (empty - send to all tasks)") + ("wait-for-state,w", bpo::value(&targetState)->default_value(""), "Wait until targeted FairMQ devices reach the given state") + ("timeout,t", bpo::value(&timeout)->default_value(0), "Timeout in milliseconds when waiting for a device state (0 - wait infinitely)") + ("help,h", "Produce help message"); + + bpo::variables_map vm; + bpo::store(bpo::command_line_parser(argc, argv).options(opts).run(), vm); + + if (vm.count("help")) { + cout << "FairMQ DDS Command UI" << endl << opts << endl; + cout << "Commands: [c] check state, [o] dump config, [h] help, [r] run, [s] stop, [t] reset task, [d] reset device, [q] end, [j] init task, [i] init device, [k] complete init, [b] bind, [x] connect" << endl; + return EXIT_SUCCESS; + } + + bpo::notify(vm); + + DDSEnvironment env; + DDSSession session(sessionID, env); + DDSTopology ddsTopo(DDSTopology::Path(topoFile), env); + + int n = ddsTopo.GetNumRequiredAgents(); + cout << "Number of required agents/slots: " << n << endl; + cout << "creating Topology" << endl; + + Topology topo(ddsTopo, session); + for (auto transition : { TopologyTransition::InitDevice, + TopologyTransition::CompleteInit, + TopologyTransition::Bind, + TopologyTransition::Connect, + TopologyTransition::InitTask, + TopologyTransition::Run, + TopologyTransition::Stop, + TopologyTransition::ResetTask, + TopologyTransition::ResetDevice, + TopologyTransition::End }) { + topo.ChangeState(transition); + } + + cout << "Finishing..." << endl; + return EXIT_SUCCESS; +} catch (exception& e) { + cerr << "Error: " << e.what() << endl; + return EXIT_FAILURE; +}