Extend DDS Example to use command interface

This commit is contained in:
Alexey Rybalchenko 2015-11-23 11:28:15 +01:00 committed by Mohammad Al-Turany
parent 837e035457
commit c9c881c33c
13 changed files with 181 additions and 238 deletions

View File

@ -6,7 +6,6 @@
# copied verbatim in the file "LICENSE" # # copied verbatim in the file "LICENSE" #
################################################################################ ################################################################################
configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/3-dds/ex3-devices.json ${CMAKE_BINARY_DIR}/bin/config/ex3-devices.json)
configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/3-dds/ex3-dds-topology.xml ${CMAKE_BINARY_DIR}/bin/config/ex3-dds-topology.xml) configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/3-dds/ex3-dds-topology.xml ${CMAKE_BINARY_DIR}/bin/config/ex3-dds-topology.xml)
configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/3-dds/ex3-dds-hosts.cfg ${CMAKE_BINARY_DIR}/bin/config/ex3-dds-hosts.cfg COPYONLY) configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/3-dds/ex3-dds-hosts.cfg ${CMAKE_BINARY_DIR}/bin/config/ex3-dds-hosts.cfg COPYONLY)
@ -68,6 +67,7 @@ Set(DEPENDENCIES
${DEPENDENCIES} ${DEPENDENCIES}
FairMQ FairMQ
dds-key-value-lib dds-key-value-lib
dds-custom-cmd-lib
) )
set(LIBRARY_NAME FairMQExample3) set(LIBRARY_NAME FairMQExample3)
@ -79,6 +79,7 @@ Set(Exe_Names
ex3-sampler-dds ex3-sampler-dds
ex3-processor-dds ex3-processor-dds
ex3-sink-dds ex3-sink-dds
ex3-dds-command-ui
) )
Set(Exe_Source Set(Exe_Source
@ -86,6 +87,7 @@ Set(Exe_Source
runExample3Sampler.cxx runExample3Sampler.cxx
runExample3Processor.cxx runExample3Processor.cxx
runExample3Sink.cxx runExample3Sink.cxx
runDDSCommandUI.cxx
) )
list(LENGTH Exe_Names _length) list(LENGTH Exe_Names _length)

View File

@ -21,7 +21,6 @@
using namespace std; using namespace std;
FairMQExample3Processor::FairMQExample3Processor() FairMQExample3Processor::FairMQExample3Processor()
: fTaskIndex(0)
{ {
} }
@ -32,7 +31,6 @@ void FairMQExample3Processor::CustomCleanup(void *data, void *object)
void FairMQExample3Processor::Run() void FairMQExample3Processor::Run()
{ {
// Check if we are still in the RUNNING state
while (CheckCurrentState(RUNNING)) while (CheckCurrentState(RUNNING))
{ {
// Create empty message to hold the input // Create empty message to hold the input
@ -46,7 +44,7 @@ void FairMQExample3Processor::Run()
// Modify the received string // Modify the received string
string* text = new string(static_cast<char*>(input->GetData()), input->GetSize()); string* text = new string(static_cast<char*>(input->GetData()), input->GetSize());
*text += " (modified by " + fId + to_string(fTaskIndex) + ")"; *text += " (modified by " + fId + ")";
// Create output message // Create output message
unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage(const_cast<char*>(text->c_str()), text->length(), CustomCleanup, text)); unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage(const_cast<char*>(text->c_str()), text->length(), CustomCleanup, text));
@ -57,50 +55,6 @@ void FairMQExample3Processor::Run()
} }
} }
void FairMQExample3Processor::SetProperty(const int key, const string& value)
{
switch (key)
{
default:
FairMQDevice::SetProperty(key, value);
break;
}
}
string FairMQExample3Processor::GetProperty(const int key, const string& default_ /*= ""*/)
{
switch (key)
{
default:
return FairMQDevice::GetProperty(key, default_);
}
}
void FairMQExample3Processor::SetProperty(const int key, const int value)
{
switch (key)
{
case TaskIndex:
fTaskIndex = value;
break;
default:
FairMQDevice::SetProperty(key, value);
break;
}
}
int FairMQExample3Processor::GetProperty(const int key, const int default_ /*= 0*/)
{
switch (key)
{
case TaskIndex:
return fTaskIndex;
break;
default:
return FairMQDevice::GetProperty(key, default_);
}
}
FairMQExample3Processor::~FairMQExample3Processor() FairMQExample3Processor::~FairMQExample3Processor()
{ {
} }

View File

