Add example and test of a n-to-m topology, incuding sub-channel use

This commit is contained in:
Alexey Rybalchenko 2020-03-11 14:41:18 +01:00 committed by Dennis Klein
parent 539b088ade
commit 24aabdb854
15 changed files with 752 additions and 0 deletions

View File

@ -16,6 +16,7 @@ add_subdirectory(multiple-channels)
if(BUILD_NANOMSG_TRANSPORT) if(BUILD_NANOMSG_TRANSPORT)
add_subdirectory(multiple-transports) add_subdirectory(multiple-transports)
endif() endif()
add_subdirectory(n-m)
add_subdirectory(qc) add_subdirectory(qc)
add_subdirectory(readout) add_subdirectory(readout)
add_subdirectory(region) add_subdirectory(region)

View File

@ -34,6 +34,10 @@ This example demonstrates how to work with multiple channels and multiplex betwe
This examples shows how to combine different channel transports (zeromq/nanomsg/shmem) inside of one device and/or topology. This examples shows how to combine different channel transports (zeromq/nanomsg/shmem) inside of one device and/or topology.
## n-m
A topology consisting of three layers of devices: synchronizer -> n * senders -> m * receivers.
## QC ## QC
A topology consisting of 4 devices - Sampler, QCProducer, QCConsumer and Sink. The data flows from Sampler through QCProducer to Sink. On demand - by setting the corresponding configuration property - the QCProducer device will duplicate the data to the QCConsumer device. The property is set by the topology controller, in this example this is the `fairmq-dds-command-ui` utility. A topology consisting of 4 devices - Sampler, QCProducer, QCConsumer and Sink. The data flows from Sampler through QCProducer to Sink. On demand - by setting the corresponding configuration property - the QCProducer device will duplicate the data to the QCConsumer device. The property is set by the topology controller, in this example this is the `fairmq-dds-command-ui` utility.

View File

