Update DDS example to use new library and names

This commit is contained in:
Alexey Rybalchenko 2016-03-30 09:55:49 +02:00 committed by Mohammad Al-Turany
parent 830ba948f0
commit e818174126
7 changed files with 39 additions and 42 deletions

View File

@ -57,8 +57,7 @@ Set(SRCS
Set(DEPENDENCIES Set(DEPENDENCIES
${DEPENDENCIES} ${DEPENDENCIES}
FairMQ FairMQ
dds-key-value-lib dds_intercom_lib
dds-custom-cmd-lib
) )
set(LIBRARY_NAME FairMQExample3) set(LIBRARY_NAME FairMQExample3)

View File

@ -19,7 +19,7 @@ In our example Sampler and Sink bind their sockets. The bound addresses are avai
sampler.ChangeState("INIT_DEVICE"); sampler.ChangeState("INIT_DEVICE");
sampler.WaitForInitialValidation(); sampler.WaitForInitialValidation();
dds::key_value::CKeyValue ddsKeyValue; CKeyValue ddsKeyValue;
ddsKeyValue.putValue("SamplerOutputAddress", sampler.fChannels.at("data-out").at(0).GetAddress()); ddsKeyValue.putValue("SamplerOutputAddress", sampler.fChannels.at("data-out").at(0).GetAddress());
sampler.WaitForEndOfState("INIT_DEVICE"); sampler.WaitForEndOfState("INIT_DEVICE");
@ -32,9 +32,9 @@ Same approach for the Sink.
The Processors in our example need the addresses of Sampler and Sink. They receive these from DDS via properties (sent in the step above): The Processors in our example need the addresses of Sampler and Sink. They receive these from DDS via properties (sent in the step above):
```C++ ```C++
dds::key_value::CKeyValue ddsKeyValue; CKeyValue ddsKeyValue;
// Sampler properties // Sampler properties
dds::key_value::CKeyValue::valuesMap_t samplerValues; CKeyValue::valuesMap_t samplerValues;
{ {
mutex keyMutex; mutex keyMutex;
condition_variable keyCondition; condition_variable keyCondition;
@ -92,9 +92,9 @@ dds-server start -s
Agents are submitted with: Agents are submitted with:
```bash ```bash
dds-submit --rms ssh --ssh-rms-cfg ex3-dds-hosts.cfg dds-submit --rms ssh --config ex3-dds-hosts.cfg
``` ```
The `--rms` option defines a destination resource management system. The `--ssh-rms-cfg` specifies an SSH plug-in resource definition file. The `--rms` option defines a destination resource management system. The `--config` specifies an SSH plug-in resource definition file.
##### 7. Set the topology file. ##### 7. Set the topology file.
@ -115,7 +115,7 @@ After activation, agents will execute the defined tasks on the worker nodes. Out
##### 10. (optional) Use example command UI to check state of the devices ##### 10. (optional) Use example command UI to check state of the devices
This example includes a simple utility to send command to devices and receive replies from them. The code in `runDDSCommandUI.cxx` (compiled as ex3-dds-command-ui) uses the DDSCustomCmd library to send "check-state" string to all devices, to which they reply with their ID and state they are in. This can be used as an example of sending/receiving commands or other information to devices. This example includes a simple utility to send command to devices and receive replies from them. The code in `runDDSCommandUI.cxx` (compiled as ex3-dds-command-ui) uses the DDS intercom library to send "check-state" string to all devices, to which they reply with their ID and state they are in. This can be used as an example of sending/receiving commands or other information to devices.
To see it in action, start the ex3-dds-command-ui while the topology is running. To see it in action, start the ex3-dds-command-ui while the topology is running.

View File

@ -1,8 +1,7 @@
@bash_begin@ @bash_begin@
echo "DBG: SSH ENV Script" # source setup.sh
#source setup.sh
@bash_end@ @bash_end@
sampler, orybalch@localhost, , /tmp/, 1 sampler, username@localhost, , /tmp/, 1
processor, orybalch@localhost, , /tmp/, 10 processor, username@localhost, , /tmp/, 10
sink, orybalch@localhost, , /tmp/, 1 sink, username@localhost, , /tmp/, 1

View File

@ -1,5 +1,5 @@
// DDS #include "dds_intercom.h" // DDS
#include "CustomCmd.h"
// STD // STD
#include <iostream> #include <iostream>
#include <exception> #include <exception>
@ -8,8 +8,7 @@
#include <atomic> #include <atomic>
using namespace std; using namespace std;
using namespace dds; using namespace dds::intercom_api;
using namespace custom_cmd;
int main(int argc, char* argv[]) int main(int argc, char* argv[])
{ {
@ -17,14 +16,14 @@ int main(int argc, char* argv[])
{ {
CCustomCmd ddsCustomCmd; CCustomCmd ddsCustomCmd;
ddsCustomCmd.subscribeCmd([](const string& command, const string& condition, uint64_t senderId) ddsCustomCmd.subscribe([](const string& command, const string& condition, uint64_t senderId)
{ {
cout << "Received: \"" << command << "\"" << endl; cout << "Received: \"" << command << "\"" << endl;
}); });
while (true) while (true)
{ {
int result = ddsCustomCmd.sendCmd("check-state", ""); int result = ddsCustomCmd.send("check-state", "");
if (result == 1) if (result == 1)
{ {
@ -34,11 +33,11 @@ int main(int argc, char* argv[])
this_thread::sleep_for(chrono::seconds(1)); this_thread::sleep_for(chrono::seconds(1));
} }
} }
catch (exception& _e) catch (exception& e)
{ {
cerr << "Error: " << _e.what() << endl; cerr << "Error: " << e.what() << endl;
return EXIT_FAILURE; return EXIT_FAILURE;
} }
return EXIT_SUCCESS; return EXIT_SUCCESS;
} }

View File

@ -24,11 +24,11 @@
#include "FairMQExample3Processor.h" #include "FairMQExample3Processor.h"
#include "FairMQTools.h" #include "FairMQTools.h"
#include "KeyValue.h" // DDS Key Value #include "dds_intercom.h" // DDS
#include "CustomCmd.h" // DDS Custom Commands
using namespace std; using namespace std;
using namespace boost::program_options; using namespace boost::program_options;
using namespace dds::intercom_api;
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
@ -63,9 +63,9 @@ int main(int argc, char** argv)
processor.fChannels["data2"].push_back(dataOutChannel); processor.fChannels["data2"].push_back(dataOutChannel);
// Waiting for DDS properties // Waiting for DDS properties
dds::key_value::CKeyValue ddsKeyValue; CKeyValue ddsKeyValue;
// Sampler properties // Sampler properties
dds::key_value::CKeyValue::valuesMap_t samplerValues; CKeyValue::valuesMap_t samplerValues;
{ {
mutex keyMutex; mutex keyMutex;
condition_variable keyCondition; condition_variable keyCondition;
@ -81,7 +81,7 @@ int main(int argc, char** argv)
} }
} }
// Sink properties // Sink properties
dds::key_value::CKeyValue::valuesMap_t sinkValues; CKeyValue::valuesMap_t sinkValues;
{ {
mutex keyMutex; mutex keyMutex;
condition_variable keyCondition; condition_variable keyCondition;
@ -106,15 +106,15 @@ int main(int argc, char** argv)
processor.ChangeState("INIT_TASK"); processor.ChangeState("INIT_TASK");
processor.WaitForEndOfState("INIT_TASK"); processor.WaitForEndOfState("INIT_TASK");
dds::custom_cmd::CCustomCmd ddsCustomCmd; CCustomCmd ddsCustomCmd;
// Subscribe on custom commands // Subscribe on custom commands
ddsCustomCmd.subscribeCmd([&](const string& command, const string& condition, uint64_t senderId) ddsCustomCmd.subscribe([&](const string& command, const string& condition, uint64_t senderId)
{ {
LOG(INFO) << "Received custom command: " << command; LOG(INFO) << "Received custom command: " << command;
if (command == "check-state") if (command == "check-state")
{ {
ddsCustomCmd.sendCmd(id + ": " + processor.GetCurrentStateName(), to_string(senderId)); ddsCustomCmd.send(id + ": " + processor.GetCurrentStateName(), to_string(senderId));
} }
else else
{ {

View File

@ -25,11 +25,11 @@
#include "FairMQExample3Sampler.h" #include "FairMQExample3Sampler.h"
#include "FairMQTools.h" #include "FairMQTools.h"
#include "KeyValue.h" // DDS Key Value #include "dds_intercom.h" // DDS
#include "CustomCmd.h" // DDS Custom Commands
using namespace std; using namespace std;
using namespace boost::program_options; using namespace boost::program_options;
using namespace dds::intercom_api;
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
@ -92,7 +92,7 @@ int main(int argc, char** argv)
// Advertise the bound addresses via DDS property // Advertise the bound addresses via DDS property
LOG(INFO) << "Giving sampler output address to DDS."; LOG(INFO) << "Giving sampler output address to DDS.";
dds::key_value::CKeyValue ddsKeyValue; CKeyValue ddsKeyValue;
ddsKeyValue.putValue("SamplerAddress", sampler.fChannels.at("data1").at(0).GetAddress()); ddsKeyValue.putValue("SamplerAddress", sampler.fChannels.at("data1").at(0).GetAddress());
sampler.WaitForEndOfState("INIT_DEVICE"); sampler.WaitForEndOfState("INIT_DEVICE");
@ -100,15 +100,15 @@ int main(int argc, char** argv)
sampler.ChangeState("INIT_TASK"); sampler.ChangeState("INIT_TASK");
sampler.WaitForEndOfState("INIT_TASK"); sampler.WaitForEndOfState("INIT_TASK");
dds::custom_cmd::CCustomCmd ddsCustomCmd; CCustomCmd ddsCustomCmd;
// Subscribe on custom commands // Subscribe on custom commands
ddsCustomCmd.subscribeCmd([&](const string& command, const string& condition, uint64_t senderId) ddsCustomCmd.subscribe([&](const string& command, const string& condition, uint64_t senderId)
{ {
LOG(INFO) << "Received custom command: " << command; LOG(INFO) << "Received custom command: " << command;
if (command == "check-state") if (command == "check-state")
{ {
ddsCustomCmd.sendCmd(id + ": " + sampler.GetCurrentStateName(), to_string(senderId)); ddsCustomCmd.send(id + ": " + sampler.GetCurrentStateName(), to_string(senderId));
} }
else else
{ {

View File

@ -25,11 +25,11 @@
#include "FairMQExample3Sink.h" #include "FairMQExample3Sink.h"
#include "FairMQTools.h" #include "FairMQTools.h"
#include "KeyValue.h" // DDS Key Value #include "dds_intercom.h" // DDS
#include "CustomCmd.h" // DDS Custom Commands
using namespace std; using namespace std;
using namespace boost::program_options; using namespace boost::program_options;
using namespace dds::intercom_api;
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
@ -92,7 +92,7 @@ int main(int argc, char** argv)
// Advertise the bound address via DDS property // Advertise the bound address via DDS property
LOG(INFO) << "Giving sink input address to DDS."; LOG(INFO) << "Giving sink input address to DDS.";
dds::key_value::CKeyValue ddsKeyValue; CKeyValue ddsKeyValue;
ddsKeyValue.putValue("SinkAddress", sink.fChannels.at("data2").at(0).GetAddress()); ddsKeyValue.putValue("SinkAddress", sink.fChannels.at("data2").at(0).GetAddress());
sink.WaitForEndOfState("INIT_DEVICE"); sink.WaitForEndOfState("INIT_DEVICE");
@ -100,15 +100,15 @@ int main(int argc, char** argv)
sink.ChangeState("INIT_TASK"); sink.ChangeState("INIT_TASK");
sink.WaitForEndOfState("INIT_TASK"); sink.WaitForEndOfState("INIT_TASK");
dds::custom_cmd::CCustomCmd ddsCustomCmd; CCustomCmd ddsCustomCmd;
// Subscribe on custom commands // Subscribe on custom commands
ddsCustomCmd.subscribeCmd([&](const string& command, const string& condition, uint64_t senderId) ddsCustomCmd.subscribe([&](const string& command, const string& condition, uint64_t senderId)
{ {
LOG(INFO) << "Received custom command: " << command; LOG(INFO) << "Received custom command: " << command;
if (command == "check-state") if (command == "check-state")
{ {
ddsCustomCmd.sendCmd(id + ": " + sink.GetCurrentStateName(), to_string(senderId)); ddsCustomCmd.send(id + ": " + sink.GetCurrentStateName(), to_string(senderId));
} }
else else
{ {