mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-15 09:31:45 +00:00
Configuration and DDS example/tools updates
- Update DDS example command UI and extract it from example. - Unify address handling via DDS properties for dynamic deployment. - Update DDS docs with the new approach. - Allow `--config-key` to be used to access common config in JSON. - Allow common channel properties to be specified for all sockets. - Update MQ examples and Tuto3 with new config options. - Add start scripts to MQ examples for easier use.
This commit is contained in:
committed by
Mohammad Al-Turany
parent
8317d440db
commit
13453354c8
@@ -8,8 +8,7 @@
|
||||
|
||||
configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/3-dds/ex3-dds-topology.xml ${CMAKE_BINARY_DIR}/bin/config/ex3-dds-topology.xml)
|
||||
configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/3-dds/ex3-dds-hosts.cfg ${CMAKE_BINARY_DIR}/bin/config/ex3-dds-hosts.cfg COPYONLY)
|
||||
|
||||
add_definitions(-DENABLE_DDS)
|
||||
configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/3-dds/ex3-dds.json ${CMAKE_BINARY_DIR}/bin/config/ex3-dds.json COPYONLY)
|
||||
|
||||
Set(INCLUDE_DIRECTORIES
|
||||
${CMAKE_SOURCE_DIR}/fairmq
|
||||
@@ -18,6 +17,7 @@ Set(INCLUDE_DIRECTORIES
|
||||
${CMAKE_SOURCE_DIR}/fairmq/devices
|
||||
${CMAKE_SOURCE_DIR}/fairmq/tools
|
||||
${CMAKE_SOURCE_DIR}/fairmq/options
|
||||
${CMAKE_SOURCE_DIR}/fairmq/deployment
|
||||
${CMAKE_SOURCE_DIR}/examples/MQ/3-dds
|
||||
${CMAKE_CURRENT_BINARY_DIR}
|
||||
)
|
||||
@@ -69,7 +69,6 @@ Set(Exe_Names
|
||||
ex3-sampler
|
||||
ex3-processor
|
||||
ex3-sink
|
||||
ex3-command-ui
|
||||
)
|
||||
|
||||
Set(Exe_Source
|
||||
@@ -77,7 +76,6 @@ Set(Exe_Source
|
||||
runExample3Sampler.cxx
|
||||
runExample3Processor.cxx
|
||||
runExample3Sink.cxx
|
||||
runDDSCommandUI.cxx
|
||||
)
|
||||
|
||||
list(LENGTH Exe_Names _length)
|
||||
|
@@ -1,7 +1,7 @@
|
||||
Example 3: DDS
|
||||
===============
|
||||
|
||||
This example demonstrates usage of the Dynamic Deployment System ([DDS](http://dds.gsi.de/)) to dynamically deploy and configure a topology of devices. The topology is similar to those of Example 2, but now it can be easily distributed on different computing nodes without the need for manual reconfiguration of the devices.
|
||||
This example demonstrates usage of the Dynamic Deployment System ([DDS](http://dds.gsi.de/)) to dynamically deploy and configure a topology of devices. The topology is similar to those of Example 2, but now it can be easily distributed on different computing nodes without the need for manual socket reconfiguration of the devices.
|
||||
|
||||
This example is compiled only if the DDS is found by CMake. Custom DDS installation location can be given to CMake like this:
|
||||
|
||||
@@ -11,52 +11,17 @@ cmake -DDDS_PATH="/path/to/dds/install/dir/" ..
|
||||
|
||||
The description below outlines the minimal steps needed to run the example with DDS. For more details please refer to DDS documentation on [DDS Website](http://dds.gsi.de/).
|
||||
|
||||
##### 1. The devices that bind their sockets need to advertise their bound addresses to DDS by writing a property.
|
||||
##### 1. After beginning the initialization, the device handles the socket addresses and ports distribution via DDS.
|
||||
|
||||
In our example Sampler and Sink bind their sockets. The bound addresses are available after the initial validation. The following code takes the address value and gives it to DDS:
|
||||
The binding channels give their bound addresses to devices interested in connecting to it and connecting sockets wait to receive these addresses. This match happens via the properties specified in the JSON file, which replace addresses in the DDS run. This is done behind the scenes after the initialization has been started and can be called with a single method call:
|
||||
|
||||
```C++
|
||||
sampler.ChangeState("INIT_DEVICE");
|
||||
sampler.WaitForInitialValidation();
|
||||
|
||||
CKeyValue ddsKeyValue;
|
||||
ddsKeyValue.putValue("SamplerOutputAddress", sampler.fChannels.at("data-out").at(0).GetAddress());
|
||||
|
||||
HandleConfigViaDDS(sampler);
|
||||
sampler.WaitForEndOfState("INIT_DEVICE");
|
||||
```
|
||||
|
||||
Same approach for the Sink.
|
||||
|
||||
##### 2. The devices that connect their sockets need to read the addresses from DDS.
|
||||
|
||||
The Processors in our example need the addresses of Sampler and Sink. They receive these from DDS via properties (sent in the step above):
|
||||
|
||||
```C++
|
||||
CKeyValue ddsKeyValue;
|
||||
// Sampler properties
|
||||
CKeyValue::valuesMap_t samplerValues;
|
||||
{
|
||||
mutex keyMutex;
|
||||
condition_variable keyCondition;
|
||||
|
||||
LOG(INFO) << "Subscribing and waiting for sampler output address.";
|
||||
ddsKeyValue.subscribe([&keyCondition](const string& /*_key*/, const string& /*_value*/) { keyCondition.notify_all(); });
|
||||
ddsKeyValue.getValues("SamplerOutputAddress", &samplerValues);
|
||||
while (samplerValues.empty())
|
||||
{
|
||||
unique_lock<mutex> lock(keyMutex);
|
||||
keyCondition.wait_until(lock, chrono::system_clock::now() + chrono::milliseconds(1000));
|
||||
ddsKeyValue.getValues("SamplerOutputAddress", &samplerValues);
|
||||
}
|
||||
}
|
||||
// Sink properties
|
||||
// ... same as above, but for sinkValues ...
|
||||
|
||||
processor.fChannels.at("data-in").at(0).UpdateAddress(samplerValues.begin()->second);
|
||||
processor.fChannels.at("data-out").at(0).UpdateAddress(sinkValues.begin()->second);
|
||||
```
|
||||
|
||||
After this step each device will have the necessary connection information.
|
||||
In most cases a device will land on a random node and all the addresses and ports are configured dynamicaly. The JSON file does not contain any address information for a DDS run. Instead, addresses are exchanged between the devices dynamically based on the provided property names. E.g. here a processor communicates with the sampler via the *data1* channel. Sampler (binding) communicates its address to the processor(s) (connecting) via the "samplerAddr" property (see `ex3-dds.json` file).
|
||||
|
||||
##### 3. Write DDS hosts file that contains a list of worker nodes to run the topology on (When deploying using the SSH plug-in).
|
||||
|
||||
@@ -80,6 +45,8 @@ wn0, username@localhost, , /tmp/, 12
|
||||
|
||||
Take a look at `ex3-dds-topology.xml`. It consists of a definition part (properties, tasks, collections and more) and execution part (main). In our example Sampler, Processor and Sink tasks are defines, containing their executables and exchanged properties. The `<main>` of the topology uses the defined tasks. Besides one Sampler and one Sink task, a group containing Processor task is defined. The group has a multiplicity of 10, meaninig 10 Processors will be executed. Each of the Processors will receive the properties with Sampler and Sink addresses.
|
||||
|
||||
If `eth0` network interface (default for binding) is not available on your system, specify another one in the topology file for each task. For example: `--network-interface lo0`.
|
||||
|
||||
##### 5. Start DDS server.
|
||||
|
||||
The DDS server is started with:
|
||||
@@ -115,7 +82,7 @@ After activation, agents will execute the defined tasks on the worker nodes. Out
|
||||
|
||||
##### 10. (optional) Use example command UI to check state of the devices
|
||||
|
||||
This example includes a simple utility to send command to devices and receive replies from them. The code in `runDDSCommandUI.cxx` (compiled as ex3-dds-command-ui) uses the DDS intercom library to send "check-state" string to all devices, to which they reply with their ID and state they are in. This can be used as an example of sending/receiving commands or other information to devices.
|
||||
This example includes a simple utility to send commands to devices and receive replies from them. The code in `runDDSCommandUI.cxx` (compiled as ex3-dds-command-ui) uses the DDS intercom library to send "check-state" string to all devices, to which they reply with their ID and state they are in. The utility also allows requesting state changes from devices. This can be used as an example of sending/receiving commands or other information to devices.
|
||||
|
||||
To see it in action, start the ex3-dds-command-ui while the topology is running.
|
||||
|
||||
|
@@ -2,6 +2,6 @@
|
||||
# source setup.sh
|
||||
@bash_end@
|
||||
|
||||
sampler, username@localhost, , /tmp/, 1
|
||||
processor, username@localhost, , /tmp/, 10
|
||||
sink, username@localhost, , /tmp/, 1
|
||||
sampler, username@localhost, , /path/to/dds-work-dir/, 1
|
||||
processor, username@localhost, , /path/to/dds-work-dir/, 10
|
||||
sink, username@localhost, , /path/to/dds-work-dir/, 1
|
||||
|
@@ -1,7 +1,7 @@
|
||||
<topology id="ExampleDDS">
|
||||
|
||||
<property id="SamplerAddress" />
|
||||
<property id="SinkAddress" />
|
||||
<property id="samplerAddr" />
|
||||
<property id="sinkAddr" />
|
||||
|
||||
<declrequirement id="SamplerWorker">
|
||||
<hostPattern type="wnname" value="sampler"/>
|
||||
@@ -16,27 +16,27 @@
|
||||
</declrequirement>
|
||||
|
||||
<decltask id="Sampler">
|
||||
<exe reachable="true">@CMAKE_BINARY_DIR@/bin/ex3-sampler --id sampler0 --log-color false</exe>
|
||||
<exe reachable="true">@CMAKE_BINARY_DIR@/bin/ex3-sampler --id sampler --mq-config @CMAKE_BINARY_DIR@/bin/config/ex3-dds.json</exe>
|
||||
<requirement>SamplerWorker</requirement>
|
||||
<properties>
|
||||
<id access="write">SamplerAddress</id>
|
||||
<id access="write">samplerAddr</id>
|
||||
</properties>
|
||||
</decltask>
|
||||
|
||||
<decltask id="Processor">
|
||||
<exe reachable="true">@CMAKE_BINARY_DIR@/bin/ex3-processor --id processor%taskIndex% --log-color false</exe>
|
||||
<exe reachable="true">@CMAKE_BINARY_DIR@/bin/ex3-processor --id processor_%taskIndex% --config-key processor --mq-config @CMAKE_BINARY_DIR@/bin/config/ex3-dds.json</exe>
|
||||
<requirement>ProcessorWorker</requirement>
|
||||
<properties>
|
||||
<id access="read">SamplerAddress</id>
|
||||
<id access="read">SinkAddress</id>
|
||||
<id access="read">samplerAddr</id>
|
||||
<id access="read">sinkAddr</id>
|
||||
</properties>
|
||||
</decltask>
|
||||
|
||||
<decltask id="Sink">
|
||||
<exe reachable="true">@CMAKE_BINARY_DIR@/bin/ex3-sink --id sink0 --log-color false</exe>
|
||||
<exe reachable="true">@CMAKE_BINARY_DIR@/bin/ex3-sink --id sink --mq-config @CMAKE_BINARY_DIR@/bin/config/ex3-dds.json</exe>
|
||||
<requirement>SinkWorker</requirement>
|
||||
<properties>
|
||||
<id access="write">SinkAddress</id>
|
||||
<id access="write">sinkAddr</id>
|
||||
</properties>
|
||||
</decltask>
|
||||
|
||||
|
42
examples/MQ/3-dds/ex3-dds.json
Normal file
42
examples/MQ/3-dds/ex3-dds.json
Normal file
@@ -0,0 +1,42 @@
|
||||
{
|
||||
"fairMQOptions":
|
||||
{
|
||||
"devices":
|
||||
[{
|
||||
"id": "sampler",
|
||||
"channel":
|
||||
{
|
||||
"name": "data1",
|
||||
"property": "samplerAddr",
|
||||
"type": "push",
|
||||
"method": "bind"
|
||||
}
|
||||
},
|
||||
{
|
||||
"key": "processor",
|
||||
"channels":
|
||||
[{
|
||||
"name": "data1",
|
||||
"property": "samplerAddr",
|
||||
"type": "pull",
|
||||
"method": "connect"
|
||||
},
|
||||
{
|
||||
"name": "data2",
|
||||
"property": "sinkAddr",
|
||||
"type": "push",
|
||||
"method": "connect"
|
||||
}]
|
||||
},
|
||||
{
|
||||
"id": "sink",
|
||||
"channel":
|
||||
{
|
||||
"name": "data2",
|
||||
"property": "sinkAddr",
|
||||
"type": "pull",
|
||||
"method": "bind"
|
||||
}
|
||||
}]
|
||||
}
|
||||
}
|
@@ -1,43 +0,0 @@
|
||||
#include "dds_intercom.h" // DDS
|
||||
|
||||
// STD
|
||||
#include <iostream>
|
||||
#include <exception>
|
||||
#include <sstream>
|
||||
#include <thread>
|
||||
#include <atomic>
|
||||
|
||||
using namespace std;
|
||||
using namespace dds::intercom_api;
|
||||
|
||||
int main(int argc, char* argv[])
|
||||
{
|
||||
try
|
||||
{
|
||||
CCustomCmd ddsCustomCmd;
|
||||
|
||||
ddsCustomCmd.subscribe([](const string& command, const string& condition, uint64_t senderId)
|
||||
{
|
||||
cout << "Received: \"" << command << "\"" << endl;
|
||||
});
|
||||
|
||||
while (true)
|
||||
{
|
||||
int result = ddsCustomCmd.send("check-state", "");
|
||||
|
||||
if (result == 1)
|
||||
{
|
||||
cerr << "Error sending custom command" << endl;
|
||||
}
|
||||
|
||||
this_thread::sleep_for(chrono::seconds(1));
|
||||
}
|
||||
}
|
||||
catch (exception& e)
|
||||
{
|
||||
cerr << "Error: " << e.what() << endl;
|
||||
return EXIT_FAILURE;
|
||||
}
|
||||
|
||||
return EXIT_SUCCESS;
|
||||
}
|
@@ -12,23 +12,13 @@
|
||||
* @author D. Klein, A. Rybalchenko
|
||||
*/
|
||||
|
||||
#include <iostream>
|
||||
#include <mutex>
|
||||
#include <condition_variable>
|
||||
|
||||
#include "boost/program_options.hpp"
|
||||
#include <boost/asio.hpp> // for DDS
|
||||
|
||||
#include "FairMQLogger.h"
|
||||
#include "FairMQDDSTools.h"
|
||||
#include "FairMQProgOptions.h"
|
||||
#include "FairMQExample3Processor.h"
|
||||
#include "FairMQTools.h"
|
||||
|
||||
#include "dds_intercom.h" // DDS
|
||||
|
||||
using namespace std;
|
||||
using namespace boost::program_options;
|
||||
using namespace dds::intercom_api;
|
||||
|
||||
int main(int argc, char** argv)
|
||||
{
|
||||
@@ -44,96 +34,16 @@ int main(int argc, char** argv)
|
||||
return 0;
|
||||
}
|
||||
|
||||
string id = config.GetValue<string>("id");
|
||||
|
||||
LOG(INFO) << "PID: " << getpid();
|
||||
|
||||
processor.SetTransport(config.GetValue<std::string>("transport"));
|
||||
|
||||
processor.SetProperty(FairMQExample3Processor::Id, id);
|
||||
|
||||
// configure data output channel
|
||||
FairMQChannel dataInChannel("pull", "connect", "");
|
||||
dataInChannel.UpdateRateLogging(0);
|
||||
processor.fChannels["data1"].push_back(dataInChannel);
|
||||
|
||||
// configure data output channel
|
||||
FairMQChannel dataOutChannel("push", "connect", "");
|
||||
dataOutChannel.UpdateRateLogging(0);
|
||||
processor.fChannels["data2"].push_back(dataOutChannel);
|
||||
|
||||
// Waiting for DDS properties
|
||||
CKeyValue ddsKeyValue;
|
||||
// Sampler properties
|
||||
CKeyValue::valuesMap_t samplerValues;
|
||||
{
|
||||
mutex keyMutex;
|
||||
condition_variable keyCondition;
|
||||
|
||||
LOG(INFO) << "Subscribing and waiting for sampler output address.";
|
||||
ddsKeyValue.subscribe([&keyCondition](const string& /*_key*/, const string& /*_value*/) { keyCondition.notify_all(); });
|
||||
ddsKeyValue.getValues("SamplerAddress", &samplerValues);
|
||||
while (samplerValues.empty())
|
||||
{
|
||||
unique_lock<mutex> lock(keyMutex);
|
||||
keyCondition.wait_until(lock, chrono::system_clock::now() + chrono::milliseconds(1000));
|
||||
ddsKeyValue.getValues("SamplerAddress", &samplerValues);
|
||||
}
|
||||
}
|
||||
// Sink properties
|
||||
CKeyValue::valuesMap_t sinkValues;
|
||||
{
|
||||
mutex keyMutex;
|
||||
condition_variable keyCondition;
|
||||
|
||||
LOG(INFO) << "Subscribing and waiting for sink input address.";
|
||||
ddsKeyValue.subscribe([&keyCondition](const string& /*_key*/, const string& /*_value*/) { keyCondition.notify_all(); });
|
||||
ddsKeyValue.getValues("SinkAddress", &sinkValues);
|
||||
while (sinkValues.empty())
|
||||
{
|
||||
unique_lock<mutex> lock(keyMutex);
|
||||
keyCondition.wait_until(lock, chrono::system_clock::now() + chrono::milliseconds(1000));
|
||||
ddsKeyValue.getValues("SinkAddress", &sinkValues);
|
||||
}
|
||||
}
|
||||
|
||||
processor.fChannels.at("data1").at(0).UpdateAddress(samplerValues.begin()->second);
|
||||
processor.fChannels.at("data2").at(0).UpdateAddress(sinkValues.begin()->second);
|
||||
processor.SetConfig(config);
|
||||
|
||||
processor.ChangeState("INIT_DEVICE");
|
||||
HandleConfigViaDDS(processor);
|
||||
processor.WaitForEndOfState("INIT_DEVICE");
|
||||
|
||||
processor.ChangeState("INIT_TASK");
|
||||
processor.WaitForEndOfState("INIT_TASK");
|
||||
|
||||
CCustomCmd ddsCustomCmd;
|
||||
|
||||
// Subscribe on custom commands
|
||||
ddsCustomCmd.subscribe([&](const string& command, const string& condition, uint64_t senderId)
|
||||
{
|
||||
LOG(INFO) << "Received custom command: " << command;
|
||||
if (command == "check-state")
|
||||
{
|
||||
ddsCustomCmd.send(id + ": " + processor.GetCurrentStateName(), to_string(senderId));
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG(WARN) << "Received unknown command: " << command;
|
||||
LOG(WARN) << "Origin: " << senderId;
|
||||
LOG(WARN) << "Destination: " << condition;
|
||||
}
|
||||
});
|
||||
|
||||
processor.ChangeState("RUN");
|
||||
processor.WaitForEndOfState("RUN");
|
||||
|
||||
processor.ChangeState("RESET_TASK");
|
||||
processor.WaitForEndOfState("RESET_TASK");
|
||||
|
||||
processor.ChangeState("RESET_DEVICE");
|
||||
processor.WaitForEndOfState("RESET_DEVICE");
|
||||
|
||||
processor.ChangeState("END");
|
||||
runDDSStateHandler(processor);
|
||||
}
|
||||
catch (exception& e)
|
||||
{
|
||||
|
@@ -12,24 +12,13 @@
|
||||
* @author D. Klein, A. Rybalchenko
|
||||
*/
|
||||
|
||||
#include <iostream>
|
||||
#include <mutex>
|
||||
#include <condition_variable>
|
||||
#include <map>
|
||||
|
||||
#include "boost/program_options.hpp"
|
||||
#include <boost/asio.hpp> // for DDS
|
||||
|
||||
#include "FairMQLogger.h"
|
||||
#include "FairMQDDSTools.h"
|
||||
#include "FairMQProgOptions.h"
|
||||
#include "FairMQExample3Sampler.h"
|
||||
#include "FairMQTools.h"
|
||||
|
||||
#include "dds_intercom.h" // DDS
|
||||
|
||||
using namespace std;
|
||||
using namespace boost::program_options;
|
||||
using namespace dds::intercom_api;
|
||||
|
||||
int main(int argc, char** argv)
|
||||
{
|
||||
@@ -40,94 +29,21 @@ int main(int argc, char** argv)
|
||||
|
||||
try
|
||||
{
|
||||
string interfaceName; // name of the network interface to use for communication.
|
||||
|
||||
options_description samplerOptions("Sampler options");
|
||||
samplerOptions.add_options()
|
||||
("network-interface", value<string>(&interfaceName)->default_value("eth0"), "Name of the network interface to use (e.g. eth0, ib0, wlan0, en0...)");
|
||||
|
||||
config.AddToCmdLineOptions(samplerOptions);
|
||||
|
||||
if (config.ParseAll(argc, argv))
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
string id = config.GetValue<string>("id");
|
||||
|
||||
LOG(INFO) << "PID: " << getpid();
|
||||
|
||||
sampler.SetTransport(config.GetValue<std::string>("transport"));
|
||||
|
||||
sampler.SetProperty(FairMQExample3Sampler::Id, id);
|
||||
|
||||
// configure data output channel
|
||||
FairMQChannel dataOutChannel("push", "bind", "");
|
||||
dataOutChannel.UpdateRateLogging(0);
|
||||
sampler.fChannels["data1"].push_back(dataOutChannel);
|
||||
|
||||
// Get the IP of the current host and store it for binding.
|
||||
map<string,string> IPs;
|
||||
FairMQ::tools::getHostIPs(IPs);
|
||||
stringstream ss;
|
||||
// Check if ib0 (infiniband) interface is available, otherwise try eth0 or wlan0.
|
||||
if (IPs.count(interfaceName))
|
||||
{
|
||||
ss << "tcp://" << IPs[interfaceName] << ":1";
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG(INFO) << ss.str();
|
||||
LOG(ERROR) << "Could not find provided network interface: \"" << interfaceName << "\"!, exiting.";
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
string initialOutputAddress = ss.str();
|
||||
|
||||
// Configure the found host IP for the channel.
|
||||
// TCP port will be chosen randomly during the initialization (binding).
|
||||
sampler.fChannels.at("data1").at(0).UpdateAddress(initialOutputAddress);
|
||||
sampler.SetConfig(config);
|
||||
|
||||
sampler.ChangeState("INIT_DEVICE");
|
||||
sampler.WaitForInitialValidation();
|
||||
|
||||
// Advertise the bound addresses via DDS property
|
||||
LOG(INFO) << "Giving sampler output address to DDS.";
|
||||
CKeyValue ddsKeyValue;
|
||||
ddsKeyValue.putValue("SamplerAddress", sampler.fChannels.at("data1").at(0).GetAddress());
|
||||
|
||||
HandleConfigViaDDS(sampler);
|
||||
sampler.WaitForEndOfState("INIT_DEVICE");
|
||||
|
||||
sampler.ChangeState("INIT_TASK");
|
||||
sampler.WaitForEndOfState("INIT_TASK");
|
||||
|
||||
CCustomCmd ddsCustomCmd;
|
||||
|
||||
// Subscribe on custom commands
|
||||
ddsCustomCmd.subscribe([&](const string& command, const string& condition, uint64_t senderId)
|
||||
{
|
||||
LOG(INFO) << "Received custom command: " << command;
|
||||
if (command == "check-state")
|
||||
{
|
||||
ddsCustomCmd.send(id + ": " + sampler.GetCurrentStateName(), to_string(senderId));
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG(WARN) << "Received unknown command: " << command;
|
||||
LOG(WARN) << "Origin: " << senderId;
|
||||
LOG(WARN) << "Destination: " << condition;
|
||||
}
|
||||
});
|
||||
|
||||
sampler.ChangeState("RUN");
|
||||
sampler.WaitForEndOfState("RUN");
|
||||
|
||||
sampler.ChangeState("RESET_TASK");
|
||||
sampler.WaitForEndOfState("RESET_TASK");
|
||||
|
||||
sampler.ChangeState("RESET_DEVICE");
|
||||
sampler.WaitForEndOfState("RESET_DEVICE");
|
||||
|
||||
sampler.ChangeState("END");
|
||||
runDDSStateHandler(sampler);
|
||||
}
|
||||
catch (exception& e)
|
||||
{
|
||||
|
@@ -12,24 +12,13 @@
|
||||
* @author D. Klein, A. Rybalchenko
|
||||
*/
|
||||
|
||||
#include <iostream>
|
||||
#include <mutex>
|
||||
#include <condition_variable>
|
||||
#include <map>
|
||||
|
||||
#include "boost/program_options.hpp"
|
||||
#include <boost/asio.hpp> // for DDS
|
||||
|
||||
#include "FairMQLogger.h"
|
||||
#include "FairMQDDSTools.h"
|
||||
#include "FairMQProgOptions.h"
|
||||
#include "FairMQExample3Sink.h"
|
||||
#include "FairMQTools.h"
|
||||
|
||||
#include "dds_intercom.h" // DDS
|
||||
|
||||
using namespace std;
|
||||
using namespace boost::program_options;
|
||||
using namespace dds::intercom_api;
|
||||
|
||||
int main(int argc, char** argv)
|
||||
{
|
||||
@@ -40,94 +29,21 @@ int main(int argc, char** argv)
|
||||
|
||||
try
|
||||
{
|
||||
string interfaceName; // name of the network interface to use for communication.
|
||||
|
||||
options_description sinkOptions("Sink options");
|
||||
sinkOptions.add_options()
|
||||
("network-interface", value<string>(&interfaceName)->default_value("eth0"), "Name of the network interface to use (e.g. eth0, ib0, wlan0, en0...)");
|
||||
|
||||
config.AddToCmdLineOptions(sinkOptions);
|
||||
|
||||
if (config.ParseAll(argc, argv))
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
string id = config.GetValue<string>("id");
|
||||
|
||||
LOG(INFO) << "PID: " << getpid();
|
||||
|
||||
sink.SetTransport(config.GetValue<std::string>("transport"));
|
||||
|
||||
sink.SetProperty(FairMQExample3Sink::Id, id);
|
||||
|
||||
// configure data output channel
|
||||
FairMQChannel dataInChannel("pull", "bind", "");
|
||||
dataInChannel.UpdateRateLogging(0);
|
||||
sink.fChannels["data2"].push_back(dataInChannel);
|
||||
|
||||
// Get the IP of the current host and store it for binding.
|
||||
map<string,string> IPs;
|
||||
FairMQ::tools::getHostIPs(IPs);
|
||||
stringstream ss;
|
||||
// Check if ib0 (infiniband) interface is available, otherwise try eth0 or wlan0.
|
||||
if (IPs.count(interfaceName))
|
||||
{
|
||||
ss << "tcp://" << IPs[interfaceName] << ":1";
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG(INFO) << ss.str();
|
||||
LOG(ERROR) << "Could not find provided network interface: \"" << interfaceName << "\"!, exiting.";
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
string initialInputAddress = ss.str();
|
||||
|
||||
// Configure the found host IP for the channel.
|
||||
// TCP port will be chosen randomly during the initialization (binding).
|
||||
sink.fChannels.at("data2").at(0).UpdateAddress(initialInputAddress);
|
||||
sink.SetConfig(config);
|
||||
|
||||
sink.ChangeState("INIT_DEVICE");
|
||||
sink.WaitForInitialValidation();
|
||||
|
||||
// Advertise the bound address via DDS property
|
||||
LOG(INFO) << "Giving sink input address to DDS.";
|
||||
CKeyValue ddsKeyValue;
|
||||
ddsKeyValue.putValue("SinkAddress", sink.fChannels.at("data2").at(0).GetAddress());
|
||||
|
||||
HandleConfigViaDDS(sink);
|
||||
sink.WaitForEndOfState("INIT_DEVICE");
|
||||
|
||||
sink.ChangeState("INIT_TASK");
|
||||
sink.WaitForEndOfState("INIT_TASK");
|
||||
|
||||
CCustomCmd ddsCustomCmd;
|
||||
|
||||
// Subscribe on custom commands
|
||||
ddsCustomCmd.subscribe([&](const string& command, const string& condition, uint64_t senderId)
|
||||
{
|
||||
LOG(INFO) << "Received custom command: " << command;
|
||||
if (command == "check-state")
|
||||
{
|
||||
ddsCustomCmd.send(id + ": " + sink.GetCurrentStateName(), to_string(senderId));
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG(WARN) << "Received unknown command: " << command;
|
||||
LOG(WARN) << "Origin: " << senderId;
|
||||
LOG(WARN) << "Destination: " << condition;
|
||||
}
|
||||
});
|
||||
|
||||
sink.ChangeState("RUN");
|
||||
sink.WaitForEndOfState("RUN");
|
||||
|
||||
sink.ChangeState("RESET_TASK");
|
||||
sink.WaitForEndOfState("RESET_TASK");
|
||||
|
||||
sink.ChangeState("RESET_DEVICE");
|
||||
sink.WaitForEndOfState("RESET_DEVICE");
|
||||
|
||||
sink.ChangeState("END");
|
||||
runDDSStateHandler(sink);
|
||||
}
|
||||
catch (exception& e)
|
||||
{
|
||||
|
Reference in New Issue
Block a user