@ -0,0 +1,99 @@
################################################################################
# Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# #
# This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, #
# copied verbatim in the file "LICENSE" #
################################################################################
add_executable(fairmq-ex-n-m-synchronizer runSynchronizer.cxx)
target_link_libraries(fairmq-ex-n-m-synchronizer PRIVATE FairMQ)
add_executable(fairmq-ex-n-m-sender runSender.cxx)
target_link_libraries(fairmq-ex-n-m-sender PRIVATE FairMQ)
add_executable(fairmq-ex-n-m-receiver runReceiver.cxx)
target_link_libraries(fairmq-ex-n-m-receiver PRIVATE FairMQ)
add_custom_target(ExampleNM DEPENDS fairmq-ex-n-m-synchronizer fairmq-ex-n-m-sender fairmq-ex-n-m-receiver)
list(JOIN Boost_LIBRARY_DIRS ":" LIB_DIR)
set(BIN_DIR ${CMAKE_CURRENT_BINARY_DIR}:${CMAKE_BINARY_DIR}/fairmq/plugins/DDS)
set(DATA_DIR ${CMAKE_CURRENT_BINARY_DIR})
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ex-n-m-topology.xml ${CMAKE_CURRENT_BINARY_DIR}/ex-n-m-topology.xml @ONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ex-n-m-pair-topology.xml ${CMAKE_CURRENT_BINARY_DIR}/ex-n-m-pair-topology.xml @ONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-ex-n-m-env.sh ${CMAKE_CURRENT_BINARY_DIR}/fairmq-ex-n-m-env.sh @ONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-n-m.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-n-m.sh @ONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-n-m-pair.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-n-m-pair.sh @ONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-n-m-dds.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-n-m-dds.sh @ONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-n-m-pair-dds.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-n-m-pair-dds.sh @ONLY)
# test
if(DDS_FOUND)
add_test(NAME Example.N-M.localhost COMMAND ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-n-m-dds.sh localhost)
set_tests_properties(Example.N-M.localhost PROPERTIES TIMEOUT 15 RUN_SERIAL true PASS_REGULAR_EXPRESSION "Example successful")
add_test(NAME Example.N-M-pair.localhost COMMAND ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-n-m-pair-dds.sh localhost)
set_tests_properties(Example.N-M-pair.localhost PROPERTIES TIMEOUT 15 RUN_SERIAL true PASS_REGULAR_EXPRESSION "Example successful")
endif()
# install
install(
TARGETS
fairmq-ex-n-m-synchronizer
fairmq-ex-n-m-sender
fairmq-ex-n-m-receiver
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
)
# configure run script with different executable paths for build and for install directories
set(BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR})
set(DATA_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_DATADIR})
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ex-n-m-topology.xml ${CMAKE_CURRENT_BINARY_DIR}/ex-n-m-topology.xml_install @ONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ex-n-m-pair-topology.xml ${CMAKE_CURRENT_BINARY_DIR}/ex-n-m-pair-topology.xml_install @ONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-ex-n-m-env.sh ${CMAKE_CURRENT_BINARY_DIR}/fairmq-ex-n-m-env.sh_install @ONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-n-m.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-n-m.sh_install @ONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-n-m-pair.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-n-m-pair.sh_install @ONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-n-m-dds.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-n-m-dds.sh_install @ONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-n-m-pair-dds.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-n-m-pair-dds.sh_install @ONLY)
install(
FILES ${CMAKE_CURRENT_BINARY_DIR}/ex-n-m-topology.xml_install
DESTINATION ${PROJECT_INSTALL_DATADIR}
RENAME ex-n-m-topology.xml
)
install(
FILES ${CMAKE_CURRENT_BINARY_DIR}/ex-n-m-pair-topology.xml_install
DESTINATION ${PROJECT_INSTALL_DATADIR}
RENAME ex-n-m-pair-topology.xml
)
install(
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-ex-n-m-env.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-ex-n-m-env.sh
)
install(
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-n-m.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-n-m.sh
)
install(
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-n-m-pair.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-n-m-pair.sh
)
install(
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-n-m-dds.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-n-m-dds.sh
)
install(
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-n-m-pair-dds.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-n-m-pair-dds.sh
)

24
examples/n-m/Header.h Normal file
View File

@ -0,0 +1,24 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIR_MQ_EXAMPLE_N_M_HEADER_H
#define FAIR_MQ_EXAMPLE_N_M_HEADER_H
#include <cstdint>
namespace example_n_m
{
struct Header
{
std::uint16_t id;
int senderIndex;
};
} // namespace example_n_m
#endif /* FAIR_MQ_EXAMPLE_N_M_HEADER_H */

4
examples/n-m/README.md Normal file
View File

@ -0,0 +1,4 @@
N-M
==========================
A topology consisting of three layers of devices: synchronizer -> sender(s) -> receiver(s). Senders distribute data to receivers based on the data id contained in the message from the synchronizer (same id goes to the same receiver from every sender). The senders send the data in a non-blocking fashion - if queue is full or receiver is down, data is discarded. Two configurations are provided - one using push/pull channels between senders/receivers, another using pair channels. In push/pull case there is only one receiving channel on the receiver device. In pair case there are as many receiver (sub-)channels as there are senders.

View File

