Compare commits

...

12 Commits

Author SHA1 Message Date
Alexey Rybalchenko
1a75141fc4 shm: allow monitor::ResetContent to cleanup after a crash 2022-02-02 10:49:00 +01:00
Alexey Rybalchenko
2f82eb4f09 shm: monitor: disable number of msgs in the ack queue output 2022-02-02 10:49:00 +01:00
Alexey Rybalchenko
92a56c26bc shm: remove UR queues on ResetContent 2022-02-02 10:49:00 +01:00
Alexey Rybalchenko
4f9aeda8ec shm: Add size to UnmanagedRegion debug output 2022-02-02 10:49:00 +01:00
Giulio Eulisse
ad894c79cf GUI Controller
provide a controller which can be used to control state
transitions from an external GUI.
2022-01-25 18:02:25 +01:00
Alexey Rybalchenko
5f33401d41 Parallelize more tests 2022-01-25 11:55:38 +01:00
Alexey Rybalchenko
f4d39d224b Avoid fixed ports in the test suites 2022-01-25 11:55:38 +01:00
Alexey Rybalchenko
bfd08bb33f Don't use to-be-deprecated names 2022-01-24 06:40:24 +01:00
Alexey Rybalchenko
f15f669853 use [[maybe_unused]] for values used in assertions 2022-01-24 06:40:24 +01:00
Alexey Rybalchenko
f6bade32bb modify keep-alive example executable a bit, make it configurable 2022-01-12 19:54:49 +01:00
Alexey Rybalchenko
ddf9bc7272 shm: keep mng segment around when skipping cleanup 2022-01-12 19:54:49 +01:00
Alexey Rybalchenko
f79a0714b4 shm: fix double unlock() 2022-01-12 19:54:49 +01:00
128 changed files with 1028 additions and 1052 deletions

View File