@ -22,25 +22,12 @@
class FairMQExample3Processor : public FairMQDevice class FairMQExample3Processor : public FairMQDevice
{ {
public: public:
enum
{
Text = FairMQDevice::Last,
TaskIndex,
Last
};
FairMQExample3Processor(); FairMQExample3Processor();
virtual ~FairMQExample3Processor(); virtual ~FairMQExample3Processor();
static void CustomCleanup(void* data, void* hint); static void CustomCleanup(void* data, void* hint);
virtual void SetProperty(const int key, const std::string& value);
virtual std::string GetProperty(const int key, const std::string& default_ = "");
virtual void SetProperty(const int key, const int value);
virtual int GetProperty(const int key, const int default_ = 0);
protected: protected:
int fTaskIndex;
virtual void Run(); virtual void Run();
}; };

View File

@ -21,7 +21,6 @@
using namespace std; using namespace std;
FairMQExample3Sampler::FairMQExample3Sampler() FairMQExample3Sampler::FairMQExample3Sampler()
: fText()
{ {
} }
@ -32,16 +31,15 @@ void FairMQExample3Sampler::CustomCleanup(void *data, void *object)
void FairMQExample3Sampler::Run() void FairMQExample3Sampler::Run()
{ {
// Check if we are still in the RUNNING state
while (CheckCurrentState(RUNNING)) while (CheckCurrentState(RUNNING))
{ {
boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
string* text = new string(fText); string* text = new string("Data");
unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage(const_cast<char*>(text->c_str()), text->length(), CustomCleanup, text)); unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage(const_cast<char*>(text->c_str()), text->length(), CustomCleanup, text));
LOG(INFO) << "Sending \"" << fText << "\""; LOG(INFO) << "Sending \"Data\"";
fChannels.at("data-out").at(0).Send(msg); fChannels.at("data-out").at(0).Send(msg);
} }
@ -50,47 +48,3 @@ void FairMQExample3Sampler::Run()
FairMQExample3Sampler::~FairMQExample3Sampler() FairMQExample3Sampler::~FairMQExample3Sampler()
{ {
} }
void FairMQExample3Sampler::SetProperty(const int key, const string& value)
{
switch (key)
{
case Text:
fText = value;
break;
default:
FairMQDevice::SetProperty(key, value);
break;
}
}
string FairMQExample3Sampler::GetProperty(const int key, const string& default_ /*= ""*/)
{
switch (key)
{
case Text:
return fText;
break;
default:
return FairMQDevice::GetProperty(key, default_);
}
}
void FairMQExample3Sampler::SetProperty(const int key, const int value)
{
switch (key)
{
default:
FairMQDevice::SetProperty(key, value);
break;
}
}
int FairMQExample3Sampler::GetProperty(const int key, const int default_ /*= 0*/)
{
switch (key)
{
default:
return FairMQDevice::GetProperty(key, default_);
}
}

View File

@ -22,24 +22,12 @@
class FairMQExample3Sampler : public FairMQDevice class FairMQExample3Sampler : public FairMQDevice
{ {
public: public:
enum
{
Text = FairMQDevice::Last,
Last
};
FairMQExample3Sampler(); FairMQExample3Sampler();
virtual ~FairMQExample3Sampler(); virtual ~FairMQExample3Sampler();
static void CustomCleanup(void* data, void* hint); static void CustomCleanup(void* data, void* hint);
virtual void SetProperty(const int key, const std::string& value);
virtual std::string GetProperty(const int key, const std::string& default_ = "");
virtual void SetProperty(const int key, const int value);
virtual int GetProperty(const int key, const int default_ = 0);
protected: protected:
std::string fText;
virtual void Run(); virtual void Run();
}; };

View File

@ -113,7 +113,13 @@ dds-topology --activate
After activation, agents will execute the defined tasks on the worker nodes. Output of the tasks will be stored in the directory that was specified in the hosts file. After activation, agents will execute the defined tasks on the worker nodes. Output of the tasks will be stored in the directory that was specified in the hosts file.
##### 10. Stop DDS server/topology. ##### 10. (optional) Use example command UI to check state of the devices
This example includes a simple utility to send command to devices and receive replies from them. The code in `runDDSCommandUI.cxx` (compiled as ex3-dds-command-ui) uses the DDSCustomCmd library to send "check-state" string to all devices, to which they reply with their ID and state they are in. This can be used as an example of sending/receiving commands or other information to devices.
To see it in action, start the ex3-dds-command-ui while the topology is running.
##### 11. Stop DDS server/topology.
The execution of tasks can be stopped with: The execution of tasks can be stopped with:
```bash ```bash

