mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 16:46:47 +00:00
Add a acknowledgement channel to Tutorial 3...
- ...to measure performance of the serialization libraries. - Rename `--log-color-format` cmd option to `--log-color`.
This commit is contained in:
parent
c9c881c33c
commit
71ecbe214d
|
@ -76,10 +76,10 @@ GENERATE_LIBRARY()
|
||||||
|
|
||||||
Set(Exe_Names
|
Set(Exe_Names
|
||||||
${Exe_Names}
|
${Exe_Names}
|
||||||
ex3-sampler-dds
|
ex3-sampler
|
||||||
ex3-processor-dds
|
ex3-processor
|
||||||
ex3-sink-dds
|
ex3-sink
|
||||||
ex3-dds-command-ui
|
ex3-command-ui
|
||||||
)
|
)
|
||||||
|
|
||||||
Set(Exe_Source
|
Set(Exe_Source
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
<topology id="ExampleDDS">
|
<topology id="ExampleDDS">
|
||||||
|
|
||||||
<property id="SamplerOutputAddress" />
|
<property id="SamplerAddress" />
|
||||||
<property id="SinkInputAddress" />
|
<property id="SinkAddress" />
|
||||||
|
|
||||||
<declrequirement id="SamplerWorker">
|
<declrequirement id="SamplerWorker">
|
||||||
<hostPattern type="wnname" value="sampler"/>
|
<hostPattern type="wnname" value="sampler"/>
|
||||||
|
@ -16,27 +16,27 @@
|
||||||
</declrequirement>
|
</declrequirement>
|
||||||
|
|
||||||
<decltask id="Sampler">
|
<decltask id="Sampler">
|
||||||
<exe reachable="true">@CMAKE_BINARY_DIR@/bin/ex3-sampler-dds --id sampler0 --log-color-format false</exe>
|
<exe reachable="true">@CMAKE_BINARY_DIR@/bin/ex3-sampler --id sampler0 --log-color false</exe>
|
||||||
<requirement>SamplerWorker</requirement>
|
<requirement>SamplerWorker</requirement>
|
||||||
<properties>
|
<properties>
|
||||||
<id access="write">SamplerOutputAddress</id>
|
<id access="write">SamplerAddress</id>
|
||||||
</properties>
|
</properties>
|
||||||
</decltask>
|
</decltask>
|
||||||
|
|
||||||
<decltask id="Processor">
|
<decltask id="Processor">
|
||||||
<exe reachable="true">@CMAKE_BINARY_DIR@/bin/ex3-processor-dds --id processor%taskIndex% --log-color-format false</exe>
|
<exe reachable="true">@CMAKE_BINARY_DIR@/bin/ex3-processor --id processor%taskIndex% --log-color false</exe>
|
||||||
<requirement>ProcessorWorker</requirement>
|
<requirement>ProcessorWorker</requirement>
|
||||||
<properties>
|
<properties>
|
||||||
<id access="read">SamplerOutputAddress</id>
|
<id access="read">SamplerAddress</id>
|
||||||
<id access="read">SinkInputAddress</id>
|
<id access="read">SinkAddress</id>
|
||||||
</properties>
|
</properties>
|
||||||
</decltask>
|
</decltask>
|
||||||
|
|
||||||
<decltask id="Sink">
|
<decltask id="Sink">
|
||||||
<exe reachable="true">@CMAKE_BINARY_DIR@/bin/ex3-sink-dds --id sink0 --log-color-format false</exe>
|
<exe reachable="true">@CMAKE_BINARY_DIR@/bin/ex3-sink --id sink0 --log-color false</exe>
|
||||||
<requirement>SinkWorker</requirement>
|
<requirement>SinkWorker</requirement>
|
||||||
<properties>
|
<properties>
|
||||||
<id access="write">SinkInputAddress</id>
|
<id access="write">SinkAddress</id>
|
||||||
</properties>
|
</properties>
|
||||||
</decltask>
|
</decltask>
|
||||||
|
|
||||||
|
|
|
@ -84,12 +84,12 @@ int main(int argc, char** argv)
|
||||||
|
|
||||||
LOG(INFO) << "Subscribing and waiting for sampler output address.";
|
LOG(INFO) << "Subscribing and waiting for sampler output address.";
|
||||||
ddsKeyValue.subscribe([&keyCondition](const string& /*_key*/, const string& /*_value*/) { keyCondition.notify_all(); });
|
ddsKeyValue.subscribe([&keyCondition](const string& /*_key*/, const string& /*_value*/) { keyCondition.notify_all(); });
|
||||||
ddsKeyValue.getValues("SamplerOutputAddress", &samplerValues);
|
ddsKeyValue.getValues("SamplerAddress", &samplerValues);
|
||||||
while (samplerValues.empty())
|
while (samplerValues.empty())
|
||||||
{
|
{
|
||||||
unique_lock<mutex> lock(keyMutex);
|
unique_lock<mutex> lock(keyMutex);
|
||||||
keyCondition.wait_until(lock, chrono::system_clock::now() + chrono::milliseconds(1000));
|
keyCondition.wait_until(lock, chrono::system_clock::now() + chrono::milliseconds(1000));
|
||||||
ddsKeyValue.getValues("SamplerOutputAddress", &samplerValues);
|
ddsKeyValue.getValues("SamplerAddress", &samplerValues);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Sink properties
|
// Sink properties
|
||||||
|
@ -100,12 +100,12 @@ int main(int argc, char** argv)
|
||||||
|
|
||||||
LOG(INFO) << "Subscribing and waiting for sink input address.";
|
LOG(INFO) << "Subscribing and waiting for sink input address.";
|
||||||
ddsKeyValue.subscribe([&keyCondition](const string& /*_key*/, const string& /*_value*/) { keyCondition.notify_all(); });
|
ddsKeyValue.subscribe([&keyCondition](const string& /*_key*/, const string& /*_value*/) { keyCondition.notify_all(); });
|
||||||
ddsKeyValue.getValues("SinkInputAddress", &sinkValues);
|
ddsKeyValue.getValues("SinkAddress", &sinkValues);
|
||||||
while (sinkValues.empty())
|
while (sinkValues.empty())
|
||||||
{
|
{
|
||||||
unique_lock<mutex> lock(keyMutex);
|
unique_lock<mutex> lock(keyMutex);
|
||||||
keyCondition.wait_until(lock, chrono::system_clock::now() + chrono::milliseconds(1000));
|
keyCondition.wait_until(lock, chrono::system_clock::now() + chrono::milliseconds(1000));
|
||||||
ddsKeyValue.getValues("SinkInputAddress", &sinkValues);
|
ddsKeyValue.getValues("SinkAddress", &sinkValues);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -123,7 +123,7 @@ int main(int argc, char** argv)
|
||||||
// Subscribe on custom commands
|
// Subscribe on custom commands
|
||||||
ddsCustomCmd.subscribeCmd([&](const string& command, const string& condition, uint64_t senderId)
|
ddsCustomCmd.subscribeCmd([&](const string& command, const string& condition, uint64_t senderId)
|
||||||
{
|
{
|
||||||
LOG(INFO) << "Received custom command: " << command << " condition: " << condition << " senderId: " << senderId;
|
LOG(INFO) << "Received custom command: " << command;
|
||||||
if (command == "check-state")
|
if (command == "check-state")
|
||||||
{
|
{
|
||||||
ddsCustomCmd.sendCmd(id + ": " + processor.GetCurrentStateName(), to_string(senderId));
|
ddsCustomCmd.sendCmd(id + ": " + processor.GetCurrentStateName(), to_string(senderId));
|
||||||
|
|
|
@ -105,7 +105,7 @@ int main(int argc, char** argv)
|
||||||
// Advertise the bound addresses via DDS property
|
// Advertise the bound addresses via DDS property
|
||||||
LOG(INFO) << "Giving sampler output address to DDS.";
|
LOG(INFO) << "Giving sampler output address to DDS.";
|
||||||
dds::key_value::CKeyValue ddsKeyValue;
|
dds::key_value::CKeyValue ddsKeyValue;
|
||||||
ddsKeyValue.putValue("SamplerOutputAddress", sampler.fChannels.at("data-out").at(0).GetAddress());
|
ddsKeyValue.putValue("SamplerAddress", sampler.fChannels.at("data-out").at(0).GetAddress());
|
||||||
|
|
||||||
sampler.WaitForEndOfState("INIT_DEVICE");
|
sampler.WaitForEndOfState("INIT_DEVICE");
|
||||||
|
|
||||||
|
@ -117,7 +117,7 @@ int main(int argc, char** argv)
|
||||||
// Subscribe on custom commands
|
// Subscribe on custom commands
|
||||||
ddsCustomCmd.subscribeCmd([&](const string& command, const string& condition, uint64_t senderId)
|
ddsCustomCmd.subscribeCmd([&](const string& command, const string& condition, uint64_t senderId)
|
||||||
{
|
{
|
||||||
LOG(INFO) << "Received custom command: " << command << " condition: " << condition << " senderId: " << senderId;
|
LOG(INFO) << "Received custom command: " << command;
|
||||||
if (command == "check-state")
|
if (command == "check-state")
|
||||||
{
|
{
|
||||||
ddsCustomCmd.sendCmd(id + ": " + sampler.GetCurrentStateName(), to_string(senderId));
|
ddsCustomCmd.sendCmd(id + ": " + sampler.GetCurrentStateName(), to_string(senderId));
|
||||||
|
|
|
@ -105,7 +105,7 @@ int main(int argc, char** argv)
|
||||||
// Advertise the bound address via DDS property
|
// Advertise the bound address via DDS property
|
||||||
LOG(INFO) << "Giving sink input address to DDS.";
|
LOG(INFO) << "Giving sink input address to DDS.";
|
||||||
dds::key_value::CKeyValue ddsKeyValue;
|
dds::key_value::CKeyValue ddsKeyValue;
|
||||||
ddsKeyValue.putValue("SinkInputAddress", sink.fChannels.at("data-in").at(0).GetAddress());
|
ddsKeyValue.putValue("SinkAddress", sink.fChannels.at("data-in").at(0).GetAddress());
|
||||||
|
|
||||||
sink.WaitForEndOfState("INIT_DEVICE");
|
sink.WaitForEndOfState("INIT_DEVICE");
|
||||||
|
|
||||||
|
@ -117,7 +117,7 @@ int main(int argc, char** argv)
|
||||||
// Subscribe on custom commands
|
// Subscribe on custom commands
|
||||||
ddsCustomCmd.subscribeCmd([&](const string& command, const string& condition, uint64_t senderId)
|
ddsCustomCmd.subscribeCmd([&](const string& command, const string& condition, uint64_t senderId)
|
||||||
{
|
{
|
||||||
LOG(INFO) << "Received custom command: " << command << " condition: " << condition << " senderId: " << senderId;
|
LOG(INFO) << "Received custom command: " << command;
|
||||||
if (command == "check-state")
|
if (command == "check-state")
|
||||||
{
|
{
|
||||||
ddsCustomCmd.sendCmd(id + ": " + sink.GetCurrentStateName(), to_string(senderId));
|
ddsCustomCmd.sendCmd(id + ": " + sink.GetCurrentStateName(), to_string(senderId));
|
||||||
|
|
Loading…
Reference in New Issue
Block a user