Rename /example to /examples and move MQ examples in it

This commit is contained in:
Alexey Rybalchenko
2015-11-09 10:36:24 +01:00
committed by Mohammad Al-Turany
parent 736d61830c
commit f0a878a43a
73 changed files with 4652 additions and 0 deletions

View File

@@ -0,0 +1,101 @@
################################################################################
# Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# #
# This software is distributed under the terms of the #
# GNU Lesser General Public Licence version 3 (LGPL) version 3, #
# copied verbatim in the file "LICENSE" #
################################################################################
configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/3-dds/ex3-devices.json ${CMAKE_BINARY_DIR}/bin/config/ex3-devices.json)
configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/3-dds/ex3-dds-topology.xml ${CMAKE_BINARY_DIR}/bin/config/ex3-dds-topology.xml)
configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/3-dds/ex3-dds-hosts.cfg ${CMAKE_BINARY_DIR}/bin/config/ex3-dds-hosts.cfg COPYONLY)
add_definitions(-DENABLE_DDS)
Set(INCLUDE_DIRECTORIES
${CMAKE_SOURCE_DIR}/fairmq
${CMAKE_SOURCE_DIR}/fairmq/devices
${CMAKE_SOURCE_DIR}/fairmq/tools
${CMAKE_SOURCE_DIR}/fairmq/options
${CMAKE_SOURCE_DIR}/examples/MQ/3-dds
${CMAKE_CURRENT_BINARY_DIR}
)
Set(SYSTEM_INCLUDE_DIRECTORIES
${SYSTEM_INCLUDE_DIRECTORIES}
${DDS_INCLUDE_DIR}
)
If(NANOMSG_FOUND)
Set(INCLUDE_DIRECTORIES
${INCLUDE_DIRECTORIES}
${CMAKE_SOURCE_DIR}/fairmq/nanomsg
)
Set(SYSTEM_INCLUDE_DIRECTORIES
${SYSTEM_INCLUDE_DIRECTORIES}
${NANOMSG_INCLUDE_DIR}
)
Else(NANOMSG_FOUND)
Set(INCLUDE_DIRECTORIES
${INCLUDE_DIRECTORIES}
${CMAKE_SOURCE_DIR}/fairmq/zeromq
)
Set(SYSTEM_INCLUDE_DIRECTORIES
${SYSTEM_INCLUDE_DIRECTORIES}
${ZMQ_INCLUDE_DIR}
)
EndIf(NANOMSG_FOUND)
Include_Directories(${INCLUDE_DIRECTORIES})
Include_Directories(SYSTEM ${SYSTEM_INCLUDE_DIRECTORIES})
Set(LINK_DIRECTORIES
${LINK_DIRECTORIES}
${Boost_LIBRARY_DIRS}
${DDS_LIBRARY_DIR}
)
Link_Directories(${LINK_DIRECTORIES})
Set(SRCS
${SRCS}
"FairMQExample3Sampler.cxx"
"FairMQExample3Processor.cxx"
"FairMQExample3Sink.cxx"
)
Set(DEPENDENCIES
${DEPENDENCIES}
FairMQ
dds-key-value-lib
)
set(LIBRARY_NAME FairMQExample3)
GENERATE_LIBRARY()
Set(Exe_Names
${Exe_Names}
ex3-sampler-dds
ex3-processor-dds
ex3-sink-dds
)
Set(Exe_Source
${Exe_Source}
runExample3Sampler.cxx
runExample3Processor.cxx
runExample3Sink.cxx
)
list(LENGTH Exe_Names _length)
math(EXPR _length ${_length}-1)
ForEach(_file RANGE 0 ${_length})
list(GET Exe_Names ${_file} _name)
list(GET Exe_Source ${_file} _src)
set(EXE_NAME ${_name})
set(SRCS ${_src})
set(DEPENDENCIES FairMQExample3)
GENERATE_EXECUTABLE()
EndForEach(_file RANGE 0 ${_length})

View File