View File

@ -3,4 +3,6 @@ echo "DBG: SSH ENV Script"
#source setup.sh #source setup.sh
@bash_end@ @bash_end@
wn0, username@localhost, , /tmp/, 12 sampler, username@localhost, , /tmp/, 1
processor, username@localhost, , /tmp/, 10
sink, username@localhost, , /tmp/, 1

View File

@ -3,15 +3,29 @@
<property id="SamplerOutputAddress" /> <property id="SamplerOutputAddress" />
<property id="SinkInputAddress" /> <property id="SinkInputAddress" />
<declrequirement id="SamplerWorker">
<hostPattern type="wnname" value="sampler"/>
</declrequirement>
<declrequirement id="ProcessorWorker">
<hostPattern type="wnname" value="processor"/>
</declrequirement>
<declrequirement id="SinkWorker">
<hostPattern type="wnname" value="sink"/>
</declrequirement>
<decltask id="Sampler"> <decltask id="Sampler">
<exe reachable="true">@CMAKE_BINARY_DIR@/bin/ex3-sampler-dds --id sampler --config-json-file @CMAKE_BINARY_DIR@/bin/config/ex3-devices.json</exe> <exe reachable="true">@CMAKE_BINARY_DIR@/bin/ex3-sampler-dds --id sampler0 --log-color-format false</exe>
<requirement>SamplerWorker</requirement>
<properties> <properties>
<id access="write">SamplerOutputAddress</id> <id access="write">SamplerOutputAddress</id>
</properties> </properties>
</decltask> </decltask>
<decltask id="Processor"> <decltask id="Processor">
<exe reachable="true">@CMAKE_BINARY_DIR@/bin/ex3-processor-dds --id processor --index %taskIndex% --config-json-file @CMAKE_BINARY_DIR@/bin/config/ex3-devices.json</exe> <exe reachable="true">@CMAKE_BINARY_DIR@/bin/ex3-processor-dds --id processor%taskIndex% --log-color-format false</exe>
<requirement>ProcessorWorker</requirement>
<properties> <properties>
<id access="read">SamplerOutputAddress</id> <id access="read">SamplerOutputAddress</id>
<id access="read">SinkInputAddress</id> <id access="read">SinkInputAddress</id>
@ -19,7 +33,8 @@
</decltask> </decltask>
<decltask id="Sink"> <decltask id="Sink">
<exe reachable="true">@CMAKE_BINARY_DIR@/bin/ex3-sink-dds --id sink --config-json-file @CMAKE_BINARY_DIR@/bin/config/ex3-devices.json</exe> <exe reachable="true">@CMAKE_BINARY_DIR@/bin/ex3-sink-dds --id sink0 --log-color-format false</exe>
<requirement>SinkWorker</requirement>
<properties> <properties>
<id access="write">SinkInputAddress</id> <id access="write">SinkInputAddress</id>
</properties> </properties>

View File

@ -1,59 +0,0 @@
{
"fairMQOptions":
{
"device":
{
"id": "sampler",
"channel":
{
"name": "data-out",
"socket":
{
"type": "push",
"method": "bind",
"address": ""
}
}
},
"device":
{
"id": "processor",
"channel":
{
"name": "data-in",
"socket":
{
"type": "pull",
"method": "connect",
"address": ""
}
},
"channel":
{
"name": "data-out",
"socket":
{
"type": "push",
"method": "connect",
"address": ""
}
}
},
"device":
{
"id": "sink",
"channel":
{
"name": "data-in",
"socket":
{
"type": "pull",
"method": "bind",
"address": ""
}
}
}
}
}

View File