@ -0,0 +1,44 @@
<topology name="myTopology">
<var name="numSenders" value="3" />
<var name="numReceivers" value="4" />
<property name="fmqchan_sync" />
<property name="fmqchan_data" />
<decltask name="Synchronizer">
<exe reachable="true">fairmq-ex-n-m-synchronizer --id sync --rate 100 --color false -P dds --channel-config name=sync,type=pub,method=bind</exe>
<env reachable="false">fairmq-ex-n-m-env.sh</env>
<properties>
<name access="write">fmqchan_sync</id>
</properties>
</decltask>
<decltask name="Sender">
<exe reachable="true">fairmq-ex-n-m-sender --id sender%taskIndex% --timeframe-size 100000 --num-receivers ${numReceivers} --color false -P dds --channel-config name=sync,type=sub,method=connect name=data,type=pair,method=connect,numSockets=${numReceivers} --dds-i data:%taskIndex%</exe>
<env reachable="false">fairmq-ex-n-m-env.sh</env>
<properties>
<name access="read">fmqchan_sync</id>
<name access="read">fmqchan_data</id>
</properties>
</decltask>
<decltask name="Receiver">
<exe reachable="true">fairmq-ex-n-m-receiver --id receiver%taskIndex% --num-senders ${numSenders} --color false -P dds --max-timeframes 10 --channel-config name=data,type=pair,method=bind,numSockets=${numSenders}</exe>
<env reachable="false">fairmq-ex-n-m-env.sh</env>
<properties>
<name access="write">fmqchan_data</id>
</properties>
</decltask>
<main name="main">
<task>Synchronizer</task>
<group name="Senders" n="${numSenders}">
<task>Sender</task>
</group>
<group name="Receivers" n="${numReceivers}">
<task>Receiver</task>
</group>
</main>
</topology>

View File

@ -0,0 +1,44 @@
<topology name="myTopology">
<var name="numSenders" value="3" />
<var name="numReceivers" value="4" />
<property name="fmqchan_sync" />
<property name="fmqchan_data" />
<decltask name="Synchronizer">
<exe reachable="true">fairmq-ex-n-m-synchronizer --id sync --rate 100 --color false -P dds --channel-config name=sync,type=pub,method=bind</exe>
<env reachable="false">fairmq-ex-n-m-env.sh</env>
<properties>
<name access="write">fmqchan_sync</id>
</properties>
</decltask>
<decltask name="Sender">
<exe reachable="true">fairmq-ex-n-m-sender --id sender%taskIndex% --timeframe-size 100000 --num-receivers ${numReceivers} --color false -P dds --channel-config name=sync,type=sub,method=connect name=data,type=push,method=connect,numSockets=${numReceivers}</exe>
<env reachable="false">fairmq-ex-n-m-env.sh</env>
<properties>
<name access="read">fmqchan_sync</id>
<name access="read">fmqchan_data</id>
</properties>
</decltask>
<decltask name="Receiver">
<exe reachable="true">fairmq-ex-n-m-receiver --id receiver%taskIndex% --num-senders ${numSenders} --color false -P dds --max-timeframes 10 --channel-config name=data,type=pull,method=bind</exe>
<env reachable="false">fairmq-ex-n-m-env.sh</env>
<properties>
<name access="write">fmqchan_data</id>
</properties>
</decltask>
<main name="main">
<task>Synchronizer</task>
<group name="Senders" n="${numSenders}">
<task>Sender</task>
</group>
<group name="Receivers" n="${numReceivers}">
<task>Receiver</task>
</group>
</main>
</topology>

View File

@ -0,0 +1,16 @@
#!/bin/bash
################################################################################
# Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# #
# This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, #
# copied verbatim in the file "LICENSE" #
################################################################################
export PATH=@BIN_DIR@:$PATH
OS=$(uname -s 2>&1)
if [ "$OS" == "Darwin" ]; then
export DYLD_LIBRARY_PATH=@LIB_DIR@:$DYLD_LIBRARY_PATH
fi

View File