@@ -0,0 +1,106 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQExample3Processor.cpp
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include "FairMQExample3Processor.h"
#include "FairMQLogger.h"
using namespace std;
FairMQExample3Processor::FairMQExample3Processor()
: fTaskIndex(0)
{
}
void FairMQExample3Processor::CustomCleanup(void *data, void *object)
{
delete (string*)object;
}
void FairMQExample3Processor::Run()
{
// Check if we are still in the RUNNING state
while (CheckCurrentState(RUNNING))
{
// Create empty message to hold the input
unique_ptr<FairMQMessage> input(fTransportFactory->CreateMessage());
// Receive the message (blocks until received or interrupted (e.g. by state change)).
// Returns size of the received message or -1 if interrupted.
if (fChannels.at("data-in").at(0).Receive(input) >= 0)
{
LOG(INFO) << "Received data, processing...";
// Modify the received string
string* text = new string(static_cast<char*>(input->GetData()), input->GetSize());
*text += " (modified by " + fId + to_string(fTaskIndex) + ")";
// Create output message
unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage(const_cast<char*>(text->c_str()), text->length(), CustomCleanup, text));
// Send out the output message
fChannels.at("data-out").at(0).Send(msg);
}
}
}
void FairMQExample3Processor::SetProperty(const int key, const string& value)
{
switch (key)
{
default:
FairMQDevice::SetProperty(key, value);
break;
}
}
string FairMQExample3Processor::GetProperty(const int key, const string& default_ /*= ""*/)
{
switch (key)
{
default:
return FairMQDevice::GetProperty(key, default_);
}
}
void FairMQExample3Processor::SetProperty(const int key, const int value)
{
switch (key)
{
case TaskIndex:
fTaskIndex = value;
break;
default:
FairMQDevice::SetProperty(key, value);
break;
}
}
int FairMQExample3Processor::GetProperty(const int key, const int default_ /*= 0*/)
{
switch (key)
{
case TaskIndex:
return fTaskIndex;
break;
default:
return FairMQDevice::GetProperty(key, default_);
}
}
FairMQExample3Processor::~FairMQExample3Processor()
{
}

View File

@@ -0,0 +1,47 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQExample3Processor.h
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#ifndef FAIRMQEXAMPLE3PROCESSOR_H_
#define FAIRMQEXAMPLE3PROCESSOR_H_
#include <string>
#include "FairMQDevice.h"
class FairMQExample3Processor : public FairMQDevice
{
public:
enum
{
Text = FairMQDevice::Last,
TaskIndex,
Last
};
FairMQExample3Processor();
virtual ~FairMQExample3Processor();
static void CustomCleanup(void* data, void* hint);
virtual void SetProperty(const int key, const std::string& value);
virtual std::string GetProperty(const int key, const std::string& default_ = "");
virtual void SetProperty(const int key, const int value);
virtual int GetProperty(const int key, const int default_ = 0);
protected:
int fTaskIndex;
virtual void Run();
};
#endif /* FAIRMQEXAMPLE3PROCESSOR_H_ */

View File

@@ -0,0 +1,96 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQExample3Sampler.cpp
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include "FairMQExample3Sampler.h"
#include "FairMQLogger.h"
using namespace std;
FairMQExample3Sampler::FairMQExample3Sampler()
: fText()
{
}
void FairMQExample3Sampler::CustomCleanup(void *data, void *object)
{
delete (string*)object;
}
void FairMQExample3Sampler::Run()
{
// Check if we are still in the RUNNING state
while (CheckCurrentState(RUNNING))
{
boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
string* text = new string(fText);
unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage(const_cast<char*>(text->c_str()), text->length(), CustomCleanup, text));
LOG(INFO) << "Sending \"" << fText << "\"";
fChannels.at("data-out").at(0).Send(msg);
}
}
FairMQExample3Sampler::~FairMQExample3Sampler()
{
}
void FairMQExample3Sampler::SetProperty(const int key, const string& value)
{
switch (key)
{
case Text:
fText = value;
break;
default:
FairMQDevice::SetProperty(key, value);
break;
}
}
string FairMQExample3Sampler::GetProperty(const int key, const string& default_ /*= ""*/)
{
switch (key)
{
case Text:
return fText;
break;
default:
return FairMQDevice::GetProperty(key, default_);
}
}
void FairMQExample3Sampler::SetProperty(const int key, const int value)
{
switch (key)
{
default:
FairMQDevice::SetProperty(key, value);
break;
}
}
int FairMQExample3Sampler::GetProperty(const int key, const int default_ /*= 0*/)
{
switch (key)
{
default:
return FairMQDevice::GetProperty(key, default_);
}
}

