mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 08:41:16 +00:00
Add DDS and Copy+Push examples.
This commit is contained in:
parent
96cd2afac7
commit
105e734808
|
@ -7,9 +7,15 @@
|
|||
################################################################################
|
||||
|
||||
configure_file(${CMAKE_SOURCE_DIR}/fairmq/options/ProgOptionTest/macro/bsampler-sink.json ${CMAKE_BINARY_DIR}/bin/config/bsampler-sink.json)
|
||||
configure_file(${CMAKE_SOURCE_DIR}/fairmq/examples/1-sampler-sink/sampler-sink.json ${CMAKE_BINARY_DIR}/bin/config/ex1-sampler-sink.json)
|
||||
configure_file(${CMAKE_SOURCE_DIR}/fairmq/examples/2-sampler-processor-sink/sampler-processor-sink.json ${CMAKE_BINARY_DIR}/bin/config/ex2-sampler-processor-sink.json)
|
||||
configure_file(${CMAKE_SOURCE_DIR}/fairmq/examples/1-sampler-sink/ex1-sampler-sink.json ${CMAKE_BINARY_DIR}/bin/config/ex1-sampler-sink.json)
|
||||
configure_file(${CMAKE_SOURCE_DIR}/fairmq/examples/2-sampler-processor-sink/ex2-sampler-processor-sink.json ${CMAKE_BINARY_DIR}/bin/config/ex2-sampler-processor-sink.json)
|
||||
configure_file(${CMAKE_SOURCE_DIR}/fairmq/examples/3-dds/ex3-devices.json ${CMAKE_BINARY_DIR}/bin/config/ex3-devices.json)
|
||||
configure_file(${CMAKE_SOURCE_DIR}/fairmq/examples/3-dds/ex3-dds-topology.xml ${CMAKE_BINARY_DIR}/bin/config/ex3-dds-topology.xml)
|
||||
configure_file(${CMAKE_SOURCE_DIR}/fairmq/examples/3-dds/ex3-dds-hosts.cfg ${CMAKE_BINARY_DIR}/bin/config/ex3-dds-hosts.cfg COPYONLY)
|
||||
configure_file(${CMAKE_SOURCE_DIR}/fairmq/examples/4-copypush/ex4-copypush.json ${CMAKE_BINARY_DIR}/bin/config/ex4-copypush.json)
|
||||
|
||||
add_subdirectory(logger)
|
||||
|
||||
Set(INCLUDE_DIRECTORIES
|
||||
${CMAKE_SOURCE_DIR}/fairmq
|
||||
${CMAKE_SOURCE_DIR}/fairmq/devices
|
||||
|
@ -18,14 +24,29 @@ Set(INCLUDE_DIRECTORIES
|
|||
${CMAKE_SOURCE_DIR}/fairmq/logger
|
||||
${CMAKE_SOURCE_DIR}/fairmq/examples/1-sampler-sink
|
||||
${CMAKE_SOURCE_DIR}/fairmq/examples/2-sampler-processor-sink
|
||||
${CMAKE_SOURCE_DIR}/fairmq/examples/req-rep
|
||||
${CMAKE_SOURCE_DIR}/fairmq/examples/4-copypush
|
||||
${CMAKE_SOURCE_DIR}/fairmq/examples/5-req-rep
|
||||
${CMAKE_CURRENT_BINARY_DIR}
|
||||
)
|
||||
|
||||
if(DDS_PATH)
|
||||
Set(INCLUDE_DIRECTORIES
|
||||
${INCLUDE_DIRECTORIES}
|
||||
${CMAKE_SOURCE_DIR}/fairmq/examples/3-dds
|
||||
)
|
||||
endif(DDS_PATH)
|
||||
|
||||
Set(SYSTEM_INCLUDE_DIRECTORIES
|
||||
${Boost_INCLUDE_DIR}
|
||||
)
|
||||
|
||||
If(DDS_PATH)
|
||||
Set(SYSTEM_INCLUDE_DIRECTORIES
|
||||
${SYSTEM_INCLUDE_DIRECTORIES}
|
||||
${DDS_PATH}/include
|
||||
)
|
||||
EndIf(DDS_PATH)
|
||||
|
||||
If(PROTOBUF_FOUND)
|
||||
Set(INCLUDE_DIRECTORIES
|
||||
${INCLUDE_DIRECTORIES}
|
||||
|
@ -65,6 +86,13 @@ Set(LINK_DIRECTORIES
|
|||
${Boost_LIBRARY_DIRS}
|
||||
)
|
||||
|
||||
if(DDS_PATH)
|
||||
set(LINK_DIRECTORIES
|
||||
${LINK_DIRECTORIES}
|
||||
${DDS_PATH}/lib
|
||||
)
|
||||
endif(DDS_PATH)
|
||||
|
||||
Link_Directories(${LINK_DIRECTORIES})
|
||||
|
||||
set(SRCS
|
||||
|
@ -96,10 +124,26 @@ set(SRCS
|
|||
"examples/2-sampler-processor-sink/FairMQExample2Processor.cxx"
|
||||
"examples/2-sampler-processor-sink/FairMQExample2Sink.cxx"
|
||||
|
||||
"examples/req-rep/FairMQExampleClient.cxx"
|
||||
"examples/req-rep/FairMQExampleServer.cxx"
|
||||
"examples/4-copypush/FairMQExample4Sampler.cxx"
|
||||
"examples/4-copypush/FairMQExample4Sink.cxx"
|
||||
|
||||
"examples/5-req-rep/FairMQExample5Client.cxx"
|
||||
"examples/5-req-rep/FairMQExample5Server.cxx"
|
||||
)
|
||||
|
||||
if(DDS_PATH)
|
||||
set(SRCS
|
||||
${SRCS}
|
||||
"examples/3-dds/FairMQExample3Sampler.cxx"
|
||||
"examples/3-dds/FairMQExample3Processor.cxx"
|
||||
"examples/3-dds/FairMQExample3Sink.cxx"
|
||||
)
|
||||
set(DEPENDENCIES
|
||||
${DEPENDENCIES}
|
||||
dds-key-value-lib
|
||||
)
|
||||
endif(DDS_PATH)
|
||||
|
||||
if(PROTOBUF_FOUND)
|
||||
# following source files are only for protobuf tests and are not essential part of FairMQ
|
||||
# add_custom_command(
|
||||
|
@ -152,7 +196,6 @@ else(NANOMSG_FOUND)
|
|||
)
|
||||
endif(NANOMSG_FOUND)
|
||||
|
||||
|
||||
# to copy src that are header-only files (e.g. c++ template) for FairRoot external installation
|
||||
# manual install (globbing add not recommended)
|
||||
Set(FAIRMQHEADERS
|
||||
|
@ -165,8 +208,16 @@ Set(FAIRMQHEADERS
|
|||
install(FILES ${FAIRMQHEADERS} DESTINATION include)
|
||||
|
||||
set(DEPENDENCIES
|
||||
${DEPENDENCIES} fairmq_logger
|
||||
boost_thread boost_timer boost_system boost_filesystem boost_program_options boost_random boost_chrono boost_exception
|
||||
${DEPENDENCIES}
|
||||
boost_thread
|
||||
fairmq_logger
|
||||
boost_timer
|
||||
boost_system
|
||||
boost_filesystem
|
||||
boost_program_options
|
||||
boost_random
|
||||
boost_chrono
|
||||
boost_exception
|
||||
)
|
||||
|
||||
set(LIBRARY_NAME FairMQ)
|
||||
|
@ -185,10 +236,21 @@ set(Exe_Names
|
|||
ex2-sampler
|
||||
ex2-processor
|
||||
ex2-sink
|
||||
example-client
|
||||
example-server
|
||||
ex4-sampler
|
||||
ex4-sink
|
||||
ex5-client
|
||||
ex5-server
|
||||
)
|
||||
|
||||
if(DDS_PATH)
|
||||
set(Exe_Names
|
||||
${Exe_Names}
|
||||
ex3-sampler-dds
|
||||
ex3-processor-dds
|
||||
ex3-sink-dds
|
||||
)
|
||||
endif(DDS_PATH)
|
||||
|
||||
# following executables are only for protobuf tests and are not essential part of FairMQ
|
||||
# if(PROTOBUF_FOUND)
|
||||
# set(Exe_Names
|
||||
|
@ -212,10 +274,21 @@ set(Exe_Source
|
|||
examples/2-sampler-processor-sink/runExample2Sampler.cxx
|
||||
examples/2-sampler-processor-sink/runExample2Processor.cxx
|
||||
examples/2-sampler-processor-sink/runExample2Sink.cxx
|
||||
examples/req-rep/runExampleClient.cxx
|
||||
examples/req-rep/runExampleServer.cxx
|
||||
examples/4-copypush/runExample4Sampler.cxx
|
||||
examples/4-copypush/runExample4Sink.cxx
|
||||
examples/5-req-rep/runExample5Client.cxx
|
||||
examples/5-req-rep/runExample5Server.cxx
|
||||
)
|
||||
|
||||
if(DDS_PATH)
|
||||
set(Exe_Source
|
||||
${Exe_Source}
|
||||
examples/3-dds/runExample3Sampler.cxx
|
||||
examples/3-dds/runExample3Processor.cxx
|
||||
examples/3-dds/runExample3Sink.cxx
|
||||
)
|
||||
endif(DDS_PATH)
|
||||
|
||||
# following source files are only for protobuf tests and are not essential part of FairMQ
|
||||
# if(PROTOBUF_FOUND)
|
||||
# set(Exe_Source
|
||||
|
|
|
@ -32,7 +32,7 @@ using namespace std;
|
|||
boost::function<void(int)> sigHandler;
|
||||
static void CallSignalHandler(int signal)
|
||||
{
|
||||
sigHandler(signal);
|
||||
sigHandler(signal);
|
||||
}
|
||||
|
||||
FairMQDevice::FairMQDevice()
|
||||
|
|
|
@ -65,7 +65,7 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable
|
|||
|
||||
virtual ~FairMQDevice();
|
||||
|
||||
std::unordered_map< std::string,std::vector<FairMQChannel> > fChannels;
|
||||
std::unordered_map<std::string, std::vector<FairMQChannel>> fChannels;
|
||||
|
||||
protected:
|
||||
std::string fId;
|
||||
|
|
|
@ -26,17 +26,18 @@ FairMQExample1Sink::FairMQExample1Sink()
|
|||
|
||||
void FairMQExample1Sink::Run()
|
||||
{
|
||||
while (GetCurrentState() == RUNNING)
|
||||
while (CheckCurrentState(RUNNING))
|
||||
{
|
||||
FairMQMessage* msg = fTransportFactory->CreateMessage();
|
||||
|
||||
fChannels.at("data-in").at(0).Receive(msg);
|
||||
if (fChannels.at("data-in").at(0).Receive(msg) > 0)
|
||||
{
|
||||
LOG(INFO) << "Received message: \""
|
||||
<< string(static_cast<char*>(msg->GetData()), msg->GetSize())
|
||||
<< "\"";
|
||||
|
||||
LOG(INFO) << "Received message: \""
|
||||
<< string(static_cast<char*>(msg->GetData()), msg->GetSize())
|
||||
<< "\"";
|
||||
|
||||
delete msg;
|
||||
delete msg;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
8
fairmq/examples/1-sampler-sink/README.md
Normal file
8
fairmq/examples/1-sampler-sink/README.md
Normal file
|
@ -0,0 +1,8 @@
|
|||
Example 1: Sampler -> Sink
|
||||
===============
|
||||
|
||||
A simple topology of two devices - **Sampler** and **Sink**. **Sampler** sends data to **Sink** via the **PUSH-PULL** pattern.
|
||||
|
||||
`runExample1Sampler.cxx` and `runExample1Sink.cxx` configure and run the devices in their main function.
|
||||
|
||||
The executables take two required command line parameters: `--id` and `--config-json-file`. The value of `--id` should be a unique identifier and the value for `-config-json-file` a path to a config file. The config file for this example is `ex1-sampler-sink.json` and it contains configuration for the communication channels of the devices. The mapping between a specific device and the configuration (which can contain multiple devices) is done based on the **id**.
|
|
@ -6,7 +6,7 @@
|
|||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
/**
|
||||
* runExampleSampler.cxx
|
||||
* runExample1Sampler.cxx
|
||||
*
|
||||
* @since 2013-04-23
|
||||
* @author D. Klein, A. Rybalchenko
|
||||
|
|
|
@ -6,7 +6,7 @@
|
|||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
/**
|
||||
* runExampleSink.cxx
|
||||
* runExample1Sink.cxx
|
||||
*
|
||||
* @since 2013-04-23
|
||||
* @author D. Klein, A. Rybalchenko
|
||||
|
|
|
@ -30,7 +30,7 @@ void FairMQExample2Processor::CustomCleanup(void *data, void *object)
|
|||
|
||||
void FairMQExample2Processor::Run()
|
||||
{
|
||||
while (GetCurrentState() == RUNNING)
|
||||
while (CheckCurrentState(RUNNING))
|
||||
{
|
||||
FairMQMessage* input = fTransportFactory->CreateMessage();
|
||||
fChannels.at("data-in").at(0).Receive(input);
|
||||
|
|
|
@ -30,7 +30,7 @@ void FairMQExample2Sampler::CustomCleanup(void *data, void *object)
|
|||
|
||||
void FairMQExample2Sampler::Run()
|
||||
{
|
||||
while (GetCurrentState() == RUNNING)
|
||||
while (CheckCurrentState(RUNNING))
|
||||
{
|
||||
boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
|
||||
|
||||
|
|
|
@ -26,7 +26,7 @@ FairMQExample2Sink::FairMQExample2Sink()
|
|||
|
||||
void FairMQExample2Sink::Run()
|
||||
{
|
||||
while (GetCurrentState() == RUNNING)
|
||||
while (CheckCurrentState(RUNNING))
|
||||
{
|
||||
FairMQMessage* msg = fTransportFactory->CreateMessage();
|
||||
|
||||
|
|
10
fairmq/examples/2-sampler-processor-sink/README.md
Normal file
10
fairmq/examples/2-sampler-processor-sink/README.md
Normal file
|
@ -0,0 +1,10 @@
|
|||
Example 2: Sampler -> Processor -> Sink
|
||||
===============
|
||||
|
||||
A simple topology of three devices - **Sampler**, **Processor** and **Sink**. **Sampler** sends data to one or more **Processor**s, who modify the data and send it to one **Sink**. Transport with the **PUSH-PULL** pattern.
|
||||
|
||||
In this example the Sampler is configured to **bind** its output and the Sink is configured to also **bind** its input. This allows us run any number of processors with the same configuration, because they all connect to same Sampler and Sink addresses. Furthermore, it allows adding of processors dynamically during run-time. The PUSH and PULL sockets will handle the data distribution to/from the new devices according to their distribution strategies ([Round-robin output for PUSH](http://api.zeromq.org/4-0:zmq-socket#toc14) and [Fair-queued input for PULL](http://api.zeromq.org/4-0:zmq-socket#toc15)).
|
||||
|
||||
The Sampler sends out a simple text string (its content configurable with `--text` command line parameter, defaul is "Hello"). Each Processor modifies the string by appending its ID to it and send it to the Sink.
|
||||
|
||||
The provided configuration file contains two Processors. To add more Processors, you can either extend the configuration file, or create a separate file only for new processors.
|
96
fairmq/examples/3-dds/FairMQExample3Processor.cxx
Normal file
96
fairmq/examples/3-dds/FairMQExample3Processor.cxx
Normal file
|
@ -0,0 +1,96 @@
|
|||
/********************************************************************************
|
||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
/**
|
||||
* FairMQExample3Processor.cpp
|
||||
*
|
||||
* @since 2014-10-10
|
||||
* @author A. Rybalchenko
|
||||
*/
|
||||
|
||||
#include <boost/thread.hpp>
|
||||
#include <boost/bind.hpp>
|
||||
|
||||
#include "FairMQExample3Processor.h"
|
||||
#include "FairMQLogger.h"
|
||||
|
||||
FairMQExample3Processor::FairMQExample3Processor()
|
||||
: fTaskIndex(0)
|
||||
{
|
||||
}
|
||||
|
||||
void FairMQExample3Processor::CustomCleanup(void *data, void *object)
|
||||
{
|
||||
delete (std::string*)object;
|
||||
}
|
||||
|
||||
void FairMQExample3Processor::Run()
|
||||
{
|
||||
while (CheckCurrentState(RUNNING))
|
||||
{
|
||||
FairMQMessage* input = fTransportFactory->CreateMessage();
|
||||
fChannels.at("data-in").at(0).Receive(input);
|
||||
|
||||
LOG(INFO) << "Received data, processing...";
|
||||
|
||||
std::string* text = new std::string(static_cast<char*>(input->GetData()), input->GetSize());
|
||||
*text += " (modified by " + fId + std::to_string(fTaskIndex) + ")";
|
||||
|
||||
delete input;
|
||||
|
||||
FairMQMessage* msg = fTransportFactory->CreateMessage(const_cast<char*>(text->c_str()), text->length(), CustomCleanup, text);
|
||||
fChannels.at("data-out").at(0).Send(msg);
|
||||
}
|
||||
}
|
||||
|
||||
void FairMQExample3Processor::SetProperty(const int key, const std::string& value)
|
||||
{
|
||||
switch (key)
|
||||
{
|
||||
default:
|
||||
FairMQDevice::SetProperty(key, value);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
std::string FairMQExample3Processor::GetProperty(const int key, const std::string& default_ /*= ""*/)
|
||||
{
|
||||
switch (key)
|
||||
{
|
||||
default:
|
||||
return FairMQDevice::GetProperty(key, default_);
|
||||
}
|
||||
}
|
||||
|
||||
void FairMQExample3Processor::SetProperty(const int key, const int value)
|
||||
{
|
||||
switch (key)
|
||||
{
|
||||
case TaskIndex:
|
||||
fTaskIndex = value;
|
||||
break;
|
||||
default:
|
||||
FairMQDevice::SetProperty(key, value);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
int FairMQExample3Processor::GetProperty(const int key, const int default_ /*= 0*/)
|
||||
{
|
||||
switch (key)
|
||||
{
|
||||
case TaskIndex:
|
||||
return fTaskIndex;
|
||||
break;
|
||||
default:
|
||||
return FairMQDevice::GetProperty(key, default_);
|
||||
}
|
||||
}
|
||||
|
||||
FairMQExample3Processor::~FairMQExample3Processor()
|
||||
{
|
||||
}
|
47
fairmq/examples/3-dds/FairMQExample3Processor.h
Normal file
47
fairmq/examples/3-dds/FairMQExample3Processor.h
Normal file
|
@ -0,0 +1,47 @@
|
|||
/********************************************************************************
|
||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
/**
|
||||
* FairMQExample3Processor.h
|
||||
*
|
||||
* @since 2014-10-10
|
||||
* @author A. Rybalchenko
|
||||
*/
|
||||
|
||||
#ifndef FAIRMQEXAMPLE3PROCESSOR_H_
|
||||
#define FAIRMQEXAMPLE3PROCESSOR_H_
|
||||
|
||||
#include <string>
|
||||
|
||||
#include "FairMQDevice.h"
|
||||
|
||||
class FairMQExample3Processor : public FairMQDevice
|
||||
{
|
||||
public:
|
||||
enum
|
||||
{
|
||||
Text = FairMQDevice::Last,
|
||||
TaskIndex,
|
||||
Last
|
||||
};
|
||||
FairMQExample3Processor();
|
||||
virtual ~FairMQExample3Processor();
|
||||
|
||||
static void CustomCleanup(void* data, void* hint);
|
||||
|
||||
virtual void SetProperty(const int key, const std::string& value);
|
||||
virtual std::string GetProperty(const int key, const std::string& default_ = "");
|
||||
virtual void SetProperty(const int key, const int value);
|
||||
virtual int GetProperty(const int key, const int default_ = 0);
|
||||
|
||||
protected:
|
||||
int fTaskIndex;
|
||||
|
||||
virtual void Run();
|
||||
};
|
||||
|
||||
#endif /* FAIRMQEXAMPLE3PROCESSOR_H_ */
|
93
fairmq/examples/3-dds/FairMQExample3Sampler.cxx
Normal file
93
fairmq/examples/3-dds/FairMQExample3Sampler.cxx
Normal file
|
@ -0,0 +1,93 @@
|
|||
/********************************************************************************
|
||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
/**
|
||||
* FairMQExample3Sampler.cpp
|
||||
*
|
||||
* @since 2014-10-10
|
||||
* @author A. Rybalchenko
|
||||
*/
|
||||
|
||||
#include <boost/thread.hpp>
|
||||
#include <boost/bind.hpp>
|
||||
|
||||
#include "FairMQExample3Sampler.h"
|
||||
#include "FairMQLogger.h"
|
||||
|
||||
FairMQExample3Sampler::FairMQExample3Sampler()
|
||||
: fText()
|
||||
{
|
||||
}
|
||||
|
||||
void FairMQExample3Sampler::CustomCleanup(void *data, void *object)
|
||||
{
|
||||
delete (std::string*)object;
|
||||
}
|
||||
|
||||
void FairMQExample3Sampler::Run()
|
||||
{
|
||||
while (CheckCurrentState(RUNNING))
|
||||
{
|
||||
boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
|
||||
|
||||
std::string* text = new std::string(fText);
|
||||
|
||||
FairMQMessage* msg = fTransportFactory->CreateMessage(const_cast<char*>(text->c_str()), text->length(), CustomCleanup, text);
|
||||
|
||||
LOG(INFO) << "Sending \"" << fText << "\"";
|
||||
|
||||
fChannels.at("data-out").at(0).Send(msg);
|
||||
}
|
||||
}
|
||||
|
||||
FairMQExample3Sampler::~FairMQExample3Sampler()
|
||||
{
|
||||
}
|
||||
|
||||
void FairMQExample3Sampler::SetProperty(const int key, const std::string& value)
|
||||
{
|
||||
switch (key)
|
||||
{
|
||||
case Text:
|
||||
fText = value;
|
||||
break;
|
||||
default:
|
||||
FairMQDevice::SetProperty(key, value);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
std::string FairMQExample3Sampler::GetProperty(const int key, const std::string& default_ /*= ""*/)
|
||||
{
|
||||
switch (key)
|
||||
{
|
||||
case Text:
|
||||
return fText;
|
||||
break;
|
||||
default:
|
||||
return FairMQDevice::GetProperty(key, default_);
|
||||
}
|
||||
}
|
||||
|
||||
void FairMQExample3Sampler::SetProperty(const int key, const int value)
|
||||
{
|
||||
switch (key)
|
||||
{
|
||||
default:
|
||||
FairMQDevice::SetProperty(key, value);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
int FairMQExample3Sampler::GetProperty(const int key, const int default_ /*= 0*/)
|
||||
{
|
||||
switch (key)
|
||||
{
|
||||
default:
|
||||
return FairMQDevice::GetProperty(key, default_);
|
||||
}
|
||||
}
|
46
fairmq/examples/3-dds/FairMQExample3Sampler.h
Normal file
46
fairmq/examples/3-dds/FairMQExample3Sampler.h
Normal file
|
@ -0,0 +1,46 @@
|
|||
/********************************************************************************
|
||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
/**
|
||||
* FairMQExample3Sampler.h
|
||||
*
|
||||
* @since 2014-10-10
|
||||
* @author A. Rybalchenko
|
||||
*/
|
||||
|
||||
#ifndef FAIRMQEXAMPLE3SAMPLER_H_
|
||||
#define FAIRMQEXAMPLE3SAMPLER_H_
|
||||
|
||||
#include <string>
|
||||
|
||||
#include "FairMQDevice.h"
|
||||
|
||||
class FairMQExample3Sampler : public FairMQDevice
|
||||
{
|
||||
public:
|
||||
enum
|
||||
{
|
||||
Text = FairMQDevice::Last,
|
||||
Last
|
||||
};
|
||||
FairMQExample3Sampler();
|
||||
virtual ~FairMQExample3Sampler();
|
||||
|
||||
static void CustomCleanup(void* data, void* hint);
|
||||
|
||||
virtual void SetProperty(const int key, const std::string& value);
|
||||
virtual std::string GetProperty(const int key, const std::string& default_ = "");
|
||||
virtual void SetProperty(const int key, const int value);
|
||||
virtual int GetProperty(const int key, const int default_ = 0);
|
||||
|
||||
protected:
|
||||
std::string fText;
|
||||
|
||||
virtual void Run();
|
||||
};
|
||||
|
||||
#endif /* FAIRMQEXAMPLE3SAMPLER_H_ */
|
45
fairmq/examples/3-dds/FairMQExample3Sink.cxx
Normal file
45
fairmq/examples/3-dds/FairMQExample3Sink.cxx
Normal file
|
@ -0,0 +1,45 @@
|
|||
/********************************************************************************
|
||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
/**
|
||||
* FairMQExample3Sink.cxx
|
||||
*
|
||||
* @since 2014-10-10
|
||||
* @author A. Rybalchenko
|
||||
*/
|
||||
|
||||
#include <boost/thread.hpp>
|
||||
#include <boost/bind.hpp>
|
||||
|
||||
#include "FairMQExample3Sink.h"
|
||||
#include "FairMQLogger.h"
|
||||
|
||||
using namespace std;
|
||||
|
||||
FairMQExample3Sink::FairMQExample3Sink()
|
||||
{
|
||||
}
|
||||
|
||||
void FairMQExample3Sink::Run()
|
||||
{
|
||||
while (CheckCurrentState(RUNNING))
|
||||
{
|
||||
FairMQMessage* msg = fTransportFactory->CreateMessage();
|
||||
|
||||
fChannels.at("data-in").at(0).Receive(msg);
|
||||
|
||||
LOG(INFO) << "Received message: \""
|
||||
<< string(static_cast<char*>(msg->GetData()), msg->GetSize())
|
||||
<< "\"";
|
||||
|
||||
delete msg;
|
||||
}
|
||||
}
|
||||
|
||||
FairMQExample3Sink::~FairMQExample3Sink()
|
||||
{
|
||||
}
|
30
fairmq/examples/3-dds/FairMQExample3Sink.h
Normal file
30
fairmq/examples/3-dds/FairMQExample3Sink.h
Normal file
|
@ -0,0 +1,30 @@
|
|||
/********************************************************************************
|
||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
/**
|
||||
* FairMQExample3Sink.h
|
||||
*
|
||||
* @since 2014-10-10
|
||||
* @author A. Rybalchenko
|
||||
*/
|
||||
|
||||
#ifndef FAIRMQEXAMPLE3SINK_H_
|
||||
#define FAIRMQEXAMPLE3SINK_H_
|
||||
|
||||
#include "FairMQDevice.h"
|
||||
|
||||
class FairMQExample3Sink : public FairMQDevice
|
||||
{
|
||||
public:
|
||||
FairMQExample3Sink();
|
||||
virtual ~FairMQExample3Sink();
|
||||
|
||||
protected:
|
||||
virtual void Run();
|
||||
};
|
||||
|
||||
#endif /* FAIRMQEXAMPLE3SINK_H_ */
|
8
fairmq/examples/3-dds/README.md
Normal file
8
fairmq/examples/3-dds/README.md
Normal file
|
@ -0,0 +1,8 @@
|
|||
Example 3: DDS
|
||||
===============
|
||||
|
||||
This example demonstrates usage of the Dynamic Deployment System ([DDS](http://dds.gsi.de/)) to dynamically deploy and configure a topology of devices. The topology is similar to those of Example 2, but now it can be easily distributed on different computing nodes without the need for manual reconfiguration of the devices.
|
||||
|
||||
The description below outlines the minimal steps needed to run the example. For more detailed DDS documentation please visit [DDS Website](http://dds.gsi.de/).
|
||||
|
||||
The topology run by DDS is defined in `ex3-dds-topology.xml` and the hosts to run it on are configured in `ex3-dds-hosts.cfg`. The topology starts one Sampler, one Sink and a group of 10 Processors.
|
6
fairmq/examples/3-dds/ex3-dds-hosts.cfg
Normal file
6
fairmq/examples/3-dds/ex3-dds-hosts.cfg
Normal file
|
@ -0,0 +1,6 @@
|
|||
@bash_begin@
|
||||
echo "DBG: SSH ENV Script"
|
||||
#source setup.sh
|
||||
@bash_end@
|
||||
|
||||
worker, username@localhost, , /tmp/, 12
|
36
fairmq/examples/3-dds/ex3-dds-topology.xml
Normal file
36
fairmq/examples/3-dds/ex3-dds-topology.xml
Normal file
|
@ -0,0 +1,36 @@
|
|||
<topology id="ExampleDDS">
|
||||
|
||||
<property id="SamplerOutputAddress" />
|
||||
<property id="SinkInputAddress" />
|
||||
|
||||
<decltask id="Sampler">
|
||||
<exe reachable="true">@CMAKE_BINARY_DIR@/bin/ex3-sampler-dds --id sampler --config-json-file @CMAKE_BINARY_DIR@/bin/config/ex3-devices.json</exe>
|
||||
<properties>
|
||||
<id access="write">SamplerOutputAddress</id>
|
||||
</properties>
|
||||
</decltask>
|
||||
|
||||
<decltask id="Processor">
|
||||
<exe reachable="true">@CMAKE_BINARY_DIR@/bin/ex3-processor-dds --id processor --index %taskIndex% --config-json-file @CMAKE_BINARY_DIR@/bin/config/ex3-devices.json</exe>
|
||||
<properties>
|
||||
<id access="read">SamplerOutputAddress</id>
|
||||
<id access="read">SinkInputAddress</id>
|
||||
</properties>
|
||||
</decltask>
|
||||
|
||||
<decltask id="Sink">
|
||||
<exe reachable="true">@CMAKE_BINARY_DIR@/bin/ex3-sink-dds --id sink --config-json-file @CMAKE_BINARY_DIR@/bin/config/ex3-devices.json</exe>
|
||||
<properties>
|
||||
<id access="write">SinkInputAddress</id>
|
||||
</properties>
|
||||
</decltask>
|
||||
|
||||
<main id="main">
|
||||
<task>Sampler</task>
|
||||
<task>Sink</task>
|
||||
<group id="ProcessorGroup" n="10">
|
||||
<task>Processor</task>
|
||||
</group>
|
||||
</main>
|
||||
|
||||
</topology>
|
59
fairmq/examples/3-dds/ex3-devices.json
Normal file
59
fairmq/examples/3-dds/ex3-devices.json
Normal file
|
@ -0,0 +1,59 @@
|
|||
{
|
||||
"fairMQOptions":
|
||||
{
|
||||
"device":
|
||||
{
|
||||
"id": "sampler",
|
||||
"channel":
|
||||
{
|
||||
"name": "data-out",
|
||||
"socket":
|
||||
{
|
||||
"type": "push",
|
||||
"method": "bind",
|
||||
"address": ""
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
"device":
|
||||
{
|
||||
"id": "processor",
|
||||
"channel":
|
||||
{
|
||||
"name": "data-in",
|
||||
"socket":
|
||||
{
|
||||
"type": "pull",
|
||||
"method": "connect",
|
||||
"address": ""
|
||||
}
|
||||
},
|
||||
"channel":
|
||||
{
|
||||
"name": "data-out",
|
||||
"socket":
|
||||
{
|
||||
"type": "push",
|
||||
"method": "connect",
|
||||
"address": ""
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
"device":
|
||||
{
|
||||
"id": "sink",
|
||||
"channel":
|
||||
{
|
||||
"name": "data-in",
|
||||
"socket":
|
||||
{
|
||||
"type": "pull",
|
||||
"method": "bind",
|
||||
"address": ""
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
144
fairmq/examples/3-dds/runExample3Processor.cxx
Normal file
144
fairmq/examples/3-dds/runExample3Processor.cxx
Normal file
|
@ -0,0 +1,144 @@
|
|||
/********************************************************************************
|
||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
/**
|
||||
* runExample2Processor.cxx
|
||||
*
|
||||
* @since 2013-04-23
|
||||
* @author D. Klein, A. Rybalchenko
|
||||
*/
|
||||
|
||||
#include <iostream>
|
||||
#include <mutex>
|
||||
#include <condition_variable>
|
||||
|
||||
#include "boost/program_options.hpp"
|
||||
#include <boost/asio.hpp> // for DDS
|
||||
|
||||
#include "FairMQLogger.h"
|
||||
#include "FairMQParser.h"
|
||||
#include "FairMQProgOptions.h"
|
||||
#include "FairMQExample3Processor.h"
|
||||
#include "FairMQTools.h"
|
||||
|
||||
#ifdef NANOMSG
|
||||
#include "FairMQTransportFactoryNN.h"
|
||||
#else
|
||||
#include "FairMQTransportFactoryZMQ.h"
|
||||
#endif
|
||||
|
||||
#include "KeyValue.h" // DDS
|
||||
|
||||
using namespace boost::program_options;
|
||||
|
||||
int main(int argc, char** argv)
|
||||
{
|
||||
FairMQExample3Processor processor;
|
||||
processor.CatchSignals();
|
||||
|
||||
FairMQProgOptions config;
|
||||
|
||||
try
|
||||
{
|
||||
int ddsTaskIndex = 0;
|
||||
|
||||
options_description samplerOptions("Processor options");
|
||||
samplerOptions.add_options()
|
||||
("index", value<int>(&ddsTaskIndex)->default_value(0), "Store DDS task index");
|
||||
|
||||
config.AddToCmdLineOptions(samplerOptions);
|
||||
|
||||
if (config.ParseAll(argc, argv))
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
std::string filename = config.GetValue<std::string>("config-json-file");
|
||||
std::string id = config.GetValue<std::string>("id");
|
||||
|
||||
config.UserParser<FairMQParser::JSON>(filename, id);
|
||||
|
||||
processor.fChannels = config.GetFairMQMap();
|
||||
|
||||
LOG(INFO) << "PID: " << getpid();
|
||||
|
||||
// Waiting for DDS properties
|
||||
dds::key_value::CKeyValue ddsKeyValue;
|
||||
// Sampler properties
|
||||
dds::key_value::CKeyValue::valuesMap_t samplerValues;
|
||||
{
|
||||
mutex keyMutex;
|
||||
condition_variable keyCondition;
|
||||
|
||||
LOG(INFO) << "Subscribing and waiting for sampler output address.";
|
||||
ddsKeyValue.subscribe([&keyCondition](const string& /*_key*/, const string& /*_value*/) { keyCondition.notify_all(); });
|
||||
ddsKeyValue.getValues("SamplerOutputAddress", &samplerValues);
|
||||
while (samplerValues.empty())
|
||||
{
|
||||
unique_lock<mutex> lock(keyMutex);
|
||||
keyCondition.wait_until(lock, chrono::system_clock::now() + chrono::milliseconds(1000));
|
||||
ddsKeyValue.getValues("SamplerOutputAddress", &samplerValues);
|
||||
}
|
||||
}
|
||||
// Sink properties
|
||||
dds::key_value::CKeyValue::valuesMap_t sinkValues;
|
||||
{
|
||||
mutex keyMutex;
|
||||
condition_variable keyCondition;
|
||||
|
||||
LOG(INFO) << "Subscribing and waiting for sink input address.";
|
||||
ddsKeyValue.subscribe([&keyCondition](const string& /*_key*/, const string& /*_value*/) { keyCondition.notify_all(); });
|
||||
ddsKeyValue.getValues("SinkInputAddress", &sinkValues);
|
||||
while (sinkValues.empty())
|
||||
{
|
||||
unique_lock<mutex> lock(keyMutex);
|
||||
keyCondition.wait_until(lock, chrono::system_clock::now() + chrono::milliseconds(1000));
|
||||
ddsKeyValue.getValues("SinkInputAddress", &sinkValues);
|
||||
}
|
||||
}
|
||||
|
||||
processor.fChannels.at("data-in").at(0).UpdateAddress(samplerValues.begin()->second);
|
||||
processor.fChannels.at("data-out").at(0).UpdateAddress(sinkValues.begin()->second);
|
||||
|
||||
#ifdef NANOMSG
|
||||
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
|
||||
#else
|
||||
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
|
||||
#endif
|
||||
|
||||
processor.SetTransport(transportFactory);
|
||||
|
||||
processor.SetProperty(FairMQExample3Processor::Id, id);
|
||||
processor.SetProperty(FairMQExample3Processor::TaskIndex, ddsTaskIndex);
|
||||
|
||||
processor.ChangeState("INIT_DEVICE");
|
||||
processor.WaitForEndOfState("INIT_DEVICE");
|
||||
|
||||
processor.ChangeState("INIT_TASK");
|
||||
processor.WaitForEndOfState("INIT_TASK");
|
||||
|
||||
processor.ChangeState("RUN");
|
||||
processor.WaitForEndOfState("RUN");
|
||||
|
||||
processor.ChangeState("RESET_TASK");
|
||||
processor.WaitForEndOfState("RESET_TASK");
|
||||
|
||||
processor.ChangeState("RESET_DEVICE");
|
||||
processor.WaitForEndOfState("RESET_DEVICE");
|
||||
|
||||
processor.ChangeState("END");
|
||||
}
|
||||
catch (std::exception& e)
|
||||
{
|
||||
LOG(ERROR) << e.what();
|
||||
LOG(INFO) << "Command line options are the following: ";
|
||||
config.PrintHelp();
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
136
fairmq/examples/3-dds/runExample3Sampler.cxx
Normal file
136
fairmq/examples/3-dds/runExample3Sampler.cxx
Normal file
|
@ -0,0 +1,136 @@
|
|||
/********************************************************************************
|
||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
/**
|
||||
* runExample2Sampler.cxx
|
||||
*
|
||||
* @since 2013-04-23
|
||||
* @author D. Klein, A. Rybalchenko
|
||||
*/
|
||||
|
||||
#include <iostream>
|
||||
#include <mutex>
|
||||
#include <condition_variable>
|
||||
#include <map>
|
||||
|
||||
#include "boost/program_options.hpp"
|
||||
#include <boost/asio.hpp> // for DDS
|
||||
|
||||
#include "FairMQLogger.h"
|
||||
#include "FairMQParser.h"
|
||||
#include "FairMQProgOptions.h"
|
||||
#include "FairMQExample3Sampler.h"
|
||||
#include "FairMQTools.h"
|
||||
|
||||
#ifdef NANOMSG
|
||||
#include "FairMQTransportFactoryNN.h"
|
||||
#else
|
||||
#include "FairMQTransportFactoryZMQ.h"
|
||||
#endif
|
||||
|
||||
#include "KeyValue.h" // DDS
|
||||
|
||||
using namespace boost::program_options;
|
||||
|
||||
int main(int argc, char** argv)
|
||||
{
|
||||
FairMQExample3Sampler sampler;
|
||||
sampler.CatchSignals();
|
||||
|
||||
FairMQProgOptions config;
|
||||
|
||||
try
|
||||
{
|
||||
std::string text;
|
||||
|
||||
options_description samplerOptions("Sampler options");
|
||||
samplerOptions.add_options()
|
||||
("text", value<std::string>(&text)->default_value("Hello"), "Text to send out");
|
||||
|
||||
config.AddToCmdLineOptions(samplerOptions);
|
||||
|
||||
if (config.ParseAll(argc, argv))
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
std::string filename = config.GetValue<std::string>("config-json-file");
|
||||
std::string id = config.GetValue<std::string>("id");
|
||||
|
||||
config.UserParser<FairMQParser::JSON>(filename, id);
|
||||
|
||||
sampler.fChannels = config.GetFairMQMap();
|
||||
|
||||
LOG(INFO) << "PID: " << getpid();
|
||||
|
||||
#ifdef NANOMSG
|
||||
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
|
||||
#else
|
||||
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
|
||||
#endif
|
||||
|
||||
sampler.SetTransport(transportFactory);
|
||||
|
||||
sampler.SetProperty(FairMQExample3Sampler::Id, id);
|
||||
sampler.SetProperty(FairMQExample3Sampler::Text, text);
|
||||
|
||||
// Get the IP of the current host and store it for binding.
|
||||
map<string,string> IPs;
|
||||
FairMQ::tools::getHostIPs(IPs);
|
||||
stringstream ss;
|
||||
// Check if ib0 (infiniband) interface is available, otherwise try eth0 or wlan0.
|
||||
if (IPs.count("ib0")) {
|
||||
ss << "tcp://" << IPs["ib0"] << ":1";
|
||||
} else if (IPs.count("eth0")) {
|
||||
ss << "tcp://" << IPs["eth0"] << ":1";
|
||||
} else if (IPs.count("wlan0")) {
|
||||
ss << "tcp://" << IPs["wlan0"] << ":1";
|
||||
} else {
|
||||
LOG(INFO) << ss.str();
|
||||
LOG(ERROR) << "Could not find ib0, eth0 or wlan0";
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
string initialOutputAddress = ss.str();
|
||||
|
||||
// Configure the found host IP for the channel.
|
||||
// TCP port will be chosen randomly during the initialization (binding).
|
||||
sampler.fChannels.at("data-out").at(0).UpdateAddress(initialOutputAddress);
|
||||
|
||||
sampler.ChangeState("INIT_DEVICE");
|
||||
sampler.WaitForInitialValidation();
|
||||
|
||||
// Advertise the bound addresses via DDS property
|
||||
LOG(INFO) << "Giving sampler output address to DDS.";
|
||||
dds::key_value::CKeyValue ddsKeyValue;
|
||||
ddsKeyValue.putValue("SamplerOutputAddress", sampler.fChannels.at("data-out").at(0).GetAddress());
|
||||
|
||||
sampler.WaitForEndOfState("INIT_DEVICE");
|
||||
|
||||
sampler.ChangeState("INIT_TASK");
|
||||
sampler.WaitForEndOfState("INIT_TASK");
|
||||
|
||||
sampler.ChangeState("RUN");
|
||||
sampler.WaitForEndOfState("RUN");
|
||||
|
||||
sampler.ChangeState("RESET_TASK");
|
||||
sampler.WaitForEndOfState("RESET_TASK");
|
||||
|
||||
sampler.ChangeState("RESET_DEVICE");
|
||||
sampler.WaitForEndOfState("RESET_DEVICE");
|
||||
|
||||
sampler.ChangeState("END");
|
||||
}
|
||||
catch (std::exception& e)
|
||||
{
|
||||
LOG(ERROR) << e.what();
|
||||
LOG(INFO) << "Command line options are the following: ";
|
||||
config.PrintHelp();
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
127
fairmq/examples/3-dds/runExample3Sink.cxx
Normal file
127
fairmq/examples/3-dds/runExample3Sink.cxx
Normal file
|
@ -0,0 +1,127 @@
|
|||
/********************************************************************************
|
||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
/**
|
||||
* runExample2Sink.cxx
|
||||
*
|
||||
* @since 2013-04-23
|
||||
* @author D. Klein, A. Rybalchenko
|
||||
*/
|
||||
|
||||
#include <iostream>
|
||||
#include <mutex>
|
||||
#include <condition_variable>
|
||||
#include <map>
|
||||
|
||||
#include "boost/program_options.hpp"
|
||||
#include <boost/asio.hpp> // for DDS
|
||||
|
||||
#include "FairMQLogger.h"
|
||||
#include "FairMQParser.h"
|
||||
#include "FairMQProgOptions.h"
|
||||
#include "FairMQExample3Sink.h"
|
||||
#include "FairMQTools.h"
|
||||
|
||||
#ifdef NANOMSG
|
||||
#include "FairMQTransportFactoryNN.h"
|
||||
#else
|
||||
#include "FairMQTransportFactoryZMQ.h"
|
||||
#endif
|
||||
|
||||
#include "KeyValue.h" // DDS
|
||||
|
||||
using namespace boost::program_options;
|
||||
|
||||
int main(int argc, char** argv)
|
||||
{
|
||||
FairMQExample3Sink sink;
|
||||
sink.CatchSignals();
|
||||
|
||||
FairMQProgOptions config;
|
||||
|
||||
try
|
||||
{
|
||||
if (config.ParseAll(argc, argv))
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
std::string filename = config.GetValue<std::string>("config-json-file");
|
||||
std::string id = config.GetValue<std::string>("id");
|
||||
|
||||
config.UserParser<FairMQParser::JSON>(filename, id);
|
||||
|
||||
sink.fChannels = config.GetFairMQMap();
|
||||
|
||||
LOG(INFO) << "PID: " << getpid();
|
||||
|
||||
#ifdef NANOMSG
|
||||
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
|
||||
#else
|
||||
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
|
||||
#endif
|
||||
|
||||
sink.SetTransport(transportFactory);
|
||||
|
||||
sink.SetProperty(FairMQExample3Sink::Id, id);
|
||||
|
||||
// Get the IP of the current host and store it for binding.
|
||||
map<string,string> IPs;
|
||||
FairMQ::tools::getHostIPs(IPs);
|
||||
stringstream ss;
|
||||
// Check if ib0 (infiniband) interface is available, otherwise try eth0 or wlan0.
|
||||
if (IPs.count("ib0")) {
|
||||
ss << "tcp://" << IPs["ib0"] << ":1";
|
||||
} else if (IPs.count("eth0")) {
|
||||
ss << "tcp://" << IPs["eth0"] << ":1";
|
||||
} else if (IPs.count("wlan0")) {
|
||||
ss << "tcp://" << IPs["wlan0"] << ":1";
|
||||
} else {
|
||||
LOG(INFO) << ss.str();
|
||||
LOG(ERROR) << "Could not find ib0, eth0 or wlan0";
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
string initialInputAddress = ss.str();
|
||||
|
||||
// Configure the found host IP for the channel.
|
||||
// TCP port will be chosen randomly during the initialization (binding).
|
||||
sink.fChannels.at("data-in").at(0).UpdateAddress(initialInputAddress);
|
||||
|
||||
sink.ChangeState("INIT_DEVICE");
|
||||
sink.WaitForInitialValidation();
|
||||
|
||||
// Advertise the bound address via DDS property
|
||||
LOG(INFO) << "Giving sink input address to DDS.";
|
||||
dds::key_value::CKeyValue ddsKeyValue;
|
||||
ddsKeyValue.putValue("SinkInputAddress", sink.fChannels.at("data-in").at(0).GetAddress());
|
||||
|
||||
sink.WaitForEndOfState("INIT_DEVICE");
|
||||
|
||||
sink.ChangeState("INIT_TASK");
|
||||
sink.WaitForEndOfState("INIT_TASK");
|
||||
|
||||
sink.ChangeState("RUN");
|
||||
sink.WaitForEndOfState("RUN");
|
||||
|
||||
sink.ChangeState("RESET_TASK");
|
||||
sink.WaitForEndOfState("RESET_TASK");
|
||||
|
||||
sink.ChangeState("RESET_DEVICE");
|
||||
sink.WaitForEndOfState("RESET_DEVICE");
|
||||
|
||||
sink.ChangeState("END");
|
||||
}
|
||||
catch (std::exception& e)
|
||||
{
|
||||
LOG(ERROR) << e.what();
|
||||
LOG(INFO) << "Command line options are the following: ";
|
||||
config.PrintHelp();
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
62
fairmq/examples/4-copypush/FairMQExample4Sampler.cxx
Normal file
62
fairmq/examples/4-copypush/FairMQExample4Sampler.cxx
Normal file
|
@ -0,0 +1,62 @@
|
|||
/********************************************************************************
|
||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
/**
|
||||
* FairMQExample4Sampler.cpp
|
||||
*
|
||||
* @since 2014-10-10
|
||||
* @author A. Rybalchenko
|
||||
*/
|
||||
|
||||
#include <memory>
|
||||
|
||||
#include <boost/thread.hpp>
|
||||
#include <boost/bind.hpp>
|
||||
|
||||
#include "FairMQExample4Sampler.h"
|
||||
#include "FairMQLogger.h"
|
||||
|
||||
FairMQExample4Sampler::FairMQExample4Sampler()
|
||||
{
|
||||
}
|
||||
|
||||
void FairMQExample4Sampler::Run()
|
||||
{
|
||||
uint64_t counter = 0;
|
||||
|
||||
while (CheckCurrentState(RUNNING))
|
||||
{
|
||||
boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
|
||||
|
||||
std::unique_ptr<uint64_t> number(new uint64_t(counter));
|
||||
|
||||
std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage(number.release(), sizeof(uint64_t)));
|
||||
|
||||
LOG(INFO) << "Sending \"" << counter << "\"";
|
||||
|
||||
if (fChannels.at("data-out").size() > 1)
|
||||
{
|
||||
for (int i = 1; i < fChannels.at("data-out").size(); ++i)
|
||||
{
|
||||
std::unique_ptr<FairMQMessage> msgCopy(fTransportFactory->CreateMessage());
|
||||
msgCopy->Copy(msg);
|
||||
fChannels.at("data-out").at(i).Send(msgCopy);
|
||||
}
|
||||
fChannels.at("data-out").at(0).Send(msg);
|
||||
}
|
||||
else
|
||||
{
|
||||
fChannels.at("data-out").at(0).Send(msg);
|
||||
}
|
||||
|
||||
++counter;
|
||||
}
|
||||
}
|
||||
|
||||
FairMQExample4Sampler::~FairMQExample4Sampler()
|
||||
{
|
||||
}
|
32
fairmq/examples/4-copypush/FairMQExample4Sampler.h
Normal file
32
fairmq/examples/4-copypush/FairMQExample4Sampler.h
Normal file
|
@ -0,0 +1,32 @@
|
|||
/********************************************************************************
|
||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
/**
|
||||
* FairMQExample4Sampler.h
|
||||
*
|
||||
* @since 2014-10-10
|
||||
* @author A. Rybalchenko
|
||||
*/
|
||||
|
||||
#ifndef FAIRMQEXAMPLESAMPLER_H_
|
||||
#define FAIRMQEXAMPLESAMPLER_H_
|
||||
|
||||
#include <string>
|
||||
|
||||
#include "FairMQDevice.h"
|
||||
|
||||
class FairMQExample4Sampler : public FairMQDevice
|
||||
{
|
||||
public:
|
||||
FairMQExample4Sampler();
|
||||
virtual ~FairMQExample4Sampler();
|
||||
|
||||
protected:
|
||||
virtual void Run();
|
||||
};
|
||||
|
||||
#endif /* FAIRMQEXAMPLE4SAMPLER_H_ */
|
41
fairmq/examples/4-copypush/FairMQExample4Sink.cxx
Normal file
41
fairmq/examples/4-copypush/FairMQExample4Sink.cxx
Normal file
|
@ -0,0 +1,41 @@
|
|||
/********************************************************************************
|
||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
/**
|
||||
* FairMQExample4Sink.cxx
|
||||
*
|
||||
* @since 2014-10-10
|
||||
* @author A. Rybalchenko
|
||||
*/
|
||||
|
||||
#include <memory>
|
||||
|
||||
#include <boost/thread.hpp>
|
||||
#include <boost/bind.hpp>
|
||||
|
||||
#include "FairMQExample4Sink.h"
|
||||
#include "FairMQLogger.h"
|
||||
|
||||
FairMQExample4Sink::FairMQExample4Sink()
|
||||
{
|
||||
}
|
||||
|
||||
void FairMQExample4Sink::Run()
|
||||
{
|
||||
while (CheckCurrentState(RUNNING))
|
||||
{
|
||||
std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
|
||||
|
||||
fChannels.at("data-in").at(0).Receive(msg);
|
||||
|
||||
LOG(INFO) << "Received message: \"" << *(static_cast<int*>(msg->GetData())) << "\"";
|
||||
}
|
||||
}
|
||||
|
||||
FairMQExample4Sink::~FairMQExample4Sink()
|
||||
{
|
||||
}
|
30
fairmq/examples/4-copypush/FairMQExample4Sink.h
Normal file
30
fairmq/examples/4-copypush/FairMQExample4Sink.h
Normal file
|
@ -0,0 +1,30 @@
|
|||
/********************************************************************************
|
||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
/**
|
||||
* FairMQExample4Sink.h
|
||||
*
|
||||
* @since 2014-10-10
|
||||
* @author A. Rybalchenko
|
||||
*/
|
||||
|
||||
#ifndef FAIRMQEXAMPLE4SINK_H_
|
||||
#define FAIRMQEXAMPLE1SINK_H_
|
||||
|
||||
#include "FairMQDevice.h"
|
||||
|
||||
class FairMQExample4Sink : public FairMQDevice
|
||||
{
|
||||
public:
|
||||
FairMQExample4Sink();
|
||||
virtual ~FairMQExample4Sink();
|
||||
|
||||
protected:
|
||||
virtual void Run();
|
||||
};
|
||||
|
||||
#endif /* FAIRMQEXAMPLE4SINK_H_ */
|
4
fairmq/examples/4-copypush/README.md
Normal file
4
fairmq/examples/4-copypush/README.md
Normal file
|
@ -0,0 +1,4 @@
|
|||
Example 4: Copy & Push
|
||||
===============
|
||||
|
||||
A topology consisting of one **Sampler** and two **Sink**s. The **Sampler** uses the `Copy` method to send the same data to both sinks with the **PUSH-PULL** pattern. In contrary to the **PUB-PATTERN** pattern, this ensures that all receivers are connected and no data is lost, but requires additional sockets.
|
68
fairmq/examples/4-copypush/ex4-copypush.json
Normal file
68
fairmq/examples/4-copypush/ex4-copypush.json
Normal file
|
@ -0,0 +1,68 @@
|
|||
{
|
||||
"fairMQOptions":
|
||||
{
|
||||
"device":
|
||||
{
|
||||
"id": "sampler1",
|
||||
"channel":
|
||||
{
|
||||
"name": "data-out",
|
||||
"socket":
|
||||
{
|
||||
"type": "push",
|
||||
"method": "bind",
|
||||
"address": "tcp://*:5555",
|
||||
"sndBufSize": "1000",
|
||||
"rcvBufSize": "1000",
|
||||
"rateLogging": "0"
|
||||
},
|
||||
"socket":
|
||||
{
|
||||
"type": "push",
|
||||
"method": "bind",
|
||||
"address": "tcp://*:5556",
|
||||
"sndBufSize": "1000",
|
||||
"rcvBufSize": "1000",
|
||||
"rateLogging": "0"
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
"device":
|
||||
{
|
||||
"id": "sink1",
|
||||
"channel":
|
||||
{
|
||||
"name": "data-in",
|
||||
"socket":
|
||||
{
|
||||
"type": "pull",
|
||||
"method": "connect",
|
||||
"address": "tcp://localhost:5555",
|
||||
"sndBufSize": "1000",
|
||||
"rcvBufSize": "1000",
|
||||
"rateLogging": "0"
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
"device":
|
||||
{
|
||||
"id": "sink2",
|
||||
"channel":
|
||||
{
|
||||
"name": "data-in",
|
||||
"socket":
|
||||
{
|
||||
"type": "pull",
|
||||
"method": "connect",
|
||||
"address": "tcp://localhost:5556",
|
||||
"sndBufSize": "1000",
|
||||
"rcvBufSize": "1000",
|
||||
"rateLogging": "0"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
83
fairmq/examples/4-copypush/runExample4Sampler.cxx
Normal file
83
fairmq/examples/4-copypush/runExample4Sampler.cxx
Normal file
|
@ -0,0 +1,83 @@
|
|||
/********************************************************************************
|
||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
/**
|
||||
* runExample4Sampler.cxx
|
||||
*
|
||||
* @since 2013-04-23
|
||||
* @author D. Klein, A. Rybalchenko
|
||||
*/
|
||||
|
||||
#include <iostream>
|
||||
|
||||
#include "boost/program_options.hpp"
|
||||
|
||||
#include "FairMQLogger.h"
|
||||
#include "FairMQParser.h"
|
||||
#include "FairMQProgOptions.h"
|
||||
#include "FairMQExample4Sampler.h"
|
||||
|
||||
#ifdef NANOMSG
|
||||
#include "FairMQTransportFactoryNN.h"
|
||||
#else
|
||||
#include "FairMQTransportFactoryZMQ.h"
|
||||
#endif
|
||||
|
||||
using namespace boost::program_options;
|
||||
|
||||
int main(int argc, char** argv)
|
||||
{
|
||||
FairMQExample4Sampler sampler;
|
||||
sampler.CatchSignals();
|
||||
|
||||
FairMQProgOptions config;
|
||||
|
||||
try
|
||||
{
|
||||
if (config.ParseAll(argc, argv))
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
std::string filename = config.GetValue<std::string>("config-json-file");
|
||||
std::string id = config.GetValue<std::string>("id");
|
||||
|
||||
config.UserParser<FairMQParser::JSON>(filename, id);
|
||||
|
||||
sampler.fChannels = config.GetFairMQMap();
|
||||
|
||||
LOG(INFO) << "PID: " << getpid();
|
||||
|
||||
#ifdef NANOMSG
|
||||
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
|
||||
#else
|
||||
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
|
||||
#endif
|
||||
|
||||
sampler.SetTransport(transportFactory);
|
||||
|
||||
sampler.SetProperty(FairMQExample4Sampler::Id, id);
|
||||
|
||||
sampler.ChangeState("INIT_DEVICE");
|
||||
sampler.WaitForEndOfState("INIT_DEVICE");
|
||||
|
||||
sampler.ChangeState("INIT_TASK");
|
||||
sampler.WaitForEndOfState("INIT_TASK");
|
||||
|
||||
sampler.ChangeState("RUN");
|
||||
sampler.InteractiveStateLoop();
|
||||
}
|
||||
catch (std::exception& e)
|
||||
{
|
||||
LOG(ERROR) << e.what();
|
||||
LOG(INFO) << "Command line options are the following: ";
|
||||
config.PrintHelp();
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
83
fairmq/examples/4-copypush/runExample4Sink.cxx
Normal file
83
fairmq/examples/4-copypush/runExample4Sink.cxx
Normal file
|
@ -0,0 +1,83 @@
|
|||
/********************************************************************************
|
||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
/**
|
||||
* runExample4Sink.cxx
|
||||
*
|
||||
* @since 2013-04-23
|
||||
* @author D. Klein, A. Rybalchenko
|
||||
*/
|
||||
|
||||
#include <iostream>
|
||||
|
||||
#include "boost/program_options.hpp"
|
||||
|
||||
#include "FairMQLogger.h"
|
||||
#include "FairMQParser.h"
|
||||
#include "FairMQProgOptions.h"
|
||||
#include "FairMQExample4Sink.h"
|
||||
|
||||
#ifdef NANOMSG
|
||||
#include "FairMQTransportFactoryNN.h"
|
||||
#else
|
||||
#include "FairMQTransportFactoryZMQ.h"
|
||||
#endif
|
||||
|
||||
using namespace boost::program_options;
|
||||
|
||||
int main(int argc, char** argv)
|
||||
{
|
||||
FairMQExample4Sink sink;
|
||||
sink.CatchSignals();
|
||||
|
||||
FairMQProgOptions config;
|
||||
|
||||
try
|
||||
{
|
||||
if (config.ParseAll(argc, argv))
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
std::string filename = config.GetValue<std::string>("config-json-file");
|
||||
std::string id = config.GetValue<std::string>("id");
|
||||
|
||||
config.UserParser<FairMQParser::JSON>(filename, id);
|
||||
|
||||
sink.fChannels = config.GetFairMQMap();
|
||||
|
||||
LOG(INFO) << "PID: " << getpid();
|
||||
|
||||
#ifdef NANOMSG
|
||||
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
|
||||
#else
|
||||
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
|
||||
#endif
|
||||
|
||||
sink.SetTransport(transportFactory);
|
||||
|
||||
sink.SetProperty(FairMQExample4Sink::Id, id);
|
||||
|
||||
sink.ChangeState("INIT_DEVICE");
|
||||
sink.WaitForEndOfState("INIT_DEVICE");
|
||||
|
||||
sink.ChangeState("INIT_TASK");
|
||||
sink.WaitForEndOfState("INIT_TASK");
|
||||
|
||||
sink.ChangeState("RUN");
|
||||
sink.InteractiveStateLoop();
|
||||
}
|
||||
catch (std::exception& e)
|
||||
{
|
||||
LOG(ERROR) << e.what();
|
||||
LOG(INFO) << "Command line options are the following: ";
|
||||
config.PrintHelp();
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
|
@ -6,7 +6,7 @@
|
|||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
/**
|
||||
* FairMQExampleClient.cpp
|
||||
* FairMQExample5Client.cpp
|
||||
*
|
||||
* @since 2014-10-10
|
||||
* @author A. Rybalchenko
|
||||
|
@ -15,28 +15,28 @@
|
|||
#include <boost/thread.hpp>
|
||||
#include <boost/bind.hpp>
|
||||
|
||||
#include "FairMQExampleClient.h"
|
||||
#include "FairMQExample5Client.h"
|
||||
#include "FairMQLogger.h"
|
||||
|
||||
using namespace std;
|
||||
|
||||
FairMQExampleClient::FairMQExampleClient()
|
||||
FairMQExample5Client::FairMQExample5Client()
|
||||
: fText()
|
||||
{
|
||||
}
|
||||
|
||||
FairMQExampleClient::~FairMQExampleClient()
|
||||
FairMQExample5Client::~FairMQExample5Client()
|
||||
{
|
||||
}
|
||||
|
||||
void FairMQExampleClient::CustomCleanup(void *data, void *hint)
|
||||
void FairMQExample5Client::CustomCleanup(void *data, void *hint)
|
||||
{
|
||||
delete (string*)hint;
|
||||
}
|
||||
|
||||
void FairMQExampleClient::Run()
|
||||
void FairMQExample5Client::Run()
|
||||
{
|
||||
while (GetCurrentState() == RUNNING)
|
||||
while (CheckCurrentState(RUNNING))
|
||||
{
|
||||
boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
|
||||
|
||||
|
@ -47,8 +47,10 @@ void FairMQExampleClient::Run()
|
|||
|
||||
LOG(INFO) << "Sending \"" << fText << "\" to server.";
|
||||
|
||||
fChannels.at("data").at(0).Send(request);
|
||||
fChannels.at("data").at(0).Receive(reply);
|
||||
if (fChannels.at("data").at(0).Send(request) > 0)
|
||||
{
|
||||
fChannels.at("data").at(0).Receive(reply);
|
||||
}
|
||||
|
||||
LOG(INFO) << "Received reply from server: \"" << string(static_cast<char*>(reply->GetData()), reply->GetSize()) << "\"";
|
||||
|
||||
|
@ -57,7 +59,7 @@ void FairMQExampleClient::Run()
|
|||
}
|
||||
|
||||
|
||||
void FairMQExampleClient::SetProperty(const int key, const string& value)
|
||||
void FairMQExample5Client::SetProperty(const int key, const string& value)
|
||||
{
|
||||
switch (key)
|
||||
{
|
||||
|
@ -70,7 +72,7 @@ void FairMQExampleClient::SetProperty(const int key, const string& value)
|
|||
}
|
||||
}
|
||||
|
||||
string FairMQExampleClient::GetProperty(const int key, const string& default_ /*= ""*/)
|
||||
string FairMQExample5Client::GetProperty(const int key, const string& default_ /*= ""*/)
|
||||
{
|
||||
switch (key)
|
||||
{
|
||||
|
@ -82,7 +84,7 @@ string FairMQExampleClient::GetProperty(const int key, const string& default_ /*
|
|||
}
|
||||
}
|
||||
|
||||
void FairMQExampleClient::SetProperty(const int key, const int value)
|
||||
void FairMQExample5Client::SetProperty(const int key, const int value)
|
||||
{
|
||||
switch (key)
|
||||
{
|
||||
|
@ -92,7 +94,7 @@ void FairMQExampleClient::SetProperty(const int key, const int value)
|
|||
}
|
||||
}
|
||||
|
||||
int FairMQExampleClient::GetProperty(const int key, const int default_ /*= 0*/)
|
||||
int FairMQExample5Client::GetProperty(const int key, const int default_ /*= 0*/)
|
||||
{
|
||||
switch (key)
|
||||
{
|
|
@ -6,20 +6,20 @@
|
|||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
/**
|
||||
* FairMQExampleClient.h
|
||||
* FairMQExample5Client.h
|
||||
*
|
||||
* @since 2014-10-10
|
||||
* @author A. Rybalchenko
|
||||
*/
|
||||
|
||||
#ifndef FAIRMQEXAMPLECLIENT_H_
|
||||
#define FAIRMQEXAMPLECLIENT_H_
|
||||
#ifndef FAIRMQEXAMPLE5CLIENT_H_
|
||||
#define FAIRMQEXAMPLE5CLIENT_H_
|
||||
|
||||
#include <string>
|
||||
|
||||
#include "FairMQDevice.h"
|
||||
|
||||
class FairMQExampleClient : public FairMQDevice
|
||||
class FairMQExample5Client : public FairMQDevice
|
||||
{
|
||||
public:
|
||||
enum
|
||||
|
@ -27,8 +27,8 @@ class FairMQExampleClient : public FairMQDevice
|
|||
Text = FairMQDevice::Last,
|
||||
Last
|
||||
};
|
||||
FairMQExampleClient();
|
||||
virtual ~FairMQExampleClient();
|
||||
FairMQExample5Client();
|
||||
virtual ~FairMQExample5Client();
|
||||
|
||||
static void CustomCleanup(void* data, void* hint);
|
||||
|
|
@ -6,7 +6,7 @@
|
|||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
/**
|
||||
* FairMQExampleServer.cxx
|
||||
* FairMQExample5Server.cxx
|
||||
*
|
||||
* @since 2014-10-10
|
||||
* @author A. Rybalchenko
|
||||
|
@ -15,44 +15,45 @@
|
|||
#include <boost/thread.hpp>
|
||||
#include <boost/bind.hpp>
|
||||
|
||||
#include "FairMQExampleServer.h"
|
||||
#include "FairMQExample5Server.h"
|
||||
#include "FairMQLogger.h"
|
||||
|
||||
using namespace std;
|
||||
|
||||
FairMQExampleServer::FairMQExampleServer()
|
||||
FairMQExample5Server::FairMQExample5Server()
|
||||
{
|
||||
}
|
||||
|
||||
void FairMQExampleServer::CustomCleanup(void *data, void *hint)
|
||||
void FairMQExample5Server::CustomCleanup(void *data, void *hint)
|
||||
{
|
||||
delete (string*)hint;
|
||||
}
|
||||
|
||||
void FairMQExampleServer::Run()
|
||||
void FairMQExample5Server::Run()
|
||||
{
|
||||
while (GetCurrentState() == RUNNING)
|
||||
while (CheckCurrentState(RUNNING))
|
||||
{
|
||||
boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
|
||||
|
||||
FairMQMessage* request = fTransportFactory->CreateMessage();
|
||||
|
||||
fChannels.at("data").at(0).Receive(request);
|
||||
if (fChannels.at("data").at(0).Receive(request) > 0)
|
||||
{
|
||||
LOG(INFO) << "Received request from client: \"" << string(static_cast<char*>(request->GetData()), request->GetSize()) << "\"";
|
||||
|
||||
LOG(INFO) << "Received request from client: \"" << string(static_cast<char*>(request->GetData()), request->GetSize()) << "\"";
|
||||
string* text = new string("Thank you for the \"" + string(static_cast<char*>(request->GetData()), request->GetSize()) + "\"!");
|
||||
|
||||
string* text = new string("Thank you for the \"" + string(static_cast<char*>(request->GetData()), request->GetSize()) + "\"!");
|
||||
delete request;
|
||||
|
||||
delete request;
|
||||
LOG(INFO) << "Sending reply to client.";
|
||||
|
||||
LOG(INFO) << "Sending reply to client.";
|
||||
FairMQMessage* reply = fTransportFactory->CreateMessage(const_cast<char*>(text->c_str()), text->length(), CustomCleanup, text);
|
||||
|
||||
FairMQMessage* reply = fTransportFactory->CreateMessage(const_cast<char*>(text->c_str()), text->length(), CustomCleanup, text);
|
||||
|
||||
fChannels.at("data").at(0).Send(reply);
|
||||
fChannels.at("data").at(0).Send(reply);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
FairMQExampleServer::~FairMQExampleServer()
|
||||
FairMQExample5Server::~FairMQExample5Server()
|
||||
{
|
||||
}
|
|
@ -6,22 +6,22 @@
|
|||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
/**
|
||||
* FairMQExampleServer.h
|
||||
* FairMQExample5Server.h
|
||||
*
|
||||
* @since 2014-10-10
|
||||
* @author A. Rybalchenko
|
||||
*/
|
||||
|
||||
#ifndef FAIRMQEXAMPLESERVER_H_
|
||||
#define FAIRMQEXAMPLESERVER_H_
|
||||
#ifndef FAIRMQEXAMPLE5SERVER_H_
|
||||
#define FAIRMQEXAMPLE5SERVER_H_
|
||||
|
||||
#include "FairMQDevice.h"
|
||||
|
||||
class FairMQExampleServer : public FairMQDevice
|
||||
class FairMQExample5Server : public FairMQDevice
|
||||
{
|
||||
public:
|
||||
FairMQExampleServer();
|
||||
virtual ~FairMQExampleServer();
|
||||
FairMQExample5Server();
|
||||
virtual ~FairMQExample5Server();
|
||||
|
||||
static void CustomCleanup(void *data, void* hint);
|
||||
|
||||
|
@ -29,4 +29,4 @@ class FairMQExampleServer : public FairMQDevice
|
|||
virtual void Run();
|
||||
};
|
||||
|
||||
#endif /* FAIRMQEXAMPLESERVER_H_ */
|
||||
#endif /* FAIRMQEXAMPLE5SERVER_H_ */
|
4
fairmq/examples/5-req-rep/README.md
Normal file
4
fairmq/examples/5-req-rep/README.md
Normal file
|
@ -0,0 +1,4 @@
|
|||
Example 5: Request & Reply
|
||||
===============
|
||||
|
||||
This topology contains two devices that communicate with each other via the **REQUEST-REPLY** pettern. Bidirectional communication via a single socket.
|
|
@ -17,7 +17,7 @@
|
|||
#include "boost/program_options.hpp"
|
||||
|
||||
#include "FairMQLogger.h"
|
||||
#include "FairMQExampleClient.h"
|
||||
#include "FairMQExample5Client.h"
|
||||
|
||||
#ifdef NANOMSG
|
||||
#include "FairMQTransportFactoryNN.h"
|
||||
|
@ -65,7 +65,7 @@ inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options)
|
|||
|
||||
int main(int argc, char** argv)
|
||||
{
|
||||
FairMQExampleClient client;
|
||||
FairMQExample5Client client;
|
||||
client.CatchSignals();
|
||||
|
||||
DeviceOptions_t options;
|
||||
|
@ -90,8 +90,8 @@ int main(int argc, char** argv)
|
|||
|
||||
client.SetTransport(transportFactory);
|
||||
|
||||
client.SetProperty(FairMQExampleClient::Id, "client");
|
||||
client.SetProperty(FairMQExampleClient::NumIoThreads, 1);
|
||||
client.SetProperty(FairMQExample5Client::Id, "client");
|
||||
client.SetProperty(FairMQExample5Client::NumIoThreads, 1);
|
||||
|
||||
FairMQChannel requestChannel("req", "connect", "tcp://localhost:5005");
|
||||
requestChannel.UpdateSndBufSize(10000);
|
|
@ -15,7 +15,7 @@
|
|||
#include <iostream>
|
||||
|
||||
#include "FairMQLogger.h"
|
||||
#include "FairMQExampleServer.h"
|
||||
#include "FairMQExample5Server.h"
|
||||
|
||||
#ifdef NANOMSG
|
||||
#include "FairMQTransportFactoryNN.h"
|
||||
|
@ -27,7 +27,7 @@ using namespace std;
|
|||
|
||||
int main(int argc, char** argv)
|
||||
{
|
||||
FairMQExampleServer server;
|
||||
FairMQExample5Server server;
|
||||
server.CatchSignals();
|
||||
|
||||
LOG(INFO) << "PID: " << getpid();
|
||||
|
@ -40,8 +40,8 @@ int main(int argc, char** argv)
|
|||
|
||||
server.SetTransport(transportFactory);
|
||||
|
||||
server.SetProperty(FairMQExampleServer::Id, "server");
|
||||
server.SetProperty(FairMQExampleServer::NumIoThreads, 1);
|
||||
server.SetProperty(FairMQExample5Server::Id, "server");
|
||||
server.SetProperty(FairMQExample5Server::NumIoThreads, 1);
|
||||
|
||||
FairMQChannel replyChannel("rep", "bind", "tcp://*:5005");
|
||||
replyChannel.UpdateSndBufSize(10000);
|
24
fairmq/examples/README.md
Normal file
24
fairmq/examples/README.md
Normal file
|
@ -0,0 +1,24 @@
|
|||
FairMQ Examples
|
||||
===============
|
||||
|
||||
Set of simple FairMQ examples.
|
||||
|
||||
Example 1: Sampler -> Sink
|
||||
--------------------------
|
||||
A simple topology of two devices - **Sampler** and **Sink**. **Sampler** sends data to **Sink** with the **PUSH-PULL** pattern.
|
||||
|
||||
Example 2: Sampler -> Processor -> Sink
|
||||
---------------------------------------
|
||||
A simple topology of three devices - **Sampler**, **Processor** and **Sink**. **Sampler** sends data to one or more **Processor**s, who modify the data and send it to one **Sink**. Transport with the **PUSH-PULL** pattern.
|
||||
|
||||
Example 3: DDS
|
||||
--------------
|
||||
This example demonstrates usage of the Dynamic Deployment System ([DDS](http://dds.gsi.de/)) to dynamically deploy and configure a topology of devices. The topology is similar to those of Example 2, but now it can be easily distributed on different computing nodes without the need for manual reconfiguration of the devices.
|
||||
|
||||
Example 4: Copy & Push
|
||||
----------------------
|
||||
A topology consisting of one **Sampler** and two **Sink**s. The **Sampler** uses the `Copy` method to send the same data to both sinks with the **PUSH_PULL** pattern. In countrary to the **PUB-PATTERN** pattern, this insures that all receivers are connected and no data is lost, but requires additional sockets.
|
||||
|
||||
Example 5: Request & Reply
|
||||
--------------------------
|
||||
This topology contains two devices that communicate with each other via the **REQUEST-REPLY** pettern. Bidirectional communication via a single socket.
|
|
@ -1,3 +0,0 @@
|
|||
# FairMQ Request-Reply Example
|
||||
|
||||
This example demonstrates usage of the request-reply pattern together with FairMQ. Two processes - example_client and example_server communicate. The client sends a text string and the server respondes by returning the string back to the client. The communication happens over a **single** REP-REP socket. Works both with ZeroMQ and with nanomsg transport.
|
|
@ -42,7 +42,7 @@ namespace FairMQParser
|
|||
FairMQMap ptreeToMQMap(const boost::property_tree::ptree& pt, const std::string& deviceId, const std::string& rootNode, const std::string& formatFlag)
|
||||
{
|
||||
// Create fair mq map
|
||||
FairMQMap MQChannelMap;
|
||||
FairMQMap channelMap;
|
||||
|
||||
// variables to create key for the mq map. Note: maybe device name and id useless here
|
||||
std::string deviceIdKey;
|
||||
|
@ -69,7 +69,7 @@ FairMQMap ptreeToMQMap(const boost::property_tree::ptree& pt, const std::string&
|
|||
LOG(DEBUG) << "Found device id '"<< deviceIdKey << "' in JSON input";
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Extract value from boost::property_tree
|
||||
// For each device in fairMQOptions
|
||||
for(const auto& p : pt.get_child(rootNode))
|
||||
|
@ -110,7 +110,7 @@ FairMQMap ptreeToMQMap(const boost::property_tree::ptree& pt, const std::string&
|
|||
continue;
|
||||
}
|
||||
|
||||
//get name attribute to form key
|
||||
// get name attribute to form key
|
||||
if (formatFlag == "xml")
|
||||
{
|
||||
channelKey = q.second.get<std::string>("<xmlattr>.name");
|
||||
|
@ -165,11 +165,11 @@ FairMQMap ptreeToMQMap(const boost::property_tree::ptree& pt, const std::string&
|
|||
}// end socket loop
|
||||
|
||||
//fill mq map option
|
||||
MQChannelMap.insert(std::make_pair(channelKey,std::move(channelList)));
|
||||
channelMap.insert(std::make_pair(channelKey,std::move(channelList)));
|
||||
}
|
||||
}
|
||||
|
||||
if (MQChannelMap.size() > 0)
|
||||
if (channelMap.size() > 0)
|
||||
{
|
||||
LOG(DEBUG) << "---- Channel-keys found are :";
|
||||
for (const auto& p : MQChannelMap)
|
||||
|
@ -182,7 +182,7 @@ FairMQMap ptreeToMQMap(const boost::property_tree::ptree& pt, const std::string&
|
|||
LOG(WARN) << "---- No channel-keys found for device-id " << deviceId;
|
||||
LOG(WARN) << "---- Check the "<< formatFlag << " inputs and/or command line inputs";
|
||||
}
|
||||
return MQChannelMap;
|
||||
return channelMap;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -200,7 +200,6 @@ FairMQMap JSON::UserParser(std::stringstream& input, const std::string& deviceId
|
|||
return ptreeToMQMap(pt, deviceId, rootNode,"json");
|
||||
}
|
||||
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
FairMQMap XML::UserParser(const std::string& filename, const std::string& deviceId, const std::string& rootNode)
|
||||
{
|
||||
|
@ -216,6 +215,4 @@ FairMQMap XML::UserParser(std::stringstream& input, const std::string& deviceId,
|
|||
return ptreeToMQMap(pt,deviceId,rootNode,"xml");
|
||||
}
|
||||
|
||||
|
||||
|
||||
} // end FairMQParser namespace
|
|
@ -6,7 +6,7 @@
|
|||
*/
|
||||
|
||||
#ifndef FAIRMQPARSER_H
|
||||
#define FAIRMQPARSER_H
|
||||
#define FAIRMQPARSER_H
|
||||
|
||||
// std
|
||||
#include <string>
|
||||
|
@ -33,7 +33,7 @@ struct JSON
|
|||
FairMQMap UserParser(std::stringstream& input, const std::string& deviceId, const std::string& rootNode = "fairMQOptions");
|
||||
};
|
||||
|
||||
struct XML
|
||||
struct XML
|
||||
{
|
||||
FairMQMap UserParser(const std::string& filename, const std::string& deviceId, const std::string& root_node="fairMQOptions");
|
||||
FairMQMap UserParser(std::stringstream& input, const std::string& deviceId, const std::string& rootNode="fairMQOptions");
|
||||
|
|
|
@ -59,84 +59,65 @@ int getHostIPs(map<string, string>& addressMap)
|
|||
return 0;
|
||||
}
|
||||
|
||||
|
||||
|
||||
// below are SFINAE template functions to check for function member signatures of class
|
||||
namespace details
|
||||
namespace details
|
||||
{
|
||||
///////////////////////////////////////////////////////////////////////////
|
||||
// test, at compile time, whether T has BindSendPart member function with returned type R and argument ...Args type
|
||||
template<class T, class Sig, class=void>
|
||||
struct has_BindSendPart:std::false_type{};
|
||||
// test, at compile time, whether T has BindSendPart member function with returned type R and argument ...Args type
|
||||
template<class T, class Sig, class=void>
|
||||
struct has_BindSendPart : false_type {};
|
||||
|
||||
template<class T, class R, class... Args>
|
||||
struct has_BindSendPart
|
||||
<T, R(Args...),
|
||||
typename std::enable_if
|
||||
<
|
||||
std::is_convertible< decltype(std::declval<T>().BindSendPart(std::declval<Args>()...)), R >::value
|
||||
|| std::is_same<R, void>::value
|
||||
>::type
|
||||
>:std::true_type{};
|
||||
template<class T, class R, class... Args>
|
||||
struct has_BindSendPart
|
||||
<T, R(Args...), typename enable_if<
|
||||
is_convertible<decltype(declval<T>().BindSendPart(declval<Args>()...)), R>::value || is_same<R, void>::value>::type
|
||||
>:true_type {};
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////
|
||||
// test, at compile time, whether T has BindGetSocketNumber member function with returned type R and argument ...Args type
|
||||
template<class T, class Sig, class=void>
|
||||
struct has_BindGetSocketNumber:std::false_type{};
|
||||
// test, at compile time, whether T has BindGetSocketNumber member function with returned type R and argument ...Args type
|
||||
template<class T, class Sig, class=void>
|
||||
struct has_BindGetSocketNumber : false_type {};
|
||||
|
||||
template<class T, class R, class... Args>
|
||||
struct has_BindGetSocketNumber
|
||||
<T, R(Args...),
|
||||
typename std::enable_if
|
||||
<
|
||||
std::is_convertible< decltype(std::declval<T>().BindGetSocketNumber(std::declval<Args>()...)), R >::value
|
||||
|| std::is_same<R, void>::value
|
||||
>::type
|
||||
>:std::true_type{};
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////
|
||||
// test, at compile time, whether T has BindGetCurrentIndex member function with returned type R and argument ...Args type
|
||||
template<class T, class Sig, class=void>
|
||||
struct has_BindGetCurrentIndex:std::false_type{};
|
||||
template<class T, class R, class... Args>
|
||||
struct has_BindGetSocketNumber
|
||||
<T, R(Args...), typename enable_if<
|
||||
is_convertible<decltype(declval<T>().BindGetSocketNumber(declval<Args>()...)), R>::value || is_same<R, void>::value>::type
|
||||
>:true_type {};
|
||||
|
||||
template<class T, class R, class... Args>
|
||||
struct has_BindGetCurrentIndex
|
||||
<T, R(Args...),
|
||||
typename std::enable_if
|
||||
<
|
||||
std::is_convertible< decltype(std::declval<T>().BindGetCurrentIndex(std::declval<Args>()...)), R >::value
|
||||
|| std::is_same<R, void>::value
|
||||
>::type
|
||||
>:std::true_type{};
|
||||
// test, at compile time, whether T has BindGetCurrentIndex member function with returned type R and argument ...Args type
|
||||
template<class T, class Sig, class=void>
|
||||
struct has_BindGetCurrentIndex : false_type {};
|
||||
|
||||
}// end namespace details
|
||||
template<class T, class R, class... Args>
|
||||
struct has_BindGetCurrentIndex
|
||||
<T, R(Args...), typename enable_if<
|
||||
is_convertible<decltype(declval<T>().BindGetCurrentIndex(declval<Args>()...)), R>::value || is_same<R, void>::value>::type
|
||||
>:true_type {};
|
||||
} // end namespace details
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////
|
||||
// Alias template of the above structs
|
||||
template<class T, class Sig>
|
||||
using has_BindSendPart = std::integral_constant<bool, details::has_BindSendPart<T, Sig>::value>;
|
||||
using has_BindSendPart = integral_constant<bool, details::has_BindSendPart<T, Sig>::value>;
|
||||
|
||||
template<class T, class Sig>
|
||||
using has_BindGetSocketNumber = std::integral_constant<bool, details::has_BindGetSocketNumber<T, Sig>::value>;
|
||||
using has_BindGetSocketNumber = integral_constant<bool, details::has_BindGetSocketNumber<T, Sig>::value>;
|
||||
|
||||
template<class T, class Sig>
|
||||
using has_BindGetCurrentIndex = std::integral_constant<bool, details::has_BindGetCurrentIndex<T, Sig>::value>;
|
||||
using has_BindGetCurrentIndex = integral_constant<bool, details::has_BindGetCurrentIndex<T, Sig>::value>;
|
||||
|
||||
// enable_if Alias template
|
||||
template<typename T>
|
||||
using enable_if_has_BindSendPart = typename std::enable_if<has_BindSendPart<T,void(int)>::value,int>::type;
|
||||
using enable_if_has_BindSendPart = typename enable_if<has_BindSendPart<T, void(int)>::value, int>::type;
|
||||
template<typename T>
|
||||
using enable_if_hasNot_BindSendPart = typename std::enable_if<!has_BindSendPart<T,void(int)>::value,int>::type;
|
||||
using enable_if_hasNot_BindSendPart = typename enable_if<!has_BindSendPart<T, void(int)>::value, int>::type;
|
||||
|
||||
template<typename T>
|
||||
using enable_if_has_BindGetSocketNumber = typename std::enable_if<has_BindGetSocketNumber<T,int()>::value,int>::type;
|
||||
using enable_if_has_BindGetSocketNumber = typename enable_if<has_BindGetSocketNumber<T, int()>::value, int>::type;
|
||||
template<typename T>
|
||||
using enable_if_hasNot_BindGetSocketNumber = typename std::enable_if<!has_BindGetSocketNumber<T,int()>::value,int>::type;
|
||||
using enable_if_hasNot_BindGetSocketNumber = typename enable_if<!has_BindGetSocketNumber<T, int()>::value, int>::type;
|
||||
|
||||
template<typename T>
|
||||
using enable_if_has_BindGetCurrentIndex = typename std::enable_if<has_BindGetCurrentIndex<T,int()>::value,int>::type;
|
||||
using enable_if_has_BindGetCurrentIndex = typename enable_if<has_BindGetCurrentIndex<T, int()>::value, int>::type;
|
||||
template<typename T>
|
||||
using enable_if_hasNot_BindGetCurrentIndex = typename std::enable_if<!has_BindGetCurrentIndex<T,int()>::value,int>::type;
|
||||
using enable_if_hasNot_BindGetCurrentIndex = typename enable_if<!has_BindGetCurrentIndex<T, int()>::value, int>::type;
|
||||
|
||||
} // namespace tools
|
||||
} // namespace FairMQ
|
||||
|
|
Loading…
Reference in New Issue
Block a user