@ -0,0 +1,76 @@
#!/bin/bash
################################################################################
# Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# #
# This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, #
# copied verbatim in the file "LICENSE" #
################################################################################
set -e
cleanup() {
dds-session stop $1
echo "CLEANUP PERFORMED"
}
source @DDS_INSTALL_PREFIX@/DDS_env.sh
export PATH=@BIN_DIR@:$PATH
exec 5>&1
output=$(dds-session start | tee >(cat - >&5))
export DDS_SESSION_ID=$(echo ${output} | grep "DDS session ID: " | cut -d' ' -f4)
echo "SESSION ID: ${DDS_SESSION_ID}"
trap "cleanup ${DDS_SESSION_ID}" EXIT
requiredNofSlots=8
dds-submit -r localhost --slots ${requiredNofSlots}
echo "...waiting for ${requiredNofSlots} idle slots..."
dds-info --idle-count --wait ${requiredNofSlots}
export FAIRMQ_DDS_TOPO_FILE=@DATA_DIR@/ex-n-m-topology.xml
echo "TOPOLOGY FILE: ${FAIRMQ_DDS_TOPO_FILE}"
echo "TOPOLOGY NAME: $(dds-topology --disable-validation --topology-name ${FAIRMQ_DDS_TOPO_FILE})"
dds-info --active-topology
dds-topology --activate ${FAIRMQ_DDS_TOPO_FILE}
dds-info --active-topology
echo "...waiting for ${requiredNofSlots} executing slots..."
dds-info --executing-count --wait ${requiredNofSlots}
echo "------------------------"
echo "...waiting for Topology to finish..."
# TODO Retrieve number of devices from DDS topology API instead of having the user pass it explicitely
fairmq-dds-command-ui -w "IDLE"
fairmq-dds-command-ui -c i
fairmq-dds-command-ui -c k
fairmq-dds-command-ui -c b
fairmq-dds-command-ui -c x
fairmq-dds-command-ui -c j
fairmq-dds-command-ui -c r
receivers="main/Receivers.*"
fairmq-dds-command-ui -w "RUNNING->READY" -p $receivers
echo "All receivers transitioned from RUNNING to READY"
fairmq-dds-command-ui -c s
fairmq-dds-command-ui -c t
fairmq-dds-command-ui -c d
fairmq-dds-command-ui -c q
echo "...waiting for ${requiredNofSlots} idle slots..."
dds-info --idle-count --wait ${requiredNofSlots}
echo "------------------------"
dds-info --active-topology
dds-topology --stop
dds-info --active-topology
dds-agent-cmd getlog -a
logDir="${wrkDir}/logs"
for file in $(find "${logDir}" -name "*.tar.gz"); do tar -xf ${file} -C "${logDir}" ; done
echo "AGENT LOG FILES IN: ${logDir}"
# This string is used by ctest to detect success
echo "Example successful :)"
# Cleanup function is called by EXIT trap

View File

@ -0,0 +1,76 @@
#!/bin/bash
################################################################################
# Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# #
# This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, #
# copied verbatim in the file "LICENSE" #
################################################################################
set -e
cleanup() {
dds-session stop $1
echo "CLEANUP PERFORMED"
}
source @DDS_INSTALL_PREFIX@/DDS_env.sh
export PATH=@BIN_DIR@:$PATH
exec 5>&1
output=$(dds-session start | tee >(cat - >&5))
export DDS_SESSION_ID=$(echo ${output} | grep "DDS session ID: " | cut -d' ' -f4)
echo "SESSION ID: ${DDS_SESSION_ID}"
trap "cleanup ${DDS_SESSION_ID}" EXIT
requiredNofSlots=8
dds-submit -r localhost --slots ${requiredNofSlots}
echo "...waiting for ${requiredNofSlots} idle slots..."
dds-info --idle-count --wait ${requiredNofSlots}
export FAIRMQ_DDS_TOPO_FILE=@DATA_DIR@/ex-n-m-pair-topology.xml
echo "TOPOLOGY FILE: ${FAIRMQ_DDS_TOPO_FILE}"
echo "TOPOLOGY NAME: $(dds-topology --disable-validation --topology-name ${FAIRMQ_DDS_TOPO_FILE})"
dds-info --active-topology
dds-topology --activate ${FAIRMQ_DDS_TOPO_FILE}
dds-info --active-topology
echo "...waiting for ${requiredNofSlots} executing slots..."
dds-info --executing-count --wait ${requiredNofSlots}
echo "------------------------"
echo "...waiting for Topology to finish..."
# TODO Retrieve number of devices from DDS topology API instead of having the user pass it explicitely
fairmq-dds-command-ui -w "IDLE"
fairmq-dds-command-ui -c i
fairmq-dds-command-ui -c k
fairmq-dds-command-ui -c b
fairmq-dds-command-ui -c x
fairmq-dds-command-ui -c j
fairmq-dds-command-ui -c r
receivers="main/Receivers.*"
fairmq-dds-command-ui -w "RUNNING->READY" -p $receivers
echo "All receivers transitioned from RUNNING to READY"
fairmq-dds-command-ui -c s
fairmq-dds-command-ui -c t
fairmq-dds-command-ui -c d
fairmq-dds-command-ui -c q
echo "...waiting for ${requiredNofSlots} idle slots..."
dds-info --idle-count --wait ${requiredNofSlots}
echo "------------------------"
dds-info --active-topology
dds-topology --stop
dds-info --active-topology
dds-agent-cmd getlog -a
logDir="${wrkDir}/logs"
for file in $(find "${logDir}" -name "*.tar.gz"); do tar -xf ${file} -C "${logDir}" ; done
echo "AGENT LOG FILES IN: ${logDir}"
# This string is used by ctest to detect success
echo "Example successful :)"
# Cleanup function is called by EXIT trap