@@ -23,10 +23,10 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-1-1.sh.in ${CMAKE_CUR
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test-ex-1-1.sh.in ${CMAKE_CURRENT_BINARY_DIR}/test-ex-1-1.sh)
add_test(NAME Example.1-1.zeromq COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-1-1.sh zeromq)
set_tests_properties(Example.1-1.zeromq PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received: ")
set_tests_properties(Example.1-1.zeromq PROPERTIES TIMEOUT "30" PASS_REGULAR_EXPRESSION "Received: ")
add_test(NAME Example.1-1.shmem COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-1-1.sh shmem)
set_tests_properties(Example.1-1.shmem PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received: ")
set_tests_properties(Example.1-1.shmem PROPERTIES TIMEOUT "30" PASS_REGULAR_EXPRESSION "Received: ")
# install

View File

@@ -30,7 +30,7 @@ struct Sampler : fair::mq::Device
// create message object with a pointer to the data buffer, its size,
// custom deletion function (called when transfer is done),
// and pointer to the object managing the data buffer
FairMQMessagePtr msg(NewMessage(
fair::mq::MessagePtr msg(NewMessage(
const_cast<char*>(text->c_str()),
text->length(),
[](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },

View File

@@ -27,7 +27,7 @@ struct Sink : fair::mq::Device
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
bool HandleData(FairMQMessagePtr& msg, int)
bool HandleData(fair::mq::MessagePtr& msg, int)
{
LOG(info) << "Received: \"" << std::string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\"";

View File

@@ -8,20 +8,23 @@ if [[ $1 =~ ^[a-z]+$ ]]; then
transport=$1
fi
SESSION="$(@CMAKE_BINARY_DIR@/fairmq/fairmq-uuid-gen -h)"
session="$(@CMAKE_BINARY_DIR@/fairmq/fairmq-uuid-gen -h)"
chan="data"
chanAddr="/tmp/fmq_$session""_""$chan""_""$transport"
# setup a trap to kill everything if the test fails/timeouts
trap 'kill -TERM $SAMPLER_PID; kill -TERM $SINK_PID; wait $SAMPLER_PID; wait $SINK_PID;' TERM
trap 'kill -TERM $SAMPLER_PID; kill -TERM $SINK_PID; wait $SAMPLER_PID; wait $SINK_PID; rm $chanAddr' TERM
SAMPLER="fairmq-ex-1-1-sampler"
SAMPLER+=" --id sampler1"
SAMPLER+=" --rate 1"
SAMPLER+=" --transport $transport"
SAMPLER+=" --verbosity veryhigh"
SAMPLER+=" --session $SESSION"
SAMPLER+=" --session $session"
SAMPLER+=" --shm-segment-size 100000000"
SAMPLER+=" --control static --color false"
SAMPLER+=" --max-iterations 1"
SAMPLER+=" --channel-config name=data,type=push,method=bind,address=tcp://*:5555,rateLogging=0"
SAMPLER+=" --channel-config name=$chan,type=push,method=bind,address=ipc://$chanAddr,rateLogging=0"
@CMAKE_CURRENT_BINARY_DIR@/$SAMPLER &
SAMPLER_PID=$!
@@ -29,13 +32,16 @@ SINK="fairmq-ex-1-1-sink"
SINK+=" --id sink1"
SINK+=" --transport $transport"
SINK+=" --verbosity veryhigh"
SINK+=" --session $SESSION"
SINK+=" --session $session"
SINK+=" --shm-segment-size 100000000"
SINK+=" --control static --color false"
SINK+=" --max-iterations 1"
SINK+=" --channel-config name=data,type=pull,method=connect,address=tcp://localhost:5555,rateLogging=0"
SINK+=" --channel-config name=$chan,type=pull,method=connect,address=ipc://$chanAddr,rateLogging=0"
@CMAKE_CURRENT_BINARY_DIR@/$SINK &
SINK_PID=$!
# wait for sampler and sink to finish
wait $SAMPLER_PID
wait $SINK_PID
rm $chanAddr

View File

@@ -29,10 +29,10 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ex-1-n-1.json ${CMAKE_CURRENT_BINARY_
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test-ex-1-n-1.sh.in ${CMAKE_CURRENT_BINARY_DIR}/test-ex-1-n-1.sh)
add_test(NAME Example.1-n-1.zeromq COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-1-n-1.sh zeromq)
set_tests_properties(Example.1-n-1.zeromq PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received: ")
set_tests_properties(Example.1-n-1.zeromq PROPERTIES TIMEOUT "30" PASS_REGULAR_EXPRESSION "Received: ")
add_test(NAME Example.1-n-1.shmem COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-1-n-1.sh shmem)
set_tests_properties(Example.1-n-1.shmem PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received: ")
set_tests_properties(Example.1-n-1.shmem PROPERTIES TIMEOUT "30" PASS_REGULAR_EXPRESSION "Received: ")
# install

View File

@@ -20,7 +20,7 @@ struct Processor : fair::mq::Device
OnData("data1", &Processor::HandleData);
}
bool HandleData(FairMQMessagePtr& msg, int)
bool HandleData(fair::mq::MessagePtr& msg, int)
{
LOG(info) << "Received data, processing...";
@@ -32,7 +32,7 @@ struct Processor : fair::mq::Device
// its size,
// custom deletion function (called when transfer is done),
// and pointer to the object managing the data buffer
FairMQMessagePtr msg2(NewMessage(const_cast<char*>(text->c_str()),
fair::mq::MessagePtr msg2(NewMessage(const_cast<char*>(text->c_str()),
text->length(),
[](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },
text));

View File

@@ -28,7 +28,7 @@ struct Sampler : fair::mq::Device
{
// Initializing message with NewStaticMessage will avoid copy
// but won't delete the data after the sending is completed.
FairMQMessagePtr msg(NewStaticMessage(fText));
fair::mq::MessagePtr msg(NewStaticMessage(fText));
LOG(info) << "Sending \"" << fText << "\"";

View File

@@ -27,7 +27,7 @@ struct Sink : fair::mq::Device
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
bool HandleData(FairMQMessagePtr& msg, int)
bool HandleData(fair::mq::MessagePtr& msg, int)
{
LOG(info) << "Received: \"" << std::string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\"";

View File

@@ -8,20 +8,25 @@ if [[ $1 =~ ^[a-z]+$ ]]; then
transport=$1
fi
ex2config="@CMAKE_CURRENT_BINARY_DIR@/ex-1-n-1.json"
SESSION="$(@CMAKE_BINARY_DIR@/fairmq/fairmq-uuid-gen -h)"
session="$(@CMAKE_BINARY_DIR@/fairmq/fairmq-uuid-gen -h)"
chan1="data1"
chan2="data2"
chan1Addr="/tmp/fmq_$session""_""$chan1""_""$transport"
chan2Addr="/tmp/fmq_$session""_""$chan2""_""$transport"
# setup a trap to kill everything if the test fails/timeouts
trap 'kill -TERM $SAMPLER_PID; kill -TERM $SINK_PID; kill -TERM $PROCESSOR1_PID; kill -TERM $PROCESSOR2_PID; wait $SAMPLER_PID; wait $SINK_PID; wait $PROCESSOR1_PID; wait $PROCESSOR2_PID;' TERM
trap 'kill -TERM $SAMPLER_PID; kill -TERM $SINK_PID; kill -TERM $PROCESSOR1_PID; kill -TERM $PROCESSOR2_PID; wait $SAMPLER_PID; wait $SINK_PID; wait $PROCESSOR1_PID; wait $PROCESSOR2_PID; rm $chan1Addr; rm $chan2Addr' TERM
SAMPLER="fairmq-ex-1-n-1-sampler"
SAMPLER+=" --id sampler1"
SAMPLER+=" --transport $transport"
SAMPLER+=" --verbosity veryhigh"
SAMPLER+=" --session $SESSION"
SAMPLER+=" --session $session"
SAMPLER+=" --severity debug"
SAMPLER+=" --shm-segment-size 100000000"
SAMPLER+=" --control static --color false"
SAMPLER+=" --max-iterations 2"
SAMPLER+=" --mq-config $ex2config"
SAMPLER+=" --channel-config name=$chan1,type=push,method=bind,address=ipc://$chan1Addr,rateLogging=0"
@CMAKE_CURRENT_BINARY_DIR@/$SAMPLER &
SAMPLER_PID=$!
@@ -29,10 +34,12 @@ PROCESSOR1="fairmq-ex-1-n-1-processor"
PROCESSOR1+=" --id processor1"
PROCESSOR1+=" --transport $transport"
PROCESSOR1+=" --verbosity veryhigh"
PROCESSOR1+=" --session $SESSION"
PROCESSOR1+=" --session $session"
PROCESSOR1+=" --severity debug"
PROCESSOR1+=" --shm-segment-size 100000000"
PROCESSOR1+=" --control static --color false"
PROCESSOR1+=" --mq-config $ex2config"
PROCESSOR1+=" --config-key processor"
PROCESSOR1+=" --channel-config name=$chan1,type=pull,method=connect,address=ipc://$chan1Addr,rateLogging=0"
PROCESSOR1+=" name=$chan2,type=push,method=connect,address=ipc://$chan2Addr,rateLogging=0"
@CMAKE_CURRENT_BINARY_DIR@/$PROCESSOR1 &
PROCESSOR1_PID=$!
@@ -40,10 +47,12 @@ PROCESSOR2="fairmq-ex-1-n-1-processor"
PROCESSOR2+=" --id processor2"
PROCESSOR2+=" --transport $transport"
PROCESSOR2+=" --verbosity veryhigh"
PROCESSOR2+=" --session $SESSION"
PROCESSOR2+=" --session $session"
PROCESSOR2+=" --severity debug"
PROCESSOR2+=" --shm-segment-size 100000000"
PROCESSOR2+=" --control static --color false"
PROCESSOR2+=" --mq-config $ex2config"
PROCESSOR2+=" --config-key processor"
PROCESSOR2+=" --channel-config name=$chan1,type=pull,method=connect,address=ipc://$chan1Addr,rateLogging=0"
PROCESSOR2+=" name=$chan2,type=push,method=connect,address=ipc://$chan2Addr,rateLogging=0"
@CMAKE_CURRENT_BINARY_DIR@/$PROCESSOR2 &
PROCESSOR2_PID=$!
@@ -51,10 +60,12 @@ SINK="fairmq-ex-1-n-1-sink"
SINK+=" --id sink1"
SINK+=" --transport $transport"
SINK+=" --verbosity veryhigh"
SINK+=" --session $SESSION"
SINK+=" --session $session"
SINK+=" --severity debug"
SINK+=" --shm-segment-size 100000000"
SINK+=" --control static --color false"
SINK+=" --max-iterations 2"
SINK+=" --mq-config $ex2config"
SINK+=" --channel-config name=$chan2,type=pull,method=bind,address=ipc://$chan2Addr,rateLogging=0"
@CMAKE_CURRENT_BINARY_DIR@/$SINK &
SINK_PID=$!
@@ -69,3 +80,5 @@ kill -SIGINT $PROCESSOR2_PID
# wait for everything to finish
wait $PROCESSOR1_PID
wait $PROCESSOR2_PID
rm $chan1Addr; rm $chan2Addr

View File

@@ -15,16 +15,16 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-builtin-devices.sh.in
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test-ex-builtin-devices.sh.in ${CMAKE_CURRENT_BINARY_DIR}/test-ex-builtin-devices.sh)
add_test(NAME Example.BuiltinDevices.zeromq COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-builtin-devices.sh zeromq)
set_tests_properties(Example.BuiltinDevices.zeromq PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Configured maximum number of iterations reached")
set_tests_properties(Example.BuiltinDevices.zeromq PROPERTIES TIMEOUT "30" PASS_REGULAR_EXPRESSION "Configured maximum number of iterations reached")
add_test(NAME Example.BuiltinDevices.shmem COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-builtin-devices.sh shmem)
set_tests_properties(Example.BuiltinDevices.shmem PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Configured maximum number of iterations reached")
set_tests_properties(Example.BuiltinDevices.shmem PROPERTIES TIMEOUT "30" PASS_REGULAR_EXPRESSION "Configured maximum number of iterations reached")
add_test(NAME Example.BuiltinDevices.multipart.zeromq COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-builtin-devices.sh zeromq true 2)
set_tests_properties(Example.BuiltinDevices.multipart.zeromq PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Configured maximum number of iterations reached")
set_tests_properties(Example.BuiltinDevices.multipart.zeromq PROPERTIES TIMEOUT "30" PASS_REGULAR_EXPRESSION "Configured maximum number of iterations reached")
add_test(NAME Example.BuiltinDevices.multipart.shmem COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-builtin-devices.sh shmem true 2)
set_tests_properties(Example.BuiltinDevices.multipart.shmem PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Configured maximum number of iterations reached")
set_tests_properties(Example.BuiltinDevices.multipart.shmem PROPERTIES TIMEOUT "30" PASS_REGULAR_EXPRESSION "Configured maximum number of iterations reached")
# install

View File

@@ -2,8 +2,6 @@
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
SESSION="$(@CMAKE_BINARY_DIR@/fairmq/fairmq-uuid-gen -h)"
transport="zeromq"
multipart="false"
numParts="1"
@@ -20,8 +18,22 @@ if [[ $3 =~ ^[0-9]+$ ]]; then
numParts=$3
fi
session="$(@CMAKE_BINARY_DIR@/fairmq/fairmq-uuid-gen -h)"
chan1="data1"
chan2="data2"
chan3="data3"
chan4="data4"
chan5="data5"
chan1Addr="/tmp/fmq_$session""_""$chan1""_""$transport"
chan2Addr1="/tmp/fmq_$session""_""$chan2""_1""_""$transport"
chan2Addr2="/tmp/fmq_$session""_""$chan2""_2""_""$transport"
chan3Addr1="/tmp/fmq_$session""_""$chan3""_1""_""$transport"
chan3Addr2="/tmp/fmq_$session""_""$chan3""_2""_""$transport"
chan4Addr="/tmp/fmq_$session""_""$chan4""_""$transport"
chan5Addr="/tmp/fmq_$session""_""$chan5""_""$transport"
# setup a trap to kill everything if the test fails/timeouts
trap 'kill -TERM $SAMPLER_PID; kill -TERM $SPLITTER_PID; kill -TERM $PROXY1_PID; kill -TERM $PROXY2_PID; kill -TERM $MERGER_PID; kill -TERM $MULTIPLIER_PID; kill -TERM $SINK_PID;' TERM
trap 'kill -TERM $SAMPLER_PID; kill -TERM $SPLITTER_PID; kill -TERM $PROXY1_PID; kill -TERM $PROXY2_PID; kill -TERM $MERGER_PID; kill -TERM $MULTIPLIER_PID; kill -TERM $SINK_PID; rm $chan1Addr; rm $chan2Addr1; rm $chan2Addr2; rm $chan3Addr1; rm $chan3Addr2; rm $chan4Addr; rm $chan5Addr' TERM
SAMPLER="fairmq-bsampler"
SAMPLER+=" --id bsampler1"
@@ -30,14 +42,15 @@ SAMPLER+=" --transport $transport"
SAMPLER+=" --color false"
SAMPLER+=" --control static"
SAMPLER+=" --verbosity veryhigh"
SAMPLER+=" --shm-segment-size 100000000"
SAMPLER+=" --severity debug"
SAMPLER+=" --msg-size 100000"
SAMPLER+=" --multipart $multipart"
SAMPLER+=" --num-parts $numParts"
SAMPLER+=" --msg-rate 1"
SAMPLER+=" --max-iterations 0"
SAMPLER+=" --out-channel data1"
SAMPLER+=" --channel-config name=data1,type=push,method=bind,sndBufSize=50,rcvBufSize=50,address=tcp://localhost:5555"
SAMPLER+=" --out-channel $chan1"
SAMPLER+=" --channel-config name=$chan1,type=push,method=bind,sndBufSize=50,rcvBufSize=50,address=ipc://$chan1Addr"
@FAIRMQ_BIN_DIR@/$SAMPLER &
SAMPLER_PID=$!
@@ -48,11 +61,12 @@ SPLITTER+=" --transport $transport"
SPLITTER+=" --color false"
SPLITTER+=" --control static"
SPLITTER+=" --verbosity veryhigh"
SPLITTER+=" --shm-segment-size 100000000"
SPLITTER+=" --multipart $multipart"
SPLITTER+=" --in-channel data1"
SPLITTER+=" --out-channel data2"
SPLITTER+=" --channel-config name=data1,type=pull,method=connect,sndBufSize=50,rcvBufSize=50,address=tcp://localhost:5555"
SPLITTER+=" name=data2,type=push,method=bind,sndBufSize=50,rcvBufSize=50,address=tcp://localhost:5556,address=tcp://localhost:5557"
SPLITTER+=" --in-channel $chan1"
SPLITTER+=" --out-channel $chan2"
SPLITTER+=" --channel-config name=$chan1,type=pull,method=connect,sndBufSize=50,rcvBufSize=50,address=ipc://$chan1Addr"
SPLITTER+=" name=$chan2,type=push,method=bind,sndBufSize=50,rcvBufSize=50,address=ipc://$chan2Addr1,address=ipc://$chan2Addr2"
@FAIRMQ_BIN_DIR@/$SPLITTER &
SPLITTER_PID=$!
@@ -63,11 +77,12 @@ PROXY1+=" --transport $transport"
PROXY1+=" --color false"
PROXY1+=" --control static"
PROXY1+=" --verbosity veryhigh"
PROXY1+=" --shm-segment-size 100000000"
PROXY1+=" --multipart $multipart"
PROXY1+=" --in-channel data2"
PROXY1+=" --out-channel data3"
PROXY1+=" --channel-config name=data2,type=pull,method=connect,sndBufSize=50,rcvBufSize=50,address=tcp://localhost:5556"
PROXY1+=" name=data3,type=push,method=connect,sndBufSize=50,rcvBufSize=50,address=tcp://localhost:5558"
PROXY1+=" --in-channel $chan2"
PROXY1+=" --out-channel $chan3"
PROXY1+=" --channel-config name=$chan2,type=pull,method=connect,sndBufSize=50,rcvBufSize=50,address=ipc://$chan2Addr1"
PROXY1+=" name=$chan3,type=push,method=connect,sndBufSize=50,rcvBufSize=50,address=ipc://$chan3Addr1"
@FAIRMQ_BIN_DIR@/$PROXY1 &
PROXY1_PID=$!
@@ -78,11 +93,12 @@ PROXY2+=" --transport $transport"
PROXY2+=" --color false"
PROXY2+=" --control static"
PROXY2+=" --verbosity veryhigh"
PROXY2+=" --shm-segment-size 100000000"
PROXY2+=" --multipart $multipart"
PROXY2+=" --in-channel data2"
PROXY2+=" --out-channel data3"
PROXY2+=" --channel-config name=data2,type=pull,method=connect,sndBufSize=50,rcvBufSize=50,address=tcp://localhost:5557"
PROXY2+=" name=data3,type=push,method=connect,sndBufSize=50,rcvBufSize=50,address=tcp://localhost:5559"
PROXY2+=" --in-channel $chan2"
PROXY2+=" --out-channel $chan3"
PROXY2+=" --channel-config name=$chan2,type=pull,method=connect,sndBufSize=50,rcvBufSize=50,address=ipc://$chan2Addr2"
PROXY2+=" name=$chan3,type=push,method=connect,sndBufSize=50,rcvBufSize=50,address=ipc://$chan3Addr2"
@FAIRMQ_BIN_DIR@/$PROXY2 &
PROXY2_PID=$!
@@ -93,11 +109,12 @@ MERGER+=" --transport $transport"
MERGER+=" --color false"
MERGER+=" --control static"
MERGER+=" --verbosity veryhigh"
MERGER+=" --shm-segment-size 100000000"
MERGER+=" --multipart $multipart"
MERGER+=" --in-channel data3"
MERGER+=" --out-channel data4"
MERGER+=" --channel-config name=data3,type=pull,method=bind,sndBufSize=50,rcvBufSize=50,address=tcp://localhost:5558,address=tcp://localhost:5559"
MERGER+=" name=data4,type=push,method=connect,sndBufSize=50,rcvBufSize=50,address=tcp://localhost:5560"
MERGER+=" --in-channel $chan3"
MERGER+=" --out-channel $chan4"
MERGER+=" --channel-config name=$chan3,type=pull,method=bind,sndBufSize=50,rcvBufSize=50,address=ipc://$chan3Addr1,address=ipc://$chan3Addr2"
MERGER+=" name=$chan4,type=push,method=connect,sndBufSize=50,rcvBufSize=50,address=ipc://$chan4Addr"
@FAIRMQ_BIN_DIR@/$MERGER &
MERGER_PID=$!
@@ -108,11 +125,12 @@ MULTIPLIER+=" --transport $transport"
MULTIPLIER+=" --color false"
MULTIPLIER+=" --control static"
MULTIPLIER+=" --verbosity veryhigh"
MULTIPLIER+=" --shm-segment-size 100000000"
MULTIPLIER+=" --multipart $multipart"
MULTIPLIER+=" --in-channel data4"
MULTIPLIER+=" --out-channel data5"
MULTIPLIER+=" --channel-config name=data4,type=pull,method=bind,sndBufSize=50,rcvBufSize=50,address=tcp://localhost:5560"
MULTIPLIER+=" name=data5,type=push,method=connect,sndBufSize=50,rcvBufSize=50,address=tcp://localhost:5561,address=tcp://localhost:5561"
MULTIPLIER+=" --in-channel $chan4"
MULTIPLIER+=" --out-channel $chan5"
MULTIPLIER+=" --channel-config name=$chan4,type=pull,method=bind,sndBufSize=50,rcvBufSize=50,address=ipc://$chan4Addr"
MULTIPLIER+=" name=$chan5,type=push,method=connect,sndBufSize=50,rcvBufSize=50,address=ipc://$chan5Addr,address=ipc://$chan5Addr"
@FAIRMQ_BIN_DIR@/$MULTIPLIER &
MULTIPLIER_PID=$!
@@ -126,8 +144,8 @@ SINK+=" --verbosity veryhigh"
SINK+=" --severity debug"
SINK+=" --multipart $multipart"
SINK+=" --max-iterations 2"
SINK+=" --in-channel data5"
SINK+=" --channel-config name=data5,type=pull,method=bind,sndBufSize=50,rcvBufSize=50,address=tcp://localhost:5561"
SINK+=" --in-channel $chan5"
SINK+=" --channel-config name=$chan5,type=pull,method=bind,sndBufSize=50,rcvBufSize=50,address=ipc://$chan5Addr"
@FAIRMQ_BIN_DIR@/$SINK &
SINK_PID=$!
@@ -146,3 +164,5 @@ wait $PROXY1_PID
wait $PROXY2_PID
wait $MERGER_PID
wait $MULTIPLIER_PID
rm $chan1Addr; rm $chan2Addr1; rm $chan2Addr2; rm $chan3Addr1; rm $chan3Addr2; rm $chan4Addr; rm $chan5Addr

View File

@@ -24,10 +24,10 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-copypush.sh.in ${CMAK
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test-ex-copypush.sh.in ${CMAKE_CURRENT_BINARY_DIR}/test-ex-copypush.sh)
add_test(NAME Example.CopyPush.zeromq COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-copypush.sh zeromq)
set_tests_properties(Example.CopyPush.zeromq PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message: ")
set_tests_properties(Example.CopyPush.zeromq PROPERTIES TIMEOUT "30" PASS_REGULAR_EXPRESSION "Received message: ")
add_test(NAME Example.CopyPush.shmem COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-copypush.sh shmem)
set_tests_properties(Example.CopyPush.shmem PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message: ")
set_tests_properties(Example.CopyPush.shmem PROPERTIES TIMEOUT "30" PASS_REGULAR_EXPRESSION "Received message: ")
# install

View File

@@ -27,10 +27,10 @@ struct Sampler : fair::mq::Device
{
// NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
// Should only be used for small data because of the cost of an additional copy
FairMQMessagePtr msg(NewSimpleMessage(fCounter++));
fair::mq::MessagePtr msg(NewSimpleMessage(fCounter++));
for (int i = 0; i < fNumDataChannels - 1; ++i) {
FairMQMessagePtr msgCopy(NewMessage());
fair::mq::MessagePtr msgCopy(NewMessage());
msgCopy->Copy(*msg);
Send(msgCopy, "data", i);
}

View File

@@ -27,7 +27,7 @@ struct Sink : fair::mq::Device
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
bool HandleData(FairMQMessagePtr& msg, int)
bool HandleData(fair::mq::MessagePtr& msg, int)
{
LOG(info) << "Received message: \"" << *(static_cast<uint64_t*>(msg->GetData())) << "\"";

View File

@@ -8,19 +8,24 @@ if [[ $1 =~ ^[a-z]+$ ]]; then
transport=$1
fi
SESSION="$(@CMAKE_BINARY_DIR@/fairmq/fairmq-uuid-gen -h)"
session="$(@CMAKE_BINARY_DIR@/fairmq/fairmq-uuid-gen -h)"
chan="data"
chanAddr1="/tmp/fmq_$session""_""$chan""_1""_""$transport"
chanAddr2="/tmp/fmq_$session""_""$chan""_2""_""$transport"
# setup a trap to kill everything if the test fails/timeouts
trap 'kill -TERM $SAMPLER_PID; kill -TERM $SINK1_PID; kill -TERM $SINK2_PID; wait $SAMPLER_PID; wait $SINK1_PID; wait $SINK2_PID;' TERM
trap 'kill -TERM $SAMPLER_PID; kill -TERM $SINK1_PID; kill -TERM $SINK2_PID; wait $SAMPLER_PID; wait $SINK1_PID; wait $SINK2_PID; rm $chanAddr1; rm $chanAddr2' TERM
SAMPLER="fairmq-ex-copypush-sampler"
SAMPLER+=" --id sampler1"
SAMPLER+=" --transport $transport"
SAMPLER+=" --verbosity veryhigh"
SAMPLER+=" --session $SESSION"
SAMPLER+=" --severity debug"
SAMPLER+=" --shm-segment-size 100000000"
SAMPLER+=" --session $session"
SAMPLER+=" --control static --color false"
SAMPLER+=" --max-iterations 1"
SAMPLER+=" --channel-config name=data,type=push,method=bind,rateLogging=0,address=tcp://*:5555,address=tcp://*:5556"
SAMPLER+=" --channel-config name=$chan,type=push,method=bind,rateLogging=0,address=ipc://$chanAddr1,address=ipc://$chanAddr2"
@CMAKE_CURRENT_BINARY_DIR@/$SAMPLER &
SAMPLER_PID=$!
@@ -28,10 +33,12 @@ SINK1="fairmq-ex-copypush-sink"
SINK1+=" --id sink1"
SINK1+=" --transport $transport"
SINK1+=" --verbosity veryhigh"
SINK1+=" --session $SESSION"
SINK1+=" --severity debug"
SINK1+=" --shm-segment-size 100000000"
SINK1+=" --session $session"
SINK1+=" --control static --color false"
SINK1+=" --max-iterations 1"
SINK1+=" --channel-config name=data,type=pull,method=connect,rateLogging=0,address=tcp://localhost:5555"
SINK1+=" --channel-config name=$chan,type=pull,method=connect,rateLogging=0,address=ipc://$chanAddr1"
@CMAKE_CURRENT_BINARY_DIR@/$SINK1 &
SINK1_PID=$!
@@ -39,10 +46,12 @@ SINK2="fairmq-ex-copypush-sink"
SINK2+=" --id sink2"
SINK2+=" --transport $transport"
SINK2+=" --verbosity veryhigh"
SINK2+=" --session $SESSION"
SINK2+=" --severity debug"
SINK2+=" --shm-segment-size 100000000"
SINK2+=" --session $session"
SINK2+=" --control static --color false"
SINK2+=" --max-iterations 1"
SINK2+=" --channel-config name=data,type=pull,method=connect,rateLogging=0,address=tcp://localhost:5556"
SINK2+=" --channel-config name=$chan,type=pull,method=connect,rateLogging=0,address=ipc://$chanAddr2"
@CMAKE_CURRENT_BINARY_DIR@/$SINK2 &
SINK2_PID=$!
@@ -50,3 +59,5 @@ SINK2_PID=$!
wait $SAMPLER_PID
wait $SINK1_PID
wait $SINK2_PID
rm $chanAddr1; rm $chanAddr2

View File

@@ -20,7 +20,7 @@ struct Processor : fair::mq::Device
OnData("data1", &Processor::HandleData);
}
bool HandleData(FairMQMessagePtr& msg, int)
bool HandleData(fair::mq::MessagePtr& msg, int)
{
LOG(info) << "Received data, processing...";
@@ -32,7 +32,7 @@ struct Processor : fair::mq::Device
// its size,
// custom deletion function (called when transfer is done),
// and pointer to the object managing the data buffer
FairMQMessagePtr msg2(NewMessage(const_cast<char*>(text->c_str()),
fair::mq::MessagePtr msg2(NewMessage(const_cast<char*>(text->c_str()),
text->length(),
[](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },
text));

View File

@@ -24,7 +24,7 @@ struct Sampler : fair::mq::Device
{
// NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
// Should only be used for small data because of the cost of an additional copy
FairMQMessagePtr msg(NewSimpleMessage("Data"));
fair::mq::MessagePtr msg(NewSimpleMessage("Data"));
LOG(info) << "Sending \"Data\"";

View File

@@ -25,7 +25,7 @@ struct Sink : fair::mq::Device
fIterations = fConfig->GetValue<uint64_t>("iterations");
}
bool HandleData(FairMQMessagePtr& msg, int)
bool HandleData(fair::mq::MessagePtr& msg, int)
{
LOG(info) << "Received: \"" << std::string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\"";

View File

@@ -23,14 +23,14 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-multipart.sh.in ${CMA
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test-ex-multipart.sh.in ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh)
add_test(NAME Example.Multipart.zeromq COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh zeromq)
set_tests_properties(Example.Multipart.zeromq PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 7 parts")
set_tests_properties(Example.Multipart.zeromq PROPERTIES TIMEOUT "30" PASS_REGULAR_EXPRESSION "Received message with 7 parts")
add_test(NAME Example.Multipart.shmem COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh shmem)
set_tests_properties(Example.Multipart.shmem PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 7 parts")
set_tests_properties(Example.Multipart.shmem PROPERTIES TIMEOUT "30" PASS_REGULAR_EXPRESSION "Received message with 7 parts")
if(BUILD_OFI_TRANSPORT)
add_test(NAME Example.Multipart.ofi COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh ofi)
set_tests_properties(Example.Multipart.ofi PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 7 parts")
set_tests_properties(Example.Multipart.ofi PROPERTIES TIMEOUT "30" PASS_REGULAR_EXPRESSION "Received message with 7 parts")
endif()
# install

View File

@@ -5,7 +5,7 @@ A topology of two devices - Sampler and Sink, communicating with PUSH-PULL patte
The Sampler sends a multipart message to the Sink, consisting of two message parts - header and body.
Each message part is a regular FairMQMessage. To combine them into a multi-part message use `FairMQParts`. Add messages to `FairMQParts` with `AddPart` method.
Each message part is a regular fair::mq::Message. To combine them into a multi-part message use `fair::mq::Parts`. Add messages to `fair::mq::Parts` with `AddPart` method.
All parts are guaranteed to be delivered together. The Receive call in the sink will recive the entire parts structure.

View File

@@ -35,15 +35,15 @@ struct Sampler : fair::mq::Device
}
LOG(info) << "Sending header with stopFlag: " << header.stopFlag;
FairMQParts parts;
fair::mq::Parts parts;
// NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
// Should only be used for small data because of the cost of an additional copy
parts.AddPart(NewSimpleMessage(header));
parts.AddPart(NewMessage(1000));
// create more data parts, testing the FairMQParts in-place constructor
FairMQParts auxData{ NewMessage(500), NewMessage(600), NewMessage(700) };
// create more data parts, testing the fair::mq::Parts in-place constructor
fair::mq::Parts auxData{ NewMessage(500), NewMessage(600), NewMessage(700) };
assert(auxData.Size() == 3);
parts.AddPart(std::move(auxData));
assert(auxData.Size() == 0);

View File

@@ -20,7 +20,7 @@ struct Sink : fair::mq::Device
OnData("data", &Sink::HandleData);
}
bool HandleData(FairMQParts& parts, int)
bool HandleData(fair::mq::Parts& parts, int)
{
LOG(info) << "Received message with " << parts.Size() << " parts";

View File

@@ -8,19 +8,28 @@ if [[ $1 =~ ^[a-z]+$ ]]; then
transport=$1
fi
SESSION="$(@CMAKE_BINARY_DIR@/fairmq/fairmq-uuid-gen -h)"
session="$(@CMAKE_BINARY_DIR@/fairmq/fairmq-uuid-gen -h)"
chan="data"
chanAddr=""
chanIpcFile="/tmp/fmq_$session""_""$chan""_""$transport"
if [ $transport = "ofi" ]; then
chanAddr="tcp://127.0.0.1:5656"
else
chanAddr="ipc://""$chanIpcFile"
fi
# setup a trap to kill everything if the test fails/timeouts
trap 'kill -TERM $SAMPLER_PID; kill -TERM $SINK_PID; wait $SAMPLER_PID; wait $SINK_PID;' TERM
trap 'kill -TERM $SAMPLER_PID; kill -TERM $SINK_PID; wait $SAMPLER_PID; wait $SINK_PID; rm $chanIpcFile' TERM
SAMPLER="fairmq-ex-multipart-sampler"
SAMPLER+=" --id sampler1"
SAMPLER+=" --transport $transport"
SAMPLER+=" --verbosity veryhigh"
SAMPLER+=" --session $SESSION"
SAMPLER+=" --session $session"
SAMPLER+=" --shm-segment-size 100000000"
SAMPLER+=" --max-iterations 1"
SAMPLER+=" --control static --color false"
SAMPLER+=" --channel-config name=data,type=pair,method=connect,rateLogging=0,address=tcp://127.0.0.1:5555,linger=1000"
SAMPLER+=" --channel-config name=$chan,type=pair,method=connect,rateLogging=0,address=$chanAddr,linger=1000"
@CMAKE_CURRENT_BINARY_DIR@/$SAMPLER &
SAMPLER_PID=$!
@@ -28,11 +37,14 @@ SINK="fairmq-ex-multipart-sink"
SINK+=" --id sink1"
SINK+=" --transport $transport"
SINK+=" --verbosity veryhigh"
SINK+=" --session $SESSION"
SINK+=" --session $session"
SINK+=" --shm-segment-size 100000000"
SINK+=" --control static --color false"
SINK+=" --channel-config name=data,type=pair,method=bind,rateLogging=0,address=tcp://127.0.0.1:5555"
SINK+=" --channel-config name=$chan,type=pair,method=bind,rateLogging=0,address=$chanAddr"
@CMAKE_CURRENT_BINARY_DIR@/$SINK &
SINK_PID=$!
wait $SAMPLER_PID
wait $SINK_PID
rm $chanIpcFile

View File

@@ -26,7 +26,7 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-multiple-channels.sh.
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test-ex-multiple-channels.sh.in ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multiple-channels.sh)
add_test(NAME Example.MultipleChannels.zeromq COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multiple-channels.sh zeromq)
set_tests_properties(Example.MultipleChannels.zeromq PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received messages from both sources.")
set_tests_properties(Example.MultipleChannels.zeromq PROPERTIES TIMEOUT "30" PASS_REGULAR_EXPRESSION "Received messages from both sources.")
# install

View File

@@ -5,4 +5,4 @@ This example demonstrates how to work with multiple channels and multiplex betwe
A topology of three devices - **Sampler**, **Sink** and **Broadcaster**. The Sampler sends data to the Sink via the PUSH-PULL pattern. The Broadcaster device sends a message to both Sampler and Sink containing a string "OK" every second. The Broadcaster sends the message via PUB pattern. Both Sampler and Sink, besides doing their PUSH-PULL job, listen via SUB to the Broadcaster.
The multiplexing between their data channels and the broadcast channels happens with `FairMQPoller`.
The multiplexing between their data channels and the broadcast channels happens with `fair::mq::Poller`.

View File

@@ -22,7 +22,7 @@ struct Broadcaster : fair::mq::Device
// NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
// Should only be used for small data because of the cost of an additional copy
FairMQMessagePtr msg(NewSimpleMessage("OK"));
fair::mq::MessagePtr msg(NewSimpleMessage("OK"));
LOG(info) << "Sending OK";

View File

@@ -7,7 +7,7 @@
********************************************************************************/
#include <fairmq/Device.h>
#include <FairMQPoller.h>
#include <fairmq/Poller.h>
#include <fairmq/runDevice.h>
#include <chrono>
@@ -26,13 +26,13 @@ struct Sampler : fair::mq::Device
void Run() override
{
FairMQPollerPtr poller(NewPoller("data", "broadcast"));
fair::mq::PollerPtr poller(NewPoller("data", "broadcast"));
while (!NewStatePending()) {
poller->Poll(100);
if (poller->CheckInput("broadcast", 0)) {
FairMQMessagePtr msg(NewMessage());
fair::mq::MessagePtr msg(NewMessage());
if (Receive(msg, "broadcast") > 0) {
LOG(info) << "Received broadcast: \"" << std::string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\"";
@@ -42,7 +42,7 @@ struct Sampler : fair::mq::Device
if (poller->CheckOutput("data", 0)) {
std::this_thread::sleep_for(std::chrono::seconds(1));
FairMQMessagePtr msg(NewSimpleMessage(fText));
fair::mq::MessagePtr msg(NewSimpleMessage(fText));
if (Send(msg, "data") > 0) {
LOG(info) << "Sent \"" << fText << "\"";

View File

@@ -27,7 +27,7 @@ struct Sink : fair::mq::Device
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
bool HandleBroadcast(FairMQMessagePtr& msg, int /*index*/)
bool HandleBroadcast(fair::mq::MessagePtr& msg, int /*index*/)
{
LOG(info) << "Received broadcast: \"" << std::string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\"";
fReceivedBroadcast = true;
@@ -35,7 +35,7 @@ struct Sink : fair::mq::Device
return CheckIterations();
}
bool HandleData(FairMQMessagePtr& msg, int /*index*/)
bool HandleData(fair::mq::MessagePtr& msg, int /*index*/)
{
LOG(info) << "Received message: \"" << std::string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\"";
fReceivedData = true;

View File

@@ -8,18 +8,25 @@ if [[ $1 =~ ^[a-z]+$ ]]; then
transport=$1
fi
# setup a trap to kill everything if the test fails/timeouts
trap 'kill -TERM $SAMPLER_PID; kill -TERM $SINK_PID; kill -TERM $BROADCASTER_PID; wait $SAMPLER_PID; wait $SINK_PID; wait $BROADCASTER_PID;' TERM
session="$(@CMAKE_BINARY_DIR@/fairmq/fairmq-uuid-gen -h)"
dataChan="data"
broadcastChan="broadcast"
dataChanAddr="/tmp/fmq_$session""_""$dataChan""_""$transport"
broadcastChanAddr="/tmp/fmq_$session""_""$broadcastChan""_""$transport"
# setup a trap to kill everything if the test fails/timeouts
trap 'kill -TERM $SAMPLER_PID; kill -TERM $SINK_PID; kill -TERM $BROADCASTER_PID; wait $SAMPLER_PID; wait $SINK_PID; wait $BROADCASTER_PID; rm $dataChanAddr; rm $broadcastChanAddr' TERM
SINK="fairmq-ex-multiple-channels-sink"
SINK+=" --id sink1"
SINK+=" --session $session"
SINK+=" --transport $transport"
SINK+=" --verbosity veryhigh"
SINK+=" --verbosity veryhigh --severity debug"
SINK+=" --shm-segment-size 100000000"
SINK+=" --max-iterations 1"
SINK+=" --control static --color false"
SINK+=" --channel-config name=data,type=pull,method=connect,rateLogging=0,address=tcp://localhost:5555"
SINK+=" name=broadcast,type=sub,method=connect,rateLogging=0,address=tcp://localhost:5005"
SINK+=" --channel-config name=$dataChan,type=pull,method=connect,rateLogging=0,address=ipc://$dataChanAddr"
SINK+=" name=$broadcastChan,type=sub,method=connect,rateLogging=0,address=ipc://$broadcastChanAddr"
@CMAKE_CURRENT_BINARY_DIR@/$SINK &
SINK_PID=$!
@@ -27,21 +34,25 @@ sleep 1
SAMPLER="fairmq-ex-multiple-channels-sampler"
SAMPLER+=" --id sampler1"
SAMPLER+=" --session $session"
SAMPLER+=" --transport $transport"
SAMPLER+=" --verbosity veryhigh"
SAMPLER+=" --verbosity veryhigh --severity debug"
SAMPLER+=" --shm-segment-size 100000000"
SAMPLER+=" --max-iterations 1"
SAMPLER+=" --control static --color false"
SAMPLER+=" --channel-config name=data,type=push,method=bind,rateLogging=0,address=tcp://*:5555"
SAMPLER+=" name=broadcast,type=sub,method=connect,rateLogging=0,address=tcp://localhost:5005"
SAMPLER+=" --channel-config name=$dataChan,type=push,method=bind,rateLogging=0,address=ipc://$dataChanAddr"
SAMPLER+=" name=$broadcastChan,type=sub,method=connect,rateLogging=0,address=ipc://$broadcastChanAddr"
@CMAKE_CURRENT_BINARY_DIR@/$SAMPLER &
SAMPLER_PID=$!
BROADCASTER="fairmq-ex-multiple-channels-broadcaster"
BROADCASTER+=" --id broadcaster1"
BROADCASTER+=" --session $session"
BROADCASTER+=" --transport $transport"
BROADCASTER+=" --verbosity veryhigh"
BROADCASTER+=" --verbosity veryhigh --severity debug"
BROADCASTER+=" --shm-segment-size 100000000"
BROADCASTER+=" --control static --color false"
BROADCASTER+=" --channel-config name=broadcast,type=pub,method=bind,rateLogging=0,address=tcp://*:5005"
BROADCASTER+=" --channel-config name=$broadcastChan,type=pub,method=bind,rateLogging=0,address=ipc://$broadcastChanAddr"
@CMAKE_CURRENT_BINARY_DIR@/$BROADCASTER &
BROADCASTER_PID=$!
@@ -53,3 +64,5 @@ kill -SIGINT $BROADCASTER_PID
# wait for broadcaster to finish
wait $BROADCASTER_PID
rm $dataChanAddr; rm $broadcastChanAddr

View File

@@ -25,7 +25,7 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-multiple-transports.s
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test-ex-multiple-transports.sh.in ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multiple-transports.sh)
add_test(NAME Example.MultipleTransports COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multiple-transports.sh)
set_tests_properties(Example.MultipleTransports PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received messages from both sources.")
set_tests_properties(Example.MultipleTransports PROPERTIES TIMEOUT "30" PASS_REGULAR_EXPRESSION "Received messages from both sources.")
# install

View File

@@ -28,7 +28,7 @@ struct Sampler1 : fair::mq::Device
bool ConditionalRun() override
{
// Creates a message using the transport of channel data1
FairMQMessagePtr msg(NewMessageFor("data1", 0, 1000000));
fair::mq::MessagePtr msg(NewMessageFor("data1", 0, 1000000));
// in case of error or transfer interruption, return false to go to IDLE state
// successfull transfer will return number of bytes transfered (can be 0 if sending an empty message).
@@ -54,7 +54,7 @@ struct Sampler1 : fair::mq::Device
uint64_t numAcks = 0;
while (!NewStatePending()) {
FairMQMessagePtr ack(NewMessageFor("ack", 0));
fair::mq::MessagePtr ack(NewMessageFor("ack", 0));
if (Receive(ack, "ack") < 0) {
break;
}

View File

@@ -22,7 +22,7 @@ struct Sampler2 : fair::mq::Device
bool ConditionalRun() override
{
FairMQMessagePtr msg(NewMessage(1000));
fair::mq::MessagePtr msg(NewMessage(1000));
// in case of error or transfer interruption, return false to go to IDLE state
// successfull transfer will return number of bytes transfered (can be 0 if sending an empty message).

View File

@@ -26,11 +26,11 @@ struct Sink : fair::mq::Device
}
// handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0)
bool HandleData1(FairMQMessagePtr& /*msg*/, int /*index*/)
bool HandleData1(fair::mq::MessagePtr& /*msg*/, int /*index*/)
{
fNumIterations1++;
// Creates a message using the transport of channel ack
FairMQMessagePtr ack(NewMessageFor("ack", 0));
fair::mq::MessagePtr ack(NewMessageFor("ack", 0));
if (Send(ack, "ack") < 0) {
return false;
}
@@ -40,7 +40,7 @@ struct Sink : fair::mq::Device
}
// handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0)
bool HandleData2(FairMQMessagePtr& /*msg*/, int /*index*/)
bool HandleData2(fair::mq::MessagePtr& /*msg*/, int /*index*/)
{
fNumIterations2++;
// return true if want to be called again (otherwise go to IDLE state)

View File

@@ -2,46 +2,57 @@
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
SESSION="$(@CMAKE_BINARY_DIR@/fairmq/fairmq-uuid-gen -h)"
session="$(@CMAKE_BINARY_DIR@/fairmq/fairmq-uuid-gen -h)"
chan1="data1"
chan2="data2"
ackChan="ack"
chan1Addr="/tmp/fmq_$session""_""$chan1"
chan2Addr="/tmp/fmq_$session""_""$chan2"
ackChanAddr="/tmp/fmq_$session""_""$ackChan"
trap 'kill -TERM $SAMPLER1_PID; kill -TERM $SAMPLER2_PID; kill -TERM $SINK_PID; wait $SAMPLER1_PID; wait $SAMPLER2_PID; wait $SINK_PID; @CMAKE_BINARY_DIR@/fairmq/fairmq-shmmonitor --cleanup --session $SESSION;' TERM
trap 'kill -TERM $SAMPLER1_PID; kill -TERM $SAMPLER2_PID; kill -TERM $SINK_PID; wait $SAMPLER1_PID; wait $SAMPLER2_PID; wait $SINK_PID; @CMAKE_BINARY_DIR@/fairmq/fairmq-shmmonitor --cleanup --session $SESSION; rm $chan1Addr; rm $chan2Addr; rm $ackChanAddr' TERM
SINK="fairmq-ex-multiple-transports-sink"
SINK+=" --id sink1"
SINK+=" --verbosity veryhigh"
SINK+=" --session $SESSION"
SINK+=" --verbosity veryhigh --severity debug"
SINK+=" --shm-segment-size 100000000"
SINK+=" --session $session"
SINK+=" --max-iterations 1"
SINK+=" --control static --color false"
SINK+=" --transport shmem"
SINK+=" --channel-config name=data1,type=pull,method=connect,address=tcp://127.0.0.1:5555"
SINK+=" name=data2,type=pull,method=connect,address=tcp://127.0.0.1:5556,transport=zeromq"
SINK+=" name=ack,type=pub,method=connect,address=tcp://127.0.0.1:5557,transport=zeromq"
SINK+=" --channel-config name=$chan1,type=pull,method=connect,address=ipc://$chan1Addr"
SINK+=" name=$chan2,type=pull,method=connect,address=ipc://$chan2Addr,transport=zeromq"
SINK+=" name=$ackChan,type=pub,method=connect,address=ipc://$ackChanAddr,transport=zeromq"
@CMAKE_CURRENT_BINARY_DIR@/$SINK &
SINK_PID=$!
SAMPLER1="fairmq-ex-multiple-transports-sampler1"
SAMPLER1+=" --id sampler1"
SAMPLER1+=" --session $SESSION"
SAMPLER1+=" --verbosity veryhigh"
SAMPLER1+=" --session $session"
SAMPLER1+=" --verbosity veryhigh --severity debug"
SAMPLER1+=" --shm-segment-size 100000000"
SAMPLER1+=" --max-iterations 1"
SAMPLER1+=" --control static --color false"
SAMPLER1+=" --transport shmem"
SAMPLER1+=" --channel-config name=data1,type=push,method=bind,address=tcp://127.0.0.1:5555"
SAMPLER1+=" name=ack,type=sub,method=bind,address=tcp://127.0.0.1:5557,transport=zeromq"
SAMPLER1+=" --channel-config name=$chan1,type=push,method=bind,address=ipc://$chan1Addr"
SAMPLER1+=" name=$ackChan,type=sub,method=bind,address=ipc://$ackChanAddr,transport=zeromq"
@CMAKE_CURRENT_BINARY_DIR@/$SAMPLER1 &
SAMPLER1_PID=$!
SAMPLER2="fairmq-ex-multiple-transports-sampler2"
SAMPLER2+=" --id sampler2"
SAMPLER2+=" --session $SESSION"
SAMPLER2+=" --verbosity veryhigh"
SAMPLER2+=" --session $session"
SAMPLER2+=" --verbosity veryhigh --severity debug"
SAMPLER2+=" --shm-segment-size 100000000"
SAMPLER2+=" --max-iterations 1"
SAMPLER2+=" --control static --color false"
SAMPLER2+=" --transport zeromq"
SAMPLER2+=" --channel-config name=data2,type=push,method=bind,address=tcp://127.0.0.1:5556"
SAMPLER2+=" --channel-config name=$chan2,type=push,method=bind,address=ipc://$chan2Addr"
@CMAKE_CURRENT_BINARY_DIR@/$SAMPLER2 &
SAMPLER2_PID=$!
wait $SAMPLER1_PID
wait $SAMPLER2_PID
wait $SINK_PID
rm $chan1Addr; rm $chan2Addr; rm $ackChanAddr

View File

@@ -24,7 +24,7 @@ namespace bpo = boost::program_options;
struct TFBuffer
{
FairMQParts parts;
fair::mq::Parts parts;
chrono::steady_clock::time_point start;
chrono::steady_clock::time_point end;
};
@@ -43,7 +43,7 @@ struct Receiver : fair::mq::Device
fMaxTimeframes = GetConfig()->GetValue<int>("max-timeframes");
}
bool HandleData(FairMQParts& parts, int /* index */)
bool HandleData(fair::mq::Parts& parts, int /* index */)
{
Header& h = *(static_cast<Header*>(parts.At(0)->GetData()));
// LOG(info) << "Received sub-time frame #" << h.id << " from Sender" << h.senderIndex;
@@ -107,7 +107,7 @@ void addCustomOptions(bpo::options_description& options)
("max-timeframes", bpo::value<int>()->default_value(0), "Maximum number of timeframes to receive (0 - unlimited)");
}
std::unique_ptr<fair::mq::Device> getDevice(FairMQProgOptions& /* config */)
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /* config */)
{
return std::make_unique<Receiver>();
}

View File

@@ -28,11 +28,11 @@ struct Sender : fair::mq::Device
void Run() override
{
FairMQChannel& dataInChannel = GetChannel("sync", 0);
fair::mq::Channel& dataInChannel = GetChannel("sync", 0);
while (!NewStatePending()) {
Header h;
FairMQMessagePtr id(NewMessage());
fair::mq::MessagePtr id(NewMessage());
if (dataInChannel.Receive(id) > 0) {
h.id = *(static_cast<uint16_t*>(id->GetData()));
h.senderIndex = fIndex;
@@ -40,7 +40,7 @@ struct Sender : fair::mq::Device
continue;
}
FairMQParts parts;
fair::mq::Parts parts;
parts.AddPart(NewSimpleMessage(h));
parts.AddPart(NewMessage(fSubtimeframeSize));
@@ -66,7 +66,7 @@ void addCustomOptions(bpo::options_description& options)
("subtimeframe-size", bpo::value<int>()->default_value(1000), "Subtimeframe size in bytes")
("num-receivers", bpo::value<int>()->required(), "Number of EPNs");
}
std::unique_ptr<fair::mq::Device> getDevice(FairMQProgOptions& /* config */)
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /* config */)
{
return std::make_unique<Sender>();
}

View File

@@ -19,7 +19,7 @@ struct Synchronizer : fair::mq::Device
{
bool ConditionalRun() override
{
FairMQMessagePtr msg(NewSimpleMessage(fTimeframeId));
fair::mq::MessagePtr msg(NewSimpleMessage(fTimeframeId));
if (Send(msg, "sync") > 0) {
if (++fTimeframeId == UINT16_MAX - 1) {
@@ -37,7 +37,7 @@ struct Synchronizer : fair::mq::Device
};
void addCustomOptions(bpo::options_description& /* options */) {}
std::unique_ptr<fair::mq::Device> getDevice(FairMQProgOptions& /* config */)
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /* config */)
{
return std::make_unique<Synchronizer>();
}

View File

@@ -30,10 +30,10 @@ struct QCDispatcher : fair::mq::Device
});
}
bool HandleData(FairMQMessagePtr& msg, int)
bool HandleData(fair::mq::MessagePtr& msg, int)
{
if (fDoQC.load() == true) {
FairMQMessagePtr msgCopy(NewMessage());
fair::mq::MessagePtr msgCopy(NewMessage());
msgCopy->Copy(*msg);
if (Send(msg, "qc") < 0) {
return false;

View File

@@ -9,12 +9,12 @@
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
class QCTask : public FairMQDevice
class QCTask : public fair::mq::Device
{
public:
QCTask()
{
OnData("qc", [](FairMQMessagePtr& /*msg*/, int) {
OnData("qc", [](fair::mq::MessagePtr& /*msg*/, int) {
LOG(info) << "received data";
return false;
});

View File

@@ -16,7 +16,7 @@ struct Sampler : fair::mq::Device
{
bool ConditionalRun() override
{
FairMQMessagePtr msg(NewMessage(1000));
fair::mq::MessagePtr msg(NewMessage(1000));
if (Send(msg, "data1") < 0) {
return false;

View File

@@ -14,7 +14,7 @@
struct Sink : fair::mq::Device
{
Sink() { OnData("data2", &Sink::HandleData); }
bool HandleData(FairMQMessagePtr& /*msg*/, int /*index*/) { return true; }
bool HandleData(fair::mq::MessagePtr& /*msg*/, int /*index*/) { return true; }
};
namespace bpo = boost::program_options;

View File

@@ -13,7 +13,7 @@
namespace bpo = boost::program_options;
class Builder : public FairMQDevice
class Builder : public fair::mq::Device
{
public:
Builder() = default;
@@ -24,7 +24,7 @@ class Builder : public FairMQDevice
OnData("rb", &Builder::HandleData);
}
bool HandleData(FairMQMessagePtr& msg, int /*index*/)
bool HandleData(fair::mq::MessagePtr& msg, int /*index*/)
{
if (Send(msg, fOutputChannelName) < 0) {
return false;

View File

@@ -18,9 +18,9 @@ struct Processor : fair::mq::Device
OnData("bp", &Processor::HandleData);
}
bool HandleData(FairMQMessagePtr& msg, int /*index*/)
bool HandleData(fair::mq::MessagePtr& msg, int /*index*/)
{
FairMQMessagePtr msg2(NewMessageFor("ps", 0, msg->GetSize()));
fair::mq::MessagePtr msg2(NewMessageFor("ps", 0, msg->GetSize()));
if (Send(msg2, "ps") < 0) {
return false;
}

View File

@@ -22,7 +22,7 @@ struct Readout : fair::mq::Device
fMsgSize = fConfig->GetProperty<int>("msg-size");
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("rb",
fRegion = fair::mq::UnmanagedRegionPtr(NewUnmanagedRegionFor("rb",
0,
10000000,
[this](const std::vector<fair::mq::RegionBlock>& blocks) { // callback to be called when message buffers no longer needed by transport
@@ -36,7 +36,7 @@ struct Readout : fair::mq::Device
bool ConditionalRun() override
{
FairMQMessagePtr msg(NewMessageFor("rb", // channel
fair::mq::MessagePtr msg(NewMessageFor("rb", // channel
0, // sub-channel
fRegion, // region
fRegion->GetData(), // ptr within region
@@ -71,7 +71,7 @@ struct Readout : fair::mq::Device
int fMsgSize = 10000;
uint64_t fMaxIterations = 0;
uint64_t fNumIterations = 0;
FairMQUnmanagedRegionPtr fRegion = nullptr;
fair::mq::UnmanagedRegionPtr fRegion = nullptr;
std::atomic<uint64_t> fNumUnackedMsgs = 0;
};

View File

@@ -21,7 +21,7 @@ struct Sender : fair::mq::Device
OnData(fInputChannelName, &Sender::HandleData);
}
bool HandleData(FairMQMessagePtr& msg, int /*index*/)
bool HandleData(fair::mq::MessagePtr& msg, int /*index*/)
{
if (Send(msg, "sr") < 0) {
return false;

View File

@@ -26,10 +26,10 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-region.sh.in ${CMAKE_
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test-ex-region.sh.in ${CMAKE_CURRENT_BINARY_DIR}/test-ex-region.sh)
add_test(NAME Example.Region.zeromq COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-region.sh zeromq)
set_tests_properties(Example.Region.zeromq PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received [0-9*] acks")
set_tests_properties(Example.Region.zeromq PROPERTIES TIMEOUT "30" PASS_REGULAR_EXPRESSION "Received [0-9*] acks")
add_test(NAME Example.Region.shmem COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-region.sh shmem)
set_tests_properties(Example.Region.shmem PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received [0-9*] acks")
set_tests_properties(Example.Region.shmem PROPERTIES TIMEOUT "30" PASS_REGULAR_EXPRESSION "Received [0-9*] acks")
# install

View File

@@ -14,13 +14,18 @@
#include <fairlogger/Logger.h>
#include <boost/algorithm/string.hpp>
#include <boost/program_options.hpp>
#include <csignal>
#include <chrono>
#include <map>
#include <string>
#include <thread>
using namespace std;
using namespace boost::program_options;
namespace
{
@@ -32,19 +37,69 @@ void signalHandler(int /* signal */)
gStopping = 1;
}
struct ShmRemover
struct ShmManager
{
ShmRemover(std::string _shmId) : shmId(std::move(_shmId)) {}
~ShmRemover()
ShmManager(uint64_t _shmId, const vector<string>& _segments, const vector<string>& _regions)
: shmId(fair::mq::shmem::makeShmIdStr(_shmId))
{
// This will clean all segments, regions and any other shmem objects belonging to this shmId
for (const auto& s : _segments) {
vector<string> segmentConf;
boost::algorithm::split(segmentConf, s, boost::algorithm::is_any_of(","));
if (segmentConf.size() != 2) {
LOG(error) << "incorrect format for --segments. Expecting pairs of <id>,<size>.";
fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{shmId});
throw runtime_error("incorrect format for --segments. Expecting pairs of <id>,<size>.");
}
uint16_t id = stoi(segmentConf.at(0));
uint64_t size = stoull(segmentConf.at(1));
auto ret = segments.emplace(id, fair::mq::shmem::Segment(shmId, id, size, fair::mq::shmem::rbTreeBestFit));
fair::mq::shmem::Segment& segment = ret.first->second;
LOG(info) << "Created segment " << id << " of size " << segment.GetSize() << ", starting at " << segment.GetData() << ". Locking...";
segment.Lock();
LOG(info) << "Done.";
LOG(info) << "Zeroing...";
segment.Zero();
LOG(info) << "Done.";
}
for (const auto& r : _regions) {
vector<string> regionConf;
boost::algorithm::split(regionConf, r, boost::algorithm::is_any_of(","));
if (regionConf.size() != 2) {
LOG(error) << "incorrect format for --regions. Expecting pairs of <id>,<size>.";
fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{shmId});
throw runtime_error("incorrect format for --regions. Expecting pairs of <id>,<size>.");
}
uint16_t id = stoi(regionConf.at(0));
uint64_t size = stoull(regionConf.at(1));
auto ret = regions.emplace(id, make_unique<fair::mq::shmem::UnmanagedRegion>(shmId, id, size));
fair::mq::shmem::UnmanagedRegion& region = *(ret.first->second);
LOG(info) << "Created unamanged region " << id << " of size " << region.GetSize() << ", starting at " << region.GetData() << ". Locking...";
region.Lock();
LOG(info) << "Done.";
LOG(info) << "Zeroing...";
region.Zero();
LOG(info) << "Done.";
}
}
void ResetContent()
{
fair::mq::shmem::Monitor::ResetContent(fair::mq::shmem::ShmId{shmId});
}
~ShmManager()
{
// clean all segments, regions and any other shmem objects belonging to this shmId
fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{shmId});
}
std::string shmId;
map<uint16_t, fair::mq::shmem::Segment> segments;
map<uint16_t, unique_ptr<fair::mq::shmem::UnmanagedRegion>> regions;
};
int main(int /* argc */, char** /* argv */)
int main(int argc, char** argv)
{
fair::Logger::SetConsoleColor(true);
@@ -52,47 +107,28 @@ int main(int /* argc */, char** /* argv */)
signal(SIGTERM, signalHandler);
try {
const string session = "default"; // to_string(fair::mq::tools::UuidHash());
// generate shmId out of session id + user id (geteuid).
const string shmId = fair::mq::shmem::makeShmIdStr(session);
uint64_t shmId = 0;
vector<string> segments;
vector<string> regions;
const uint16_t s1id = 0;
const uint64_t s1size = 100000000;
const uint16_t s2id = 1;
const uint64_t s2size = 200000000;
options_description desc("Options");
desc.add_options()
("shmid", value<uint64_t>(&shmId)->required(), "Shm id")
("segments", value<vector<string>>(&segments)->multitoken()->composing(), "Segments, as <id>,<size> <id>,<size> <id>,<size> ...")
("regions", value<vector<string>>(&regions)->multitoken()->composing(), "Regions, as <id>,<size> <id>,<size> <id>,<size> ...")
("help,h", "Print help");
const uint16_t r1id = 0;
const uint64_t r1size = 100000000;
const uint16_t r2id = 1;
const uint64_t r2size = 200000000;
variables_map vm;
store(parse_command_line(argc, argv, desc), vm);
// cleanup when done
ShmRemover shmRemover(shmId);
if (vm.count("help")) {
LOG(info) << "ShmManager" << "\n" << desc;
return 0;
}
// managed segments
fair::mq::shmem::Segment segment1(shmId, s1id, s1size, fair::mq::shmem::rbTreeBestFit);
segment1.Lock();
segment1.Zero();
LOG(info) << "Created segment " << s1id << " of size " << segment1.GetSize() << " starting at " << segment1.GetData();
notify(vm);
fair::mq::shmem::Segment segment2(shmId, s2id, s2size, fair::mq::shmem::rbTreeBestFit);
segment2.Lock();
segment2.Zero();
LOG(info) << "Created segment " << s2id << " of size " << segment2.GetSize() << " starting at " << segment2.GetData();
// unmanaged regions
fair::mq::shmem::UnmanagedRegion region1(shmId, r1id, r1size);
region1.Lock();
region1.Zero();
LOG(info) << "Created region " << r1id << " of size " << region1.GetSize() << " starting at " << region1.GetData();
fair::mq::shmem::UnmanagedRegion region2(shmId, r2id, r2size);
region2.Lock();
region2.Zero();
LOG(info) << "Created region " << r2id << " of size " << region2.GetSize() << " starting at " << region2.GetData();
// for a "soft reset" call (shmem should not be in active use by (no messages in flight) devices during this call):
// fair::mq::shmem::Monitor::ResetContent(fair::mq::shmem::ShmId{shmId});
ShmManager shmManager(shmId, segments, regions);
while (!gStopping) {
std::this_thread::sleep_for(std::chrono::milliseconds(50));

View File

@@ -23,7 +23,7 @@ struct Sampler : fair::mq::Device
fLinger = fConfig->GetProperty<uint32_t>("region-linger");
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
GetChannel("data", 0).Transport()->SubscribeToRegionEvents([](FairMQRegionInfo info) {
GetChannel("data", 0).Transport()->SubscribeToRegionEvents([](fair::mq::RegionInfo info) {
LOG(info) << "Region event: " << info.event << ": "
<< (info.managed ? "managed" : "unmanaged")
<< ", id: " << info.id
@@ -36,7 +36,7 @@ struct Sampler : fair::mq::Device
regionCfg.linger = fLinger; // delay in ms before region destruction to collect outstanding events
regionCfg.lock = true; // mlock region after creation
regionCfg.zero = true; // zero region content after creation
fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data", // region is created using the transport of this channel...
fRegion = fair::mq::UnmanagedRegionPtr(NewUnmanagedRegionFor("data", // region is created using the transport of this channel...
0, // ... and this sub-channel
10000000, // region size
[this](const std::vector<fair::mq::RegionBlock>& blocks) { // callback to be called when message buffers no longer needed by transport
@@ -50,7 +50,7 @@ struct Sampler : fair::mq::Device
bool ConditionalRun() override
{
FairMQMessagePtr msg(NewMessageFor("data", // channel
fair::mq::MessagePtr msg(NewMessageFor("data", // channel
0, // sub-channel
fRegion, // region
fRegion->GetData(), // ptr within region
@@ -93,7 +93,7 @@ struct Sampler : fair::mq::Device
uint32_t fLinger = 100;
uint64_t fMaxIterations = 0;
uint64_t fNumIterations = 0;
FairMQUnmanagedRegionPtr fRegion = nullptr;
fair::mq::UnmanagedRegionPtr fRegion = nullptr;
std::mutex fMtx;
uint64_t fNumUnackedMsgs = 0;
};

View File

@@ -9,22 +9,25 @@ if [[ $1 =~ ^[a-z]+$ ]]; then
fi
msgSize="1000000"
SESSION="$(@CMAKE_BINARY_DIR@/fairmq/fairmq-uuid-gen -h)"
session="$(@CMAKE_BINARY_DIR@/fairmq/fairmq-uuid-gen -h)"
chan="data"
chanAddr="/tmp/fmq_$session""_""$chan""_""$transport"
# setup a trap to kill everything if the test fails/timeouts
trap 'kill -TERM $SAMPLER_PID; kill -TERM $SINK_PID; wait $SAMPLER_PID; wait $SINK_PID; @CMAKE_BINARY_DIR@/fairmq/fairmq-shmmonitor --cleanup --session $SESSION' TERM
trap 'kill -TERM $SAMPLER_PID; kill -TERM $SINK_PID; wait $SAMPLER_PID; wait $SINK_PID; @CMAKE_BINARY_DIR@/fairmq/fairmq-shmmonitor --cleanup --session $SESSION; rm $chanAddr' TERM
SAMPLER="fairmq-ex-region-sampler"
SAMPLER+=" --id sampler1"
SAMPLER+=" --transport $transport"
SAMPLER+=" --severity debug"
SAMPLER+=" --session $SESSION"
SAMPLER+=" --session $session"
SAMPLER+=" --shm-segment-size 100000000"
SAMPLER+=" --verbosity veryhigh"
SAMPLER+=" --control static --color false"
SAMPLER+=" --max-iterations 1"
SAMPLER+=" --msg-size $msgSize"
SAMPLER+=" --region-linger 500"
SAMPLER+=" --channel-config name=data,type=push,method=bind,address=tcp://127.0.0.1:7777"
SAMPLER+=" --channel-config name=$chan,type=push,method=bind,address=ipc://$chanAddr"
@CMAKE_CURRENT_BINARY_DIR@/$SAMPLER &
SAMPLER_PID=$!
@@ -32,14 +35,17 @@ SINK="fairmq-ex-region-sink"
SINK+=" --id sink1"
SINK+=" --transport $transport"
SINK+=" --severity debug"
SINK+=" --session $SESSION"
SINK+=" --session $session"
SINK+=" --shm-segment-size 100000000"
SINK+=" --verbosity veryhigh"
SINK+=" --control static --color false"
SINK+=" --max-iterations 1"
SINK+=" --channel-config name=data,type=pull,method=connect,address=tcp://127.0.0.1:7777"
SINK+=" --channel-config name=$chan,type=pull,method=connect,address=ipc://$chanAddr"
@CMAKE_CURRENT_BINARY_DIR@/$SINK &
SINK_PID=$!
# wait for sampler and sink to finish
wait $SAMPLER_PID
wait $SINK_PID
rm $chanAddr

View File

@@ -24,10 +24,10 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-req-rep.sh.in ${CMAKE
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test-ex-req-rep.sh.in ${CMAKE_CURRENT_BINARY_DIR}/test-ex-req-rep.sh)
add_test(NAME Example.ReqRep.zeromq COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-req-rep.sh zeromq)
set_tests_properties(Example.ReqRep.zeromq PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received reply from server: ")
set_tests_properties(Example.ReqRep.zeromq PROPERTIES TIMEOUT "30" PASS_REGULAR_EXPRESSION "Received reply from server: ")
add_test(NAME Example.ReqRep.shmem COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-req-rep.sh shmem)
set_tests_properties(Example.ReqRep.shmem PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received reply from server: ")
set_tests_properties(Example.ReqRep.shmem PROPERTIES TIMEOUT "30" PASS_REGULAR_EXPRESSION "Received reply from server: ")
# install

View File

@@ -32,11 +32,11 @@ struct Client : fair::mq::Device
// its size,
// custom deletion function (called when transfer is done),
// and pointer to the object managing the data buffer
FairMQMessagePtr req(NewMessage(const_cast<char*>(text->c_str()), // data
fair::mq::MessagePtr req(NewMessage(const_cast<char*>(text->c_str()), // data
text->length(), // size
[](void* /*data*/, void* object) { delete static_cast<std::string*>(object); }, // deletion callback
text)); // object that manages the data
FairMQMessagePtr rep(NewMessage());
fair::mq::MessagePtr rep(NewMessage());
LOG(info) << "Sending \"" << fText << "\" to server.";

View File

@@ -26,7 +26,7 @@ struct Server : fair::mq::Device
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
bool HandleData(FairMQMessagePtr& req, int)
bool HandleData(fair::mq::MessagePtr& req, int)
{
LOG(info) << "Received request from client: \"" << std::string(static_cast<char*>(req->GetData()), req->GetSize()) << "\"";
@@ -34,7 +34,7 @@ struct Server : fair::mq::Device
LOG(info) << "Sending reply to client.";
FairMQMessagePtr rep(NewMessage(const_cast<char*>(text->c_str()), // data
fair::mq::MessagePtr rep(NewMessage(const_cast<char*>(text->c_str()), // data
text->length(), // size
[](void* /*data*/, void* object) { delete static_cast<std::string*>(object); }, // deletion callback
text)); // object that manages the data

View File

@@ -8,19 +8,22 @@ if [[ $1 =~ ^[a-z]+$ ]]; then
transport=$1
fi
SESSION="$(@CMAKE_BINARY_DIR@/fairmq/fairmq-uuid-gen -h)"
session="$(@CMAKE_BINARY_DIR@/fairmq/fairmq-uuid-gen -h)"
chan="data"
chanAddr="/tmp/fmq_$session""_""$chan""_""$transport"
# setup a trap to kill everything if the test fails/timeouts
trap 'kill -TERM $CLIENT_PID; kill -TERM $SERVER_PID; wait $CLIENT_PID; wait $SERVER_PID;' TERM
trap 'kill -TERM $CLIENT_PID; kill -TERM $SERVER_PID; wait $CLIENT_PID; wait $SERVER_PID; rm $chanAddr' TERM
CLIENT="fairmq-ex-req-rep-client"
CLIENT+=" --id client"
CLIENT+=" --transport $transport"
CLIENT+=" --verbosity veryhigh"
CLIENT+=" --session $SESSION"
CLIENT+=" --session $session"
CLIENT+=" --shm-segment-size 100000000"
CLIENT+=" --control static --color false"
CLIENT+=" --max-iterations 1"
CLIENT+=" --channel-config name=data,type=req,method=connect,rateLogging=0,address=tcp://127.0.0.1:5005"
CLIENT+=" --channel-config name=$chan,type=req,method=connect,rateLogging=0,address=ipc://$chanAddr"
@CMAKE_CURRENT_BINARY_DIR@/$CLIENT &
CLIENT_PID=$!
@@ -28,13 +31,16 @@ SERVER="fairmq-ex-req-rep-server"
SERVER+=" --id server"
SERVER+=" --transport $transport"
SERVER+=" --verbosity veryhigh"
SERVER+=" --session $SESSION"
SERVER+=" --session $session"
SERVER+=" --shm-segment-size 100000000"
SERVER+=" --control static --color false"
SERVER+=" --max-iterations 1"
SERVER+=" --channel-config name=data,type=rep,method=bind,rateLogging=0,address=tcp://127.0.0.1:5005"
SERVER+=" --channel-config name=$chan,type=rep,method=bind,rateLogging=0,address=ipc://$chanAddr"
@CMAKE_CURRENT_BINARY_DIR@/$SERVER &
SERVER_PID=$!
# wait for everything to finish
wait $CLIENT_PID
wait $SERVER_PID
rm $chanAddr

View File

@@ -30,13 +30,13 @@
using namespace std;
using namespace fair::mq;
using namespace fair::mq::tools;
using namespace tools;
using namespace boost::property_tree;
namespace fair::mq
{
fair::mq::Properties PtreeParser(const ptree& pt, const string& id)
Properties PtreeParser(const ptree& pt, const string& id)
{
if (id.empty()) {
throw ParserError("no device ID provided. Provide with `--id` cmd option");
@@ -47,7 +47,7 @@ fair::mq::Properties PtreeParser(const ptree& pt, const string& id)
return helper::DeviceParser(pt.get_child("fairMQOptions"), id);
}
fair::mq::Properties JSONParser(const string& filename, const string& deviceId)
Properties JSONParser(const string& filename, const string& deviceId)
{
ptree pt;
LOG(debug) << "Parsing JSON from " << filename << " ...";
@@ -58,9 +58,9 @@ fair::mq::Properties JSONParser(const string& filename, const string& deviceId)
namespace helper
{
fair::mq::Properties DeviceParser(const ptree& fairMQOptions, const string& deviceId)
Properties DeviceParser(const ptree& fairMQOptions, const string& deviceId)
{
fair::mq::Properties properties;
Properties properties;
for (const auto& node : fairMQOptions) {
if (node.first == "devices") {
@@ -82,27 +82,27 @@ fair::mq::Properties DeviceParser(const ptree& fairMQOptions, const string& devi
return properties;
}
void ChannelParser(const ptree& tree, fair::mq::Properties& properties)
void ChannelParser(const ptree& tree, Properties& properties)
{
for (const auto& node : tree) {
if (node.first == "channels") {
for (const auto& cn : node.second) {
fair::mq::Properties commonProperties;
commonProperties.emplace("type", cn.second.get<string>("type", FairMQChannel::DefaultType));
commonProperties.emplace("method", cn.second.get<string>("method", FairMQChannel::DefaultMethod));
commonProperties.emplace("address", cn.second.get<string>("address", FairMQChannel::DefaultAddress));
commonProperties.emplace("transport", cn.second.get<string>("transport", FairMQChannel::DefaultTransportName));
commonProperties.emplace("sndBufSize", cn.second.get<int>("sndBufSize", FairMQChannel::DefaultSndBufSize));
commonProperties.emplace("rcvBufSize", cn.second.get<int>("rcvBufSize", FairMQChannel::DefaultRcvBufSize));
commonProperties.emplace("sndKernelSize", cn.second.get<int>("sndKernelSize", FairMQChannel::DefaultSndKernelSize));
commonProperties.emplace("rcvKernelSize", cn.second.get<int>("rcvKernelSize", FairMQChannel::DefaultRcvKernelSize));
commonProperties.emplace("sndTimeoutMs", cn.second.get<int>("sndTimeoutMs", FairMQChannel::DefaultSndTimeoutMs));
commonProperties.emplace("rcvTimeoutMs", cn.second.get<int>("rcvTimeoutMs", FairMQChannel::DefaultRcvTimeoutMs));
commonProperties.emplace("linger", cn.second.get<int>("linger", FairMQChannel::DefaultLinger));
commonProperties.emplace("rateLogging", cn.second.get<int>("rateLogging", FairMQChannel::DefaultRateLogging));
commonProperties.emplace("portRangeMin", cn.second.get<int>("portRangeMin", FairMQChannel::DefaultPortRangeMin));
commonProperties.emplace("portRangeMax", cn.second.get<int>("portRangeMax", FairMQChannel::DefaultPortRangeMax));
commonProperties.emplace("autoBind", cn.second.get<bool>("autoBind", FairMQChannel::DefaultAutoBind));
Properties commonProperties;
commonProperties.emplace("type", cn.second.get<string>("type", Channel::DefaultType));
commonProperties.emplace("method", cn.second.get<string>("method", Channel::DefaultMethod));
commonProperties.emplace("address", cn.second.get<string>("address", Channel::DefaultAddress));
commonProperties.emplace("transport", cn.second.get<string>("transport", Channel::DefaultTransportName));
commonProperties.emplace("sndBufSize", cn.second.get<int>("sndBufSize", Channel::DefaultSndBufSize));
commonProperties.emplace("rcvBufSize", cn.second.get<int>("rcvBufSize", Channel::DefaultRcvBufSize));
commonProperties.emplace("sndKernelSize", cn.second.get<int>("sndKernelSize", Channel::DefaultSndKernelSize));
commonProperties.emplace("rcvKernelSize", cn.second.get<int>("rcvKernelSize", Channel::DefaultRcvKernelSize));
commonProperties.emplace("sndTimeoutMs", cn.second.get<int>("sndTimeoutMs", Channel::DefaultSndTimeoutMs));
commonProperties.emplace("rcvTimeoutMs", cn.second.get<int>("rcvTimeoutMs", Channel::DefaultRcvTimeoutMs));
commonProperties.emplace("linger", cn.second.get<int>("linger", Channel::DefaultLinger));
commonProperties.emplace("rateLogging", cn.second.get<int>("rateLogging", Channel::DefaultRateLogging));
commonProperties.emplace("portRangeMin", cn.second.get<int>("portRangeMin", Channel::DefaultPortRangeMin));
commonProperties.emplace("portRangeMax", cn.second.get<int>("portRangeMax", Channel::DefaultPortRangeMax));
commonProperties.emplace("autoBind", cn.second.get<bool>("autoBind", Channel::DefaultAutoBind));
string name = cn.second.get<string>("name");
int numSockets = cn.second.get<int>("numSockets", 0);
@@ -128,7 +128,7 @@ void ChannelParser(const ptree& tree, fair::mq::Properties& properties)
}
}
void SubChannelParser(const ptree& channelTree, fair::mq::Properties& properties, const string& channelName, const fair::mq::Properties& commonProperties)
void SubChannelParser(const ptree& channelTree, Properties& properties, const string& channelName, const Properties& commonProperties)
{
// for each socket in channel
int i = 0;
@@ -137,7 +137,7 @@ void SubChannelParser(const ptree& channelTree, fair::mq::Properties& properties
if (node.first == "sockets") {
for (const auto& sn : node.second) {
// a sub-channel inherits relevant properties from the common channel ...
fair::mq::Properties newProperties(commonProperties);
Properties newProperties(commonProperties);
// ... and adds/overwrites its own properties
newProperties["type"] = sn.second.get<string>("type", boost::any_cast<string>(commonProperties.at("type")));
@@ -177,7 +177,7 @@ void SubChannelParser(const ptree& channelTree, fair::mq::Properties& properties
LOG(trace) << "\tNo sockets specified,";
LOG(trace) << "\tapplying common settings to the channel:";
fair::mq::Properties newProperties(commonProperties);
Properties newProperties(commonProperties);
for (auto& p : newProperties) {
LOG(trace) << "\t" << setw(13) << left << p.first << " : " << p.second;

View File

@@ -12,7 +12,7 @@
///
/// @author Mikolaj Krzewicki, mkrzewic@cern.ch
#include <fairmq/FairMQTransportFactory.h>
#include <fairmq/TransportFactory.h>
#include <fairmq/MemoryResources.h>
namespace fair::mq
@@ -28,13 +28,13 @@ template<typename ContainerT>
// pmr::polymorphic_allocator<typename
// ContainerT::value_type>,
// typename ContainerT::allocator_type>::value == true,
// FairMQMessagePtr>::type
FairMQMessagePtr getMessage(ContainerT &&container_, FairMQMemoryResource *targetResource = nullptr)
// MessagePtr>::type
MessagePtr getMessage(ContainerT &&container_, MemoryResource *targetResource = nullptr)
{
auto container = std::move(container_);
auto alloc = container.get_allocator();
auto resource = dynamic_cast<FairMQMemoryResource *>(alloc.resource());
auto resource = dynamic_cast<MemoryResource *>(alloc.resource());
if (!resource && !targetResource) {
throw std::runtime_error("Neither the container or target resource specified");
}

View File

@@ -7,7 +7,7 @@
********************************************************************************/
#include <fairmq/Plugin.h>
#include <FairMQLogger.h>
#include <fairlogger/Logger.h>
#include <utility>
using namespace std;

View File

@@ -351,7 +351,7 @@ void ProgOptions::DeleteProperty(const string& key)
vm.erase(key);
}
void ProgOptions::AddChannel(const string& name, const FairMQChannel& channel)
void ProgOptions::AddChannel(const string& name, const Channel& channel)
{
lock_guard<mutex> lock(fMtx);
unordered_map<string, int> existingChannels = GetChannelInfoImpl();

View File

@@ -133,6 +133,7 @@ struct RegionConfig
bool removeOnDestruction = true; /// remove the region on object destruction
int creationFlags = 0; /// flags passed to the underlying transport on region creation
int64_t userFlags = 0; /// custom flags that have no effect on the transport, but can be retrieved from the region by the user
uint64_t size = 0; /// region size
std::string path = ""; /// file path, if the region is backed by a file
std::optional<uint16_t> id = std::nullopt; /// region id
uint32_t linger = 100; /// delay in ms before region destruction to collect outstanding events

View File

@@ -44,7 +44,7 @@ class BenchmarkSampler : public Device
void Run() override
{
// store the channel reference to avoid traversing the map on every loop iteration
FairMQChannel& dataOutChannel = GetChannel(fOutChannelName, 0);
Channel& dataOutChannel = GetChannel(fOutChannelName, 0);
LOG(info) << "Starting the benchmark with message size of " << fMsgSize << " and " << fMaxIterations << " iterations.";
auto tStart = std::chrono::high_resolution_clock::now();
@@ -53,7 +53,7 @@ class BenchmarkSampler : public Device
while (!NewStatePending()) {
if (fMultipart) {
FairMQParts parts;
Parts parts;
for (size_t i = 0; i < fNumParts; ++i) {
parts.AddPart(dataOutChannel.NewMessage(fMsgSize, fair::mq::Alignment{fMsgAlignment}));
@@ -71,7 +71,7 @@ class BenchmarkSampler : public Device
++fNumIterations;
}
} else {
FairMQMessagePtr msg(dataOutChannel.NewMessage(fMsgSize, fair::mq::Alignment{fMsgAlignment}));
MessagePtr msg(dataOutChannel.NewMessage(fMsgSize, fair::mq::Alignment{fMsgAlignment}));
if (fMemSet) {
std::memset(msg->GetData(), 0, msg->GetSize());
}

View File

@@ -9,7 +9,7 @@
#ifndef FAIR_MQ_MERGER_H
#define FAIR_MQ_MERGER_H
#include <FairMQPoller.h>
#include <fairmq/Poller.h>
#include <fairmq/Device.h>
#include <fairlogger/Logger.h>
@@ -45,13 +45,13 @@ class Merger : public Device
{
int numInputs = GetNumSubChannels(fInChannelName);
std::vector<FairMQChannel*> chans;
std::vector<Channel*> chans;
for (auto& chan : fChannels.at(fInChannelName)) {
chans.push_back(&chan);
}
FairMQPollerPtr poller(NewPoller(chans));
PollerPtr poller(NewPoller(chans));
if (fMultipart) {
while (!NewStatePending()) {
@@ -61,7 +61,7 @@ class Merger : public Device
for (int i = 0; i < numInputs; ++i) {
// Check if the channel has data ready to be received.
if (poller->CheckInput(i)) {
FairMQParts payload;
Parts payload;
if (Receive(payload, fInChannelName, i) >= 0) {
if (Send(payload, fOutChannelName) < 0) {
@@ -83,7 +83,7 @@ class Merger : public Device
for (int i = 0; i < numInputs; ++i) {
// Check if the channel has data ready to be received.
if (poller->CheckInput(i)) {
FairMQMessagePtr payload(fTransportFactory->CreateMessage());
MessagePtr payload(fTransportFactory->CreateMessage());
if (Receive(payload, fInChannelName, i) >= 0) {
if (Send(payload, fOutChannelName) < 0) {

View File

@@ -40,11 +40,11 @@ class Multiplier : public Device
}
bool HandleSingleData(std::unique_ptr<FairMQMessage>& payload, int)
bool HandleSingleData(std::unique_ptr<Message>& payload, int)
{
for (unsigned int i = 0; i < fOutChannelNames.size() - 1; ++i) { // all except last channel
for (unsigned int j = 0; j < GetNumSubChannels(fOutChannelNames.at(i)); ++j) { // all subChannels in a channel
FairMQMessagePtr msgCopy(fTransportFactory->CreateMessage());
MessagePtr msgCopy(fTransportFactory->CreateMessage());
msgCopy->Copy(*payload);
Send(msgCopy, fOutChannelNames.at(i), j);
@@ -54,7 +54,7 @@ class Multiplier : public Device
unsigned int lastChannelSize = GetNumSubChannels(fOutChannelNames.back());
for (unsigned int i = 0; i < lastChannelSize - 1; ++i) { // iterate over all except last subChannels of the last channel
FairMQMessagePtr msgCopy(fTransportFactory->CreateMessage());
MessagePtr msgCopy(fTransportFactory->CreateMessage());
msgCopy->Copy(*payload);
Send(msgCopy, fOutChannelNames.back(), i);
@@ -65,14 +65,14 @@ class Multiplier : public Device
return true;
}
bool HandleMultipartData(FairMQParts& payload, int)
bool HandleMultipartData(Parts& payload, int)
{
for (unsigned int i = 0; i < fOutChannelNames.size() - 1; ++i) { // all except last channel
for (unsigned int j = 0; j < GetNumSubChannels(fOutChannelNames.at(i)); ++j) { // all subChannels in a channel
FairMQParts parts;
Parts parts;
for (int k = 0; k < payload.Size(); ++k) {
FairMQMessagePtr msgCopy(fTransportFactory->CreateMessage());
MessagePtr msgCopy(fTransportFactory->CreateMessage());
msgCopy->Copy(payload.AtRef(k));
parts.AddPart(std::move(msgCopy));
}
@@ -84,10 +84,10 @@ class Multiplier : public Device
unsigned int lastChannelSize = GetNumSubChannels(fOutChannelNames.back());
for (unsigned int i = 0; i < lastChannelSize - 1; ++i) { // iterate over all except last subChannels of the last channel
FairMQParts parts;
Parts parts;
for (int k = 0; k < payload.Size(); ++k) {
FairMQMessagePtr msgCopy(fTransportFactory->CreateMessage());
MessagePtr msgCopy(fTransportFactory->CreateMessage());
msgCopy->Copy(payload.AtRef(k));
parts.AddPart(std::move(msgCopy));
}

View File

@@ -34,7 +34,7 @@ class Proxy : public Device
{
if (fMultipart) {
while (!NewStatePending()) {
FairMQParts payload;
Parts payload;
if (Receive(payload, fInChannelName) >= 0) {
if (Send(payload, fOutChannelName) < 0) {
LOG(debug) << "Transfer interrupted";
@@ -47,7 +47,7 @@ class Proxy : public Device
}
} else {
while (!NewStatePending()) {
FairMQMessagePtr payload(fTransportFactory->CreateMessage());
MessagePtr payload(fTransportFactory->CreateMessage());
if (Receive(payload, fInChannelName) >= 0) {
if (Send(payload, fOutChannelName) < 0) {
LOG(debug) << "Transfer interrupted";

View File

@@ -9,7 +9,6 @@
#ifndef FAIR_MQ_SINK_H
#define FAIR_MQ_SINK_H
#include <FairMQPoller.h>
#include <fairmq/Device.h>
#include <fairmq/tools/Strings.h>
@@ -48,7 +47,7 @@ class Sink : public Device
void Run() override
{
// store the channel reference to avoid traversing the map on every loop iteration
FairMQChannel& dataInChannel = GetChannel(fInChannelName, 0);
Channel& dataInChannel = GetChannel(fInChannelName, 0);
LOG(info) << "Starting sink and expecting to receive " << fMaxIterations << " messages.";
auto tStart = std::chrono::high_resolution_clock::now();
@@ -70,7 +69,7 @@ class Sink : public Device
while (!NewStatePending()) {
if (fMultipart) {
FairMQParts parts;
Parts parts;
if (dataInChannel.Receive(parts) < 0) {
continue;
}
@@ -80,7 +79,7 @@ class Sink : public Device
}
}
} else {
FairMQMessagePtr msg(dataInChannel.NewMessage());
MessagePtr msg(dataInChannel.NewMessage());
if (dataInChannel.Receive(msg) < 0) {
continue;
}

View File

@@ -34,9 +34,9 @@ class Splitter : public Device
fDirection = 0;
if (fMultipart) {
OnData(fInChannelName, &Splitter::HandleData<FairMQParts>);
OnData(fInChannelName, &Splitter::HandleData<Parts>);
} else {
OnData(fInChannelName, &Splitter::HandleData<FairMQMessagePtr>);
OnData(fInChannelName, &Splitter::HandleData<MessagePtr>);
}
}

View File

@@ -10,7 +10,7 @@
#include <fairmq/ofi/Socket.h>
#include <fairmq/ofi/TransportFactory.h>
#include <fairmq/tools/Strings.h>
#include <FairMQLogger.h>
#include <fairlogger/Logger.h>
#include <asiofi.hpp>
#include <asio/buffer.hpp>

View File

@@ -11,7 +11,7 @@
#include "PMIx.hpp"
#include <FairMQLogger.h>
#include <fairlogger/Logger.h>
#include <fairmq/tools/Semaphore.h>
#include <memory> // make_unique
#include <string>

View File

@@ -14,7 +14,7 @@
#include <fairmq/Plugin.h>
#include <fairmq/Version.h>
#include <FairMQLogger.h>
#include <fairlogger/Logger.h>
#include <string>
#include <sstream>

View File

@@ -72,6 +72,9 @@ Control::Control(const string& name, Plugin::Version version, const string& main
if (control == "static") {
LOG(debug) << "Running builtin controller: static";
fControllerThread = thread(&Control::StaticMode, this);
} else if (control == "gui") {
LOG(debug) << "Running builtin controller: gui";
fControllerThread = thread(&Control::GUIMode, this);
} else if (control == "dynamic" || control == "external" || control == "interactive") {
LOG(debug) << "Running builtin controller: interactive";
fControllerThread = thread(&Control::InteractiveMode, this);
@@ -380,6 +383,28 @@ try {
ReleaseDeviceControl();
}
auto Control::GUIMode() -> void
try {
RunStartupSequence();
{
// Wait for next state, which is DeviceState::Ready,
// or for device shutdown request (Ctrl-C)
pair<bool, fair::mq::State> result;
do {
result = fStateQueue.WaitForNext(chrono::milliseconds(50));
} while (!fDeviceShutdownRequested);
}
RunShutdownSequence();
} catch (PluginServices::DeviceControlError& e) {
// If we are here, it means another plugin has taken control. That's fine, just print the
// exception message and do nothing else.
LOG(debug) << e.what();
} catch (DeviceErrorState&) {
ReleaseDeviceControl();
}
auto Control::SignalHandler() -> void
{
while (gSignalCount == 0 && !fPluginShutdownRequested) {

View File

@@ -43,6 +43,7 @@ class Control : public Plugin
static auto PrintStateMachine() -> void;
auto PrintNumberOfConnectedPeers() -> void;
auto StaticMode() -> void;
auto GUIMode() -> void;
auto SignalHandler() -> void;
auto RunShutdownSequence() -> void;
auto RunStartupSequence() -> void;

View File

@@ -11,7 +11,7 @@
#include <memory>
#include <string>
using FairMQDevicePtr = FairMQDevice*;
using FairMQDevicePtr = fair::mq::Device*;
// to be implemented by the user to return a child class of FairMQDevice
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& config);
@@ -45,7 +45,7 @@ int main(int argc, char* argv[])
// });
runner.AddHook<InstantiateDevice>([](DeviceRunner& r){
r.fDevice = std::unique_ptr<FairMQDevice>{getDevice(r.fConfig)};
r.fDevice = std::unique_ptr<fair::mq::Device>{getDevice(r.fConfig)};
});
return runner.Run();

View File

@@ -28,6 +28,8 @@
namespace fair::mq::shmem
{
static constexpr uint64_t kManagementSegmentSize = 6553600;
struct SharedMemoryError : std::runtime_error { using std::runtime_error::runtime_error; };
using SimpleSeqFitSegment = boost::interprocess::basic_managed_shared_memory<char,
@@ -58,19 +60,22 @@ struct RegionInfo
: fPath("", alloc)
, fCreationFlags(0)
, fUserFlags(0)
, fSize(0)
, fDestroyed(false)
{}
RegionInfo(const char* path, const int flags, const uint64_t userFlags, const VoidAlloc& alloc)
RegionInfo(const char* path, int flags, uint64_t userFlags, uint64_t size, const VoidAlloc& alloc)
: fPath(path, alloc)
, fCreationFlags(flags)
, fUserFlags(userFlags)
, fSize(size)
, fDestroyed(false)
{}
Str fPath;
int fCreationFlags;
uint64_t fUserFlags;
uint64_t fSize;
bool fDestroyed;
};

View File

@@ -132,7 +132,7 @@ class Manager
: fShmId64(config ? config->GetProperty<uint64_t>("shmid", makeShmIdUint64(sessionName)) : makeShmIdUint64(sessionName))
, fShmId(makeShmIdStr(fShmId64))
, fSegmentId(config ? config->GetProperty<uint16_t>("shm-segment-id", 0) : 0)
, fManagementSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mng").c_str(), 6553600)
, fManagementSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mng").c_str(), kManagementSegmentSize)
, fShmVoidAlloc(fManagementSegment.get_segment_manager())
, fShmMtx(fManagementSegment.find_or_construct<boost::interprocess::interprocess_mutex>(boost::interprocess::unique_instance)())
, fNumObservedEvents(0)
@@ -466,6 +466,7 @@ class Manager
auto r = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, 0, true, std::move(cfg)));
r.first->second->InitializeQueues();
r.first->second->StartAckSender();
lockedShmLock.lock();
return r.first->second.get();
} catch (std::out_of_range& oor) {
LOG(error) << "Could not get remote region with id '" << id << "'. Does the region creator run with the same session id?";
@@ -776,8 +777,6 @@ class Manager
if (lastRemoved) {
if (!fNoCleanup) {
Monitor::Cleanup(ShmId{fShmId});
} else {
Monitor::RemoveObject("fmq_" + fShmId + "_mng");
}
}
}

View File

@@ -35,7 +35,7 @@ class Message final : public fair::mq::Message
friend class Socket;
public:
Message(Manager& manager, FairMQTransportFactory* factory = nullptr)
Message(Manager& manager, fair::mq::TransportFactory* factory = nullptr)
: fair::mq::Message(factory)
, fManager(manager)
, fQueued(false)
@@ -46,7 +46,7 @@ class Message final : public fair::mq::Message
fManager.IncrementMsgCounter();
}
Message(Manager& manager, Alignment alignment, FairMQTransportFactory* factory = nullptr)
Message(Manager& manager, Alignment alignment, fair::mq::TransportFactory* factory = nullptr)
: fair::mq::Message(factory)
, fManager(manager)
, fQueued(false)
@@ -58,7 +58,7 @@ class Message final : public fair::mq::Message
fManager.IncrementMsgCounter();
}
Message(Manager& manager, const size_t size, FairMQTransportFactory* factory = nullptr)
Message(Manager& manager, const size_t size, fair::mq::TransportFactory* factory = nullptr)
: fair::mq::Message(factory)
, fManager(manager)
, fQueued(false)
@@ -70,7 +70,7 @@ class Message final : public fair::mq::Message
fManager.IncrementMsgCounter();
}
Message(Manager& manager, const size_t size, Alignment alignment, FairMQTransportFactory* factory = nullptr)
Message(Manager& manager, const size_t size, Alignment alignment, fair::mq::TransportFactory* factory = nullptr)
: fair::mq::Message(factory)
, fManager(manager)
, fQueued(false)
@@ -83,7 +83,7 @@ class Message final : public fair::mq::Message
fManager.IncrementMsgCounter();
}
Message(Manager& manager, void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr, FairMQTransportFactory* factory = nullptr)
Message(Manager& manager, void* data, const size_t size, fair::mq::FreeFn* ffn, void* hint = nullptr, fair::mq::TransportFactory* factory = nullptr)
: fair::mq::Message(factory)
, fManager(manager)
, fQueued(false)
@@ -102,7 +102,7 @@ class Message final : public fair::mq::Message
fManager.IncrementMsgCounter();
}
Message(Manager& manager, UnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0, FairMQTransportFactory* factory = nullptr)
Message(Manager& manager, UnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0, fair::mq::TransportFactory* factory = nullptr)
: fair::mq::Message(factory)
, fManager(manager)
, fQueued(false)
@@ -125,7 +125,7 @@ class Message final : public fair::mq::Message
fManager.IncrementMsgCounter();
}
Message(Manager& manager, MetaHeader& hdr, FairMQTransportFactory* factory = nullptr)
Message(Manager& manager, MetaHeader& hdr, fair::mq::TransportFactory* factory = nullptr)
: fair::mq::Message(factory)
, fManager(manager)
, fQueued(false)
@@ -169,7 +169,7 @@ class Message final : public fair::mq::Message
InitializeChunk(size, fAlignment);
}
void Rebuild(void* data, size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override
void Rebuild(void* data, size_t size, fair::mq::FreeFn* ffn, void* hint = nullptr) override
{
CloseMessage();
fQueued = false;

View File

@@ -6,9 +6,10 @@
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "Monitor.h"
#include "Common.h"
#include "UnmanagedRegion.h"
#include "Monitor.h"
#include "Segment.h"
#include <fairmq/shmem/UnmanagedRegion.h>
#include <fairmq/tools/IO.h>
#include <fairmq/tools/Strings.h>
@@ -267,13 +268,14 @@ bool Monitor::PrintShm(const ShmId& shmId)
ss << "\n unmanaged regions:";
for (const auto& r : *shmRegions) {
ss << "\n [" << r.first << "]: " << (r.second.fDestroyed ? "destroyed" : "alive");
ss << ", size: " << r.second.fSize;
try {
boost::interprocess::message_queue q(open_only, std::string("fmq_" + std::string(shmId) + "_rgq_" + to_string(r.first)).c_str());
ss << ", ack queue: " << q.get_num_msg() << " messages";
} catch (bie&) {
ss << ", ack queue: not found";
}
// try {
// boost::interprocess::message_queue q(open_only, std::string("fmq_" + std::string(shmId) + "_rgq_" + to_string(r.first)).c_str());
// ss << ", ack queue: " << q.get_num_msg() << " messages";
// } catch (bie&) {
// ss << ", ack queue: not found";
// }
}
}
LOGV(info, user1) << ss.str();
@@ -414,24 +416,28 @@ void Monitor::PrintDebugInfo(const ShmId& shmId __attribute__((unused)))
size_t numMessages = 0;
for (const auto& e : *debug) {
numMessages += e.second.size();
}
LOG(info) << endl << "found " << numMessages << " messages.";
for (const auto& s : *debug) {
for (const auto& e : s.second) {
using time_point = chrono::system_clock::time_point;
time_point tmpt{chrono::duration_cast<time_point::duration>(chrono::nanoseconds(e.second.fCreationTime))};
time_t t = chrono::system_clock::to_time_t(tmpt);
uint64_t ms = e.second.fCreationTime % 1000000;
auto tm = localtime(&t);
LOG(info) << "segment: " << setw(3) << setfill(' ') << s.first
<< ", offset: " << setw(12) << setfill(' ') << e.first
<< ", size: " << setw(10) << setfill(' ') << e.second.fSize
<< ", creator PID: " << e.second.fPid << setfill('0')
<< ", at: " << setw(2) << tm->tm_hour << ":" << setw(2) << tm->tm_min << ":" << setw(2) << tm->tm_sec << "." << setw(6) << ms;
if (debug) {
for (const auto& e : *debug) {
numMessages += e.second.size();
}
LOG(info) << endl << "found " << numMessages << " messages.";
for (const auto& s : *debug) {
for (const auto& e : s.second) {
using time_point = chrono::system_clock::time_point;
time_point tmpt{chrono::duration_cast<time_point::duration>(chrono::nanoseconds(e.second.fCreationTime))};
time_t t = chrono::system_clock::to_time_t(tmpt);
uint64_t ms = e.second.fCreationTime % 1000000;
auto tm = localtime(&t);
LOG(info) << "segment: " << setw(3) << setfill(' ') << s.first
<< ", offset: " << setw(12) << setfill(' ') << e.first
<< ", size: " << setw(10) << setfill(' ') << e.second.fSize
<< ", creator PID: " << e.second.fPid << setfill('0')
<< ", at: " << setw(2) << tm->tm_hour << ":" << setw(2) << tm->tm_min << ":" << setw(2) << tm->tm_sec << "." << setw(6) << ms;
}
}
} else {
LOG(info) << "no debug data found";
}
} catch (bie&) {
LOG(info) << "no segments found";
@@ -462,11 +468,16 @@ unordered_map<uint16_t, std::vector<BufferDebugInfo>> Monitor::GetDebugInfo(cons
result.reserve(debug->size());
for (const auto& s : *debug) {
result[s.first].reserve(s.second.size());
for (const auto& e : s.second) {
result[s.first][e.first] = BufferDebugInfo(e.first, e.second.fPid, e.second.fSize, e.second.fCreationTime);
if (debug) {
for (const auto& s : *debug) {
result[s.first].reserve(s.second.size());
for (const auto& e : s.second) {
result[s.first][e.first] = BufferDebugInfo(e.first, e.second.fPid, e.second.fSize, e.second.fCreationTime);
}
}
} else {
LOG(info) << "no debug data found";
}
} catch (bie&) {
LOG(info) << "no segments found";
@@ -552,15 +563,16 @@ std::pair<std::string, bool> Remove(const std::string& name, bool verbose)
}
}
std::vector<std::pair<std::string, bool>> Monitor::Cleanup(const ShmId& shmId, bool verbose /* = true */)
std::vector<std::pair<std::string, bool>> Monitor::Cleanup(const ShmId& shmIdT, bool verbose /* = true */)
{
std::string shmId = shmIdT.shmId;
std::vector<std::pair<std::string, bool>> result;
if (verbose) {
LOG(info) << "Cleaning up for shared memory id '" << shmId.shmId << "'...";
LOG(info) << "Cleaning up for shared memory id '" << shmId << "'...";
}
string managementSegmentName("fmq_" + shmId.shmId + "_mng");
string managementSegmentName("fmq_" + shmId + "_mng");
try {
bipc::managed_shared_memory managementSegment(bipc::open_only, managementSegmentName.c_str());
@@ -578,22 +590,21 @@ std::vector<std::pair<std::string, bool>> Monitor::Cleanup(const ShmId& shmId, b
LOG(info) << "Found RegionInfo with path: '" << path << "', flags: " << flags << ", fDestroyed: " << info.fDestroyed << ".";
}
if (!path.empty()) {
result.emplace_back(Remove<bipc::file_mapping>(path + "fmq_" + shmId.shmId + "_rg_" + to_string(id), verbose));
result.emplace_back(Remove<bipc::file_mapping>(path + "fmq_" + shmId + "_rg_" + to_string(id), verbose));
} else {
result.emplace_back(Remove<bipc::shared_memory_object>("fmq_" + shmId.shmId + "_rg_" + to_string(id), verbose));
result.emplace_back(Remove<bipc::shared_memory_object>("fmq_" + shmId + "_rg_" + to_string(id), verbose));
}
result.emplace_back(Remove<bipc::message_queue>("fmq_" + shmId.shmId + "_rgq_" + to_string(id), verbose));
result.emplace_back(Remove<bipc::message_queue>("fmq_" + shmId + "_rgq_" + to_string(id), verbose));
}
}
Uint16SegmentInfoHashMap* shmSegments = managementSegment.find<Uint16SegmentInfoHashMap>(bipc::unique_instance).first;
if (shmSegments) {
if (verbose) {
LOG(info) << "Found " << shmSegments->size() << " managed segments...";
}
for (const auto& segment : *shmSegments) {
result.emplace_back(Remove<bipc::shared_memory_object>("fmq_" + shmId.shmId + "_m_" + to_string(segment.first), verbose));
result.emplace_back(Remove<bipc::shared_memory_object>("fmq_" + shmId + "_m_" + to_string(segment.first), verbose));
}
} else {
if (verbose) {
@@ -636,41 +647,49 @@ std::vector<std::pair<std::string, bool>> Monitor::CleanupFull(const SessionId&
return CleanupFull(shmId, verbose);
}
void Monitor::ResetContent(const ShmId& shmId, bool verbose /* = true */)
void Monitor::ResetContent(const ShmId& shmIdT, bool verbose /* = true */)
{
std::string shmId = shmIdT.shmId;
if (verbose) {
cout << "Resetting segments content for shared memory id '" << shmId.shmId << "'..." << endl;
cout << "Resetting segments content for shared memory id '" << shmId << "'..." << endl;
}
string managementSegmentName("fmq_" + shmId.shmId + "_mng");
string managementSegmentName("fmq_" + shmId + "_mng");
try {
using namespace boost::interprocess;
managed_shared_memory managementSegment(open_only, managementSegmentName.c_str());
Uint16SegmentInfoHashMap* segmentInfos = managementSegment.find<Uint16SegmentInfoHashMap>(unique_instance).first;
for (const auto& s : *segmentInfos) {
if (verbose) {
cout << "Resetting content of segment '" << "fmq_" << shmId.shmId << "_m_" << s.first << "'..." << endl;
cout << "Resetting content of segment '" << "fmq_" << shmId << "_m_" << s.first << "'..." << endl;
}
try {
if (s.second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) {
RBTreeBestFitSegment segment(open_only, std::string("fmq_" + shmId.shmId + "_m_" + to_string(s.first)).c_str());
RBTreeBestFitSegment segment(open_only, std::string("fmq_" + shmId + "_m_" + to_string(s.first)).c_str());
void* ptr = segment.get_segment_manager();
size_t size = segment.get_segment_manager()->get_size();
new(ptr) segment_manager<char, rbtree_best_fit<mutex_family, offset_ptr<void>>, null_index>(size);
} else {
SimpleSeqFitSegment segment(open_only, std::string("fmq_" + shmId.shmId + "_m_" + to_string(s.first)).c_str());
SimpleSeqFitSegment segment(open_only, std::string("fmq_" + shmId + "_m_" + to_string(s.first)).c_str());
void* ptr = segment.get_segment_manager();
size_t size = segment.get_segment_manager()->get_size();
new(ptr) segment_manager<char, simple_seq_fit<mutex_family, offset_ptr<void>>, null_index>(size);
}
} catch (bie& e) {
if (verbose) {
cout << "Error resetting content of segment '" << std::string("fmq_" + shmId.shmId + "_m_" + to_string(s.first)) << "': " << e.what() << endl;
cout << "Error resetting content of segment '" << std::string("fmq_" + shmId + "_m_" + to_string(s.first)) << "': " << e.what() << endl;
}
}
}
Uint16RegionInfoHashMap* shmRegions = managementSegment.find<Uint16RegionInfoHashMap>(bipc::unique_instance).first;
if (shmRegions) {
for (const auto& region : *shmRegions) {
uint16_t id = region.first;
Remove<bipc::message_queue>("fmq_" + shmId + "_rgq_" + to_string(id), verbose);
}
}
} catch (bie& e) {
if (verbose) {
cout << "Could not find '" << managementSegmentName << "' segment. Nothing to cleanup." << endl;
@@ -679,7 +698,7 @@ void Monitor::ResetContent(const ShmId& shmId, bool verbose /* = true */)
}
if (verbose) {
cout << "Done resetting segment content for shared memory id '" << shmId.shmId << "'." << endl;
cout << "Done resetting segment content for shared memory id '" << shmId << "'." << endl;
}
}
@@ -692,6 +711,43 @@ void Monitor::ResetContent(const SessionId& sessionId, bool verbose /* = true */
ResetContent(shmId, verbose);
}
void Monitor::ResetContent(const ShmId& shmIdT, const std::vector<SegmentConfig>& segmentCfgs, const std::vector<RegionConfig>& regionCfgs, bool verbose /* = true */)
{
using namespace boost::interprocess;
std::string shmId = shmIdT.shmId;
std::string managementSegmentName("fmq_" + shmId + "_mng");
// reset managed segments
ResetContent(shmIdT, verbose);
// delete management segment
Remove<bipc::shared_memory_object>(managementSegmentName, verbose);
// recreate management segment
managed_shared_memory mngSegment(create_only, managementSegmentName.c_str(), kManagementSegmentSize);
// fill management segment with segment & region infos
for (const auto& s : segmentCfgs) {
if (s.allocationAlgorithm == "rbtree_best_fit") {
Segment::Register(shmId, s.id, AllocationAlgorithm::rbtree_best_fit);
} else if (s.allocationAlgorithm == "simple_seq_fit") {
Segment::Register(shmId, s.id, AllocationAlgorithm::simple_seq_fit);
} else {
LOG(error) << "Unknown allocation algorithm provided: " << s.allocationAlgorithm;
throw MonitorError("Unknown allocation algorithm provided: " + s.allocationAlgorithm);
}
}
for (const auto& r : regionCfgs) {
fair::mq::shmem::UnmanagedRegion::Register(shmId, r);
}
}
void Monitor::ResetContent(const SessionId& sessionId, const std::vector<SegmentConfig>& segmentCfgs, const std::vector<RegionConfig>& regionCfgs, bool verbose /* = true */)
{
ShmId shmId{makeShmIdStr(sessionId.sessionId)};
if (verbose) {
cout << "ResetContent called with session id '" << sessionId.sessionId << "', translating to shared memory id '" << shmId.shmId << "'" << endl;
}
ResetContent(shmId, segmentCfgs, regionCfgs, verbose);
}
Monitor::~Monitor()
{
if (fSignalThread.joinable()) {

View File

@@ -8,6 +8,8 @@
#ifndef FAIR_MQ_SHMEM_MONITOR_H_
#define FAIR_MQ_SHMEM_MONITOR_H_
#include <fairmq/UnmanagedRegion.h>
#include <fairlogger/Logger.h>
#include <thread>
@@ -49,6 +51,13 @@ struct BufferDebugInfo
uint64_t fCreationTime;
};
struct SegmentConfig
{
uint16_t id;
uint64_t size;
std::string allocationAlgorithm;
};
class Monitor
{
public:
@@ -88,6 +97,14 @@ class Monitor
/// @param sessionId session id
/// Only call this when segment is not in use
static void ResetContent(const SessionId& sessionId, bool verbose = true);
/// @brief [EXPERIMENTAL] cleanup the content of the shem segment, without recreating it
/// @param shmId shared memory id
/// Only call this when segment is not in use
static void ResetContent(const ShmId& shmId, const std::vector<SegmentConfig>& segmentCfgs, const std::vector<RegionConfig>& regionCfgs, bool verbose = true);
/// @brief [EXPERIMENTAL] cleanup the content of the shem segment, without recreating it
/// @param sessionId session id
/// Only call this when segment is not in use
static void ResetContent(const SessionId& sessionId, const std::vector<SegmentConfig>& segmentCfgs, const std::vector<RegionConfig>& regionCfgs, bool verbose = true);
/// @brief Outputs list of messages in shmem (if compiled with FAIRMQ_DEBUG_MODE=ON)
/// @param shmId shmem id

View File

@@ -26,6 +26,8 @@ static const RBTreeBestFit rbTreeBestFit = RBTreeBestFit();
struct Segment
{
friend class Monitor;
Segment(const std::string& shmId, uint16_t id, size_t size, SimpleSeqFit)
: fSegment(SimpleSeqFitSegment(boost::interprocess::open_or_create,
std::string("fmq_" + shmId + "_m_" + std::to_string(id)).c_str(),
@@ -66,15 +68,12 @@ struct Segment
static void Register(const std::string& shmId, uint16_t id, AllocationAlgorithm allocAlgo)
{
using namespace boost::interprocess;
managed_shared_memory mngSegment(open_or_create, std::string("fmq_" + shmId + "_mng").c_str(), 6553600);
managed_shared_memory mngSegment(open_or_create, std::string("fmq_" + shmId + "_mng").c_str(), kManagementSegmentSize);
VoidAlloc alloc(mngSegment.get_segment_manager());
Uint16SegmentInfoHashMap* shmSegments = mngSegment.find_or_construct<Uint16SegmentInfoHashMap>(unique_instance)(alloc);
EventCounter* eventCounter = mngSegment.find<EventCounter>(unique_instance).first;
if (!eventCounter) {
eventCounter = mngSegment.construct<EventCounter>(unique_instance)(0);
}
EventCounter* eventCounter = mngSegment.find_or_construct<EventCounter>(unique_instance)(0);
bool newSegmentRegistered = shmSegments->emplace(id, allocAlgo).second;
if (newSegmentRegistered) {

View File

@@ -51,7 +51,7 @@ struct ZMsg
class Socket final : public fair::mq::Socket
{
public:
Socket(Manager& manager, const std::string& type, const std::string& name, const std::string& id, void* context, FairMQTransportFactory* fac = nullptr)
Socket(Manager& manager, const std::string& type, const std::string& name, const std::string& id, void* context, fair::mq::TransportFactory* fac = nullptr)
: fair::mq::Socket(fac)
, fManager(manager)
, fId(id + "." + name + "." + type)

View File

@@ -113,7 +113,7 @@ class TransportFactory final : public fair::mq::TransportFactory
return std::make_unique<Message>(*fManager, size, alignment, this);
}
MessagePtr CreateMessage(void* data, size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override
MessagePtr CreateMessage(void* data, size_t size, fair::mq::FreeFn* ffn, void* hint = nullptr) override
{
return std::make_unique<Message>(*fManager, data, size, ffn, hint, this);
}
@@ -128,17 +128,17 @@ class TransportFactory final : public fair::mq::TransportFactory
return std::make_unique<Socket>(*fManager, type, name, GetId(), fZmqCtx, this);
}
PollerPtr CreatePoller(const std::vector<FairMQChannel>& channels) const override
PollerPtr CreatePoller(const std::vector<Channel>& channels) const override
{
return std::make_unique<Poller>(channels);
}
PollerPtr CreatePoller(const std::vector<FairMQChannel*>& channels) const override
PollerPtr CreatePoller(const std::vector<Channel*>& channels) const override
{
return std::make_unique<Poller>(channels);
}
PollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const override
PollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<Channel>>& channelsMap, const std::vector<std::string>& channelList) const override
{
return std::make_unique<Poller>(channelsMap, channelList);
}

View File

@@ -41,6 +41,7 @@ struct UnmanagedRegion
{
friend class Message;
friend class Manager;
friend class Monitor;
UnmanagedRegion(const std::string& shmId, uint16_t id, uint64_t size)
: UnmanagedRegion(shmId, size, false, makeRegionConfig(id))
@@ -50,6 +51,10 @@ struct UnmanagedRegion
: UnmanagedRegion(shmId, size, false, std::move(cfg))
{}
UnmanagedRegion(const std::string& shmId, RegionConfig cfg)
: UnmanagedRegion(shmId, cfg.size, false, std::move(cfg))
{}
UnmanagedRegion(const std::string& shmId, uint64_t size, bool remote, RegionConfig cfg)
: fRemote(remote)
, fRemoveOnDestruction(cfg.removeOnDestruction)
@@ -66,6 +71,9 @@ struct UnmanagedRegion
{
using namespace boost::interprocess;
// TODO: refactor this
cfg.size = size;
if (!cfg.path.empty()) {
fName = std::string(cfg.path + fName);
@@ -223,20 +231,17 @@ struct UnmanagedRegion
return regionCfg;
}
static void Register(const std::string& shmId, RegionConfig& cfg)
static void Register(const std::string& shmId, const RegionConfig& cfg)
{
using namespace boost::interprocess;
managed_shared_memory mngSegment(open_or_create, std::string("fmq_" + shmId + "_mng").c_str(), 6553600);
managed_shared_memory mngSegment(open_or_create, std::string("fmq_" + shmId + "_mng").c_str(), kManagementSegmentSize);
VoidAlloc alloc(mngSegment.get_segment_manager());
Uint16RegionInfoHashMap* shmRegions = mngSegment.find_or_construct<Uint16RegionInfoHashMap>(unique_instance)(alloc);
EventCounter* eventCounter = mngSegment.find<EventCounter>(unique_instance).first;
if (!eventCounter) {
eventCounter = mngSegment.construct<EventCounter>(unique_instance)(0);
}
EventCounter* eventCounter = mngSegment.find_or_construct<EventCounter>(unique_instance)(0);
bool newShmRegionCreated = shmRegions->emplace(cfg.id.value(), RegionInfo(cfg.path.c_str(), cfg.creationFlags, cfg.userFlags, alloc)).second;
bool newShmRegionCreated = shmRegions->emplace(cfg.id.value(), RegionInfo(cfg.path.c_str(), cfg.creationFlags, cfg.userFlags, cfg.size, alloc)).second;
if (newShmRegionCreated) {
(eventCounter->fCount)++;
}

View File

@@ -34,7 +34,7 @@ class UnmanagedRegionImpl final : public fair::mq::UnmanagedRegion
RegionCallback callback,
RegionBulkCallback bulkCallback,
fair::mq::RegionConfig cfg,
FairMQTransportFactory* factory)
fair::mq::TransportFactory* factory)
: fair::mq::UnmanagedRegion(factory)
, fManager(manager)
, fRegion(nullptr)

View File

@@ -141,7 +141,7 @@ inline auto makeMonitorSocket(void* zmqCtx, void* socketToMonitor, std::string_v
// the FD is still valid by the time your code receives this event.
// ZMQ_EVENT_DISCONNECTED - The socket was disconnected unexpectedly. The event value is the
// FD of the underlying network socket. Warning: this socket will be closed.
auto const rc =
[[maybe_unused]] auto const rc =
zmq_socket_monitor(socketToMonitor,
address.c_str(),
ZMQ_EVENT_CONNECTED | ZMQ_EVENT_ACCEPTED | ZMQ_EVENT_DISCONNECTED);
@@ -156,11 +156,11 @@ inline auto makeMonitorSocket(void* zmqCtx, void* socketToMonitor, std::string_v
// Progress only happens, when a user calls GetNumberOfConnectedPeers()`.
// The assumption here is, that not too many events will pile up anyways.
int const unlimited(0);
auto const rc = zmq_setsockopt(mon, ZMQ_RCVHWM, &unlimited, sizeof(unlimited));
[[maybe_unused]] auto const rc = zmq_setsockopt(mon, ZMQ_RCVHWM, &unlimited, sizeof(unlimited));
assertm(rc == 0, "Setting rcv queue size to unlimited succeeded"); // NOLINT
}
{ // Connect the reading monitor socket
auto const rc = zmq_connect(mon, address.c_str());
[[maybe_unused]] auto const rc = zmq_connect(mon, address.c_str());
assertm(rc == 0, "Connecting reading monitor socket succeeded"); // NOLINT
}
return mon;
@@ -192,7 +192,7 @@ inline auto getMonitorEvent(void* monitorSocket) -> int
assertm(zmq_msg_more(&msg), "A second frame is pending"); // NOLINT
zmq_msg_init(&msg);
{
auto const rc = zmq_msg_recv(&msg, monitorSocket, 0);
[[maybe_unused]] auto const rc = zmq_msg_recv(&msg, monitorSocket, 0);
assertm(rc >= 0, "second monitor event frame successfully received"); // NOLINT
}
assertm(!zmq_msg_more(&msg), "No more frames are pending"); // NOLINT

View File

@@ -10,8 +10,9 @@
#define FAIR_MQ_ZMQ_CONTEXT_H_
#include <fairmq/tools/Strings.h>
#include <FairMQLogger.h>
#include <FairMQUnmanagedRegion.h>
#include <fairmq/UnmanagedRegion.h>
#include <fairlogger/Logger.h>
#include <zmq.h>

View File

@@ -10,9 +10,10 @@
#define FAIR_MQ_ZMQ_MESSAGE_H
#include <fairmq/zeromq/UnmanagedRegion.h>
#include <FairMQLogger.h>
#include <FairMQMessage.h>
#include <FairMQUnmanagedRegion.h>
#include <fairmq/Message.h>
#include <fairmq/UnmanagedRegion.h>
#include <fairlogger/Logger.h>
#include <zmq.h>
@@ -38,7 +39,7 @@ class Message final : public fair::mq::Message
Message& operator=(const Message&) = delete;
Message& operator=(Message&&) = delete;
Message(FairMQTransportFactory* factory = nullptr)
Message(fair::mq::TransportFactory* factory = nullptr)
: fair::mq::Message(factory)
, fMsg(std::make_unique<zmq_msg_t>())
{
@@ -47,7 +48,7 @@ class Message final : public fair::mq::Message
}
}
Message(Alignment alignment, FairMQTransportFactory* factory = nullptr)
Message(Alignment alignment, fair::mq::TransportFactory* factory = nullptr)
: fair::mq::Message(factory)
, fAlignment(alignment.alignment)
, fMsg(std::make_unique<zmq_msg_t>())
@@ -57,7 +58,7 @@ class Message final : public fair::mq::Message
}
}
Message(const size_t size, FairMQTransportFactory* factory = nullptr)
Message(const size_t size, fair::mq::TransportFactory* factory = nullptr)
: fair::mq::Message(factory)
, fMsg(std::make_unique<zmq_msg_t>())
{
@@ -80,7 +81,7 @@ class Message final : public fair::mq::Message
return {static_cast<void*>(fullBufferPtr), static_cast<void*>(alignedPartPtr)};
}
Message(const size_t size, Alignment alignment, FairMQTransportFactory* factory = nullptr)
Message(const size_t size, Alignment alignment, fair::mq::TransportFactory* factory = nullptr)
: fair::mq::Message(factory)
, fAlignment(alignment.alignment)
, fMsg(std::make_unique<zmq_msg_t>())
@@ -97,7 +98,7 @@ class Message final : public fair::mq::Message
}
}
Message(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr, FairMQTransportFactory* factory = nullptr)
Message(void* data, const size_t size, fair::mq::FreeFn* ffn, void* hint = nullptr, fair::mq::TransportFactory* factory = nullptr)
: fair::mq::Message(factory)
, fMsg(std::make_unique<zmq_msg_t>())
{
@@ -106,7 +107,7 @@ class Message final : public fair::mq::Message
}
}
Message(UnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0, FairMQTransportFactory* factory = nullptr)
Message(UnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0, fair::mq::TransportFactory* factory = nullptr)
: fair::mq::Message(factory)
, fMsg(std::make_unique<zmq_msg_t>())
{
@@ -184,7 +185,7 @@ class Message final : public fair::mq::Message
}
}
void Rebuild(void* data, size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override
void Rebuild(void* data, size_t size, fair::mq::FreeFn* ffn, void* hint = nullptr) override
{
CloseMessage();
fMsg = std::make_unique<zmq_msg_t>();

View File

@@ -9,14 +9,15 @@
#ifndef FAIR_MQ_ZMQ_SOCKET_H
#define FAIR_MQ_ZMQ_SOCKET_H
#include <FairMQLogger.h>
#include <FairMQMessage.h>
#include <FairMQSocket.h>
#include <fairmq/Message.h>
#include <fairmq/Socket.h>
#include <fairmq/tools/Strings.h>
#include <fairmq/zeromq/Common.h>
#include <fairmq/zeromq/Context.h>
#include <fairmq/zeromq/Message.h>
#include <fairlogger/Logger.h>
#include <zmq.h>
#include <atomic>
@@ -30,7 +31,7 @@ namespace fair::mq::zmq
class Socket final : public fair::mq::Socket
{
public:
Socket(Context& ctx, const std::string& type, const std::string& name, const std::string& id, FairMQTransportFactory* factory = nullptr)
Socket(Context& ctx, const std::string& type, const std::string& name, const std::string& id, fair::mq::TransportFactory* factory = nullptr)
: fair::mq::Socket(factory)
, fCtx(ctx)
, fId(id + "." + name + "." + type)
@@ -219,7 +220,7 @@ class Socket final : public fair::mq::Socket
bool repeat = false;
do {
FairMQMessagePtr part = std::make_unique<Message>(GetTransport());
fair::mq::MessagePtr part = std::make_unique<Message>(GetTransport());
int nbytes = zmq_msg_recv(static_cast<Message*>(part.get())->GetMessage(), fSocket, flags);
if (nbytes >= 0) {

View File

@@ -14,7 +14,7 @@
#include <fairmq/zeromq/Socket.h>
#include <fairmq/zeromq/Poller.h>
#include <fairmq/zeromq/UnmanagedRegion.h>
#include <FairMQTransportFactory.h>
#include <fairmq/TransportFactory.h>
#include <fairmq/ProgOptions.h>
#include <memory> // unique_ptr, make_unique
@@ -24,11 +24,11 @@
namespace fair::mq::zmq
{
class TransportFactory final : public FairMQTransportFactory
class TransportFactory final : public fair::mq::TransportFactory
{
public:
TransportFactory(const std::string& id = "", const ProgOptions* config = nullptr)
: FairMQTransportFactory(id)
: fair::mq::TransportFactory(id)
, fCtx(nullptr)
{
int major = 0, minor = 0, patch = 0;
@@ -68,7 +68,7 @@ class TransportFactory final : public FairMQTransportFactory
return std::make_unique<Message>(size, alignment, this);
}
MessagePtr CreateMessage(void* data, size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override
MessagePtr CreateMessage(void* data, size_t size, fair::mq::FreeFn* ffn, void* hint = nullptr) override
{
return std::make_unique<Message>(data, size, ffn, hint, this);
}
@@ -83,17 +83,17 @@ class TransportFactory final : public FairMQTransportFactory
return std::make_unique<Socket>(*fCtx, type, name, GetId(), this);
}
PollerPtr CreatePoller(const std::vector<FairMQChannel>& channels) const override
PollerPtr CreatePoller(const std::vector<Channel>& channels) const override
{
return std::make_unique<Poller>(channels);
}
PollerPtr CreatePoller(const std::vector<FairMQChannel*>& channels) const override
PollerPtr CreatePoller(const std::vector<Channel*>& channels) const override
{
return std::make_unique<Poller>(channels);
}
PollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const override
PollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<Channel>>& channelsMap, const std::vector<std::string>& channelList) const override
{
return std::make_unique<Poller>(channelsMap, channelList);
}

View File

@@ -10,8 +10,9 @@
#define FAIR_MQ_ZMQ_UNMANAGEDREGION_H
#include <fairmq/zeromq/Context.h>
#include <FairMQUnmanagedRegion.h>
#include <FairMQLogger.h>
#include <fairmq/UnmanagedRegion.h>
#include <fairlogger/Logger.h>
#include <cstddef> // size_t
#include <string>
@@ -33,7 +34,7 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion
int64_t userFlags,
RegionCallback callback,
RegionBulkCallback bulkCallback,
FairMQTransportFactory* factory,
fair::mq::TransportFactory* factory,
fair::mq::RegionConfig cfg)
: fair::mq::UnmanagedRegion(factory)
, fCtx(ctx)

View File

@@ -45,11 +45,9 @@ add_testhelper(runTestDevice
${definitions}
)
set(MQ_CONFIG "${CMAKE_BINARY_DIR}/test/testsuite_FairMQ.IOPatterns_config.json")
set(RUN_TEST_DEVICE "${CMAKE_BINARY_DIR}/test/testhelper_runTestDevice")
set(FAIRMQ_BIN_DIR ${CMAKE_BINARY_DIR}/fairmq)
set(SDK_TESTSUITE_SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/sdk)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/protocols/config.json.in ${MQ_CONFIG})
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/runner.cxx.in ${CMAKE_CURRENT_BINARY_DIR}/runner.cxx)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/TestEnvironment.h.in ${CMAKE_CURRENT_BINARY_DIR}/TestEnvironment.h)
@@ -68,7 +66,6 @@ add_testsuite(Protocols
${CMAKE_CURRENT_SOURCE_DIR}/protocols
${CMAKE_CURRENT_BINARY_DIR}
TIMEOUT 5
RUN_SERIAL ON
${definitions}
)
@@ -131,7 +128,6 @@ add_testsuite(Device
${CMAKE_CURRENT_SOURCE_DIR}/device
${CMAKE_CURRENT_BINARY_DIR}
TIMEOUT 5
# RUN_SERIAL ON
)
set(VERSION_MAJOR 1)
@@ -282,7 +278,6 @@ add_testsuite(Poller
INCLUDES ${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_BINARY_DIR}
TIMEOUT 5
RUN_SERIAL ON
${definitions}
)

View File

@@ -9,8 +9,8 @@
#ifndef FAIR_MQ_TEST_TESTRECEIVER_H
#define FAIR_MQ_TEST_TESTRECEIVER_H
#include <FairMQDevice.h>
#include <FairMQLogger.h>
#include <fairmq/Device.h>
#include <fairlogger/Logger.h>
#include <string>
#include <thread>

View File

@@ -9,8 +9,8 @@
#ifndef FAIR_MQ_TEST_TESTSENDER_H
#define FAIR_MQ_TEST_TESTSENDER_H
#include <FairMQDevice.h>
#include <FairMQLogger.h>
#include <fairmq/Device.h>
#include <fairlogger/Logger.h>
#include <string>
#include <thread>

View File

@@ -6,7 +6,7 @@
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <FairMQDevice.h>
#include <fairmq/Device.h>
#include <fairmq/DeviceRunner.h>
#include <fairmq/ProgOptions.h>
@@ -19,52 +19,53 @@ namespace _config
{
using namespace std;
using namespace fair::mq;
void control(FairMQDevice& device)
void control(Device& device)
{
device.ChangeState(fair::mq::Transition::InitDevice);
device.WaitForState(fair::mq::State::InitializingDevice);
device.ChangeState(fair::mq::Transition::CompleteInit);
device.WaitForState(fair::mq::State::Initialized);
device.ChangeState(fair::mq::Transition::Bind);
device.WaitForState(fair::mq::State::Bound);
device.ChangeState(fair::mq::Transition::Connect);
device.WaitForState(fair::mq::State::DeviceReady);
device.ChangeState(fair::mq::Transition::InitTask);
device.WaitForState(fair::mq::State::Ready);
device.ChangeState(Transition::InitDevice);
device.WaitForState(State::InitializingDevice);
device.ChangeState(Transition::CompleteInit);
device.WaitForState(State::Initialized);
device.ChangeState(Transition::Bind);
device.WaitForState(State::Bound);
device.ChangeState(Transition::Connect);
device.WaitForState(State::DeviceReady);
device.ChangeState(Transition::InitTask);
device.WaitForState(State::Ready);
device.ChangeState(fair::mq::Transition::Run);
device.WaitForState(fair::mq::State::Ready);
device.ChangeState(Transition::Run);
device.WaitForState(State::Ready);
device.ChangeState(fair::mq::Transition::ResetTask);
device.WaitForState(fair::mq::State::DeviceReady);
device.ChangeState(fair::mq::Transition::ResetDevice);
device.WaitForState(fair::mq::State::Idle);
device.ChangeState(Transition::ResetTask);
device.WaitForState(State::DeviceReady);
device.ChangeState(Transition::ResetDevice);
device.WaitForState(State::Idle);
device.ChangeState(fair::mq::Transition::End);
device.ChangeState(Transition::End);
}
class TestDevice : public FairMQDevice
class TestDevice : public Device
{
public:
TestDevice(const string& transport)
{
fDeviceThread = thread(&FairMQDevice::RunStateMachine, this);
fDeviceThread = thread(&Device::RunStateMachine, this);
SetTransport(transport);
ChangeState(fair::mq::Transition::InitDevice);
WaitForState(fair::mq::State::InitializingDevice);
ChangeState(fair::mq::Transition::CompleteInit);
WaitForState(fair::mq::State::Initialized);
ChangeState(fair::mq::Transition::Bind);
WaitForState(fair::mq::State::Bound);
ChangeState(fair::mq::Transition::Connect);
WaitForState(fair::mq::State::DeviceReady);
ChangeState(fair::mq::Transition::InitTask);
WaitForState(fair::mq::State::Ready);
ChangeState(Transition::InitDevice);
WaitForState(State::InitializingDevice);
ChangeState(Transition::CompleteInit);
WaitForState(State::Initialized);
ChangeState(Transition::Bind);
WaitForState(State::Bound);
ChangeState(Transition::Connect);
WaitForState(State::DeviceReady);
ChangeState(Transition::InitTask);
WaitForState(State::Ready);
ChangeState(fair::mq::Transition::Run);
ChangeState(Transition::Run);
}
TestDevice(const TestDevice&) = delete;
@@ -74,15 +75,15 @@ class TestDevice : public FairMQDevice
~TestDevice() override
{
WaitForState(fair::mq::State::Running);
ChangeState(fair::mq::Transition::Stop);
WaitForState(fair::mq::State::Ready);
ChangeState(fair::mq::Transition::ResetTask);
WaitForState(fair::mq::State::DeviceReady);
ChangeState(fair::mq::Transition::ResetDevice);
WaitForState(fair::mq::State::Idle);
WaitForState(State::Running);
ChangeState(Transition::Stop);
WaitForState(State::Ready);
ChangeState(Transition::ResetTask);
WaitForState(State::DeviceReady);
ChangeState(Transition::ResetDevice);
WaitForState(State::Idle);
ChangeState(fair::mq::Transition::End);
ChangeState(Transition::End);
if (fDeviceThread.joinable()) {
fDeviceThread.join();
@@ -100,7 +101,7 @@ class Config : public ::testing::Test
string TestDeviceSetConfig(const string& transport)
{
fair::mq::ProgOptions config;
ProgOptions config;
vector<string> emptyArgs = {"dummy", "--id", "test", "--color", "false"};
@@ -108,10 +109,10 @@ class Config : public ::testing::Test
config.SetProperty("transport", transport);
FairMQDevice device;
Device device;
device.SetConfig(config);
FairMQChannel channel;
Channel channel;
channel.UpdateType("pub");
channel.UpdateMethod("connect");
channel.UpdateAddress("tcp://localhost:5558");
@@ -130,14 +131,14 @@ class Config : public ::testing::Test
string TestDeviceSetConfigWithPlugins(const string& transport)
{
fair::mq::ProgOptions config;
ProgOptions config;
vector<string> emptyArgs = {"dummy", "--id", "test", "--color", "false"};
config.SetProperty("transport", transport);
FairMQDevice device;
fair::mq::PluginManager mgr;
Device device;
PluginManager mgr;
mgr.LoadPlugin("s:config");
mgr.ForEachPluginProgOptions([&](boost::program_options::options_description options) {
config.AddToCmdLineOptions(options);
@@ -146,10 +147,10 @@ class Config : public ::testing::Test
mgr.InstantiatePlugins();
config.ParseAll(emptyArgs, true);
fair::mq::DeviceRunner::HandleGeneralOptions(config);
DeviceRunner::HandleGeneralOptions(config);
device.SetConfig(config);
FairMQChannel channel;
Channel channel;
channel.UpdateType("pub");
channel.UpdateMethod("connect");
channel.UpdateAddress("tcp://localhost:5558");
@@ -177,16 +178,16 @@ class Config : public ::testing::Test
string TestDeviceSetTransport(const string& transport)
{
FairMQDevice device;
Device device;
device.SetTransport(transport);
FairMQChannel channel;
Channel channel;
channel.UpdateType("pub");
channel.UpdateMethod("connect");
channel.UpdateAddress("tcp://localhost:5558");
device.AddChannel("data", std::move(channel));
thread t(&FairMQDevice::RunStateMachine, &device);
thread t(&Device::RunStateMachine, &device);
control(device);

View File

@@ -12,7 +12,7 @@
#include <boost/process.hpp>
#include <fairmq/tools/Process.h>
#include <fairmq/tools/Unique.h>
#include <FairMQDevice.h>
#include <fairmq/Device.h>
#include <string>
#include <thread>
@@ -22,24 +22,25 @@ namespace
{
using namespace std;
using namespace fair::mq;
using namespace fair::mq::test;
using namespace fair::mq::tools;
class BadDevice : public FairMQDevice
class BadDevice : public Device
{
public:
BadDevice()
{
fDeviceThread = thread([&](){
EXPECT_THROW(RunStateMachine(), fair::mq::MessageError);
EXPECT_THROW(RunStateMachine(), MessageError);
});
SetTransport("shmem");
ChangeState(fair::mq::Transition::InitDevice);
WaitForState(fair::mq::State::InitializingDevice);
ChangeState(fair::mq::Transition::CompleteInit);
WaitForState(fair::mq::State::Initialized);
ChangeState(Transition::InitDevice);
WaitForState(State::InitializingDevice);
ChangeState(Transition::CompleteInit);
WaitForState(State::Initialized);
parts.AddPart(NewMessage());
}
@@ -51,7 +52,7 @@ class BadDevice : public FairMQDevice
~BadDevice() override
{
ChangeState(fair::mq::Transition::ResetDevice);
ChangeState(Transition::ResetDevice);
if (fDeviceThread.joinable()) {
fDeviceThread.join();
@@ -60,12 +61,12 @@ class BadDevice : public FairMQDevice
private:
thread fDeviceThread;
FairMQParts parts;
Parts parts;
};
void RunErrorStateIn(const string& state, const string& control, const string& input = "")
{
size_t session{fair::mq::tools::UuidHash()};
size_t session{tools::UuidHash()};
execute_result result{"", 100};
thread device_thread([&]() {

View File

@@ -21,7 +21,7 @@ namespace
using namespace std;
using namespace fair::mq;
void control(FairMQDevice& device)
void control(Device& device)
{
thread t([&] {
device.ChangeState(Transition::InitDevice);
@@ -62,7 +62,7 @@ class MultipleDevices : public ::testing::Test {
test::Sender sender("data");
sender.SetTransport("zeromq");
FairMQChannel channel("push", "connect", "ipc://multiple-devices-test");
Channel channel("push", "connect", "ipc://multiple-devices-test");
channel.UpdateRateLogging(0);
sender.AddChannel("data", std::move(channel));
@@ -75,7 +75,7 @@ class MultipleDevices : public ::testing::Test {
test::Receiver receiver("data");
receiver.SetTransport("zeromq");
FairMQChannel channel("pull", "bind", "ipc://multiple-devices-test");
Channel channel("pull", "bind", "ipc://multiple-devices-test");
channel.UpdateRateLogging(0);
receiver.AddChannel("data", std::move(channel));

View File

@@ -6,8 +6,8 @@
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <FairMQDevice.h>
#include <FairMQLogger.h>
#include <fairmq/Device.h>
#include <fairlogger/Logger.h>
#include <gtest/gtest.h>
@@ -20,7 +20,7 @@ namespace
using namespace std;
using namespace fair::mq;
class SlowDevice : public FairMQDevice
class SlowDevice : public Device
{
public:
SlowDevice() = default;
@@ -34,7 +34,7 @@ class SlowDevice : public FairMQDevice
void transitionTo(const vector<State>& states, int numExpectedStates)
{
FairMQDevice device;
Device device;
thread t([&] {
for (const auto& s : states) {

View File

@@ -6,7 +6,7 @@
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <FairMQDevice.h>
#include <fairmq/Device.h>
#include <fairmq/tools/Version.h>

View File

@@ -9,8 +9,8 @@
#ifndef FAIR_MQ_TEST_ERROR_STATE_H
#define FAIR_MQ_TEST_ERROR_STATE_H
#include <FairMQDevice.h>
#include <FairMQLogger.h>
#include <fairmq/Device.h>
#include <fairlogger/Logger.h>
#include <iostream>

View File

@@ -9,8 +9,8 @@
#ifndef FAIR_MQ_TEST_EXCEPTIONS_H
#define FAIR_MQ_TEST_EXCEPTIONS_H
#include <FairMQDevice.h>
#include <FairMQLogger.h>
#include <fairmq/Device.h>
#include <fairlogger/Logger.h>
#include <iostream>
#include <stdexcept>

View File

@@ -9,7 +9,7 @@
#ifndef FAIR_MQ_TEST_PAIRLEFT_H
#define FAIR_MQ_TEST_PAIRLEFT_H
#include <FairMQDevice.h>
#include <fairmq/Device.h>
#include <cstddef>
#include <thread>

View File

@@ -9,7 +9,7 @@
#ifndef FAIR_MQ_TEST_PAIRRIGHT_H
#define FAIR_MQ_TEST_PAIRRIGHT_H
#include <FairMQDevice.h>
#include <fairmq/Device.h>
#include <cstddef>
#include <string>
#include <thread>

View File

@@ -9,17 +9,20 @@
#ifndef FAIR_MQ_TEST_POLLIN_H
#define FAIR_MQ_TEST_POLLIN_H
#include <FairMQDevice.h>
#include <FairMQLogger.h>
#include <fairmq/Device.h>
#include <fairmq/ProgOptions.h>
#include <fairlogger/Logger.h>
#include <thread>
namespace fair::mq::test
{
using namespace std;
using namespace fair::mq;
class PollIn : public FairMQDevice
class PollIn : public Device
{
public:
PollIn()
@@ -35,12 +38,12 @@ class PollIn : public FairMQDevice
auto Run() -> void override
{
vector<FairMQChannel*> chans;
vector<Channel*> chans;
chans.push_back(&GetChannel("data1", 0));
chans.push_back(&GetChannel("data2", 0));
FairMQPollerPtr poller = nullptr;
PollerPtr poller = nullptr;
if (fPollType == 0)
{
@@ -59,8 +62,8 @@ class PollIn : public FairMQDevice
bool arrived2 = false;
bool bothArrived = false;
FairMQMessagePtr msg1(NewMessage());
FairMQMessagePtr msg2(NewMessage());
MessagePtr msg1(NewMessage());
MessagePtr msg2(NewMessage());
while (!bothArrived)
{

View File

@@ -9,7 +9,7 @@
#ifndef FAIR_MQ_TEST_POLLOUT_H
#define FAIR_MQ_TEST_POLLOUT_H
#include <FairMQDevice.h>
#include <fairmq/Device.h>
#include <thread>
namespace fair::mq::test

Some files were not shown because too many files have changed in this diff Show More