View File

@@ -0,0 +1,46 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQExample3Sampler.h
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#ifndef FAIRMQEXAMPLE3SAMPLER_H_
#define FAIRMQEXAMPLE3SAMPLER_H_
#include <string>
#include "FairMQDevice.h"
class FairMQExample3Sampler : public FairMQDevice
{
public:
enum
{
Text = FairMQDevice::Last,
Last
};
FairMQExample3Sampler();
virtual ~FairMQExample3Sampler();
static void CustomCleanup(void* data, void* hint);
virtual void SetProperty(const int key, const std::string& value);
virtual std::string GetProperty(const int key, const std::string& default_ = "");
virtual void SetProperty(const int key, const int value);
virtual int GetProperty(const int key, const int default_ = 0);
protected:
std::string fText;
virtual void Run();
};
#endif /* FAIRMQEXAMPLE3SAMPLER_H_ */

View File

@@ -0,0 +1,44 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQExample3Sink.cxx
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include "FairMQExample3Sink.h"
#include "FairMQLogger.h"
using namespace std;
FairMQExample3Sink::FairMQExample3Sink()
{
}
void FairMQExample3Sink::Run()
{
while (CheckCurrentState(RUNNING))
{
unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
if (fChannels.at("data-in").at(0).Receive(msg) >= 0)
{
LOG(INFO) << "Received message: \""
<< string(static_cast<char*>(msg->GetData()), msg->GetSize())
<< "\"";
}
}
}
FairMQExample3Sink::~FairMQExample3Sink()
{
}

View File

@@ -0,0 +1,30 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQExample3Sink.h
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#ifndef FAIRMQEXAMPLE3SINK_H_
#define FAIRMQEXAMPLE3SINK_H_
#include "FairMQDevice.h"
class FairMQExample3Sink : public FairMQDevice
{
public:
FairMQExample3Sink();
virtual ~FairMQExample3Sink();
protected:
virtual void Run();
};
#endif /* FAIRMQEXAMPLE3SINK_H_ */

127
examples/MQ/3-dds/README.md Normal file
View File