View File

@ -0,0 +1,60 @@
#!/bin/bash
export PATH=@BIN_DIR@:$PATH
SYNC="fairmq-ex-n-m-synchronizer"
SYNC+=" --id Sync"
SYNC+=" --channel-config name=sync,type=pub,method=bind,address=tcp://localhost:8010"
SYNC+=" --rate 100"
xterm -geometry 80x25+0+0 -hold -e $SYNC &
SENDER0="fairmq-ex-n-m-sender"
SENDER0+=" --id Sender1"
SENDER0+=" --channel-config name=sync,type=sub,method=connect,address=tcp://localhost:8010"
SENDER0+=" name=data,type=pair,method=connect,address=tcp://localhost:8021,address=tcp://localhost:8022,address=tcp://localhost:8023,address=tcp://localhost:8024"
SENDER0+=" --sender-index 0"
SENDER0+=" --subtimeframe-size 1000000"
SENDER0+=" --num-receivers 4"
xterm -geometry 80x25+500+0 -hold -e $SENDER0 &
SENDER1="fairmq-ex-n-m-sender"
SENDER1+=" --id Sender2"
SENDER1+=" --channel-config name=sync,type=sub,method=connect,address=tcp://localhost:8010"
SENDER1+=" name=data,type=pair,method=connect,address=tcp://localhost:8031,address=tcp://localhost:8032,address=tcp://localhost:8033,address=tcp://localhost:8034"
SENDER1+=" --sender-index 1"
SENDER1+=" --subtimeframe-size 1000000"
SENDER1+=" --num-receivers 4"
xterm -geometry 80x25+500+350 -hold -e $SENDER1 &
SENDER2="fairmq-ex-n-m-sender"
SENDER2+=" --id Sender3"
SENDER2+=" --channel-config name=sync,type=sub,method=connect,address=tcp://localhost:8010"
SENDER2+=" name=data,type=pair,method=connect,address=tcp://localhost:8041,address=tcp://localhost:8042,address=tcp://localhost:8043,address=tcp://localhost:8044"
SENDER2+=" --sender-index 2"
SENDER2+=" --subtimeframe-size 1000000"
SENDER2+=" --num-receivers 4"
xterm -geometry 80x25+500+700 -hold -e $SENDER2 &
RECEIVER0="fairmq-ex-n-m-receiver"
RECEIVER0+=" --id Receiver1"
RECEIVER0+=" --channel-config name=data,type=pair,method=bind,address=tcp://localhost:8021,address=tcp://localhost:8031,address=tcp://localhost:8041"
RECEIVER0+=" --num-senders 3"
xterm -geometry 80x25+1000+0 -hold -e $RECEIVER0 &
RECEIVER1="fairmq-ex-n-m-receiver"
RECEIVER1+=" --id Receiver2"
RECEIVER1+=" --channel-config name=data,type=pair,method=bind,address=tcp://localhost:8022,address=tcp://localhost:8032,address=tcp://localhost:8042"
RECEIVER1+=" --num-senders 3"
xterm -geometry 80x25+1000+350 -hold -e $RECEIVER1 &
RECEIVER2="fairmq-ex-n-m-receiver"
RECEIVER2+=" --id Receiver3"
RECEIVER2+=" --channel-config name=data,type=pair,method=bind,address=tcp://localhost:8023,address=tcp://localhost:8033,address=tcp://localhost:8043"
RECEIVER2+=" --num-senders 3"
xterm -geometry 80x25+1000+700 -hold -e $RECEIVER2 &
RECEIVER3="fairmq-ex-n-m-receiver"
RECEIVER3+=" --id Receiver4"
RECEIVER3+=" --channel-config name=data,type=pair,method=bind,address=tcp://localhost:8024,address=tcp://localhost:8034,address=tcp://localhost:8044"
RECEIVER3+=" --num-senders 3"
xterm -geometry 80x25+1000+1050 -hold -e $RECEIVER3 &

