diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 9bef85f2..3856dab9 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -16,6 +16,7 @@ add_subdirectory(multiple-channels) if(BUILD_NANOMSG_TRANSPORT) add_subdirectory(multiple-transports) endif() +add_subdirectory(n-m) add_subdirectory(qc) add_subdirectory(readout) add_subdirectory(region) diff --git a/examples/README.md b/examples/README.md index 19b9a1fb..762b6ee7 100644 --- a/examples/README.md +++ b/examples/README.md @@ -34,6 +34,10 @@ This example demonstrates how to work with multiple channels and multiplex betwe This examples shows how to combine different channel transports (zeromq/nanomsg/shmem) inside of one device and/or topology. +## n-m + +A topology consisting of three layers of devices: synchronizer -> n * senders -> m * receivers. + ## QC A topology consisting of 4 devices - Sampler, QCProducer, QCConsumer and Sink. The data flows from Sampler through QCProducer to Sink. On demand - by setting the corresponding configuration property - the QCProducer device will duplicate the data to the QCConsumer device. The property is set by the topology controller, in this example this is the `fairmq-dds-command-ui` utility. diff --git a/examples/n-m/CMakeLists.txt b/examples/n-m/CMakeLists.txt new file mode 100644 index 00000000..eb2970cd --- /dev/null +++ b/examples/n-m/CMakeLists.txt @@ -0,0 +1,99 @@ +################################################################################ +# Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # +# # +# This software is distributed under the terms of the # +# GNU Lesser General Public Licence (LGPL) version 3, # +# copied verbatim in the file "LICENSE" # +################################################################################ + +add_executable(fairmq-ex-n-m-synchronizer runSynchronizer.cxx) +target_link_libraries(fairmq-ex-n-m-synchronizer PRIVATE FairMQ) + +add_executable(fairmq-ex-n-m-sender runSender.cxx) +target_link_libraries(fairmq-ex-n-m-sender PRIVATE FairMQ) + +add_executable(fairmq-ex-n-m-receiver runReceiver.cxx) +target_link_libraries(fairmq-ex-n-m-receiver PRIVATE FairMQ) + +add_custom_target(ExampleNM DEPENDS fairmq-ex-n-m-synchronizer fairmq-ex-n-m-sender fairmq-ex-n-m-receiver) + +list(JOIN Boost_LIBRARY_DIRS ":" LIB_DIR) +set(BIN_DIR ${CMAKE_CURRENT_BINARY_DIR}:${CMAKE_BINARY_DIR}/fairmq/plugins/DDS) +set(DATA_DIR ${CMAKE_CURRENT_BINARY_DIR}) +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ex-n-m-topology.xml ${CMAKE_CURRENT_BINARY_DIR}/ex-n-m-topology.xml @ONLY) +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ex-n-m-pair-topology.xml ${CMAKE_CURRENT_BINARY_DIR}/ex-n-m-pair-topology.xml @ONLY) +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-ex-n-m-env.sh ${CMAKE_CURRENT_BINARY_DIR}/fairmq-ex-n-m-env.sh @ONLY) +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-n-m.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-n-m.sh @ONLY) +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-n-m-pair.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-n-m-pair.sh @ONLY) +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-n-m-dds.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-n-m-dds.sh @ONLY) +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-n-m-pair-dds.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-n-m-pair-dds.sh @ONLY) + +# test +if(DDS_FOUND) + add_test(NAME Example.N-M.localhost COMMAND ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-n-m-dds.sh localhost) + set_tests_properties(Example.N-M.localhost PROPERTIES TIMEOUT 15 RUN_SERIAL true PASS_REGULAR_EXPRESSION "Example successful") + add_test(NAME Example.N-M-pair.localhost COMMAND ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-n-m-pair-dds.sh localhost) + set_tests_properties(Example.N-M-pair.localhost PROPERTIES TIMEOUT 15 RUN_SERIAL true PASS_REGULAR_EXPRESSION "Example successful") +endif() + +# install +install( + TARGETS + fairmq-ex-n-m-synchronizer + fairmq-ex-n-m-sender + fairmq-ex-n-m-receiver + + LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR} + RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR} +) + +# configure run script with different executable paths for build and for install directories +set(BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}) +set(DATA_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_DATADIR}) +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ex-n-m-topology.xml ${CMAKE_CURRENT_BINARY_DIR}/ex-n-m-topology.xml_install @ONLY) +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ex-n-m-pair-topology.xml ${CMAKE_CURRENT_BINARY_DIR}/ex-n-m-pair-topology.xml_install @ONLY) +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-ex-n-m-env.sh ${CMAKE_CURRENT_BINARY_DIR}/fairmq-ex-n-m-env.sh_install @ONLY) +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-n-m.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-n-m.sh_install @ONLY) +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-n-m-pair.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-n-m-pair.sh_install @ONLY) +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-n-m-dds.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-n-m-dds.sh_install @ONLY) +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-n-m-pair-dds.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-n-m-pair-dds.sh_install @ONLY) + +install( + FILES ${CMAKE_CURRENT_BINARY_DIR}/ex-n-m-topology.xml_install + DESTINATION ${PROJECT_INSTALL_DATADIR} + RENAME ex-n-m-topology.xml +) +install( + FILES ${CMAKE_CURRENT_BINARY_DIR}/ex-n-m-pair-topology.xml_install + DESTINATION ${PROJECT_INSTALL_DATADIR} + RENAME ex-n-m-pair-topology.xml +) + +install( + PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-ex-n-m-env.sh_install + DESTINATION ${PROJECT_INSTALL_BINDIR} + RENAME fairmq-ex-n-m-env.sh +) + +install( + PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-n-m.sh_install + DESTINATION ${PROJECT_INSTALL_BINDIR} + RENAME fairmq-start-ex-n-m.sh +) + +install( + PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-n-m-pair.sh_install + DESTINATION ${PROJECT_INSTALL_BINDIR} + RENAME fairmq-start-ex-n-m-pair.sh +) + +install( + PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-n-m-dds.sh_install + DESTINATION ${PROJECT_INSTALL_BINDIR} + RENAME fairmq-start-ex-n-m-dds.sh +) +install( + PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-n-m-pair-dds.sh_install + DESTINATION ${PROJECT_INSTALL_BINDIR} + RENAME fairmq-start-ex-n-m-pair-dds.sh +) diff --git a/examples/n-m/Header.h b/examples/n-m/Header.h new file mode 100644 index 00000000..b3526059 --- /dev/null +++ b/examples/n-m/Header.h @@ -0,0 +1,24 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +#ifndef FAIR_MQ_EXAMPLE_N_M_HEADER_H +#define FAIR_MQ_EXAMPLE_N_M_HEADER_H + +#include + +namespace example_n_m +{ + +struct Header +{ + std::uint16_t id; + int senderIndex; +}; + +} // namespace example_n_m + +#endif /* FAIR_MQ_EXAMPLE_N_M_HEADER_H */ diff --git a/examples/n-m/README.md b/examples/n-m/README.md new file mode 100644 index 00000000..618432a0 --- /dev/null +++ b/examples/n-m/README.md @@ -0,0 +1,4 @@ +N-M +========================== + +A topology consisting of three layers of devices: synchronizer -> sender(s) -> receiver(s). Senders distribute data to receivers based on the data id contained in the message from the synchronizer (same id goes to the same receiver from every sender). The senders send the data in a non-blocking fashion - if queue is full or receiver is down, data is discarded. Two configurations are provided - one using push/pull channels between senders/receivers, another using pair channels. In push/pull case there is only one receiving channel on the receiver device. In pair case there are as many receiver (sub-)channels as there are senders. diff --git a/examples/n-m/ex-n-m-pair-topology.xml b/examples/n-m/ex-n-m-pair-topology.xml new file mode 100644 index 00000000..3d88eacc --- /dev/null +++ b/examples/n-m/ex-n-m-pair-topology.xml @@ -0,0 +1,44 @@ + + + + + + + + + + fairmq-ex-n-m-synchronizer --id sync --rate 100 --color false -P dds --channel-config name=sync,type=pub,method=bind + fairmq-ex-n-m-env.sh + + fmqchan_sync + + + + + fairmq-ex-n-m-sender --id sender%taskIndex% --timeframe-size 100000 --num-receivers ${numReceivers} --color false -P dds --channel-config name=sync,type=sub,method=connect name=data,type=pair,method=connect,numSockets=${numReceivers} --dds-i data:%taskIndex% + fairmq-ex-n-m-env.sh + + fmqchan_sync + fmqchan_data + + + + + fairmq-ex-n-m-receiver --id receiver%taskIndex% --num-senders ${numSenders} --color false -P dds --max-timeframes 10 --channel-config name=data,type=pair,method=bind,numSockets=${numSenders} + fairmq-ex-n-m-env.sh + + fmqchan_data + + + +
+ Synchronizer + + Sender + + + Receiver + +
+ +
diff --git a/examples/n-m/ex-n-m-topology.xml b/examples/n-m/ex-n-m-topology.xml new file mode 100644 index 00000000..04c2769a --- /dev/null +++ b/examples/n-m/ex-n-m-topology.xml @@ -0,0 +1,44 @@ + + + + + + + + + + fairmq-ex-n-m-synchronizer --id sync --rate 100 --color false -P dds --channel-config name=sync,type=pub,method=bind + fairmq-ex-n-m-env.sh + + fmqchan_sync + + + + + fairmq-ex-n-m-sender --id sender%taskIndex% --timeframe-size 100000 --num-receivers ${numReceivers} --color false -P dds --channel-config name=sync,type=sub,method=connect name=data,type=push,method=connect,numSockets=${numReceivers} + fairmq-ex-n-m-env.sh + + fmqchan_sync + fmqchan_data + + + + + fairmq-ex-n-m-receiver --id receiver%taskIndex% --num-senders ${numSenders} --color false -P dds --max-timeframes 10 --channel-config name=data,type=pull,method=bind + fairmq-ex-n-m-env.sh + + fmqchan_data + + + +
+ Synchronizer + + Sender + + + Receiver + +
+ +
diff --git a/examples/n-m/fairmq-ex-n-m-env.sh b/examples/n-m/fairmq-ex-n-m-env.sh new file mode 100755 index 00000000..6f2e45c5 --- /dev/null +++ b/examples/n-m/fairmq-ex-n-m-env.sh @@ -0,0 +1,16 @@ +#!/bin/bash + +################################################################################ +# Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # +# # +# This software is distributed under the terms of the # +# GNU Lesser General Public Licence (LGPL) version 3, # +# copied verbatim in the file "LICENSE" # +################################################################################ + +export PATH=@BIN_DIR@:$PATH + +OS=$(uname -s 2>&1) +if [ "$OS" == "Darwin" ]; then + export DYLD_LIBRARY_PATH=@LIB_DIR@:$DYLD_LIBRARY_PATH +fi diff --git a/examples/n-m/fairmq-start-ex-n-m-dds.sh.in b/examples/n-m/fairmq-start-ex-n-m-dds.sh.in new file mode 100755 index 00000000..f6e2998a --- /dev/null +++ b/examples/n-m/fairmq-start-ex-n-m-dds.sh.in @@ -0,0 +1,76 @@ +#!/bin/bash + +################################################################################ +# Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # +# # +# This software is distributed under the terms of the # +# GNU Lesser General Public Licence (LGPL) version 3, # +# copied verbatim in the file "LICENSE" # +################################################################################ + +set -e + +cleanup() { + dds-session stop $1 + echo "CLEANUP PERFORMED" +} + +source @DDS_INSTALL_PREFIX@/DDS_env.sh +export PATH=@BIN_DIR@:$PATH + +exec 5>&1 +output=$(dds-session start | tee >(cat - >&5)) +export DDS_SESSION_ID=$(echo ${output} | grep "DDS session ID: " | cut -d' ' -f4) +echo "SESSION ID: ${DDS_SESSION_ID}" + +trap "cleanup ${DDS_SESSION_ID}" EXIT + +requiredNofSlots=8 +dds-submit -r localhost --slots ${requiredNofSlots} +echo "...waiting for ${requiredNofSlots} idle slots..." +dds-info --idle-count --wait ${requiredNofSlots} + +export FAIRMQ_DDS_TOPO_FILE=@DATA_DIR@/ex-n-m-topology.xml +echo "TOPOLOGY FILE: ${FAIRMQ_DDS_TOPO_FILE}" +echo "TOPOLOGY NAME: $(dds-topology --disable-validation --topology-name ${FAIRMQ_DDS_TOPO_FILE})" + +dds-info --active-topology +dds-topology --activate ${FAIRMQ_DDS_TOPO_FILE} +dds-info --active-topology +echo "...waiting for ${requiredNofSlots} executing slots..." +dds-info --executing-count --wait ${requiredNofSlots} + +echo "------------------------" +echo "...waiting for Topology to finish..." +# TODO Retrieve number of devices from DDS topology API instead of having the user pass it explicitely +fairmq-dds-command-ui -w "IDLE" +fairmq-dds-command-ui -c i +fairmq-dds-command-ui -c k +fairmq-dds-command-ui -c b +fairmq-dds-command-ui -c x +fairmq-dds-command-ui -c j +fairmq-dds-command-ui -c r +receivers="main/Receivers.*" +fairmq-dds-command-ui -w "RUNNING->READY" -p $receivers +echo "All receivers transitioned from RUNNING to READY" +fairmq-dds-command-ui -c s +fairmq-dds-command-ui -c t +fairmq-dds-command-ui -c d +fairmq-dds-command-ui -c q +echo "...waiting for ${requiredNofSlots} idle slots..." +dds-info --idle-count --wait ${requiredNofSlots} +echo "------------------------" + +dds-info --active-topology +dds-topology --stop +dds-info --active-topology + +dds-agent-cmd getlog -a +logDir="${wrkDir}/logs" +for file in $(find "${logDir}" -name "*.tar.gz"); do tar -xf ${file} -C "${logDir}" ; done +echo "AGENT LOG FILES IN: ${logDir}" + +# This string is used by ctest to detect success +echo "Example successful :)" + +# Cleanup function is called by EXIT trap diff --git a/examples/n-m/fairmq-start-ex-n-m-pair-dds.sh.in b/examples/n-m/fairmq-start-ex-n-m-pair-dds.sh.in new file mode 100755 index 00000000..a65e3273 --- /dev/null +++ b/examples/n-m/fairmq-start-ex-n-m-pair-dds.sh.in @@ -0,0 +1,76 @@ +#!/bin/bash + +################################################################################ +# Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # +# # +# This software is distributed under the terms of the # +# GNU Lesser General Public Licence (LGPL) version 3, # +# copied verbatim in the file "LICENSE" # +################################################################################ + +set -e + +cleanup() { + dds-session stop $1 + echo "CLEANUP PERFORMED" +} + +source @DDS_INSTALL_PREFIX@/DDS_env.sh +export PATH=@BIN_DIR@:$PATH + +exec 5>&1 +output=$(dds-session start | tee >(cat - >&5)) +export DDS_SESSION_ID=$(echo ${output} | grep "DDS session ID: " | cut -d' ' -f4) +echo "SESSION ID: ${DDS_SESSION_ID}" + +trap "cleanup ${DDS_SESSION_ID}" EXIT + +requiredNofSlots=8 +dds-submit -r localhost --slots ${requiredNofSlots} +echo "...waiting for ${requiredNofSlots} idle slots..." +dds-info --idle-count --wait ${requiredNofSlots} + +export FAIRMQ_DDS_TOPO_FILE=@DATA_DIR@/ex-n-m-pair-topology.xml +echo "TOPOLOGY FILE: ${FAIRMQ_DDS_TOPO_FILE}" +echo "TOPOLOGY NAME: $(dds-topology --disable-validation --topology-name ${FAIRMQ_DDS_TOPO_FILE})" + +dds-info --active-topology +dds-topology --activate ${FAIRMQ_DDS_TOPO_FILE} +dds-info --active-topology +echo "...waiting for ${requiredNofSlots} executing slots..." +dds-info --executing-count --wait ${requiredNofSlots} + +echo "------------------------" +echo "...waiting for Topology to finish..." +# TODO Retrieve number of devices from DDS topology API instead of having the user pass it explicitely +fairmq-dds-command-ui -w "IDLE" +fairmq-dds-command-ui -c i +fairmq-dds-command-ui -c k +fairmq-dds-command-ui -c b +fairmq-dds-command-ui -c x +fairmq-dds-command-ui -c j +fairmq-dds-command-ui -c r +receivers="main/Receivers.*" +fairmq-dds-command-ui -w "RUNNING->READY" -p $receivers +echo "All receivers transitioned from RUNNING to READY" +fairmq-dds-command-ui -c s +fairmq-dds-command-ui -c t +fairmq-dds-command-ui -c d +fairmq-dds-command-ui -c q +echo "...waiting for ${requiredNofSlots} idle slots..." +dds-info --idle-count --wait ${requiredNofSlots} +echo "------------------------" + +dds-info --active-topology +dds-topology --stop +dds-info --active-topology + +dds-agent-cmd getlog -a +logDir="${wrkDir}/logs" +for file in $(find "${logDir}" -name "*.tar.gz"); do tar -xf ${file} -C "${logDir}" ; done +echo "AGENT LOG FILES IN: ${logDir}" + +# This string is used by ctest to detect success +echo "Example successful :)" + +# Cleanup function is called by EXIT trap diff --git a/examples/n-m/fairmq-start-ex-n-m-pair.sh.in b/examples/n-m/fairmq-start-ex-n-m-pair.sh.in new file mode 100755 index 00000000..fffe5ea7 --- /dev/null +++ b/examples/n-m/fairmq-start-ex-n-m-pair.sh.in @@ -0,0 +1,60 @@ +#!/bin/bash + +export PATH=@BIN_DIR@:$PATH + +SYNC="fairmq-ex-n-m-synchronizer" +SYNC+=" --id Sync" +SYNC+=" --channel-config name=sync,type=pub,method=bind,address=tcp://localhost:8010" +SYNC+=" --rate 100" +xterm -geometry 80x25+0+0 -hold -e $SYNC & + +SENDER0="fairmq-ex-n-m-sender" +SENDER0+=" --id Sender1" +SENDER0+=" --channel-config name=sync,type=sub,method=connect,address=tcp://localhost:8010" +SENDER0+=" name=data,type=pair,method=connect,address=tcp://localhost:8021,address=tcp://localhost:8022,address=tcp://localhost:8023,address=tcp://localhost:8024" +SENDER0+=" --sender-index 0" +SENDER0+=" --subtimeframe-size 1000000" +SENDER0+=" --num-receivers 4" +xterm -geometry 80x25+500+0 -hold -e $SENDER0 & + +SENDER1="fairmq-ex-n-m-sender" +SENDER1+=" --id Sender2" +SENDER1+=" --channel-config name=sync,type=sub,method=connect,address=tcp://localhost:8010" +SENDER1+=" name=data,type=pair,method=connect,address=tcp://localhost:8031,address=tcp://localhost:8032,address=tcp://localhost:8033,address=tcp://localhost:8034" +SENDER1+=" --sender-index 1" +SENDER1+=" --subtimeframe-size 1000000" +SENDER1+=" --num-receivers 4" +xterm -geometry 80x25+500+350 -hold -e $SENDER1 & + +SENDER2="fairmq-ex-n-m-sender" +SENDER2+=" --id Sender3" +SENDER2+=" --channel-config name=sync,type=sub,method=connect,address=tcp://localhost:8010" +SENDER2+=" name=data,type=pair,method=connect,address=tcp://localhost:8041,address=tcp://localhost:8042,address=tcp://localhost:8043,address=tcp://localhost:8044" +SENDER2+=" --sender-index 2" +SENDER2+=" --subtimeframe-size 1000000" +SENDER2+=" --num-receivers 4" +xterm -geometry 80x25+500+700 -hold -e $SENDER2 & + +RECEIVER0="fairmq-ex-n-m-receiver" +RECEIVER0+=" --id Receiver1" +RECEIVER0+=" --channel-config name=data,type=pair,method=bind,address=tcp://localhost:8021,address=tcp://localhost:8031,address=tcp://localhost:8041" +RECEIVER0+=" --num-senders 3" +xterm -geometry 80x25+1000+0 -hold -e $RECEIVER0 & + +RECEIVER1="fairmq-ex-n-m-receiver" +RECEIVER1+=" --id Receiver2" +RECEIVER1+=" --channel-config name=data,type=pair,method=bind,address=tcp://localhost:8022,address=tcp://localhost:8032,address=tcp://localhost:8042" +RECEIVER1+=" --num-senders 3" +xterm -geometry 80x25+1000+350 -hold -e $RECEIVER1 & + +RECEIVER2="fairmq-ex-n-m-receiver" +RECEIVER2+=" --id Receiver3" +RECEIVER2+=" --channel-config name=data,type=pair,method=bind,address=tcp://localhost:8023,address=tcp://localhost:8033,address=tcp://localhost:8043" +RECEIVER2+=" --num-senders 3" +xterm -geometry 80x25+1000+700 -hold -e $RECEIVER2 & + +RECEIVER3="fairmq-ex-n-m-receiver" +RECEIVER3+=" --id Receiver4" +RECEIVER3+=" --channel-config name=data,type=pair,method=bind,address=tcp://localhost:8024,address=tcp://localhost:8034,address=tcp://localhost:8044" +RECEIVER3+=" --num-senders 3" +xterm -geometry 80x25+1000+1050 -hold -e $RECEIVER3 & diff --git a/examples/n-m/fairmq-start-ex-n-m.sh.in b/examples/n-m/fairmq-start-ex-n-m.sh.in new file mode 100755 index 00000000..d0e3e4f4 --- /dev/null +++ b/examples/n-m/fairmq-start-ex-n-m.sh.in @@ -0,0 +1,60 @@ +#!/bin/bash + +export PATH=@BIN_DIR@:$PATH + +SYNC="fairmq-ex-n-m-synchronizer" +SYNC+=" --id Sync" +SYNC+=" --channel-config name=sync,type=pub,method=bind,address=tcp://localhost:8010" +SYNC+=" --rate 100" +xterm -geometry 80x25+0+0 -hold -e $SYNC & + +SENDER0="fairmq-ex-n-m-sender" +SENDER0+=" --id Sender1" +SENDER0+=" --channel-config name=sync,type=sub,method=connect,address=tcp://localhost:8010" +SENDER0+=" name=data,type=push,method=connect,address=tcp://localhost:8021,address=tcp://localhost:8022,address=tcp://localhost:8023,address=tcp://localhost:8024" +SENDER0+=" --sender-index 0" +SENDER0+=" --subtimeframe-size 1000000" +SENDER0+=" --num-receivers 4" +xterm -geometry 80x25+500+0 -hold -e $SENDER0 & + +SENDER1="fairmq-ex-n-m-sender" +SENDER1+=" --id Sender2" +SENDER1+=" --channel-config name=sync,type=sub,method=connect,address=tcp://localhost:8010" +SENDER1+=" name=data,type=push,method=connect,address=tcp://localhost:8021,address=tcp://localhost:8022,address=tcp://localhost:8023,address=tcp://localhost:8024" +SENDER1+=" --sender-index 1" +SENDER1+=" --subtimeframe-size 1000000" +SENDER1+=" --num-receivers 4" +xterm -geometry 80x25+500+350 -hold -e $SENDER1 & + +SENDER2="fairmq-ex-n-m-sender" +SENDER2+=" --id Sender3" +SENDER2+=" --channel-config name=sync,type=sub,method=connect,address=tcp://localhost:8010" +SENDER2+=" name=data,type=push,method=connect,address=tcp://localhost:8021,address=tcp://localhost:8022,address=tcp://localhost:8023,address=tcp://localhost:8024" +SENDER2+=" --sender-index 2" +SENDER2+=" --subtimeframe-size 1000000" +SENDER2+=" --num-receivers 4" +xterm -geometry 80x25+500+700 -hold -e $SENDER2 & + +RECEIVER0="fairmq-ex-n-m-receiver" +RECEIVER0+=" --id Receiver1" +RECEIVER0+=" --channel-config name=data,type=pull,method=bind,address=tcp://localhost:8021" +RECEIVER0+=" --num-senders 3" +xterm -geometry 80x25+1000+0 -hold -e $RECEIVER0 & + +RECEIVER1="fairmq-ex-n-m-receiver" +RECEIVER1+=" --id Receiver2" +RECEIVER1+=" --channel-config name=data,type=pull,method=bind,address=tcp://localhost:8022" +RECEIVER1+=" --num-senders 3" +xterm -geometry 80x25+1000+350 -hold -e $RECEIVER1 & + +RECEIVER2="fairmq-ex-n-m-receiver" +RECEIVER2+=" --id Receiver3" +RECEIVER2+=" --channel-config name=data,type=pull,method=bind,address=tcp://localhost:8023" +RECEIVER2+=" --num-senders 3" +xterm -geometry 80x25+1000+700 -hold -e $RECEIVER2 & + +RECEIVER3="fairmq-ex-n-m-receiver" +RECEIVER3+=" --id Receiver4" +RECEIVER3+=" --channel-config name=data,type=pull,method=bind,address=tcp://localhost:8024" +RECEIVER3+=" --num-senders 3" +xterm -geometry 80x25+1000+1050 -hold -e $RECEIVER3 & diff --git a/examples/n-m/runReceiver.cxx b/examples/n-m/runReceiver.cxx new file mode 100644 index 00000000..a1ea4b8b --- /dev/null +++ b/examples/n-m/runReceiver.cxx @@ -0,0 +1,119 @@ +/******************************************************************************** + * Copyright (C) 2020 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include "Header.h" + +#include +#include + +#include +#include +#include +#include +#include +#include + +using namespace std; +using namespace example_n_m; +namespace bpo = boost::program_options; + +struct TFBuffer +{ + FairMQParts parts; + chrono::steady_clock::time_point start; + chrono::steady_clock::time_point end; +}; + +class Receiver : public FairMQDevice +{ + public: + Receiver() + : fBuffer() + , fDiscardedSet() + , fNumSenders(0) + , fBufferTimeoutInMs(5000) + , fMaxTimeframes(0) + , fTimeframeCounter(0) + { + OnData("data", &Receiver::HandleData); + } + + ~Receiver() = default; + + void InitTask() override + { + fNumSenders = GetConfig()->GetValue("num-senders"); + fBufferTimeoutInMs = GetConfig()->GetValue("buffer-timeout"); + fMaxTimeframes = GetConfig()->GetValue("max-timeframes"); + } + + protected: + bool HandleData(FairMQParts& parts, int /* index */) + { + Header& h = *(static_cast(parts.At(0)->GetData())); + // LOG(info) << "Received sub-time frame #" << h.id << " from Sender" << h.senderIndex; + + if (fDiscardedSet.find(h.id) == fDiscardedSet.end()) { + if (fBuffer.find(h.id) == fBuffer.end()) { + // if this is the first part with this ID, save the receive time. + fBuffer[h.id].start = chrono::steady_clock::now(); + } + // if the received ID has not previously been discarded, store the data part in the buffer + fBuffer[h.id].parts.AddPart(move(parts.At(1))); + } else { + // if received ID has been previously discarded. + LOG(debug) << "Received part from an already discarded timeframe with id " << h.id; + } + + if (fBuffer[h.id].parts.Size() == fNumSenders) { + LOG(info) << "Successfully completed timeframe #" << h.id; + fBuffer.erase(h.id); + + if (fMaxTimeframes > 0 && ++fTimeframeCounter >= fMaxTimeframes) { + LOG(info) << "Reached configured maximum number of timeframes (" << fMaxTimeframes << "). Exiting RUNNING state."; + return false; + } + } + + return true; + } + + void DiscardIncompleteTimeframes() + { + auto it = fBuffer.begin(); + while (it != fBuffer.end()) { + if (chrono::duration_cast(chrono::steady_clock::now() - (it->second).start).count() > fBufferTimeoutInMs) { + LOG(debug) << "Timeframe #" << it->first << " incomplete after " << fBufferTimeoutInMs << " milliseconds, discarding"; + fDiscardedSet.insert(it->first); + fBuffer.erase(it++); + LOG(debug) << "Number of discarded timeframes: " << fDiscardedSet.size(); + } else { + // LOG(info) << "Timeframe #" << it->first << " within timeout, buffering..."; + ++it; + } + } + } + + unordered_map fBuffer; + unordered_set fDiscardedSet; + + int fNumSenders; + int fBufferTimeoutInMs; + int fMaxTimeframes; + int fTimeframeCounter; +}; + +void addCustomOptions(bpo::options_description& options) +{ + options.add_options() + ("buffer-timeout", bpo::value()->default_value(1000), "Buffer timeout in milliseconds") + ("num-senders", bpo::value()->required(), "Number of senders") + ("max-timeframes", bpo::value()->default_value(0), "Maximum number of timeframes to receive (0 - unlimited)"); +} + +FairMQDevice* getDevice(const FairMQProgOptions& /* config */) { return new Receiver(); } diff --git a/examples/n-m/runSender.cxx b/examples/n-m/runSender.cxx new file mode 100644 index 00000000..c51a438a --- /dev/null +++ b/examples/n-m/runSender.cxx @@ -0,0 +1,79 @@ +/******************************************************************************** + * Copyright (C) 2020 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include "Header.h" + +#include +#include + +#include + +using namespace std; +using namespace example_n_m; +namespace bpo = boost::program_options; + +class Sender : public FairMQDevice +{ + public: + Sender() + : fNumReceivers(0) + , fIndex(0) + , fSubtimeframeSize(10000) + {} + + ~Sender() = default; + + protected: + void InitTask() override + { + fIndex = GetConfig()->GetProperty("sender-index"); + fSubtimeframeSize = GetConfig()->GetProperty("subtimeframe-size"); + fNumReceivers = GetConfig()->GetProperty("num-receivers"); + } + + void Run() override + { + FairMQChannel& dataInChannel = fChannels.at("sync").at(0); + + while (!NewStatePending()) { + Header h; + FairMQMessagePtr id(NewMessage()); + if (dataInChannel.Receive(id) > 0) { + h.id = *(static_cast(id->GetData())); + h.senderIndex = fIndex; + } else { + continue; + } + + FairMQParts parts; + parts.AddPart(NewSimpleMessage(h)); + parts.AddPart(NewMessage(fSubtimeframeSize)); + + uint64_t currentDataId = h.id; + int direction = currentDataId % fNumReceivers; + + if (Send(parts, "data", direction, 0) < 0) { + LOG(debug) << "Failed to queue Subtimeframe #" << currentDataId << " to Receiver[" << direction << "]"; + } + } + } + + private: + int fNumReceivers; + unsigned int fIndex; + int fSubtimeframeSize; +}; + +void addCustomOptions(bpo::options_description& options) +{ + options.add_options() + ("sender-index", bpo::value()->default_value(0), "Sender Index") + ("subtimeframe-size", bpo::value()->default_value(1000), "Subtimeframe size in bytes") + ("num-receivers", bpo::value()->required(), "Number of EPNs"); +} +FairMQDevice* getDevice(const FairMQProgOptions& /* config */) { return new Sender(); } diff --git a/examples/n-m/runSynchronizer.cxx b/examples/n-m/runSynchronizer.cxx new file mode 100644 index 00000000..58627602 --- /dev/null +++ b/examples/n-m/runSynchronizer.cxx @@ -0,0 +1,46 @@ +/******************************************************************************** + * Copyright (C) 2020 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include +#include + +#include +#include + +using namespace std; +namespace bpo = boost::program_options; + +class Synchronizer : public FairMQDevice +{ + public: + Synchronizer() + : fTimeframeId(0) + {} + ~Synchronizer() = default; + + protected: + bool ConditionalRun() override + { + FairMQMessagePtr msg(NewSimpleMessage(fTimeframeId)); + + if (Send(msg, "sync") > 0) { + if (++fTimeframeId == UINT16_MAX - 1) { + fTimeframeId = 0; + } + } else { + return false; + } + + return true; + } + + uint16_t fTimeframeId; +}; + +void addCustomOptions(bpo::options_description& /* options */) {} +FairMQDevice* getDevice(const FairMQProgOptions& /* config */) { return new Synchronizer(); }