@@ -0,0 +1,127 @@
Example 3: DDS
===============
This example demonstrates usage of the Dynamic Deployment System ([DDS](http://dds.gsi.de/)) to dynamically deploy and configure a topology of devices. The topology is similar to those of Example 2, but now it can be easily distributed on different computing nodes without the need for manual reconfiguration of the devices.
This example is compiled only if the DDS is found by CMake. Custom DDS installation location can be given to CMake like this:
```bash
cmake -DDDS_PATH="/path/to/dds/install/dir/" ..
```
The description below outlines the minimal steps needed to run the example with DDS. For more details please refer to DDS documentation on [DDS Website](http://dds.gsi.de/).
##### 1. The devices that bind their sockets need to advertise their bound addresses to DDS by writing a property.
In our example Sampler and Sink bind their sockets. The bound addresses are available after the initial validation. The following code takes the address value and gives it to DDS:
```C++
sampler.ChangeState("INIT_DEVICE");
sampler.WaitForInitialValidation();
dds::key_value::CKeyValue ddsKeyValue;
ddsKeyValue.putValue("SamplerOutputAddress", sampler.fChannels.at("data-out").at(0).GetAddress());
sampler.WaitForEndOfState("INIT_DEVICE");
```
Same approach for the Sink.
##### 2. The devices that connect their sockets need to read the addresses from DDS.
The Processors in our example need the addresses of Sampler and Sink. They receive these from DDS via properties (sent in the step above):
```C++
dds::key_value::CKeyValue ddsKeyValue;
// Sampler properties
dds::key_value::CKeyValue::valuesMap_t samplerValues;
{
mutex keyMutex;
condition_variable keyCondition;
LOG(INFO) << "Subscribing and waiting for sampler output address.";
ddsKeyValue.subscribe([&keyCondition](const string& /*_key*/, const string& /*_value*/) { keyCondition.notify_all(); });
ddsKeyValue.getValues("SamplerOutputAddress", &samplerValues);
while (samplerValues.empty())
{
unique_lock<mutex> lock(keyMutex);
keyCondition.wait_until(lock, chrono::system_clock::now() + chrono::milliseconds(1000));
ddsKeyValue.getValues("SamplerOutputAddress", &samplerValues);
}
}
// Sink properties
// ... same as above, but for sinkValues ...
processor.fChannels.at("data-in").at(0).UpdateAddress(samplerValues.begin()->second);
processor.fChannels.at("data-out").at(0).UpdateAddress(sinkValues.begin()->second);
```
After this step each device will have the necessary connection information.
##### 3. Write DDS hosts file that contains a list of worker nodes to run the topology on (When deploying using the SSH plug-in).
We run this example on the local machine for simplicity. The file below defines one worker `wn0` with 12 DDS Agents (thus able to accept 12 tasks). The parameters for each worker node are:
- user-chosen worker ID (must be unique)
- a host name with or without a login, in a form: login@host.fqdn (password-less SSH access to these hosts must be possible)
- additional SSH params (can be empty)
- a remote working directory (most exist on the worker nodes)
- number of DDS Agents for this worker
```bash
@bash_begin@
echo "DBG: SSH ENV Script"
#source setup.sh
@bash_end@
wn0, username@localhost, , /tmp/, 12
```
##### 4. Write DDS topology file that describes which tasks (processes) to run and their topology and configuration.
Take a look at `ex3-dds-topology.xml`. It consists of a definition part (properties, tasks, collections and more) and execution part (main). In our example Sampler, Processor and Sink tasks are defines, containing their executables and exchanged properties. The `<main>` of the topology uses the defined tasks. Besides one Sampler and one Sink task, a group containing Processor task is defined. The group has a multiplicity of 10, meaninig 10 Processors will be executed. Each of the Processors will receive the properties with Sampler and Sink addresses.
##### 5. Start DDS server.
The DDS server is started with:
```bash
dds-server start -s
```
##### 6. Submit DDS Agents (configured in the hosts file).
Agents are submitted with:
```bash
dds-submit --rms ssh --ssh-rms-cfg ex3-dds-hosts.cfg
```
The `--rms` option defines a destination resource management system. The `--ssh-rms-cfg` specifies an SSH plug-in resource definition file.
##### 7. Set the topology file.
Point DDS to the topology file:
```bash
dds-topology --set ex3-dds-topology.xml
```
##### 8. Activate the topology.
```bash
dds-topology --activate
```
##### 9. Run
After activation, agents will execute the defined tasks on the worker nodes. Output of the tasks will be stored in the directory that was specified in the hosts file.
##### 10. Stop DDS server/topology.
The execution of tasks can be stopped with:
```bash
dds-topology --stop
```
Or by stopping the DDS server:
```bash
dds-server stop
```
For a more complete DDS documentation please refer to [DDS Website](http://dds.gsi.de/).

View File

@@ -0,0 +1,6 @@
@bash_begin@
echo "DBG: SSH ENV Script"
#source setup.sh
@bash_end@
wn0, username@localhost, , /tmp/, 12

View File

@@ -0,0 +1,36 @@
<topology id="ExampleDDS">
<property id="SamplerOutputAddress" />
<property id="SinkInputAddress" />
<decltask id="Sampler">
<exe reachable="true">@CMAKE_BINARY_DIR@/bin/ex3-sampler-dds --id sampler --config-json-file @CMAKE_BINARY_DIR@/bin/config/ex3-devices.json</exe>
<properties>
<id access="write">SamplerOutputAddress</id>
</properties>
</decltask>
<decltask id="Processor">
<exe reachable="true">@CMAKE_BINARY_DIR@/bin/ex3-processor-dds --id processor --index %taskIndex% --config-json-file @CMAKE_BINARY_DIR@/bin/config/ex3-devices.json</exe>
<properties>
<id access="read">SamplerOutputAddress</id>
<id access="read">SinkInputAddress</id>
</properties>
</decltask>
<decltask id="Sink">
<exe reachable="true">@CMAKE_BINARY_DIR@/bin/ex3-sink-dds --id sink --config-json-file @CMAKE_BINARY_DIR@/bin/config/ex3-devices.json</exe>
<properties>
<id access="write">SinkInputAddress</id>
</properties>
</decltask>
<main id="main">
<task>Sampler</task>
<task>Sink</task>
<group id="ProcessorGroup" n="10">
<task>Processor</task>
</group>
</main>
</topology>

View File

@@ -0,0 +1,59 @@
{
"fairMQOptions":
{
"device":
{
"id": "sampler",
"channel":
{
"name": "data-out",
"socket":
{
"type": "push",
"method": "bind",
"address": ""
}
}
},
"device":
{
"id": "processor",
"channel":
{
"name": "data-in",
"socket":
{
"type": "pull",
"method": "connect",
"address": ""
}
},
"channel":
{
"name": "data-out",
"socket":
{
"type": "push",
"method": "connect",
"address": ""
}
}
},
"device":
{
"id": "sink",
"channel":
{
"name": "data-in",
"socket":
{
"type": "pull",
"method": "bind",
"address": ""
}
}
}
}
}

View File

@@ -0,0 +1,144 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* runExample2Processor.cxx
*
* @since 2013-04-23
* @author D. Klein, A. Rybalchenko
*/
#include <iostream>
#include <mutex>
#include <condition_variable>
#include "boost/program_options.hpp"
#include <boost/asio.hpp> // for DDS
#include "FairMQLogger.h"
#include "FairMQParser.h"
#include "FairMQProgOptions.h"
#include "FairMQExample3Processor.h"
#include "FairMQTools.h"
#ifdef NANOMSG
#include "FairMQTransportFactoryNN.h"
#else
#include "FairMQTransportFactoryZMQ.h"
#endif
#include "KeyValue.h" // DDS
using namespace boost::program_options;
int main(int argc, char** argv)
{
FairMQExample3Processor processor;
processor.CatchSignals();
FairMQProgOptions config;
try
{
int ddsTaskIndex = 0;
options_description processorOptions("Processor options");
processorOptions.add_options()
("index", value<int>(&ddsTaskIndex)->default_value(0), "DDS task index");
config.AddToCmdLineOptions(processorOptions);
if (config.ParseAll(argc, argv))
{
return 0;
}
std::string filename = config.GetValue<std::string>("config-json-file");
std::string id = config.GetValue<std::string>("id");
config.UserParser<FairMQParser::JSON>(filename, id);
processor.fChannels = config.GetFairMQMap();
LOG(INFO) << "PID: " << getpid();
// Waiting for DDS properties
dds::key_value::CKeyValue ddsKeyValue;
// Sampler properties
dds::key_value::CKeyValue::valuesMap_t samplerValues;
{
mutex keyMutex;
condition_variable keyCondition;
LOG(INFO) << "Subscribing and waiting for sampler output address.";
ddsKeyValue.subscribe([&keyCondition](const string& /*_key*/, const string& /*_value*/) { keyCondition.notify_all(); });
ddsKeyValue.getValues("SamplerOutputAddress", &samplerValues);
while (samplerValues.empty())
{
unique_lock<mutex> lock(keyMutex);
keyCondition.wait_until(lock, chrono::system_clock::now() + chrono::milliseconds(1000));
ddsKeyValue.getValues("SamplerOutputAddress", &samplerValues);
}
}
// Sink properties
dds::key_value::CKeyValue::valuesMap_t sinkValues;
{
mutex keyMutex;
condition_variable keyCondition;
LOG(INFO) << "Subscribing and waiting for sink input address.";
ddsKeyValue.subscribe([&keyCondition](const string& /*_key*/, const string& /*_value*/) { keyCondition.notify_all(); });
ddsKeyValue.getValues("SinkInputAddress", &sinkValues);
while (sinkValues.empty())
{
unique_lock<mutex> lock(keyMutex);
keyCondition.wait_until(lock, chrono::system_clock::now() + chrono::milliseconds(1000));
ddsKeyValue.getValues("SinkInputAddress", &sinkValues);
}
}
processor.fChannels.at("data-in").at(0).UpdateAddress(samplerValues.begin()->second);
processor.fChannels.at("data-out").at(0).UpdateAddress(sinkValues.begin()->second);
#ifdef NANOMSG
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
#else
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
#endif
processor.SetTransport(transportFactory);
processor.SetProperty(FairMQExample3Processor::Id, id);
processor.SetProperty(FairMQExample3Processor::TaskIndex, ddsTaskIndex);
processor.ChangeState("INIT_DEVICE");
processor.WaitForEndOfState("INIT_DEVICE");
processor.ChangeState("INIT_TASK");
processor.WaitForEndOfState("INIT_TASK");
processor.ChangeState("RUN");
processor.WaitForEndOfState("RUN");
processor.ChangeState("RESET_TASK");
processor.WaitForEndOfState("RESET_TASK");
processor.ChangeState("RESET_DEVICE");
processor.WaitForEndOfState("RESET_DEVICE");
processor.ChangeState("END");
}
catch (std::exception& e)
{
LOG(ERROR) << e.what();
LOG(INFO) << "Command line options are the following: ";
config.PrintHelp();
return 1;
}
return 0;
}

View File

@@ -0,0 +1,137 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* runExample2Sampler.cxx
*
* @since 2013-04-23
* @author D. Klein, A. Rybalchenko
*/
#include <iostream>
#include <mutex>
#include <condition_variable>
#include <map>
#include "boost/program_options.hpp"
#include <boost/asio.hpp> // for DDS
#include "FairMQLogger.h"
#include "FairMQParser.h"
#include "FairMQProgOptions.h"
#include "FairMQExample3Sampler.h"
#include "FairMQTools.h"
#ifdef NANOMSG
#include "FairMQTransportFactoryNN.h"
#else
#include "FairMQTransportFactoryZMQ.h"
#endif
#include "KeyValue.h" // DDS
using namespace boost::program_options;
int main(int argc, char** argv)
{
FairMQExample3Sampler sampler;
sampler.CatchSignals();
FairMQProgOptions config;
try
{
std::string text; // text to be sent for processing.
std::string interfaceName; // name of the network interface to use for communication.
options_description samplerOptions("Sampler options");
samplerOptions.add_options()
("text", value<std::string>(&text)->default_value("Hello"), "Text to send out")
("network-interface", value<std::string>(&interfaceName)->default_value("eth0"), "Name of the network interface to use (e.g. eth0, ib0, wlan0, en0...)");
config.AddToCmdLineOptions(samplerOptions);
if (config.ParseAll(argc, argv))
{
return 0;
}
std::string filename = config.GetValue<std::string>("config-json-file");
std::string id = config.GetValue<std::string>("id");
config.UserParser<FairMQParser::JSON>(filename, id);
sampler.fChannels = config.GetFairMQMap();
LOG(INFO) << "PID: " << getpid();
#ifdef NANOMSG
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
#else
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
#endif
sampler.SetTransport(transportFactory);
sampler.SetProperty(FairMQExample3Sampler::Id, id);
sampler.SetProperty(FairMQExample3Sampler::Text, text);
// Get the IP of the current host and store it for binding.
map<string,string> IPs;
FairMQ::tools::getHostIPs(IPs);
stringstream ss;
// Check if ib0 (infiniband) interface is available, otherwise try eth0 or wlan0.
if (IPs.count(interfaceName))
{
ss << "tcp://" << IPs[interfaceName] << ":1";
}
else
{
LOG(INFO) << ss.str();
LOG(ERROR) << "Could not find provided network interface: \"" << interfaceName << "\"!, exiting.";
exit(EXIT_FAILURE);
}
string initialOutputAddress = ss.str();
// Configure the found host IP for the channel.
// TCP port will be chosen randomly during the initialization (binding).
sampler.fChannels.at("data-out").at(0).UpdateAddress(initialOutputAddress);
sampler.ChangeState("INIT_DEVICE");
sampler.WaitForInitialValidation();
// Advertise the bound addresses via DDS property
LOG(INFO) << "Giving sampler output address to DDS.";
dds::key_value::CKeyValue ddsKeyValue;
ddsKeyValue.putValue("SamplerOutputAddress", sampler.fChannels.at("data-out").at(0).GetAddress());
sampler.WaitForEndOfState("INIT_DEVICE");
sampler.ChangeState("INIT_TASK");
sampler.WaitForEndOfState("INIT_TASK");
sampler.ChangeState("RUN");
sampler.WaitForEndOfState("RUN");
sampler.ChangeState("RESET_TASK");
sampler.WaitForEndOfState("RESET_TASK");
sampler.ChangeState("RESET_DEVICE");
sampler.WaitForEndOfState("RESET_DEVICE");
sampler.ChangeState("END");
}
catch (std::exception& e)
{
LOG(ERROR) << e.what();
LOG(INFO) << "Command line options are the following: ";
config.PrintHelp();
return 1;
}
return 0;
}

View File

@@ -0,0 +1,134 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* runExample2Sink.cxx
*
* @since 2013-04-23
* @author D. Klein, A. Rybalchenko
*/
#include <iostream>
#include <mutex>
#include <condition_variable>
#include <map>
#include "boost/program_options.hpp"
#include <boost/asio.hpp> // for DDS
#include "FairMQLogger.h"
#include "FairMQParser.h"
#include "FairMQProgOptions.h"
#include "FairMQExample3Sink.h"
#include "FairMQTools.h"
#ifdef NANOMSG
#include "FairMQTransportFactoryNN.h"
#else
#include "FairMQTransportFactoryZMQ.h"
#endif
#include "KeyValue.h" // DDS
using namespace boost::program_options;
int main(int argc, char** argv)
{
FairMQExample3Sink sink;
sink.CatchSignals();
FairMQProgOptions config;
try
{
std::string interfaceName; // name of the network interface to use for communication.
options_description sinkOptions("Sink options");
sinkOptions.add_options()
("network-interface", value<std::string>(&interfaceName)->default_value("eth0"), "Name of the network interface to use (e.g. eth0, ib0, wlan0, en0...)");
config.AddToCmdLineOptions(sinkOptions);
if (config.ParseAll(argc, argv))
{
return 0;
}
std::string filename = config.GetValue<std::string>("config-json-file");
std::string id = config.GetValue<std::string>("id");
config.UserParser<FairMQParser::JSON>(filename, id);
sink.fChannels = config.GetFairMQMap();
LOG(INFO) << "PID: " << getpid();
#ifdef NANOMSG
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
#else
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
#endif
sink.SetTransport(transportFactory);
sink.SetProperty(FairMQExample3Sink::Id, id);
// Get the IP of the current host and store it for binding.
map<string,string> IPs;
FairMQ::tools::getHostIPs(IPs);
stringstream ss;
// Check if ib0 (infiniband) interface is available, otherwise try eth0 or wlan0.
if (IPs.count(interfaceName))
{
ss << "tcp://" << IPs[interfaceName] << ":1";
}
else
{
LOG(INFO) << ss.str();
LOG(ERROR) << "Could not find provided network interface: \"" << interfaceName << "\"!, exiting.";
exit(EXIT_FAILURE);
}
string initialInputAddress = ss.str();
// Configure the found host IP for the channel.
// TCP port will be chosen randomly during the initialization (binding).
sink.fChannels.at("data-in").at(0).UpdateAddress(initialInputAddress);
sink.ChangeState("INIT_DEVICE");
sink.WaitForInitialValidation();
// Advertise the bound address via DDS property
LOG(INFO) << "Giving sink input address to DDS.";
dds::key_value::CKeyValue ddsKeyValue;
ddsKeyValue.putValue("SinkInputAddress", sink.fChannels.at("data-in").at(0).GetAddress());
sink.WaitForEndOfState("INIT_DEVICE");
sink.ChangeState("INIT_TASK");
sink.WaitForEndOfState("INIT_TASK");
sink.ChangeState("RUN");
sink.WaitForEndOfState("RUN");
sink.ChangeState("RESET_TASK");
sink.WaitForEndOfState("RESET_TASK");
sink.ChangeState("RESET_DEVICE");
sink.WaitForEndOfState("RESET_DEVICE");
sink.ChangeState("END");
}
catch (std::exception& e)
{
LOG(ERROR) << e.what();
LOG(INFO) << "Command line options are the following: ";
config.PrintHelp();
return 1;
}
return 0;
}