View File

@ -0,0 +1,60 @@
#!/bin/bash
export PATH=@BIN_DIR@:$PATH
SYNC="fairmq-ex-n-m-synchronizer"
SYNC+=" --id Sync"
SYNC+=" --channel-config name=sync,type=pub,method=bind,address=tcp://localhost:8010"
SYNC+=" --rate 100"
xterm -geometry 80x25+0+0 -hold -e $SYNC &
SENDER0="fairmq-ex-n-m-sender"
SENDER0+=" --id Sender1"
SENDER0+=" --channel-config name=sync,type=sub,method=connect,address=tcp://localhost:8010"
SENDER0+=" name=data,type=push,method=connect,address=tcp://localhost:8021,address=tcp://localhost:8022,address=tcp://localhost:8023,address=tcp://localhost:8024"
SENDER0+=" --sender-index 0"
SENDER0+=" --subtimeframe-size 1000000"
SENDER0+=" --num-receivers 4"
xterm -geometry 80x25+500+0 -hold -e $SENDER0 &
SENDER1="fairmq-ex-n-m-sender"
SENDER1+=" --id Sender2"
SENDER1+=" --channel-config name=sync,type=sub,method=connect,address=tcp://localhost:8010"
SENDER1+=" name=data,type=push,method=connect,address=tcp://localhost:8021,address=tcp://localhost:8022,address=tcp://localhost:8023,address=tcp://localhost:8024"
SENDER1+=" --sender-index 1"
SENDER1+=" --subtimeframe-size 1000000"
SENDER1+=" --num-receivers 4"
xterm -geometry 80x25+500+350 -hold -e $SENDER1 &
SENDER2="fairmq-ex-n-m-sender"
SENDER2+=" --id Sender3"
SENDER2+=" --channel-config name=sync,type=sub,method=connect,address=tcp://localhost:8010"
SENDER2+=" name=data,type=push,method=connect,address=tcp://localhost:8021,address=tcp://localhost:8022,address=tcp://localhost:8023,address=tcp://localhost:8024"
SENDER2+=" --sender-index 2"
SENDER2+=" --subtimeframe-size 1000000"
SENDER2+=" --num-receivers 4"
xterm -geometry 80x25+500+700 -hold -e $SENDER2 &
RECEIVER0="fairmq-ex-n-m-receiver"
RECEIVER0+=" --id Receiver1"
RECEIVER0+=" --channel-config name=data,type=pull,method=bind,address=tcp://localhost:8021"
RECEIVER0+=" --num-senders 3"
xterm -geometry 80x25+1000+0 -hold -e $RECEIVER0 &
RECEIVER1="fairmq-ex-n-m-receiver"
RECEIVER1+=" --id Receiver2"
RECEIVER1+=" --channel-config name=data,type=pull,method=bind,address=tcp://localhost:8022"
RECEIVER1+=" --num-senders 3"
xterm -geometry 80x25+1000+350 -hold -e $RECEIVER1 &
RECEIVER2="fairmq-ex-n-m-receiver"
RECEIVER2+=" --id Receiver3"
RECEIVER2+=" --channel-config name=data,type=pull,method=bind,address=tcp://localhost:8023"
RECEIVER2+=" --num-senders 3"
xterm -geometry 80x25+1000+700 -hold -e $RECEIVER2 &
RECEIVER3="fairmq-ex-n-m-receiver"
RECEIVER3+=" --id Receiver4"
RECEIVER3+=" --channel-config name=data,type=pull,method=bind,address=tcp://localhost:8024"
RECEIVER3+=" --num-senders 3"
xterm -geometry 80x25+1000+1050 -hold -e $RECEIVER3 &