@ -0,0 +1,44 @@
// DDS
#include "CustomCmd.h"
// STD
#include <iostream>
#include <exception>
#include <sstream>
#include <thread>
#include <atomic>
using namespace std;
using namespace dds;
using namespace custom_cmd;
int main(int argc, char* argv[])
{
try
{
CCustomCmd ddsCustomCmd;
ddsCustomCmd.subscribeCmd([](const string& command, const string& condition, uint64_t senderId)
{
cout << "Received: \"" << command << "\"" << endl;
});
while (true)
{
int result = ddsCustomCmd.sendCmd("check-state", "");
if (result == 1)
{
cerr << "Error sending custom command" << endl;
}
this_thread::sleep_for(chrono::seconds(1));
}
}
catch (exception& _e)
{
cerr << "Error: " << _e.what() << endl;
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}

View File

@ -20,7 +20,6 @@
#include <boost/asio.hpp> // for DDS #include <boost/asio.hpp> // for DDS
#include "FairMQLogger.h" #include "FairMQLogger.h"
#include "FairMQParser.h"
#include "FairMQProgOptions.h" #include "FairMQProgOptions.h"
#include "FairMQExample3Processor.h" #include "FairMQExample3Processor.h"
#include "FairMQTools.h" #include "FairMQTools.h"
@ -31,8 +30,10 @@
#include "FairMQTransportFactoryZMQ.h" #include "FairMQTransportFactoryZMQ.h"
#endif #endif
#include "KeyValue.h" // DDS #include "KeyValue.h" // DDS Key Value
#include "CustomCmd.h" // DDS Custom Commands
using namespace std;
using namespace boost::program_options; using namespace boost::program_options;
int main(int argc, char** argv) int main(int argc, char** argv)
@ -44,28 +45,35 @@ int main(int argc, char** argv)
try try
{ {
int ddsTaskIndex = 0;
options_description processorOptions("Processor options");
processorOptions.add_options()
("index", value<int>(&ddsTaskIndex)->default_value(0), "DDS task index");
config.AddToCmdLineOptions(processorOptions);
if (config.ParseAll(argc, argv)) if (config.ParseAll(argc, argv))
{ {
return 0; return 0;
} }
std::string filename = config.GetValue<std::string>("config-json-file"); string id = config.GetValue<string>("id");
std::string id = config.GetValue<std::string>("id");
config.UserParser<FairMQParser::JSON>(filename, id);
processor.fChannels = config.GetFairMQMap();
LOG(INFO) << "PID: " << getpid(); LOG(INFO) << "PID: " << getpid();
#ifdef NANOMSG
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
#else
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
#endif
processor.SetTransport(transportFactory);
processor.SetProperty(FairMQExample3Processor::Id, id);
// configure data output channel
FairMQChannel dataInChannel("pull", "connect", "");
dataInChannel.UpdateRateLogging(0);
processor.fChannels["data-in"].push_back(dataInChannel);
// configure data output channel
FairMQChannel dataOutChannel("push", "connect", "");
dataOutChannel.UpdateRateLogging(0);
processor.fChannels["data-out"].push_back(dataOutChannel);
// Waiting for DDS properties // Waiting for DDS properties
dds::key_value::CKeyValue ddsKeyValue; dds::key_value::CKeyValue ddsKeyValue;
// Sampler properties // Sampler properties
@ -104,23 +112,30 @@ int main(int argc, char** argv)
processor.fChannels.at("data-in").at(0).UpdateAddress(samplerValues.begin()->second); processor.fChannels.at("data-in").at(0).UpdateAddress(samplerValues.begin()->second);
processor.fChannels.at("data-out").at(0).UpdateAddress(sinkValues.begin()->second); processor.fChannels.at("data-out").at(0).UpdateAddress(sinkValues.begin()->second);
#ifdef NANOMSG
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
#else
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
#endif
processor.SetTransport(transportFactory);
processor.SetProperty(FairMQExample3Processor::Id, id);
processor.SetProperty(FairMQExample3Processor::TaskIndex, ddsTaskIndex);
processor.ChangeState("INIT_DEVICE"); processor.ChangeState("INIT_DEVICE");
processor.WaitForEndOfState("INIT_DEVICE"); processor.WaitForEndOfState("INIT_DEVICE");
processor.ChangeState("INIT_TASK"); processor.ChangeState("INIT_TASK");
processor.WaitForEndOfState("INIT_TASK"); processor.WaitForEndOfState("INIT_TASK");
dds::custom_cmd::CCustomCmd ddsCustomCmd;
// Subscribe on custom commands
ddsCustomCmd.subscribeCmd([&](const string& command, const string& condition, uint64_t senderId)
{
LOG(INFO) << "Received custom command: " << command << " condition: " << condition << " senderId: " << senderId;
if (command == "check-state")
{
ddsCustomCmd.sendCmd(id + ": " + processor.GetCurrentStateName(), to_string(senderId));
}
else
{
LOG(WARN) << "Received unknown command: " << command;
LOG(WARN) << "Origin: " << senderId;
LOG(WARN) << "Destination: " << condition;
}
});
processor.ChangeState("RUN"); processor.ChangeState("RUN");
processor.WaitForEndOfState("RUN"); processor.WaitForEndOfState("RUN");
@ -132,7 +147,7 @@ int main(int argc, char** argv)
processor.ChangeState("END"); processor.ChangeState("END");
} }
catch (std::exception& e) catch (exception& e)
{ {
LOG(ERROR) << e.what(); LOG(ERROR) << e.what();
LOG(INFO) << "Command line options are the following: "; LOG(INFO) << "Command line options are the following: ";

View File

@ -21,7 +21,6 @@
#include <boost/asio.hpp> // for DDS #include <boost/asio.hpp> // for DDS
#include "FairMQLogger.h" #include "FairMQLogger.h"
#include "FairMQParser.h"
#include "FairMQProgOptions.h" #include "FairMQProgOptions.h"
#include "FairMQExample3Sampler.h" #include "FairMQExample3Sampler.h"
#include "FairMQTools.h" #include "FairMQTools.h"
@ -32,8 +31,10 @@
#include "FairMQTransportFactoryZMQ.h" #include "FairMQTransportFactoryZMQ.h"
#endif #endif
#include "KeyValue.h" // DDS #include "KeyValue.h" // DDS Key Value
#include "CustomCmd.h" // DDS Custom Commands
using namespace std;
using namespace boost::program_options; using namespace boost::program_options;
int main(int argc, char** argv) int main(int argc, char** argv)
@ -45,13 +46,11 @@ int main(int argc, char** argv)
try try
{ {
std::string text; // text to be sent for processing. string interfaceName; // name of the network interface to use for communication.
std::string interfaceName; // name of the network interface to use for communication.
options_description samplerOptions("Sampler options"); options_description samplerOptions("Sampler options");
samplerOptions.add_options() samplerOptions.add_options()
("text", value<std::string>(&text)->default_value("Hello"), "Text to send out") ("network-interface", value<string>(&interfaceName)->default_value("eth0"), "Name of the network interface to use (e.g. eth0, ib0, wlan0, en0...)");
("network-interface", value<std::string>(&interfaceName)->default_value("eth0"), "Name of the network interface to use (e.g. eth0, ib0, wlan0, en0...)");
config.AddToCmdLineOptions(samplerOptions); config.AddToCmdLineOptions(samplerOptions);
@ -60,12 +59,7 @@ int main(int argc, char** argv)
return 0; return 0;
} }
std::string filename = config.GetValue<std::string>("config-json-file"); string id = config.GetValue<string>("id");
std::string id = config.GetValue<std::string>("id");
config.UserParser<FairMQParser::JSON>(filename, id);
sampler.fChannels = config.GetFairMQMap();
LOG(INFO) << "PID: " << getpid(); LOG(INFO) << "PID: " << getpid();
@ -78,7 +72,11 @@ int main(int argc, char** argv)
sampler.SetTransport(transportFactory); sampler.SetTransport(transportFactory);
sampler.SetProperty(FairMQExample3Sampler::Id, id); sampler.SetProperty(FairMQExample3Sampler::Id, id);
sampler.SetProperty(FairMQExample3Sampler::Text, text);
// configure data output channel
FairMQChannel dataOutChannel("push", "bind", "");
dataOutChannel.UpdateRateLogging(0);
sampler.fChannels["data-out"].push_back(dataOutChannel);
// Get the IP of the current host and store it for binding. // Get the IP of the current host and store it for binding.
map<string,string> IPs; map<string,string> IPs;
@ -114,6 +112,24 @@ int main(int argc, char** argv)
sampler.ChangeState("INIT_TASK"); sampler.ChangeState("INIT_TASK");
sampler.WaitForEndOfState("INIT_TASK"); sampler.WaitForEndOfState("INIT_TASK");
dds::custom_cmd::CCustomCmd ddsCustomCmd;
// Subscribe on custom commands
ddsCustomCmd.subscribeCmd([&](const string& command, const string& condition, uint64_t senderId)
{
LOG(INFO) << "Received custom command: " << command << " condition: " << condition << " senderId: " << senderId;
if (command == "check-state")
{
ddsCustomCmd.sendCmd(id + ": " + sampler.GetCurrentStateName(), to_string(senderId));
}
else
{
LOG(WARN) << "Received unknown command: " << command;
LOG(WARN) << "Origin: " << senderId;
LOG(WARN) << "Destination: " << condition;
}
});
sampler.ChangeState("RUN"); sampler.ChangeState("RUN");
sampler.WaitForEndOfState("RUN"); sampler.WaitForEndOfState("RUN");
@ -125,7 +141,7 @@ int main(int argc, char** argv)
sampler.ChangeState("END"); sampler.ChangeState("END");
} }
catch (std::exception& e) catch (exception& e)
{ {
LOG(ERROR) << e.what(); LOG(ERROR) << e.what();
LOG(INFO) << "Command line options are the following: "; LOG(INFO) << "Command line options are the following: ";

View File

@ -21,7 +21,6 @@
#include <boost/asio.hpp> // for DDS #include <boost/asio.hpp> // for DDS
#include "FairMQLogger.h" #include "FairMQLogger.h"
#include "FairMQParser.h"
#include "FairMQProgOptions.h" #include "FairMQProgOptions.h"
#include "FairMQExample3Sink.h" #include "FairMQExample3Sink.h"
#include "FairMQTools.h" #include "FairMQTools.h"
@ -32,8 +31,10 @@
#include "FairMQTransportFactoryZMQ.h" #include "FairMQTransportFactoryZMQ.h"
#endif #endif
#include "KeyValue.h" // DDS #include "KeyValue.h" // DDS Key Value
#include "CustomCmd.h" // DDS Custom Commands
using namespace std;
using namespace boost::program_options; using namespace boost::program_options;
int main(int argc, char** argv) int main(int argc, char** argv)
@ -45,11 +46,11 @@ int main(int argc, char** argv)
try try
{ {
std::string interfaceName; // name of the network interface to use for communication. string interfaceName; // name of the network interface to use for communication.
options_description sinkOptions("Sink options"); options_description sinkOptions("Sink options");
sinkOptions.add_options() sinkOptions.add_options()
("network-interface", value<std::string>(&interfaceName)->default_value("eth0"), "Name of the network interface to use (e.g. eth0, ib0, wlan0, en0...)"); ("network-interface", value<string>(&interfaceName)->default_value("eth0"), "Name of the network interface to use (e.g. eth0, ib0, wlan0, en0...)");
config.AddToCmdLineOptions(sinkOptions); config.AddToCmdLineOptions(sinkOptions);
@ -58,12 +59,7 @@ int main(int argc, char** argv)
return 0; return 0;
} }
std::string filename = config.GetValue<std::string>("config-json-file"); string id = config.GetValue<string>("id");
std::string id = config.GetValue<std::string>("id");
config.UserParser<FairMQParser::JSON>(filename, id);
sink.fChannels = config.GetFairMQMap();
LOG(INFO) << "PID: " << getpid(); LOG(INFO) << "PID: " << getpid();
@ -77,6 +73,11 @@ int main(int argc, char** argv)
sink.SetProperty(FairMQExample3Sink::Id, id); sink.SetProperty(FairMQExample3Sink::Id, id);
// configure data output channel
FairMQChannel dataInChannel("pull", "bind", "");
dataInChannel.UpdateRateLogging(0);
sink.fChannels["data-in"].push_back(dataInChannel);
// Get the IP of the current host and store it for binding. // Get the IP of the current host and store it for binding.
map<string,string> IPs; map<string,string> IPs;
FairMQ::tools::getHostIPs(IPs); FairMQ::tools::getHostIPs(IPs);
@ -111,6 +112,24 @@ int main(int argc, char** argv)
sink.ChangeState("INIT_TASK"); sink.ChangeState("INIT_TASK");
sink.WaitForEndOfState("INIT_TASK"); sink.WaitForEndOfState("INIT_TASK");
dds::custom_cmd::CCustomCmd ddsCustomCmd;
// Subscribe on custom commands
ddsCustomCmd.subscribeCmd([&](const string& command, const string& condition, uint64_t senderId)
{
LOG(INFO) << "Received custom command: " << command << " condition: " << condition << " senderId: " << senderId;
if (command == "check-state")
{
ddsCustomCmd.sendCmd(id + ": " + sink.GetCurrentStateName(), to_string(senderId));
}
else
{
LOG(WARN) << "Received unknown command: " << command;
LOG(WARN) << "Origin: " << senderId;
LOG(WARN) << "Destination: " << condition;
}
});
sink.ChangeState("RUN"); sink.ChangeState("RUN");
sink.WaitForEndOfState("RUN"); sink.WaitForEndOfState("RUN");
@ -122,7 +141,7 @@ int main(int argc, char** argv)
sink.ChangeState("END"); sink.ChangeState("END");
} }
catch (std::exception& e) catch (exception& e)
{ {
LOG(ERROR) << e.what(); LOG(ERROR) << e.what();
LOG(INFO) << "Command line options are the following: "; LOG(INFO) << "Command line options are the following: ";