View File

@ -0,0 +1,119 @@
/********************************************************************************
* Copyright (C) 2020 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "Header.h"
#include <FairMQDevice.h>
#include <runFairMQDevice.h>
#include <string>
#include <iomanip>
#include <chrono>
#include <string>
#include <unordered_map>
#include <unordered_set>
using namespace std;
using namespace example_n_m;
namespace bpo = boost::program_options;
struct TFBuffer
{
FairMQParts parts;
chrono::steady_clock::time_point start;
chrono::steady_clock::time_point end;
};
class Receiver : public FairMQDevice
{
public:
Receiver()
: fBuffer()
, fDiscardedSet()
, fNumSenders(0)
, fBufferTimeoutInMs(5000)
, fMaxTimeframes(0)
, fTimeframeCounter(0)
{
OnData("data", &Receiver::HandleData);
}
~Receiver() = default;
void InitTask() override
{
fNumSenders = GetConfig()->GetValue<int>("num-senders");
fBufferTimeoutInMs = GetConfig()->GetValue<int>("buffer-timeout");
fMaxTimeframes = GetConfig()->GetValue<int>("max-timeframes");
}
protected:
bool HandleData(FairMQParts& parts, int /* index */)
{
Header& h = *(static_cast<Header*>(parts.At(0)->GetData()));
// LOG(info) << "Received sub-time frame #" << h.id << " from Sender" << h.senderIndex;
if (fDiscardedSet.find(h.id) == fDiscardedSet.end()) {
if (fBuffer.find(h.id) == fBuffer.end()) {
// if this is the first part with this ID, save the receive time.
fBuffer[h.id].start = chrono::steady_clock::now();
}
// if the received ID has not previously been discarded, store the data part in the buffer
fBuffer[h.id].parts.AddPart(move(parts.At(1)));
} else {
// if received ID has been previously discarded.
LOG(debug) << "Received part from an already discarded timeframe with id " << h.id;
}
if (fBuffer[h.id].parts.Size() == fNumSenders) {
LOG(info) << "Successfully completed timeframe #" << h.id;
fBuffer.erase(h.id);
if (fMaxTimeframes > 0 && ++fTimeframeCounter >= fMaxTimeframes) {
LOG(info) << "Reached configured maximum number of timeframes (" << fMaxTimeframes << "). Exiting RUNNING state.";
return false;
}
}
return true;
}
void DiscardIncompleteTimeframes()
{
auto it = fBuffer.begin();
while (it != fBuffer.end()) {
if (chrono::duration_cast<chrono::milliseconds>(chrono::steady_clock::now() - (it->second).start).count() > fBufferTimeoutInMs) {
LOG(debug) << "Timeframe #" << it->first << " incomplete after " << fBufferTimeoutInMs << " milliseconds, discarding";
fDiscardedSet.insert(it->first);
fBuffer.erase(it++);
LOG(debug) << "Number of discarded timeframes: " << fDiscardedSet.size();
} else {
// LOG(info) << "Timeframe #" << it->first << " within timeout, buffering...";
++it;
}
}
}
unordered_map<uint16_t, TFBuffer> fBuffer;
unordered_set<uint16_t> fDiscardedSet;
int fNumSenders;
int fBufferTimeoutInMs;
int fMaxTimeframes;
int fTimeframeCounter;
};
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("buffer-timeout", bpo::value<int>()->default_value(1000), "Buffer timeout in milliseconds")
("num-senders", bpo::value<int>()->required(), "Number of senders")
("max-timeframes", bpo::value<int>()->default_value(0), "Maximum number of timeframes to receive (0 - unlimited)");
}
FairMQDevice* getDevice(const FairMQProgOptions& /* config */) { return new Receiver(); }

View File

@ -0,0 +1,79 @@
/********************************************************************************
* Copyright (C) 2020 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "Header.h"
#include <FairMQDevice.h>
#include <runFairMQDevice.h>
#include <string>
using namespace std;
using namespace example_n_m;
namespace bpo = boost::program_options;
class Sender : public FairMQDevice
{
public:
Sender()
: fNumReceivers(0)
, fIndex(0)
, fSubtimeframeSize(10000)
{}
~Sender() = default;
protected:
void InitTask() override
{
fIndex = GetConfig()->GetProperty<int>("sender-index");
fSubtimeframeSize = GetConfig()->GetProperty<int>("subtimeframe-size");
fNumReceivers = GetConfig()->GetProperty<int>("num-receivers");
}
void Run() override
{
FairMQChannel& dataInChannel = fChannels.at("sync").at(0);
while (!NewStatePending()) {
Header h;
FairMQMessagePtr id(NewMessage());
if (dataInChannel.Receive(id) > 0) {
h.id = *(static_cast<uint16_t*>(id->GetData()));
h.senderIndex = fIndex;
} else {
continue;
}
FairMQParts parts;
parts.AddPart(NewSimpleMessage(h));
parts.AddPart(NewMessage(fSubtimeframeSize));
uint64_t currentDataId = h.id;
int direction = currentDataId % fNumReceivers;
if (Send(parts, "data", direction, 0) < 0) {
LOG(debug) << "Failed to queue Subtimeframe #" << currentDataId << " to Receiver[" << direction << "]";
}
}
}
private:
int fNumReceivers;
unsigned int fIndex;
int fSubtimeframeSize;
};
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("sender-index", bpo::value<int>()->default_value(0), "Sender Index")
("subtimeframe-size", bpo::value<int>()->default_value(1000), "Subtimeframe size in bytes")
("num-receivers", bpo::value<int>()->required(), "Number of EPNs");
}
FairMQDevice* getDevice(const FairMQProgOptions& /* config */) { return new Sender(); }

View File

@ -0,0 +1,46 @@
/********************************************************************************
* Copyright (C) 2020 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <FairMQDevice.h>
#include <runFairMQDevice.h>
#include <string>
#include <cstdint>
using namespace std;
namespace bpo = boost::program_options;
class Synchronizer : public FairMQDevice
{
public:
Synchronizer()
: fTimeframeId(0)
{}
~Synchronizer() = default;
protected:
bool ConditionalRun() override
{
FairMQMessagePtr msg(NewSimpleMessage(fTimeframeId));
if (Send(msg, "sync") > 0) {
if (++fTimeframeId == UINT16_MAX - 1) {
fTimeframeId = 0;
}
} else {
return false;
}
return true;
}
uint16_t fTimeframeId;
};
void addCustomOptions(bpo::options_description& /* options */) {}
FairMQDevice* getDevice(const FairMQProgOptions& /* config */) { return new Synchronizer(); }