Compare commits

...

17 Commits

Author SHA1 Message Date
Alexey Rybalchenko
ee24144d61 Remove previously deprecated Copy method (since 1yr) 2018-11-28 21:16:29 +01:00
Alexey Rybalchenko
a9619a06d0 Add test for FairMQMessage::Rebuild 2018-11-28 21:16:29 +01:00
Alexey Rybalchenko
c605cbc3f6 Fix bug in shmem CloseMessage 2018-11-28 21:16:29 +01:00
Dennis Klein
ffd31aa09a Add coverity badge 2018-11-28 21:08:01 +01:00
Dennis Klein
a3fdfcad9e Adapt nightly build pipeline to new CI environments 2018-11-28 17:03:29 +01:00
Alexey Rybalchenko
44c0fe5abf Set EXTRA_FLAGS in CI script 2018-11-28 16:55:34 +01:00
Alexey Rybalchenko
66d6d0e07b Fix warning 2018-11-28 16:55:34 +01:00
Alexey Rybalchenko
ffbe90b638 Update to new DDS 2.2 API
- require DDS 2.2
 - fix regressions in automatic port binding
 - fix regression in channel API
 - update DDS example readme
2018-11-28 16:55:34 +01:00
Dennis Klein
dc1d7a23c1 Adapt CI script to new environments 2018-11-28 16:28:26 +01:00
Alexey Rybalchenko
33f5590626 Fix -Wsign-compare warning 2018-11-28 11:49:17 +01:00
mkrzewic
489bea5a51 Add simple test for factory pointer setting at receive 2018-11-27 14:36:35 +01:00
mkrzewic
cc0c525e0d Set pointer to factory also when receiving multi-part 2018-11-27 14:36:35 +01:00
Alexey Rybalchenko
25fcf13985 Move Bind/Connect/Attach to FairMQChannel 2018-11-13 11:08:48 +01:00
Alexey Rybalchenko
3ca0d7236a Add safety checks for process tools 2018-11-06 11:14:01 +01:00
Teo Mrnjavac
227a302903 Avoid boost::uuids::entropy_error on some systems 2018-11-05 13:18:08 +01:00
Alexey Rybalchenko
bd899a2806 Add test for channel validation 2018-11-01 15:43:40 +01:00
Alexey Rybalchenko
0b199e779a Add test for interface IP detection tools 2018-11-01 15:43:40 +01:00
53 changed files with 974 additions and 977 deletions

View File

@@ -67,7 +67,7 @@ if(BUILD_OFI_TRANSPORT)
endif()
if(BUILD_DDS_PLUGIN)
find_package2(PRIVATE DDS VERSION 2.0 REQUIRED)
find_package2(PRIVATE DDS VERSION 2.2 REQUIRED)
endif()
if(BUILD_TESTING)

View File

@@ -4,6 +4,7 @@ Eulisse, Giulio
Karabowicz, Radoslaw
Kretz, Matthias <kretz@kde.org>
Krzewicki, Mikolaj
Mrnjavac, Teo <teo.m@cern.ch>
Neskovic, Gvozden
Richter, Matthias
Uhlig, Florian

22
Jenkinsfile vendored
View File

@@ -15,12 +15,26 @@ def jobMatrix(String prefix, List specs, Closure callback) {
deleteDir()
checkout scm
sh """\
echo "export SIMPATH=\${SIMPATH_PREFIX}${spec.fairsoft}" >> Dart.cfg
echo "export FAIRSOFT_VERSION=${spec.fairsoft}" >> Dart.cfg
"""
if ((spec.os == 'Debian8') && (spec.compiler == 'gcc8.1')) {
sh '''\
echo "source /etc/profile.d/modules.sh" >> Dart.cfg
echo "module use /cvmfs/it.gsi.de/modulefiles" >> Dart.cfg
echo "module load compiler/gcc/8" >> Dart.cfg
'''
}
sh '''\
echo "export BUILDDIR=$PWD/build" >> Dart.cfg
echo "export SOURCEDIR=$PWD" >> Dart.cfg
echo "export PATH=$SIMPATH/bin:$PATH" >> Dart.cfg
echo "export PATH=\\\$SIMPATH/bin:\\\$PATH" >> Dart.cfg
echo "export GIT_BRANCH=$JOB_BASE_NAME" >> Dart.cfg
echo "export EXTRA_FLAGS='-DCMAKE_CXX_COMPILER=g++'" >> Dart.cfg
echo "echo \\\$PATH" >> Dart.cfg
'''
sh 'cat Dart.cfg'
callback.call(spec, label)
@@ -44,14 +58,14 @@ pipeline{
steps{
script {
def build_jobs = jobMatrix('alfa-ci/build', [
[os: 'Debian8', arch: 'x86_64', compiler: 'gcc4.9', fairsoft: 'may18'],
[os: 'MacOS10.13', arch: 'x86_64', compiler: 'AppleLLVM9.0.0', fairsoft: 'may18'],
[os: 'Debian8', arch: 'x86_64', compiler: 'gcc8.1', fairsoft: 'fairmq_dev'],
//[os: 'MacOS10.13', arch: 'x86_64', compiler: 'AppleLLVM10.0.0', fairsoft: 'may18'],
]) { spec, label ->
sh './Dart.sh alfa_ci Dart.cfg'
}
def profile_jobs = jobMatrix('alfa-ci/codecov', [
[os: 'Debian8', arch: 'x86_64', compiler: 'gcc4.9', fairsoft: 'may18'],
[os: 'Debian8', arch: 'x86_64', compiler: 'gcc8.1', fairsoft: 'fairmq_dev'],
]) { spec, label ->
withCredentials([string(credentialsId: 'fairmq_codecov_token', variable: 'CODECOV_TOKEN')]) {
sh './Dart.sh codecov Dart.cfg'

View File

@@ -14,6 +14,27 @@ def buildMatrix(List specs, Closure callback) {
deleteDir()
checkout scm
sh """\
echo "export SIMPATH=\${SIMPATH_PREFIX}${spec.fairsoft}" >> Dart.cfg
echo "export FAIRSOFT_VERSION=${spec.fairsoft}" >> Dart.cfg
"""
if ((spec.os == 'Debian8') && (spec.compiler == 'gcc8.1')) {
sh '''\
echo "source /etc/profile.d/modules.sh" >> Dart.cfg
echo "module use /cvmfs/it.gsi.de/modulefiles" >> Dart.cfg
echo "module load compiler/gcc/8" >> Dart.cfg
'''
}
sh '''\
echo "export BUILDDIR=$PWD/build" >> Dart.cfg
echo "export SOURCEDIR=$PWD" >> Dart.cfg
echo "export PATH=\\\$SIMPATH/bin:\\\$PATH" >> Dart.cfg
echo "export GIT_BRANCH=dev" >> Dart.cfg
echo "export EXTRA_FLAGS='-DCMAKE_CXX_COMPILER=g++'" >> Dart.cfg
echo "echo \\\$PATH" >> Dart.cfg
'''
sh 'cat Dart.cfg'
callback.call(spec, label)
deleteDir()
@@ -35,16 +56,9 @@ pipeline{
steps{
script {
parallel(buildMatrix([
[os: 'Debian8', arch: 'x86_64', compiler: 'gcc4.9', fairsoft: 'may18'],
[os: 'MacOS10.11', arch: 'x86_64', compiler: 'AppleLLVM8.0.0', fairsoft: 'may18'],
[os: 'MacOS10.13', arch: 'x86_64', compiler: 'AppleLLVM9.0.0', fairsoft: 'may18'],
[os: 'Debian8', arch: 'x86_64', compiler: 'gcc8.1', fairsoft: 'fairmq_dev'],
//[os: 'MacOS10.13', arch: 'x86_64', compiler: 'AppleLLVM10.0.0', fairsoft: 'may18'],
]) { spec, label ->
sh '''\
echo "export BUILDDIR=$PWD/build" >> Dart.cfg
echo "export SOURCEDIR=$PWD" >> Dart.cfg
echo "export PATH=$SIMPATH/bin:$PATH" >> Dart.cfg
echo "export GIT_BRANCH=dev" >> Dart.cfg
'''
sh './Dart.sh Nightly Dart.cfg'
sh './Dart.sh Profile Dart.cfg'
})

View File

@@ -1,5 +1,5 @@
<!-- {#mainpage} -->
# FairMQ [![license](https://alfa-ci.gsi.de/shields/badge/license-LGPL--3.0-orange.svg)](COPYRIGHT) [![build status](https://alfa-ci.gsi.de/buildStatus/icon?job=FairRootGroup/FairMQ/master)](https://alfa-ci.gsi.de/blue/organizations/jenkins/FairRootGroup%2FFairMQ/branches) [![test coverage master branch](https://codecov.io/gh/FairRootGroup/FairMQ/branch/master/graph/badge.svg)](https://codecov.io/gh/FairRootGroup/FairMQ/branch/master) [![Codacy Badge](https://api.codacy.com/project/badge/Grade/6b648d95d68d4c4eae833b84f84d299c)](https://www.codacy.com/app/dennisklein/FairMQ?utm_source=github.com&amp;utm_medium=referral&amp;utm_content=FairRootGroup/FairMQ&amp;utm_campaign=Badge_Grade)
# FairMQ [![license](https://alfa-ci.gsi.de/shields/badge/license-LGPL--3.0-orange.svg)](COPYRIGHT) [![build status](https://alfa-ci.gsi.de/buildStatus/icon?job=FairRootGroup/FairMQ/master)](https://alfa-ci.gsi.de/blue/organizations/jenkins/FairRootGroup%2FFairMQ/branches) [![test coverage master branch](https://codecov.io/gh/FairRootGroup/FairMQ/branch/master/graph/badge.svg)](https://codecov.io/gh/FairRootGroup/FairMQ/branch/master) [![Coverity Badge](https://alfa-ci.gsi.de/shields/coverity/scan/fairrootgroup-fairmq.svg)](https://scan.coverity.com/projects/fairrootgroup-fairmq) [![Codacy Badge](https://api.codacy.com/project/badge/Grade/6b648d95d68d4c4eae833b84f84d299c)](https://www.codacy.com/app/dennisklein/FairMQ?utm_source=github.com&amp;utm_medium=referral&amp;utm_content=FairRootGroup/FairMQ&amp;utm_campaign=Badge_Grade)
C++ Message Queuing Library and Framework

View File

@@ -47,7 +47,7 @@ The configuration of the channel connection addresses is done by the DDS plugin
Note that the attributes `value` contain a different value.
##### 4. Start DDS server.
##### 4. Start DDS session.
First you need to initialize DDS environment:
@@ -55,10 +55,10 @@ First you need to initialize DDS environment:
source DDS_env.sh # this script is located in the DDS installation directory
```
The DDS server is started with:
The DDS session is started with:
```bash
dds-server start -s
dds-session start
```
##### 5. Submit DDS Agents (configured in the hosts file).
@@ -91,7 +91,7 @@ A simple utility (fairmq-dds-command-ui) is included with FairMQ to send command
To see it in action, start the fairmq-dds-command-ui while the topology is running. Run the utility with `-h` to see everything that it can do.
The utility requires a session parameter to connect to appropriate DDS session. The session value is given when starting dds-server.
The utility requires a session parameter to connect to appropriate DDS session. The session value is given when starting dds-session.
By default the command UI sends commands to all tasks. This can be further refined by giving a specific topology path via `-p` argument.
Given our topology file, here are some examples of valid paths:
@@ -108,15 +108,15 @@ Given our topology file, here are some examples of valid paths:
./fairmq/plugins/DDS/fairmq-dds-command-ui -s 937ffbca-b524-44d8-9898-1d69aedc3751 -c c -p main/ProcessorGroup/Processor_9
```
##### 9. Stop DDS server/topology.
##### 9. Stop DDS session/topology.
The execution of tasks can be stopped with:
```bash
dds-topology --stop
```
Or by stopping the DDS server:
Or by stopping the DDS session:
```bash
dds-server stop
dds-session stop
```
For general DDS documentation please refer to [DDS Website](http://dds.gsi.de/).

View File

@@ -18,90 +18,65 @@
#include <boost/algorithm/string.hpp> // join/split
#include <set>
#include <utility> // std::move
#include <random>
using namespace std;
mutex FairMQChannel::fChannelMutex;
FairMQChannel::FairMQChannel()
: fSocket(nullptr)
, fType("unspecified")
, fMethod("unspecified")
, fAddress("unspecified")
, fTransportType(fair::mq::Transport::DEFAULT)
, fSndBufSize(1000)
, fRcvBufSize(1000)
, fSndKernelSize(0)
, fRcvKernelSize(0)
, fLinger(500)
, fRateLogging(1)
, fName("")
, fIsValid(false)
, fTransportFactory(nullptr)
, fMultipart(false)
, fModified(true)
, fReset(false)
{
}
: FairMQChannel("", "unspecified", "unspecified", "unspecified", nullptr)
{}
FairMQChannel::FairMQChannel(const string& type, const string& method, const string& address)
: fSocket(nullptr)
: FairMQChannel("", type, method, address, nullptr)
{}
FairMQChannel::FairMQChannel(const string& name, const string& type, shared_ptr<FairMQTransportFactory> factory)
: FairMQChannel(name, type, "unspecified", "unspecified", factory)
{}
FairMQChannel::FairMQChannel(const string& name, const string& type, const string& method, const string& address, shared_ptr<FairMQTransportFactory> factory)
: fTransportFactory(factory)
, fTransportType(factory ? factory->GetType() : fair::mq::Transport::DEFAULT)
, fSocket(factory ? factory->CreateSocket(type, name) : nullptr)
, fType(type)
, fMethod(method)
, fAddress(address)
, fTransportType(fair::mq::Transport::DEFAULT)
, fSndBufSize(1000)
, fRcvBufSize(1000)
, fSndKernelSize(0)
, fRcvKernelSize(0)
, fLinger(500)
, fRateLogging(1)
, fName("")
, fIsValid(false)
, fTransportFactory(nullptr)
, fMultipart(false)
, fModified(true)
, fReset(false)
{
}
FairMQChannel::FairMQChannel(const string& name, const string& type, shared_ptr<FairMQTransportFactory> factory)
: fSocket(factory->CreateSocket(type, name))
, fType(type)
, fMethod("unspecified")
, fAddress("unspecified")
, fTransportType(factory->GetType())
, fSndBufSize(1000)
, fRcvBufSize(1000)
, fSndKernelSize(0)
, fRcvKernelSize(0)
, fLinger(500)
, fRateLogging(1)
, fPortRangeMin(22000)
, fPortRangeMax(23000)
, fAutoBind(true)
, fName(name)
, fIsValid(false)
, fTransportFactory(factory)
, fMultipart(false)
, fModified(true)
, fReset(false)
{
}
{}
FairMQChannel::FairMQChannel(const FairMQChannel& chan)
: fSocket(nullptr)
: fTransportFactory(nullptr)
, fTransportType(chan.fTransportType)
, fSocket(nullptr)
, fType(chan.fType)
, fMethod(chan.fMethod)
, fAddress(chan.fAddress)
, fTransportType(chan.fTransportType)
, fSndBufSize(chan.fSndBufSize)
, fRcvBufSize(chan.fRcvBufSize)
, fSndKernelSize(chan.fSndKernelSize)
, fRcvKernelSize(chan.fRcvKernelSize)
, fLinger(chan.fLinger)
, fRateLogging(chan.fRateLogging)
, fPortRangeMin(chan.fPortRangeMin)
, fPortRangeMax(chan.fPortRangeMax)
, fAutoBind(chan.fAutoBind)
, fName(chan.fName)
, fIsValid(false)
, fTransportFactory(nullptr)
, fMultipart(chan.fMultipart)
, fModified(chan.fModified)
, fReset(false)
@@ -109,20 +84,23 @@ FairMQChannel::FairMQChannel(const FairMQChannel& chan)
FairMQChannel& FairMQChannel::operator=(const FairMQChannel& chan)
{
fTransportFactory = nullptr;
fTransportType = chan.fTransportType;
fSocket = nullptr;
fType = chan.fType;
fMethod = chan.fMethod;
fAddress = chan.fAddress;
fTransportType = chan.fTransportType;
fSndBufSize = chan.fSndBufSize;
fRcvBufSize = chan.fRcvBufSize;
fSndKernelSize = chan.fSndKernelSize;
fRcvKernelSize = chan.fRcvKernelSize;
fLinger = chan.fLinger;
fRateLogging = chan.fRateLogging;
fPortRangeMin = chan.fPortRangeMin;
fPortRangeMax = chan.fPortRangeMax;
fAutoBind = chan.fAutoBind;
fName = chan.fName;
fIsValid = false;
fTransportFactory = nullptr;
fMultipart = chan.fMultipart;
fModified = chan.fModified;
fReset = false;
@@ -136,20 +114,23 @@ FairMQSocket & FairMQChannel::GetSocket() const
return *fSocket;
}
string FairMQChannel::GetChannelName() const
string FairMQChannel::GetName() const
{
lock_guard<mutex> lock(fChannelMutex);
return fName;
}
string FairMQChannel::GetChannelPrefix() const
string FairMQChannel::GetPrefix() const
{
lock_guard<mutex> lock(fChannelMutex);
string prefix = fName;
prefix = prefix.erase(fName.rfind('['));
return prefix;
}
string FairMQChannel::GetChannelIndex() const
string FairMQChannel::GetIndex() const
{
lock_guard<mutex> lock(fChannelMutex);
string indexStr = fName;
indexStr.erase(indexStr.rfind(']'));
indexStr.erase(0, indexStr.rfind('[') + 1);
@@ -246,6 +227,33 @@ try {
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
}
int FairMQChannel::GetPortRangeMin() const
try {
lock_guard<mutex> lock(fChannelMutex);
return fPortRangeMin;
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::GetPortRangeMin: " << e.what();
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
}
int FairMQChannel::GetPortRangeMax() const
try {
lock_guard<mutex> lock(fChannelMutex);
return fPortRangeMax;
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::GetPortRangeMax: " << e.what();
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
}
bool FairMQChannel::GetAutoBind() const
try {
lock_guard<mutex> lock(fChannelMutex);
return fAutoBind;
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::GetAutoBind: " << e.what();
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
}
void FairMQChannel::UpdateType(const string& type)
try {
lock_guard<mutex> lock(fChannelMutex);
@@ -356,6 +364,39 @@ try {
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
}
void FairMQChannel::UpdatePortRangeMin(const int minPort)
try {
lock_guard<mutex> lock(fChannelMutex);
fIsValid = false;
fPortRangeMin = minPort;
fModified = true;
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::UpdatePortRangeMin: " << e.what();
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
}
void FairMQChannel::UpdatePortRangeMax(const int maxPort)
try {
lock_guard<mutex> lock(fChannelMutex);
fIsValid = false;
fPortRangeMax = maxPort;
fModified = true;
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::UpdatePortRangeMax: " << e.what();
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
}
void FairMQChannel::UpdateAutoBind(const bool autobind)
try {
lock_guard<mutex> lock(fChannelMutex);
fIsValid = false;
fAutoBind = autobind;
fModified = true;
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::UpdateAutoBind: " << e.what();
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
}
auto FairMQChannel::SetModified(const bool modified) -> void
try {
lock_guard<mutex> lock(fChannelMutex);
@@ -365,14 +406,14 @@ try {
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
}
void FairMQChannel::UpdateChannelName(const string& name)
void FairMQChannel::UpdateName(const string& name)
try {
lock_guard<mutex> lock(fChannelMutex);
fIsValid = false;
fName = name;
fModified = true;
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::UpdateChannelName: " << e.what();
LOG(error) << "Exception caught in FairMQChannel::UpdateName: " << e.what();
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
}
@@ -385,24 +426,21 @@ try {
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
}
bool FairMQChannel::ValidateChannel()
bool FairMQChannel::Validate()
try {
lock_guard<mutex> lock(fChannelMutex);
stringstream ss;
ss << "Validating channel '" << fName << "'... ";
if (fIsValid)
{
if (fIsValid) {
ss << "ALREADY VALID";
LOG(debug) << ss.str();
return true;
}
// validate socket type
const string socketTypeNames[] = { "sub", "pub", "pull", "push", "req", "rep", "xsub", "xpub", "dealer", "router", "pair" };
const set<string> socketTypes(socketTypeNames, socketTypeNames + sizeof(socketTypeNames) / sizeof(string));
if (socketTypes.find(fType) == socketTypes.end())
{
const set<string> socketTypes{ "sub", "pub", "pull", "push", "req", "rep", "xsub", "xpub", "dealer", "router", "pair" };
if (socketTypes.find(fType) == socketTypes.end()) {
ss << "INVALID";
LOG(debug) << ss.str();
LOG(error) << "Invalid channel type: '" << fType << "'";
@@ -410,31 +448,22 @@ try {
}
// validate socket address
if (fAddress == "unspecified" || fAddress == "")
{
if (fAddress == "unspecified" || fAddress == "") {
ss << "INVALID";
LOG(debug) << ss.str();
LOG(debug) << "invalid channel address: '" << fAddress << "'";
return false;
}
else
{
} else {
vector<string> endpoints;
boost::algorithm::split(endpoints, fAddress, boost::algorithm::is_any_of(";"));
for (const auto endpoint : endpoints)
{
for (const auto endpoint : endpoints) {
string address;
if (endpoint[0] == '@' || endpoint[0] == '+' || endpoint[0] == '>')
{
if (endpoint[0] == '@' || endpoint[0] == '+' || endpoint[0] == '>') {
address = endpoint.substr(1);
}
else
{
} else {
// we don't have a method modifier, check if the default method is set
const string socketMethodNames[] = { "bind", "connect" };
const set<string> socketMethods(socketMethodNames, socketMethodNames + sizeof(socketMethodNames) / sizeof(string));
if (socketMethods.find(fMethod) == socketMethods.end())
{
const set<string> socketMethods{ "bind", "connect" };
if (socketMethods.find(fMethod) == socketMethods.end()) {
ss << "INVALID";
LOG(debug) << ss.str();
LOG(error) << "Invalid endpoint connection method: '" << fMethod << "' for " << endpoint;
@@ -443,56 +472,43 @@ try {
address = endpoint;
}
// check if address is a tcp or ipc address
if (address.compare(0, 6, "tcp://") == 0)
{
if (address.compare(0, 6, "tcp://") == 0) {
// check if TCP address contains port delimiter
string addressString = address.substr(6);
if (addressString.find(':') == string::npos)
{
if (addressString.find(':') == string::npos) {
ss << "INVALID";
LOG(debug) << ss.str();
LOG(error) << "invalid channel address: '" << address << "' (missing port?)";
return false;
}
}
else if (address.compare(0, 6, "ipc://") == 0)
{
} else if (address.compare(0, 6, "ipc://") == 0) {
// check if IPC address is not empty
string addressString = address.substr(6);
if (addressString == "")
{
if (addressString == "") {
ss << "INVALID";
LOG(debug) << ss.str();
LOG(error) << "invalid channel address: '" << address << "' (empty IPC address?)";
return false;
}
}
else if (address.compare(0, 9, "inproc://") == 0)
{
} else if (address.compare(0, 9, "inproc://") == 0) {
// check if IPC address is not empty
string addressString = address.substr(9);
if (addressString == "")
{
if (addressString == "") {
ss << "INVALID";
LOG(debug) << ss.str();
LOG(error) << "invalid channel address: '" << address << "' (empty inproc address?)";
return false;
}
}
else if (address.compare(0, 8, "verbs://") == 0)
{
} else if (address.compare(0, 8, "verbs://") == 0) {
// check if IPC address is not empty
string addressString = address.substr(9);
if (addressString == "")
{
string addressString = address.substr(8);
if (addressString == "") {
ss << "INVALID";
LOG(debug) << ss.str();
LOG(error) << "invalid channel address: '" << address << "' (empty verbs address?)";
return false;
}
}
else
{
} else {
// if neither TCP or IPC is specified, return invalid
ss << "INVALID";
LOG(debug) << ss.str();
@@ -503,8 +519,7 @@ try {
}
// validate socket buffer size for sending
if (fSndBufSize < 0)
{
if (fSndBufSize < 0) {
ss << "INVALID";
LOG(debug) << ss.str();
LOG(error) << "invalid channel send buffer size (cannot be negative): '" << fSndBufSize << "'";
@@ -512,8 +527,7 @@ try {
}
// validate socket buffer size for receiving
if (fRcvBufSize < 0)
{
if (fRcvBufSize < 0) {
ss << "INVALID";
LOG(debug) << ss.str();
LOG(error) << "invalid channel receive buffer size (cannot be negative): '" << fRcvBufSize << "'";
@@ -521,8 +535,7 @@ try {
}
// validate socket kernel transmit size for sending
if (fSndKernelSize < 0)
{
if (fSndKernelSize < 0) {
ss << "INVALID";
LOG(debug) << ss.str();
LOG(error) << "invalid channel send kernel transmit size (cannot be negative): '" << fSndKernelSize << "'";
@@ -530,8 +543,7 @@ try {
}
// validate socket kernel transmit size for receiving
if (fRcvKernelSize < 0)
{
if (fRcvKernelSize < 0) {
ss << "INVALID";
LOG(debug) << ss.str();
LOG(error) << "invalid channel receive kernel transmit size (cannot be negative): '" << fRcvKernelSize << "'";
@@ -539,8 +551,7 @@ try {
}
// validate socket rate logging interval
if (fRateLogging < 0)
{
if (fRateLogging < 0) {
ss << "INVALID";
LOG(debug) << ss.str();
LOG(error) << "invalid socket rate logging interval (cannot be negative): '" << fRateLogging << "'";
@@ -556,10 +567,71 @@ try {
throw ChannelConfigurationError(fair::mq::tools::ToString(e.what()));
}
void FairMQChannel::InitTransport(shared_ptr<FairMQTransportFactory> factory)
void FairMQChannel::Init()
{
fTransportFactory = factory;
fTransportType = factory->GetType();
lock_guard<mutex> lock(fChannelMutex);
fSocket = fTransportFactory->CreateSocket(fType, fName);
// set linger duration (how long socket should wait for outstanding transfers before shutdown)
fSocket->SetLinger(fLinger);
// set high water marks
fSocket->SetSndBufSize(fSndBufSize);
fSocket->SetRcvBufSize(fRcvBufSize);
// set kernel transmit size (set it only if value is not the default value)
if (fSndKernelSize != 0) {
fSocket->SetSndKernelSize(fSndKernelSize);
}
if (fRcvKernelSize != 0) {
fSocket->SetRcvKernelSize(fRcvKernelSize);
}
}
bool FairMQChannel::ConnectEndpoint(const string& endpoint)
{
lock_guard<mutex> lock(fChannelMutex);
return fSocket->Connect(endpoint);
}
bool FairMQChannel::BindEndpoint(string& endpoint)
{
lock_guard<mutex> lock(fChannelMutex);
// try to bind to the configured port. If it fails, try random one (if AutoBind is on).
if (fSocket->Bind(endpoint)) {
return true;
} else {
if (fAutoBind) {
// number of attempts when choosing a random port
int numAttempts = 0;
int maxAttempts = 1000;
// initialize random generator
default_random_engine generator(chrono::system_clock::now().time_since_epoch().count());
uniform_int_distribution<int> randomPort(fPortRangeMin, fPortRangeMax);
do {
LOG(debug) << "Could not bind to configured (TCP) port (" << endpoint << "), trying random port in range " << fPortRangeMin << "-" << fPortRangeMax;
++numAttempts;
if (numAttempts > maxAttempts) {
LOG(error) << "could not bind to any (TCP) port in the given range after " << maxAttempts << " attempts";
return false;
}
size_t pos = endpoint.rfind(':');
endpoint = endpoint.substr(0, pos + 1) + fair::mq::tools::ToString(static_cast<int>(randomPort(generator)));
} while (!fSocket->Bind(endpoint));
return true;
} else {
return false;
}
}
}
void FairMQChannel::ResetChannel()
@@ -568,131 +640,3 @@ void FairMQChannel::ResetChannel()
fIsValid = false;
// TODO: implement channel resetting
}
int FairMQChannel::Send(unique_ptr<FairMQMessage>& msg, int sndTimeoutInMs)
{
CheckSendCompatibility(msg);
return fSocket->Send(msg, sndTimeoutInMs);
}
int FairMQChannel::Receive(unique_ptr<FairMQMessage>& msg, int rcvTimeoutInMs)
{
CheckReceiveCompatibility(msg);
return fSocket->Receive(msg, rcvTimeoutInMs);
}
int FairMQChannel::SendAsync(unique_ptr<FairMQMessage>& msg)
{
CheckSendCompatibility(msg);
return fSocket->Send(msg, 0);
}
int FairMQChannel::ReceiveAsync(unique_ptr<FairMQMessage>& msg)
{
CheckReceiveCompatibility(msg);
return fSocket->Receive(msg, 0);
}
int64_t FairMQChannel::Send(vector<unique_ptr<FairMQMessage>>& msgVec, int sndTimeoutInMs)
{
CheckSendCompatibility(msgVec);
return fSocket->Send(msgVec, sndTimeoutInMs);
}
int64_t FairMQChannel::Receive(vector<unique_ptr<FairMQMessage>>& msgVec, int rcvTimeoutInMs)
{
CheckReceiveCompatibility(msgVec);
return fSocket->Receive(msgVec, rcvTimeoutInMs);
}
int64_t FairMQChannel::SendAsync(vector<unique_ptr<FairMQMessage>>& msgVec)
{
CheckSendCompatibility(msgVec);
return fSocket->Send(msgVec, 0);
}
int64_t FairMQChannel::ReceiveAsync(vector<unique_ptr<FairMQMessage>>& msgVec)
{
CheckReceiveCompatibility(msgVec);
return fSocket->Receive(msgVec, 0);
}
FairMQChannel::~FairMQChannel()
{
}
unsigned long FairMQChannel::GetBytesTx() const
{
return fSocket->GetBytesTx();
}
unsigned long FairMQChannel::GetBytesRx() const
{
return fSocket->GetBytesRx();
}
unsigned long FairMQChannel::GetMessagesTx() const
{
return fSocket->GetMessagesTx();
}
unsigned long FairMQChannel::GetMessagesRx() const
{
return fSocket->GetMessagesRx();
}
void FairMQChannel::CheckSendCompatibility(FairMQMessagePtr& msg)
{
if (fTransportType != msg->GetType())
{
// LOG(debug) << "Channel type does not match message type. Creating wrapper";
FairMQMessagePtr msgWrapper(NewMessage(msg->GetData(),
msg->GetSize(),
[](void* /*data*/, void* _msg) { delete static_cast<FairMQMessage*>(_msg); },
msg.get()
));
msg.release();
msg = move(msgWrapper);
}
}
void FairMQChannel::CheckSendCompatibility(vector<FairMQMessagePtr>& msgVec)
{
for (auto& msg : msgVec)
{
if (fTransportType != msg->GetType())
{
// LOG(debug) << "Channel type does not match message type. Creating wrapper";
FairMQMessagePtr msgWrapper(NewMessage(msg->GetData(),
msg->GetSize(),
[](void* /*data*/, void* _msg) { delete static_cast<FairMQMessage*>(_msg); },
msg.get()
));
msg.release();
msg = move(msgWrapper);
}
}
}
void FairMQChannel::CheckReceiveCompatibility(FairMQMessagePtr& msg)
{
if (fTransportType != msg->GetType())
{
// LOG(debug) << "Channel type does not match message type. Creating wrapper";
FairMQMessagePtr newMsg(NewMessage());
msg = move(newMsg);
}
}
void FairMQChannel::CheckReceiveCompatibility(vector<FairMQMessagePtr>& msgVec)
{
for (auto& msg : msgVec)
{
if (fTransportType != msg->GetType())
{
// LOG(debug) << "Channel type does not match message type. Creating wrapper";
FairMQMessagePtr newMsg(NewMessage());
msg = move(newMsg);
}
}
}

View File

@@ -10,11 +10,12 @@
#define FAIRMQCHANNEL_H_
#include <string>
#include <memory> // unique_ptr
#include <memory> // unique_ptr, shared_ptr
#include <vector>
#include <atomic>
#include <mutex>
#include <stdexcept>
#include <utility> // std::move
#include <FairMQTransportFactory.h>
#include <FairMQSocket.h>
@@ -43,6 +44,14 @@ class FairMQChannel
/// @param factory TransportFactory
FairMQChannel(const std::string& name, const std::string& type, std::shared_ptr<FairMQTransportFactory> factory);
/// Constructor
/// @param name Channel name
/// @param type Socket type (push/pull/pub/sub/spub/xsub/pair/req/rep/dealer/router/)
/// @param method Socket method (bind/connect)
/// @param address Network address to bind/connect to (e.g. "tcp://127.0.0.1:5555" or "ipc://abc")
/// @param factory TransportFactory
FairMQChannel(const std::string& name, const std::string& type, const std::string& method, const std::string& address, std::shared_ptr<FairMQTransportFactory> factory);
/// Copy Constructor
FairMQChannel(const FairMQChannel&);
@@ -50,20 +59,20 @@ class FairMQChannel
FairMQChannel& operator=(const FairMQChannel&);
/// Default destructor
virtual ~FairMQChannel();
virtual ~FairMQChannel() {}
struct ChannelConfigurationError : std::runtime_error { using std::runtime_error::runtime_error; };
FairMQSocket& GetSocket() const;
auto Bind(const std::string& address) -> bool
bool Bind(const std::string& address)
{
fMethod = "bind";
fAddress = address;
return fSocket->Bind(address);
}
auto Connect(const std::string& address) -> void
bool Connect(const std::string& address)
{
fMethod = "connect";
fAddress = address;
@@ -72,15 +81,18 @@ class FairMQChannel
/// Get channel name
/// @return Returns full channel name (e.g. "data[0]")
std::string GetChannelName() const;
std::string GetChannelName() const { return GetName(); } // TODO: deprecate this in favor of following
std::string GetName() const;
/// Get channel prefix
/// @return Returns channel prefix (e.g. "data" in "data[0]")
std::string GetChannelPrefix() const;
std::string GetChannelPrefix() const { return GetPrefix(); } // TODO: deprecate this in favor of following
std::string GetPrefix() const;
/// Get channel index
/// @return Returns channel index (e.g. 0 in "data[0]")
std::string GetChannelIndex() const;
std::string GetChannelIndex() const { return GetIndex(); } // TODO: deprecate this in favor of following
std::string GetIndex() const;
/// Get socket type
/// @return Returns socket type (push/pull/pub/sub/spub/xsub/pair/req/rep/dealer/router/)
@@ -122,6 +134,18 @@ class FairMQChannel
/// @return Returns socket rate logging interval (in seconds)
int GetRateLogging() const;
/// Get start of the port range for automatic binding
/// @return start of the port range
int GetPortRangeMin() const;
/// Get end of the port range for automatic binding
/// @return end of the port range
int GetPortRangeMax() const;
/// Set automatic binding (pick random port if bind fails)
/// @return true/false, true if automatic binding is enabled
bool GetAutoBind() const;
/// Set socket type
/// @param type Socket type (push/pull/pub/sub/spub/xsub/pair/req/rep/dealer/router/)
void UpdateType(const std::string& type);
@@ -162,9 +186,22 @@ class FairMQChannel
/// @param rateLogging Socket rate logging interval (in seconds)
void UpdateRateLogging(const int rateLogging);
/// Set start of the port range for automatic binding
/// @param minPort start of the port range
void UpdatePortRangeMin(const int minPort);
/// Set end of the port range for automatic binding
/// @param maxPort end of the port range
void UpdatePortRangeMax(const int maxPort);
/// Set automatic binding (pick random port if bind fails)
/// @param autobind true/false, true to enable automatic binding
void UpdateAutoBind(const bool autobind);
/// Set channel name
/// @param name Arbitrary channel name
void UpdateChannelName(const std::string& name);
void UpdateChannelName(const std::string& name) { UpdateName(name); } // TODO: deprecate this in favor of following
void UpdateName(const std::string& name);
/// Checks if the configured channel settings are valid (checks the validity parameter, without running full validation (as oposed to ValidateChannel()))
/// @return true if channel settings are valid, false otherwise.
@@ -172,7 +209,20 @@ class FairMQChannel
/// Validates channel configuration
/// @return true if channel settings are valid, false otherwise.
bool ValidateChannel();
bool ValidateChannel() // TODO: deprecate this
{
return Validate();
}
/// Validates channel configuration
/// @return true if channel settings are valid, false otherwise.
bool Validate();
void Init();
bool ConnectEndpoint(const std::string& endpoint);
bool BindEndpoint(std::string& endpoint);
/// Resets the channel (requires validation to be used again).
void ResetChannel();
@@ -181,31 +231,63 @@ class FairMQChannel
/// @param msg Constant reference of unique_ptr to a FairMQMessage
/// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. -1 if there was an error.
int Send(FairMQMessagePtr& msg, int sndTimeoutInMs = -1);
int Send(FairMQMessagePtr& msg, int sndTimeoutInMs = -1)
{
CheckSendCompatibility(msg);
return fSocket->Send(msg, sndTimeoutInMs);
}
/// Receives a message from the socket queue.
/// @param msg Constant reference of unique_ptr to a FairMQMessage
/// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received. -2 if reading from the queue was not possible or timed out. -1 if there was an error.
int Receive(FairMQMessagePtr& msg, int rcvTimeoutInMs = -1);
int Receive(FairMQMessagePtr& msg, int rcvTimeoutInMs = -1)
{
CheckReceiveCompatibility(msg);
return fSocket->Receive(msg, rcvTimeoutInMs);
}
int SendAsync(FairMQMessagePtr& msg) __attribute__((deprecated("For non-blocking Send, use timeout version with timeout of 0: Send(msg, timeout);")));
int ReceiveAsync(FairMQMessagePtr& msg) __attribute__((deprecated("For non-blocking Receive, use timeout version with timeout of 0: Receive(msg, timeout);")));
int SendAsync(FairMQMessagePtr& msg) __attribute__((deprecated("For non-blocking Send, use timeout version with timeout of 0: Send(msg, timeout);")))
{
CheckSendCompatibility(msg);
return fSocket->Send(msg, 0);
}
int ReceiveAsync(FairMQMessagePtr& msg) __attribute__((deprecated("For non-blocking Receive, use timeout version with timeout of 0: Receive(msg, timeout);")))
{
CheckReceiveCompatibility(msg);
return fSocket->Receive(msg, 0);
}
/// Send a vector of messages
/// @param msgVec message vector reference
/// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. -1 if there was an error.
int64_t Send(std::vector<FairMQMessagePtr>& msgVec, int sndTimeoutInMs = -1);
int64_t Send(std::vector<FairMQMessagePtr>& msgVec, int sndTimeoutInMs = -1)
{
CheckSendCompatibility(msgVec);
return fSocket->Send(msgVec, sndTimeoutInMs);
}
/// Receive a vector of messages
/// @param msgVec message vector reference
/// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received. -2 if reading from the queue was not possible or timed out. -1 if there was an error.
int64_t Receive(std::vector<FairMQMessagePtr>& msgVec, int rcvTimeoutInMs = -1);
int64_t Receive(std::vector<FairMQMessagePtr>& msgVec, int rcvTimeoutInMs = -1)
{
CheckReceiveCompatibility(msgVec);
return fSocket->Receive(msgVec, rcvTimeoutInMs);
}
int64_t SendAsync(std::vector<FairMQMessagePtr>& msgVec) __attribute__((deprecated("For non-blocking Send, use timeout version with timeout of 0: Send(msgVec, timeout);")));
int64_t ReceiveAsync(std::vector<FairMQMessagePtr>& msgVec) __attribute__((deprecated("For non-blocking Receive, use timeout version with timeout of 0: Receive(msgVec, timeout);")));
int64_t SendAsync(std::vector<FairMQMessagePtr>& msgVec) __attribute__((deprecated("For non-blocking Send, use timeout version with timeout of 0: Send(msgVec, timeout);")))
{
CheckSendCompatibility(msgVec);
return fSocket->Send(msgVec, 0);
}
int64_t ReceiveAsync(std::vector<FairMQMessagePtr>& msgVec) __attribute__((deprecated("For non-blocking Receive, use timeout version with timeout of 0: Receive(msgVec, timeout);")))
{
CheckReceiveCompatibility(msgVec);
return fSocket->Receive(msgVec, 0);
}
/// Send FairMQParts
/// @param parts FairMQParts reference
@@ -235,10 +317,10 @@ class FairMQChannel
return Receive(parts.fParts, 0);
}
unsigned long GetBytesTx() const;
unsigned long GetBytesRx() const;
unsigned long GetMessagesTx() const;
unsigned long GetMessagesRx() const;
unsigned long GetBytesTx() const { return fSocket->GetBytesTx(); }
unsigned long GetBytesRx() const { return fSocket->GetBytesRx(); }
unsigned long GetMessagesTx() const { return fSocket->GetMessagesTx(); }
unsigned long GetMessagesRx() const { return fSocket->GetMessagesRx(); }
auto Transport() -> FairMQTransportFactory*
{
@@ -264,31 +346,26 @@ class FairMQChannel
}
private:
std::shared_ptr<FairMQTransportFactory> fTransportFactory;
fair::mq::Transport fTransportType;
std::unique_ptr<FairMQSocket> fSocket;
std::string fType;
std::string fMethod;
std::string fAddress;
fair::mq::Transport fTransportType;
int fSndBufSize;
int fRcvBufSize;
int fSndKernelSize;
int fRcvKernelSize;
int fLinger;
int fRateLogging;
int fPortRangeMin;
int fPortRangeMax;
bool fAutoBind;
std::string fName;
std::atomic<bool> fIsValid;
std::shared_ptr<FairMQTransportFactory> fTransportFactory;
void CheckSendCompatibility(FairMQMessagePtr& msg);
void CheckSendCompatibility(std::vector<FairMQMessagePtr>& msgVec);
void CheckReceiveCompatibility(FairMQMessagePtr& msg);
void CheckReceiveCompatibility(std::vector<FairMQMessagePtr>& msgVec);
void InitTransport(std::shared_ptr<FairMQTransportFactory> factory);
// use static mutex to make the class easily copyable
// implication: same mutex is used for all instances of the class
// this does not hurt much, because mutex is used only during initialization with very low contention
@@ -297,8 +374,66 @@ class FairMQChannel
bool fMultipart;
bool fModified;
auto SetModified(const bool modified) -> void;
bool fReset;
void CheckSendCompatibility(FairMQMessagePtr& msg)
{
if (fTransportType != msg->GetType()) {
// LOG(debug) << "Channel type does not match message type. Creating wrapper";
FairMQMessagePtr msgWrapper(NewMessage(
msg->GetData(),
msg->GetSize(),
[](void* /*data*/, void* _msg) { delete static_cast<FairMQMessage*>(_msg); },
msg.get()
));
msg.release();
msg = move(msgWrapper);
}
}
void CheckSendCompatibility(std::vector<FairMQMessagePtr>& msgVec)
{
for (auto& msg : msgVec) {
if (fTransportType != msg->GetType()) {
// LOG(debug) << "Channel type does not match message type. Creating wrapper";
FairMQMessagePtr msgWrapper(NewMessage(
msg->GetData(),
msg->GetSize(),
[](void* /*data*/, void* _msg) { delete static_cast<FairMQMessage*>(_msg); },
msg.get()
));
msg.release();
msg = move(msgWrapper);
}
}
}
void CheckReceiveCompatibility(FairMQMessagePtr& msg)
{
if (fTransportType != msg->GetType()) {
// LOG(debug) << "Channel type does not match message type. Creating wrapper";
FairMQMessagePtr newMsg(NewMessage());
msg = move(newMsg);
}
}
void CheckReceiveCompatibility(std::vector<FairMQMessagePtr>& msgVec)
{
for (auto& msg : msgVec) {
if (fTransportType != msg->GetType()) {
// LOG(debug) << "Channel type does not match message type. Creating wrapper";
FairMQMessagePtr newMsg(NewMessage());
msg = move(newMsg);
}
}
}
void InitTransport(std::shared_ptr<FairMQTransportFactory> factory)
{
fTransportFactory = factory;
fTransportType = factory->GetType();
}
auto SetModified(const bool modified) -> void;
};
#endif /* FAIRMQCHANNEL_H_ */

View File

@@ -16,7 +16,6 @@
#include <list>
#include <cstdlib>
#include <random>
#include <chrono>
#include <mutex>
#include <thread>
@@ -55,8 +54,6 @@ FairMQDevice::FairMQDevice(FairMQProgOptions* config, const fair::mq::tools::Ver
, fInternalConfig(config ? nullptr : fair::mq::tools::make_unique<FairMQProgOptions>())
, fConfig(config ? config : fInternalConfig.get())
, fId()
, fPortRangeMin(22000)
, fPortRangeMax(32000)
, fDefaultTransportType(fair::mq::Transport::DEFAULT)
, fDataCallbacks(false)
, fMsgInputs()
@@ -80,8 +77,6 @@ void FairMQDevice::InitWrapper()
{
fId = fConfig->GetValue<string>("id");
fRate = fConfig->GetValue<float>("rate");
fPortRangeMin = fConfig->GetValue<int>("port-range-min");
fPortRangeMax = fConfig->GetValue<int>("port-range-max");
try {
fDefaultTransportType = fair::mq::TransportTypes.at(fConfig->GetValue<string>("transport"));
@@ -91,15 +86,11 @@ void FairMQDevice::InitWrapper()
throw;
}
for (auto& c : fConfig->GetFairMQMap())
{
if (fChannels.find(c.first) == fChannels.end())
{
for (auto& c : fConfig->GetFairMQMap()) {
if (fChannels.find(c.first) == fChannels.end()) {
LOG(debug) << "Inserting new device channel from config: " << c.first;
fChannels.insert(c);
}
else
{
} else {
LOG(debug) << "Updating existing device channel from config: " << c.first;
fChannels[c.first] = c.second;
}
@@ -115,50 +106,44 @@ void FairMQDevice::InitWrapper()
string networkInterface = fConfig->GetValue<string>("network-interface");
// Fill the uninitialized channel containers
for (auto& mi : fChannels)
{
for (auto vi = mi.second.begin(); vi != mi.second.end(); ++vi)
{
// if (vi->fModified)
// {
// if (vi->fReset)
// {
// vi->fSocket.reset();
// }
// set channel name: name + vector index
vi->fName = fair::mq::tools::ToString(mi.first, "[", vi - (mi.second).begin(), "]");
for (auto& mi : fChannels) {
int subChannelIndex = 0;
for (auto& vi : mi.second) {
// set channel name: name + vector index
vi.fName = fair::mq::tools::ToString(mi.first, "[", subChannelIndex, "]");
if (vi->fMethod == "bind")
{
// if binding address is not specified, try getting it from the configured network interface
if (vi->fAddress == "unspecified" || vi->fAddress == "")
{
// if the configured network interface is default, get its name from the default route
if (networkInterface == "default")
{
networkInterface = fair::mq::tools::getDefaultRouteNetworkInterface();
}
vi->fAddress = "tcp://" + fair::mq::tools::getInterfaceIP(networkInterface) + ":1";
// set channel transport
if (vi.fTransportType == fair::mq::Transport::DEFAULT || vi.fTransportType == fTransportFactory->GetType()) {
LOG(debug) << vi.fName << ": using default transport";
vi.InitTransport(fTransportFactory);
} else {
LOG(debug) << vi.fName << ": channel transport (" << fair::mq::TransportNames.at(fDefaultTransportType) << ") overriden to " << fair::mq::TransportNames.at(vi.fTransportType);
vi.InitTransport(AddTransport(vi.fTransportType));
}
if (vi.fMethod == "bind") {
// if binding address is not specified, try getting it from the configured network interface
if (vi.fAddress == "unspecified" || vi.fAddress == "") {
// if the configured network interface is default, get its name from the default route
if (networkInterface == "default") {
networkInterface = fair::mq::tools::getDefaultRouteNetworkInterface();
}
// fill the uninitialized list
uninitializedBindingChannels.push_back(&(*vi));
vi.fAddress = "tcp://" + fair::mq::tools::getInterfaceIP(networkInterface) + ":1";
}
else if (vi->fMethod == "connect")
{
// fill the uninitialized list
uninitializedConnectingChannels.push_back(&(*vi));
}
else if (vi->fAddress.find_first_of("@+>") != string::npos)
{
// fill the uninitialized list
uninitializedConnectingChannels.push_back(&(*vi));
}
else
{
LOG(error) << "Cannot update configuration. Socket method (bind/connect) for channel '" << vi->fName << "' not specified.";
throw runtime_error(fair::mq::tools::ToString("Cannot update configuration. Socket method (bind/connect) for channel ", vi->fName, " not specified."));
}
// }
// fill the uninitialized list
uninitializedBindingChannels.push_back(&vi);
} else if (vi.fMethod == "connect") {
// fill the uninitialized list
uninitializedConnectingChannels.push_back(&vi);
} else if (vi.fAddress.find_first_of("@+>") != string::npos) {
// fill the uninitialized list
uninitializedConnectingChannels.push_back(&vi);
} else {
LOG(error) << "Cannot update configuration. Socket method (bind/connect) for channel '" << vi.fName << "' not specified.";
throw runtime_error(fair::mq::tools::ToString("Cannot update configuration. Socket method (bind/connect) for channel ", vi.fName, " not specified."));
}
subChannelIndex++;
}
}
@@ -166,8 +151,7 @@ void FairMQDevice::InitWrapper()
// If necessary this could be handled in the same way as the connecting channels
AttachChannels(uninitializedBindingChannels);
if (!uninitializedBindingChannels.empty())
{
if (!uninitializedBindingChannels.empty()) {
LOG(error) << uninitializedBindingChannels.size() << " of the binding channels could not initialize. Initial configuration incomplete.";
throw runtime_error(fair::mq::tools::ToString(uninitializedBindingChannels.size(), " of the binding channels could not initialize. Initial configuration incomplete."));
}
@@ -183,22 +167,18 @@ void FairMQDevice::InitWrapper()
// first attempt
AttachChannels(uninitializedConnectingChannels);
// if not all channels could be connected, update their address values from config and retry
while (!uninitializedConnectingChannels.empty())
{
while (!uninitializedConnectingChannels.empty()) {
this_thread::sleep_for(chrono::milliseconds(sleepTimeInMS));
for (auto& chan : uninitializedConnectingChannels)
{
for (auto& chan : uninitializedConnectingChannels) {
string key{"chans." + chan->GetChannelPrefix() + "." + chan->GetChannelIndex() + ".address"};
string newAddress = fConfig->GetValue<string>(key);
if (newAddress != chan->GetAddress())
{
if (newAddress != chan->GetAddress()) {
chan->UpdateAddress(newAddress);
}
}
if (numAttempts++ > maxAttempts)
{
if (numAttempts++ > maxAttempts) {
LOG(error) << "could not connect all channels after " << initializationTimeoutInS << " attempts";
throw runtime_error(fair::mq::tools::ToString("could not connect all channels after ", initializationTimeoutInS, " attempts"));
}
@@ -223,106 +203,54 @@ void FairMQDevice::AttachChannels(vector<FairMQChannel*>& chans)
{
auto itr = chans.begin();
while (itr != chans.end())
{
if ((*itr)->ValidateChannel())
{
if (AttachChannel(**itr))
{
while (itr != chans.end()) {
if ((*itr)->ValidateChannel()) {
(*itr)->Init();
if (AttachChannel(**itr)) {
(*itr)->SetModified(false);
// remove the channel from the uninitialized container
itr = chans.erase(itr);
}
else
{
} else {
LOG(error) << "failed to attach channel " << (*itr)->fName << " (" << (*itr)->fMethod << ")";
++itr;
}
}
else
{
} else {
++itr;
}
}
}
bool FairMQDevice::AttachChannel(FairMQChannel& ch)
bool FairMQDevice::AttachChannel(FairMQChannel& chan)
{
if (ch.fTransportType == fair::mq::Transport::DEFAULT || ch.fTransportType == fTransportFactory->GetType())
{
LOG(debug) << ch.fName << ": using default transport";
ch.InitTransport(fTransportFactory);
}
else
{
LOG(debug) << ch.fName << ": channel transport (" << fair::mq::TransportNames.at(fDefaultTransportType) << ") overriden to " << fair::mq::TransportNames.at(ch.fTransportType);
ch.InitTransport(AddTransport(ch.fTransportType));
}
vector<string> endpoints;
boost::algorithm::split(endpoints, ch.fAddress, boost::algorithm::is_any_of(","));
for (auto& endpoint : endpoints)
{
//(re-)init socket
if (!ch.fSocket)
{
try
{
ch.fSocket = ch.fTransportFactory->CreateSocket(ch.fType, ch.fName);
}
catch (fair::mq::SocketError& se)
{
LOG(error) << se.what();
return false;
}
}
// set linger duration (how long socket should wait for outstanding transfers before shutdown)
ch.fSocket->SetLinger(ch.fLinger);
// set high water marks
ch.fSocket->SetSndBufSize(ch.fSndBufSize);
ch.fSocket->SetRcvBufSize(ch.fRcvBufSize);
// set kernel transmit size (set it only if value is not the default value)
if (ch.fSndKernelSize != 0)
{
ch.fSocket->SetSndKernelSize(ch.fSndKernelSize);
}
if (ch.fRcvKernelSize != 0)
{
ch.fSocket->SetRcvKernelSize(ch.fRcvKernelSize);
}
string chanAddress = chan.GetAddress();
boost::algorithm::split(endpoints, chanAddress, boost::algorithm::is_any_of(","));
for (auto& endpoint : endpoints) {
// attach
bool bind = (ch.fMethod == "bind");
bool bind = (chan.GetMethod() == "bind");
bool connectionModifier = false;
string address = endpoint;
// check if the default fMethod is overridden by a modifier
if (endpoint[0] == '+' || endpoint[0] == '>')
{
if (endpoint[0] == '+' || endpoint[0] == '>') {
connectionModifier = true;
bind = false;
address = endpoint.substr(1);
}
else if (endpoint[0] == '@')
{
} else if (endpoint[0] == '@') {
connectionModifier = true;
bind = true;
address = endpoint.substr(1);
}
if (address.compare(0, 6, "tcp://") == 0)
{
if (address.compare(0, 6, "tcp://") == 0) {
string addressString = address.substr(6);
auto pos = addressString.find(':');
string hostPart = addressString.substr(0, pos);
if (!(bind && hostPart == "*"))
{
if (!(bind && hostPart == "*")) {
string portPart = addressString.substr(pos + 1);
string resolvedHost = fair::mq::tools::getIpFromHostname(hostPart);
if (resolvedHost == "")
{
if (resolvedHost == "") {
return false;
}
address.assign("tcp://" + resolvedHost + ":" + portPart);
@@ -331,76 +259,35 @@ bool FairMQDevice::AttachChannel(FairMQChannel& ch)
bool success = true;
// make the connection
if (bind)
{
success = BindEndpoint(*ch.fSocket, address);
}
else
{
success = ConnectEndpoint(*ch.fSocket, address);
if (bind) {
success = chan.BindEndpoint(address);
} else {
success = chan.ConnectEndpoint(address);
}
// bind might bind to an address different than requested,
// put the actual address back in the config
endpoint.clear();
if (connectionModifier)
{
if (connectionModifier) {
endpoint.push_back(bind?'@':'+');
}
endpoint += address;
LOG(debug) << "Attached channel " << ch.fName << " to " << endpoint << (bind ? " (bind) " : " (connect) ") << "(" << ch.fType << ")";
// after the book keeping is done, exit in case of errors
if (!success)
{
if (!success) {
return success;
} else {
LOG(debug) << "Attached channel " << chan.GetName() << " to " << endpoint << (bind ? " (bind) " : " (connect) ") << "(" << chan.GetType() << ")";
}
}
// put the (possibly) modified address back in the channel object and config
string newAddress{boost::algorithm::join(endpoints, ",")};
if (newAddress != ch.fAddress)
{
ch.UpdateAddress(newAddress);
string key{"chans." + ch.GetChannelPrefix() + "." + ch.GetChannelIndex() + ".address"};
fConfig->SetValue(key, newAddress);
}
string newAddress(boost::algorithm::join(endpoints, ","));
if (newAddress != chanAddress) {
chan.UpdateAddress(newAddress);
return true;
}
bool FairMQDevice::ConnectEndpoint(FairMQSocket& socket, string& endpoint)
{
socket.Connect(endpoint);
return true;
}
bool FairMQDevice::BindEndpoint(FairMQSocket& socket, string& endpoint)
{
// number of attempts when choosing a random port
int maxAttempts = 1000;
int numAttempts = 0;
// initialize random generator
default_random_engine generator(chrono::system_clock::now().time_since_epoch().count());
uniform_int_distribution<int> randomPort(fPortRangeMin, fPortRangeMax);
// try to bind to the saved port. In case of failure, try random one.
while (!socket.Bind(endpoint))
{
LOG(debug) << "Could not bind to configured (TCP) port, trying random port in range " << fPortRangeMin << "-" << fPortRangeMax;
++numAttempts;
if (numAttempts > maxAttempts)
{
LOG(error) << "could not bind to any (TCP) port in the given range after " << maxAttempts << " attempts";
return false;
}
size_t pos = endpoint.rfind(':');
endpoint = endpoint.substr(0, pos + 1) + fair::mq::tools::ToString(static_cast<int>(randomPort(generator)));
// update address in the config, it could have been modified during binding
fConfig->SetValue({"chans." + chan.GetPrefix() + "." + chan.GetIndex() + ".address"}, newAddress);
}
return true;
@@ -445,27 +332,6 @@ void FairMQDevice::SortChannel(const string& name, const bool reindex)
}
}
void FairMQDevice::PrintChannel(const string& name)
{
if (fChannels.find(name) != fChannels.end())
{
for (const auto& vi : fChannels[name])
{
LOG(info) << vi.fName << ": "
<< vi.fType << " | "
<< vi.fMethod << " | "
<< vi.fAddress << " | "
<< vi.fSndBufSize << " | "
<< vi.fRcvBufSize << " | "
<< vi.fRateLogging;
}
}
else
{
LOG(error) << "Printing failed: no channel with the name \"" << name << "\".";
}
}
void FairMQDevice::RunWrapper()
{
CallStateChangeCallbacks(RUNNING);

View File

@@ -81,10 +81,6 @@ class FairMQDevice : public FairMQStateMachine
/// @param reindex Should reindexing be done
void SortChannel(const std::string& name, const bool reindex = true);
/// Prints channel configuration
/// @param name Name of the channel
void PrintChannel(const std::string& name);
template<typename Serializer, typename DataType, typename... Args>
void Serialize(FairMQMessage& msg, DataType&& data, Args&&... args) const
{
@@ -373,12 +369,6 @@ class FairMQDevice : public FairMQStateMachine
void SetNumIoThreads(int numIoThreads) { fConfig->SetValue<int>("io-threads", numIoThreads);}
int GetNumIoThreads() const { return fConfig->GetValue<int>("io-threads"); }
void SetPortRangeMin(int portRangeMin) { fConfig->SetValue<int>("port-range-min", portRangeMin); }
int GetPortRangeMin() const { return fConfig->GetValue<int>("port-range-min"); }
void SetPortRangeMax(int portRangeMax) { fConfig->SetValue<int>("port-range-max", portRangeMax); }
int GetPortRangeMax() const { return fConfig->GetValue<int>("port-range-max"); }
void SetNetworkInterface(const std::string& networkInterface) { fConfig->SetValue<std::string>("network-interface", networkInterface); }
std::string GetNetworkInterface() const { return fConfig->GetValue<std::string>("network-interface"); }
@@ -458,9 +448,6 @@ class FairMQDevice : public FairMQStateMachine
virtual void Reset();
private:
int fPortRangeMin; ///< Minimum value for the port range (if dynamic)
int fPortRangeMax; ///< Maximum value for the port range (if dynamic)
fair::mq::Transport fDefaultTransportType; ///< Default transport for the device
/// Handles the initialization and the Init() method
@@ -484,15 +471,6 @@ class FairMQDevice : public FairMQStateMachine
/// Attach (bind/connect) channels in the list
void AttachChannels(std::vector<FairMQChannel*>& chans);
/// Sets up and connects/binds a socket to an endpoint
/// return a string with the actual endpoint if it happens
/// to stray from default.
bool ConnectEndpoint(FairMQSocket& socket, std::string& endpoint);
bool BindEndpoint(FairMQSocket& socket, std::string& endpoint);
/// Attaches the channel to all listed endpoints
/// the list is comma separated; the default method (bind/connect) is used.
/// to override default: prepend "@" to bind, "+" or ">" to connect endpoint.
bool AttachChannel(FairMQChannel& ch);
void HandleSingleChannelInput();

View File

@@ -35,7 +35,6 @@ class FairMQMessage
FairMQTransportFactory* GetTransport() { return fTransport; }
//void SetTransport(FairMQTransportFactory* transport) { fTransport = transport; }
virtual void Copy(const std::unique_ptr<FairMQMessage>& msg) __attribute__((deprecated("Use 'Copy(const FairMQMessage& msg)'"))) = 0;
virtual void Copy(const FairMQMessage& msg) = 0;
virtual ~FairMQMessage() {};

View File

@@ -14,16 +14,18 @@
#include <memory>
#include "FairMQMessage.h"
class FairMQTransportFactory;
class FairMQSocket
{
public:
FairMQSocket() {}
FairMQSocket(FairMQTransportFactory* fac): fTransport(fac) {}
virtual std::string GetId() = 0;
virtual bool Bind(const std::string& address) = 0;
virtual void Connect(const std::string& address) = 0;
virtual bool Connect(const std::string& address) = 0;
virtual int Send(FairMQMessagePtr& msg, int timeout = -1) = 0;
virtual int Receive(FairMQMessagePtr& msg, int timeout = -1) = 0;
@@ -51,7 +53,13 @@ class FairMQSocket
virtual unsigned long GetMessagesTx() const = 0;
virtual unsigned long GetMessagesRx() const = 0;
FairMQTransportFactory* GetTransport() { return fTransport; }
void SetTransport(FairMQTransportFactory* transport) { fTransport=transport; }
virtual ~FairMQSocket() {};
private:
FairMQTransportFactory* fTransport{nullptr};
};
using FairMQSocketPtr = std::unique_ptr<FairMQSocket>;

View File

@@ -62,7 +62,7 @@ class FairMQTransportFactory
virtual FairMQMessagePtr CreateMessage(FairMQUnmanagedRegionPtr& unmanagedRegion, void* data, const size_t size, void* hint = 0) = 0;
/// Create a socket
virtual FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name) const = 0;
virtual FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name) = 0;
/// Create a poller for a single channel (all subchannels)
virtual FairMQPollerPtr CreatePoller(const std::vector<FairMQChannel>& channels) const = 0;

View File

@@ -205,30 +205,6 @@ void FairMQMessageNN::Copy(const FairMQMessage& msg)
}
}
void FairMQMessageNN::Copy(const FairMQMessagePtr& msg)
{
if (fMessage)
{
if (nn_freemsg(fMessage) < 0)
{
LOG(error) << "failed freeing message, reason: " << nn_strerror(errno);
}
}
size_t size = msg->GetSize();
fMessage = nn_allocmsg(size, 0);
if (!fMessage)
{
LOG(error) << "failed allocating message, reason: " << nn_strerror(errno);
}
else
{
memcpy(fMessage, static_cast<FairMQMessageNN*>(msg.get())->GetMessage(), size);
fSize = size;
}
}
void FairMQMessageNN::CloseMessage()
{
if (nn_freemsg(fMessage) < 0)

View File

@@ -49,7 +49,6 @@ class FairMQMessageNN final : public FairMQMessage
fair::mq::Transport GetType() const override;
void Copy(const FairMQMessage& msg) override;
void Copy(const FairMQMessagePtr& msg) override;
~FairMQMessageNN() override;

View File

@@ -32,8 +32,9 @@ using namespace fair::mq;
atomic<bool> FairMQSocketNN::fInterrupted(false);
FairMQSocketNN::FairMQSocketNN(const string& type, const string& name, const string& id /*= ""*/)
: fSocket(-1)
FairMQSocketNN::FairMQSocketNN(const string& type, const string& name, const string& id /*= ""*/, FairMQTransportFactory* fac /*=nullptr*/)
: FairMQSocket{fac}
, fSocket(-1)
, fId(id + "." + name + "." + type)
, fBytesTx(0)
, fBytesRx(0)
@@ -87,7 +88,7 @@ FairMQSocketNN::FairMQSocketNN(const string& type, const string& name, const str
}
#endif
// LOG(info) << "created socket " << fId;
LOG(debug) << "Created socket " << GetId();
}
string FairMQSocketNN::GetId()
@@ -99,24 +100,26 @@ bool FairMQSocketNN::Bind(const string& address)
{
// LOG(info) << "bind socket " << fId << " on " << address;
int eid = nn_bind(fSocket, address.c_str());
if (eid < 0)
if (nn_bind(fSocket, address.c_str()) < 0)
{
LOG(error) << "failed binding socket " << fId << ", reason: " << nn_strerror(errno);
return false;
}
return true;
}
void FairMQSocketNN::Connect(const string& address)
bool FairMQSocketNN::Connect(const string& address)
{
// LOG(info) << "connect socket " << fId << " to " << address;
int eid = nn_connect(fSocket, address.c_str());
if (eid < 0)
if (nn_connect(fSocket, address.c_str()) < 0)
{
LOG(error) << "failed connecting socket " << fId << ", reason: " << nn_strerror(errno);
return false;
}
return true;
}
int FairMQSocketNN::Send(FairMQMessagePtr& msg, const int timeout)
@@ -366,7 +369,7 @@ int64_t FairMQSocketNN::Receive(vector<FairMQMessagePtr>& msgVec, const int time
object.convert(buf);
// get the single message size
size_t size = buf.size() * sizeof(char);
FairMQMessagePtr part(new FairMQMessageNN(size));
FairMQMessagePtr part(new FairMQMessageNN(size, GetTransport()));
static_cast<FairMQMessageNN*>(part.get())->fReceiving = true;
memcpy(part->GetData(), buf.data(), size);
msgVec.push_back(move(part));

View File

@@ -14,18 +14,19 @@
#include "FairMQSocket.h"
#include "FairMQMessage.h"
class FairMQTransportFactory;
class FairMQSocketNN final : public FairMQSocket
{
public:
FairMQSocketNN(const std::string& type, const std::string& name, const std::string& id = "");
FairMQSocketNN(const std::string& type, const std::string& name, const std::string& id = "", FairMQTransportFactory* fac = nullptr);
FairMQSocketNN(const FairMQSocketNN&) = delete;
FairMQSocketNN operator=(const FairMQSocketNN&) = delete;
std::string GetId() override;
bool Bind(const std::string& address) override;
void Connect(const std::string& address) override;
bool Connect(const std::string& address) override;
int Send(FairMQMessagePtr& msg, const int timeout = -1) override;
int Receive(FairMQMessagePtr& msg, const int timeout = -1) override;

View File

@@ -43,9 +43,9 @@ FairMQMessagePtr FairMQTransportFactoryNN::CreateMessage(FairMQUnmanagedRegionPt
return unique_ptr<FairMQMessage>(new FairMQMessageNN(region, data, size, hint, this));
}
FairMQSocketPtr FairMQTransportFactoryNN::CreateSocket(const string& type, const string& name) const
FairMQSocketPtr FairMQTransportFactoryNN::CreateSocket(const string& type, const string& name)
{
unique_ptr<FairMQSocket> socket(new FairMQSocketNN(type, name, GetId()));
unique_ptr<FairMQSocket> socket(new FairMQSocketNN(type, name, GetId(), this));
fSockets.push_back(socket.get());
return socket;
}

View File

@@ -30,7 +30,7 @@ class FairMQTransportFactoryNN final : public FairMQTransportFactory
FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override;
FairMQMessagePtr CreateMessage(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0) override;
FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name) const override;
FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name) override;
FairMQPollerPtr CreatePoller(const std::vector<FairMQChannel>& channels) const override;
FairMQPollerPtr CreatePoller(const std::vector<FairMQChannel*>& channels) const override;

View File

@@ -138,11 +138,6 @@ auto Message::Copy(const fair::mq::Message& /*msg*/) -> void
throw MessageError{"Not yet implemented."};
}
auto Message::Copy(const fair::mq::MessagePtr& /*msg*/) -> void
{
throw MessageError{"Not yet implemented."};
}
Message::~Message()
{
if (fFreeFunction) {

View File

@@ -53,7 +53,6 @@ class Message final : public fair::mq::Message
auto GetType() const -> fair::mq::Transport override { return fair::mq::Transport::OFI; }
auto Copy(const fair::mq::Message& msg) -> void override;
auto Copy(const fair::mq::MessagePtr& msg) -> void override;
~Message() override;

View File

@@ -31,8 +31,9 @@ namespace ofi
using namespace std;
Socket::Socket(Context& context, const string& type, const string& name, const string& id /*= ""*/)
: fDataEndpoint(nullptr)
Socket::Socket(Context& context, const string& type, const string& name, const string& id /*= ""*/, FairMQTransportFactory* fac)
: FairMQSocket{fac}
, fDataEndpoint(nullptr)
, fDataCompletionQueueTx(nullptr)
, fDataCompletionQueueRx(nullptr)
, fId(id + "." + name + "." + type)
@@ -107,13 +108,14 @@ catch (const SocketError& e)
return false;
}
auto Socket::Connect(const string& address) -> void
auto Socket::Connect(const string& address) -> bool
{
auto addr = Context::VerifyAddress(address);
ConnectControlSocket(addr);
fContext.InitOfi(ConnectionType::Connect, addr);
InitDataEndpoint();
fWaitingForControlPeer = true;
return true;
}
auto Socket::BindControlSocket(Context::Address address) -> void
@@ -514,7 +516,7 @@ auto Socket::ReceiveImpl(vector<FairMQMessagePtr>& msgVec, const int flags, cons
//
// do
// {
// FairMQMessagePtr part(new FairMQMessageSHM(fManager));
// FairMQMessagePtr part(new FairMQMessageSHM(fManager, GetTransport()));
// zmq_msg_t* msgPtr = static_cast<FairMQMessageSHM*>(part.get())->GetMessage();
//
// int nbytes = zmq_msg_recv(msgPtr, fSocket, flags);

View File

@@ -18,6 +18,7 @@
#include <memory> // unique_ptr
#include <netinet/in.h>
#include <rdma/fabric.h>
class FairMQTransportFactory;
namespace fair
{
@@ -35,14 +36,14 @@ namespace ofi
class Socket final : public fair::mq::Socket
{
public:
Socket(Context& factory, const std::string& type, const std::string& name, const std::string& id = "");
Socket(Context& factory, const std::string& type, const std::string& name, const std::string& id = "", FairMQTransportFactory* fac);
Socket(const Socket&) = delete;
Socket operator=(const Socket&) = delete;
auto GetId() -> std::string { return fId; }
auto Bind(const std::string& address) -> bool override;
auto Connect(const std::string& address) -> void override;
auto Connect(const std::string& address) -> bool override;
auto Send(MessagePtr& msg, int timeout = 0) -> int override;
auto Receive(MessagePtr& msg, int timeout = 0) -> int override;

View File

@@ -56,9 +56,9 @@ auto TransportFactory::CreateMessage(UnmanagedRegionPtr& region, void* data, con
return MessagePtr{new Message(region, data, size, hint)};
}
auto TransportFactory::CreateSocket(const string& type, const string& name) const -> SocketPtr
auto TransportFactory::CreateSocket(const string& type, const string& name) -> SocketPtr
{
return SocketPtr{new Socket(fContext, type, name, GetId())};
return SocketPtr{new Socket(fContext, type, name, GetId(), this)};
}
auto TransportFactory::CreatePoller(const vector<FairMQChannel>& channels) const -> PollerPtr

View File

@@ -38,7 +38,7 @@ class TransportFactory final : public FairMQTransportFactory
auto CreateMessage(void* data, const std::size_t size, fairmq_free_fn* ffn, void* hint = nullptr) const -> MessagePtr override;
auto CreateMessage(UnmanagedRegionPtr& region, void* data, const std::size_t size, void* hint = nullptr) const -> MessagePtr override;
auto CreateSocket(const std::string& type, const std::string& name) const -> SocketPtr override;
auto CreateSocket(const std::string& type, const std::string& name) -> SocketPtr override;
auto CreatePoller(const std::vector<FairMQChannel>& channels) const -> PollerPtr override;
auto CreatePoller(const std::vector<const FairMQChannel*>& channels) const -> PollerPtr override;

View File

@@ -32,8 +32,7 @@ namespace parser
// function that convert property tree (given the json structure) to FairMQChannelMap
FairMQChannelMap ptreeToMQMap(const boost::property_tree::ptree& pt, const string& id, const string& rootNode)
{
if (id == "")
{
if (id == "") {
throw ParserError("no device ID provided. Provide with `--id` cmd option");
}
@@ -44,8 +43,7 @@ FairMQChannelMap ptreeToMQMap(const boost::property_tree::ptree& pt, const strin
// Extract value from boost::property_tree
Helper::DeviceParser(pt.get_child(rootNode), channelMap, id);
if (channelMap.empty())
{
if (channelMap.empty()) {
LOG(warn) << "---- No channel keys found for " << id;
LOG(warn) << "---- Check the JSON inputs and/or command line inputs";
}
@@ -68,20 +66,14 @@ void PrintDeviceList(const boost::property_tree::ptree& tree)
string deviceIdKey;
// do a first loop just to print the device-id in json input
for (const auto& p : tree)
{
if (p.first == "devices")
{
for (const auto& q : p.second.get_child(""))
{
for (const auto& p : tree) {
if (p.first == "devices") {
for (const auto& q : p.second.get_child("")) {
string key = q.second.get<string>("key", "");
if (key != "")
{
if (key != "") {
deviceIdKey = key;
LOG(debug) << "Found config for device key '" << deviceIdKey << "' in JSON input";
}
else
{
} else {
deviceIdKey = q.second.get<string>("id");
LOG(debug) << "Found config for device id '" << deviceIdKey << "' in JSON input";
}
@@ -95,33 +87,26 @@ void DeviceParser(const boost::property_tree::ptree& tree, FairMQChannelMap& cha
string deviceIdKey;
// For each node in fairMQOptions
for (const auto& p : tree)
{
if (p.first == "devices")
{
for (const auto& q : p.second)
{
for (const auto& p : tree) {
if (p.first == "devices") {
for (const auto& q : p.second) {
// check if key is provided, otherwise use id
string key = q.second.get<string>("key", "");
if (key != "")
{
if (key != "") {
deviceIdKey = key;
// LOG(debug) << "Found config for device key '" << deviceIdKey << "' in JSON input";
}
else
{
// LOG(trace) << "Found config for device key '" << deviceIdKey << "' in JSON input";
} else {
deviceIdKey = q.second.get<string>("id");
// LOG(debug) << "Found config for device id '" << deviceIdKey << "' in JSON input";
// LOG(trace) << "Found config for device id '" << deviceIdKey << "' in JSON input";
}
// if not correct device id, do not fill MQMap
if (deviceId != deviceIdKey)
{
if (deviceId != deviceIdKey) {
continue;
}
LOG(debug) << "Found following channels for device ID '" << deviceId << "' :";
LOG(trace) << "Found following channels for device ID '" << deviceId << "' :";
ChannelParser(q.second, channelMap);
}
@@ -133,12 +118,9 @@ void ChannelParser(const boost::property_tree::ptree& tree, FairMQChannelMap& ch
{
string channelKey;
for (const auto& p : tree)
{
if (p.first == "channels")
{
for (const auto& q : p.second)
{
for (const auto& p : tree) {
if (p.first == "channels") {
for (const auto& q : p.second) {
channelKey = q.second.get<string>("name");
int numSockets = q.second.get<int>("numSockets", 0);
@@ -154,36 +136,37 @@ void ChannelParser(const boost::property_tree::ptree& tree, FairMQChannelMap& ch
commonChannel.UpdateSndKernelSize(q.second.get<int>("sndKernelSize", commonChannel.GetSndKernelSize()));
commonChannel.UpdateRcvKernelSize(q.second.get<int>("rcvKernelSize", commonChannel.GetRcvKernelSize()));
commonChannel.UpdateLinger(q.second.get<int>("linger", commonChannel.GetLinger()));
commonChannel.UpdateRateLogging(q.second.get<int>("rateLogging", commonChannel.GetRateLogging()));
commonChannel.UpdatePortRangeMin(q.second.get<int>("portRangeMin", commonChannel.GetPortRangeMin()));
commonChannel.UpdatePortRangeMax(q.second.get<int>("portRangeMax", commonChannel.GetPortRangeMax()));
commonChannel.UpdateAutoBind(q.second.get<bool>("autoBind", commonChannel.GetAutoBind()));
// temporary FairMQChannel container
vector<FairMQChannel> channelList;
if (numSockets > 0)
{
LOG(debug) << "" << channelKey << ":";
LOG(debug) << "\tnumSockets of " << numSockets << " specified,";
LOG(debug) << "\tapplying common settings to each:";
if (numSockets > 0) {
LOG(trace) << "" << channelKey << ":";
LOG(trace) << "\tnumSockets of " << numSockets << " specified,";
LOG(trace) << "\tapplying common settings to each:";
LOG(debug) << "\ttype = " << commonChannel.GetType();
LOG(debug) << "\tmethod = " << commonChannel.GetMethod();
LOG(debug) << "\taddress = " << commonChannel.GetAddress();
LOG(debug) << "\ttransport = " << commonChannel.GetTransportName();
LOG(debug) << "\tsndBufSize = " << commonChannel.GetSndBufSize();
LOG(debug) << "\trcvBufSize = " << commonChannel.GetRcvBufSize();
LOG(debug) << "\tsndKernelSize = " << commonChannel.GetSndKernelSize();
LOG(debug) << "\trcvKernelSize = " << commonChannel.GetRcvKernelSize();
LOG(debug) << "\tlinger = " << commonChannel.GetLinger();
LOG(debug) << "\trateLogging = " << commonChannel.GetRateLogging();
LOG(trace) << "\ttype = " << commonChannel.GetType();
LOG(trace) << "\tmethod = " << commonChannel.GetMethod();
LOG(trace) << "\taddress = " << commonChannel.GetAddress();
LOG(trace) << "\ttransport = " << commonChannel.GetTransportName();
LOG(trace) << "\tsndBufSize = " << commonChannel.GetSndBufSize();
LOG(trace) << "\trcvBufSize = " << commonChannel.GetRcvBufSize();
LOG(trace) << "\tsndKernelSize = " << commonChannel.GetSndKernelSize();
LOG(trace) << "\trcvKernelSize = " << commonChannel.GetRcvKernelSize();
LOG(trace) << "\tlinger = " << commonChannel.GetLinger();
LOG(trace) << "\trateLogging = " << commonChannel.GetRateLogging();
LOG(trace) << "\tportRangeMin = " << commonChannel.GetPortRangeMin();
LOG(trace) << "\tportRangeMax = " << commonChannel.GetPortRangeMax();
LOG(trace) << "\tautoBind = " << commonChannel.GetAutoBind();
for (int i = 0; i < numSockets; ++i)
{
for (int i = 0; i < numSockets; ++i) {
FairMQChannel channel(commonChannel);
channelList.push_back(channel);
}
}
else
{
} else {
SocketParser(q.second.get_child(""), channelList, channelKey, commonChannel);
}
@@ -198,12 +181,9 @@ void SocketParser(const boost::property_tree::ptree& tree, vector<FairMQChannel>
// for each socket in channel
int socketCounter = 0;
for (const auto& p : tree)
{
if (p.first == "sockets")
{
for (const auto& q : p.second)
{
for (const auto& p : tree) {
if (p.first == "sockets") {
for (const auto& q : p.second) {
// create new channel and apply setting from the common channel
FairMQChannel channel(commonChannel);
@@ -218,18 +198,24 @@ void SocketParser(const boost::property_tree::ptree& tree, vector<FairMQChannel>
channel.UpdateRcvKernelSize(q.second.get<int>("rcvKernelSize", channel.GetRcvKernelSize()));
channel.UpdateLinger(q.second.get<int>("linger", channel.GetLinger()));
channel.UpdateRateLogging(q.second.get<int>("rateLogging", channel.GetRateLogging()));
channel.UpdatePortRangeMin(q.second.get<int>("portRangeMin", channel.GetPortRangeMin()));
channel.UpdatePortRangeMax(q.second.get<int>("portRangeMax", channel.GetPortRangeMax()));
channel.UpdateAutoBind(q.second.get<bool>("autoBind", channel.GetAutoBind()));
LOG(debug) << "" << channelName << "[" << socketCounter << "]:";
LOG(debug) << "\ttype = " << channel.GetType();
LOG(debug) << "\tmethod = " << channel.GetMethod();
LOG(debug) << "\taddress = " << channel.GetAddress();
LOG(debug) << "\ttransport = " << channel.GetTransportName();
LOG(debug) << "\tsndBufSize = " << channel.GetSndBufSize();
LOG(debug) << "\trcvBufSize = " << channel.GetRcvBufSize();
LOG(debug) << "\tsndKernelSize = " << channel.GetSndKernelSize();
LOG(debug) << "\trcvKernelSize = " << channel.GetRcvKernelSize();
LOG(debug) << "\tlinger = " << channel.GetLinger();
LOG(debug) << "\trateLogging = " << channel.GetRateLogging();
LOG(trace) << "" << channelName << "[" << socketCounter << "]:";
LOG(trace) << "\ttype = " << channel.GetType();
LOG(trace) << "\tmethod = " << channel.GetMethod();
LOG(trace) << "\taddress = " << channel.GetAddress();
LOG(trace) << "\ttransport = " << channel.GetTransportName();
LOG(trace) << "\tsndBufSize = " << channel.GetSndBufSize();
LOG(trace) << "\trcvBufSize = " << channel.GetRcvBufSize();
LOG(trace) << "\tsndKernelSize = " << channel.GetSndKernelSize();
LOG(trace) << "\trcvKernelSize = " << channel.GetRcvKernelSize();
LOG(trace) << "\tlinger = " << channel.GetLinger();
LOG(trace) << "\trateLogging = " << channel.GetRateLogging();
LOG(trace) << "\tportRangeMin = " << channel.GetPortRangeMin();
LOG(trace) << "\tportRangeMax = " << channel.GetPortRangeMax();
LOG(trace) << "\tautoBind = " << channel.GetAutoBind();
channelList.push_back(channel);
++socketCounter;
@@ -237,28 +223,28 @@ void SocketParser(const boost::property_tree::ptree& tree, vector<FairMQChannel>
}
} // end socket loop
if (socketCounter)
{
LOG(debug) << "Found " << socketCounter << " socket(s) in channel.";
}
else
{
LOG(debug) << "" << channelName << ":";
LOG(debug) << "\tNo sockets specified,";
LOG(debug) << "\tapplying common settings to the channel:";
if (socketCounter) {
LOG(trace) << "Found " << socketCounter << " socket(s) in channel.";
} else {
LOG(trace) << "" << channelName << ":";
LOG(trace) << "\tNo sockets specified,";
LOG(trace) << "\tapplying common settings to the channel:";
FairMQChannel channel(commonChannel);
LOG(debug) << "\ttype = " << channel.GetType();
LOG(debug) << "\tmethod = " << channel.GetMethod();
LOG(debug) << "\taddress = " << channel.GetAddress();
LOG(debug) << "\ttransport = " << channel.GetTransportName();
LOG(debug) << "\tsndBufSize = " << channel.GetSndBufSize();
LOG(debug) << "\trcvBufSize = " << channel.GetRcvBufSize();
LOG(debug) << "\tsndKernelSize = " << channel.GetSndKernelSize();
LOG(debug) << "\trcvKernelSize = " << channel.GetRcvKernelSize();
LOG(debug) << "\tlinger = " << channel.GetLinger();
LOG(debug) << "\trateLogging = " << channel.GetRateLogging();
LOG(trace) << "\ttype = " << channel.GetType();
LOG(trace) << "\tmethod = " << channel.GetMethod();
LOG(trace) << "\taddress = " << channel.GetAddress();
LOG(trace) << "\ttransport = " << channel.GetTransportName();
LOG(trace) << "\tsndBufSize = " << channel.GetSndBufSize();
LOG(trace) << "\trcvBufSize = " << channel.GetRcvBufSize();
LOG(trace) << "\tsndKernelSize = " << channel.GetSndKernelSize();
LOG(trace) << "\trcvKernelSize = " << channel.GetRcvKernelSize();
LOG(trace) << "\tlinger = " << channel.GetLinger();
LOG(trace) << "\trateLogging = " << channel.GetRateLogging();
LOG(trace) << "\tportRangeMin = " << channel.GetPortRangeMin();
LOG(trace) << "\tportRangeMax = " << channel.GetPortRangeMax();
LOG(trace) << "\tautoBind = " << channel.GetAutoBind();
channelList.push_back(channel);
}

View File

@@ -19,10 +19,9 @@
#include "FairMQParser.h"
#include "FairMQSuboptParser.h"
#include "tools/Unique.h"
#include <boost/algorithm/string.hpp> // join/split
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <algorithm>
#include <iomanip>
@@ -63,8 +62,6 @@ FairMQProgOptions::FairMQProgOptions()
("network-interface", po::value<string>()->default_value("default"), "Network interface to bind on (e.g. eth0, ib0..., default will try to detect the interface of the default route).")
("config-key", po::value<string>(), "Use provided value instead of device id for fetching the configuration from the config file.")
("initialization-timeout", po::value<int >()->default_value(120), "Timeout for the initialization in seconds (when expecting dynamic initialization).")
("port-range-min", po::value<int >()->default_value(22000), "Start of the port range for dynamic initialization.")
("port-range-max", po::value<int >()->default_value(32000), "End of the port range for dynamic initialization.")
("print-channels", po::value<bool >()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (<channel name>:<min num subchannels>:<max num subchannels>)")
("shm-segment-size", po::value<size_t>()->default_value(2000000000), "Shared memory: size of the shared memory segment (in bytes).")
("shm-monitor", po::value<bool >()->default_value(true), "Shared memory: run monitor daemon.")
@@ -171,7 +168,6 @@ int FairMQProgOptions::ParseAll(const int argc, char const* const* argv, bool al
{
LOG(warn) << "--" << p->canonical_display_name();
}
LOG(warn) << "No channels will be created (You can create them manually).";
}
}
catch (exception& e)
@@ -227,7 +223,7 @@ void FairMQProgOptions::ParseCmdLine(const int argc, char const* const* argv, bo
void FairMQProgOptions::ParseDefaults()
{
vector<string> emptyArgs = {"dummy", "--id", boost::uuids::to_string(boost::uuids::random_generator()())};
vector<string> emptyArgs = {"dummy", "--id", tools::Uuid()};
vector<const char*> argv(emptyArgs.size());
@@ -271,12 +267,10 @@ void FairMQProgOptions::UpdateChannelInfo()
// create key for variable map as follow : channelName.index.memberName
void FairMQProgOptions::UpdateMQValues()
{
for (const auto& p : fFairMQChannelMap)
{
for (const auto& p : fFairMQChannelMap) {
int index = 0;
for (const auto& channel : p.second)
{
for (const auto& channel : p.second) {
string typeKey = "chans." + p.first + "." + to_string(index) + ".type";
string methodKey = "chans." + p.first + "." + to_string(index) + ".method";
string addressKey = "chans." + p.first + "." + to_string(index) + ".address";
@@ -287,6 +281,9 @@ void FairMQProgOptions::UpdateMQValues()
string rcvKernelSizeKey = "chans." + p.first + "." + to_string(index) + ".rcvKernelSize";
string lingerKey = "chans." + p.first + "." + to_string(index) + ".linger";
string rateLoggingKey = "chans." + p.first + "." + to_string(index) + ".rateLogging";
string portRangeMinKey = "chans." + p.first + "." + to_string(index) + ".portRangeMin";
string portRangeMaxKey = "chans." + p.first + "." + to_string(index) + ".portRangeMax";
string autoBindKey = "chans." + p.first + "." + to_string(index) + ".autoBind";
fChannelKeyMap[typeKey] = ChannelKey{p.first, index, "type"};
fChannelKeyMap[methodKey] = ChannelKey{p.first, index, "method"};
@@ -298,6 +295,9 @@ void FairMQProgOptions::UpdateMQValues()
fChannelKeyMap[rcvKernelSizeKey] = ChannelKey{p.first, index, "rcvkernelSize"};
fChannelKeyMap[lingerKey] = ChannelKey{p.first, index, "linger"};
fChannelKeyMap[rateLoggingKey] = ChannelKey{p.first, index, "rateLogging"};
fChannelKeyMap[portRangeMinKey] = ChannelKey{p.first, index, "portRangeMin"};
fChannelKeyMap[portRangeMaxKey] = ChannelKey{p.first, index, "portRangeMax"};
fChannelKeyMap[autoBindKey] = ChannelKey{p.first, index, "autoBind"};
UpdateVarMap<string>(typeKey, channel.GetType());
UpdateVarMap<string>(methodKey, channel.GetMethod());
@@ -309,86 +309,67 @@ void FairMQProgOptions::UpdateMQValues()
UpdateVarMap<int>(rcvKernelSizeKey, channel.GetRcvKernelSize());
UpdateVarMap<int>(lingerKey, channel.GetLinger());
UpdateVarMap<int>(rateLoggingKey, channel.GetRateLogging());
UpdateVarMap<int>(portRangeMinKey, channel.GetPortRangeMin());
UpdateVarMap<int>(portRangeMaxKey, channel.GetPortRangeMax());
UpdateVarMap<bool>(autoBindKey, channel.GetAutoBind());
index++;
}
UpdateVarMap<int>("chans." + p.first + ".numSockets", index);
}
}
int FairMQProgOptions::UpdateChannelValue(const string& channelName, int index, const string& member, const string& val)
{
if (member == "type")
{
if (member == "type") {
fFairMQChannelMap.at(channelName).at(index).UpdateType(val);
return 0;
}
if (member == "method")
{
} else if (member == "method") {
fFairMQChannelMap.at(channelName).at(index).UpdateMethod(val);
return 0;
}
if (member == "address")
{
} else if (member == "address") {
fFairMQChannelMap.at(channelName).at(index).UpdateAddress(val);
return 0;
}
if (member == "transport")
{
} else if (member == "transport") {
fFairMQChannelMap.at(channelName).at(index).UpdateTransport(val);
return 0;
}
else
{
//if we get there it means something is wrong
} else {
LOG(error) << "update of FairMQChannel map failed for the following key: " << channelName << "." << index << "." << member;
return 1;
}
return 0;
}
int FairMQProgOptions::UpdateChannelValue(const string& channelName, int index, const string& member, int val)
{
if (member == "sndBufSize")
{
if (member == "sndBufSize") {
fFairMQChannelMap.at(channelName).at(index).UpdateSndBufSize(val);
return 0;
}
if (member == "rcvBufSize")
{
} else if (member == "rcvBufSize") {
fFairMQChannelMap.at(channelName).at(index).UpdateRcvBufSize(val);
return 0;
}
if (member == "sndKernelSize")
{
} else if (member == "sndKernelSize") {
fFairMQChannelMap.at(channelName).at(index).UpdateSndKernelSize(val);
return 0;
}
if (member == "rcvKernelSize")
{
} else if (member == "rcvKernelSize") {
fFairMQChannelMap.at(channelName).at(index).UpdateRcvKernelSize(val);
return 0;
}
if (member == "linger")
{
} else if (member == "linger") {
fFairMQChannelMap.at(channelName).at(index).UpdateLinger(val);
return 0;
} else if (member == "rateLogging") {
fFairMQChannelMap.at(channelName).at(index).UpdateRateLogging(val);
} else if (member == "portRangeMin") {
fFairMQChannelMap.at(channelName).at(index).UpdatePortRangeMin(val);
} else if (member == "portRangeMax") {
fFairMQChannelMap.at(channelName).at(index).UpdatePortRangeMax(val);
} else {
LOG(error) << "update of FairMQChannel map failed for the following key: " << channelName << "." << index << "." << member;
return 1;
}
if (member == "rateLogging")
{
fFairMQChannelMap.at(channelName).at(index).UpdateRateLogging(val);
return 0;
}
int FairMQProgOptions::UpdateChannelValue(const string& channelName, int index, const string& member, bool val)
{
if (member == "autoBind") {
fFairMQChannelMap.at(channelName).at(index).UpdateAutoBind(val);
return 0;
}
else
{
// if we get there it means something is wrong
} else {
LOG(error) << "update of FairMQChannel map failed for the following key: " << channelName << "." << index << "." << member;
return 1;
}

View File

@@ -218,6 +218,7 @@ class FairMQProgOptions
}
int UpdateChannelValue(const std::string& channelName, int index, const std::string& member, const std::string& val);
int UpdateChannelValue(const std::string& channelName, int index, const std::string& member, int val);
int UpdateChannelValue(const std::string& channelName, int index, const std::string& member, bool val);
void UpdateChannelInfo();

View File

@@ -58,6 +58,9 @@ struct SUBOPT
RCVKERNELSIZE,
LINGER,
RATELOGGING, // logging rate
PORTRANGEMIN,
PORTRANGEMAX,
AUTOBIND,
NUMSOCKETS,
lastsocketkey
};
@@ -74,6 +77,9 @@ struct SUBOPT
/*[RCVKERNELSIZE] = */ "rcvKernelSize",
/*[LINGER] = */ "linger",
/*[RATELOGGING] = */ "rateLogging",
/*[PORTRANGEMIN] = */ "portRangeMin",
/*[PORTRANGEMAX] = */ "portRangeMax",
/*[AUTOBIND] = */ "autoBind",
/*[NUMSOCKETS] = */ "numSockets",
nullptr
};

View File

@@ -48,26 +48,20 @@ DDS::DDS(const string& name, const Plugin::Version version, const string& mainta
, fDeviceTerminationRequested(false)
, fHeartbeatInterval{100}
{
try
{
try {
TakeDeviceControl();
fControllerThread = thread(&DDS::HandleControl, this);
fHeartbeatThread = thread(&DDS::HeartbeatSender, this);
}
catch (PluginServices::DeviceControlError& e)
{
} catch (PluginServices::DeviceControlError& e) {
LOG(debug) << e.what();
}
catch (exception& e)
{
} catch (exception& e) {
LOG(error) << "Error in plugin initialization: " << e.what();
}
}
auto DDS::HandleControl() -> void
{
try
{
try {
// subscribe for state changes from DDS (subscriptions start firing after fService.start() is called)
SubscribeForCustomCommands();
@@ -80,23 +74,20 @@ auto DDS::HandleControl() -> void
SubscribeForConnectingChannels();
// subscribe to device state changes, pushing new state chenges into the event queue
SubscribeToDeviceStateChange([&](DeviceState newState)
{
SubscribeToDeviceStateChange([&](DeviceState newState) {
{
lock_guard<mutex> lock{fEventsMutex};
fEvents.push(newState);
}
fNewEvent.notify_one();
if (newState == DeviceState::Exiting)
{
if (newState == DeviceState::Exiting) {
fDeviceTerminationRequested = true;
}
{
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
string id = GetProperty<string>("id");
for (auto subscriberId : fStateChangeSubscribers)
{
for (auto subscriberId : fStateChangeSubscribers) {
LOG(debug) << "Publishing state-change: " << newState << " to " << subscriberId;
fDDSCustomCmd.send("state-change: " + id + "," + ToStr(newState), to_string(subscriberId));
}
@@ -131,14 +122,11 @@ auto DDS::HandleControl() -> void
// wait until stop signal
unique_lock<mutex> lock(fStopMutex);
while (!fDeviceTerminationRequested)
{
while (!fDeviceTerminationRequested) {
fStopCondition.wait_for(lock, chrono::seconds(1));
}
LOG(debug) << "Stopping DDS control plugin";
}
catch (exception& e)
{
} catch (exception& e) {
LOG(error) << "Error: " << e.what() << endl;
return;
}
@@ -146,13 +134,10 @@ auto DDS::HandleControl() -> void
fDDSKeyValue.unsubscribe();
fDDSCustomCmd.unsubscribe();
try
{
try {
UnsubscribeFromDeviceStateChange();
ReleaseDeviceControl();
}
catch (fair::mq::PluginServices::DeviceControlError& e)
{
} catch (fair::mq::PluginServices::DeviceControlError& e) {
LOG(error) << e.what();
}
}
@@ -194,7 +179,7 @@ auto DDS::FillChannelContainers() -> void
for (const auto& vi : iValues) {
size_t pos = vi.find(":");
string chanName = vi.substr(0, pos );
string chanName = vi.substr(0, pos);
// check if provided name is a valid channel name
if (fConnectingChans.find(chanName) == fConnectingChans.end()) {
@@ -229,11 +214,9 @@ auto DDS::FillChannelContainers() -> void
auto DDS::SubscribeForConnectingChannels() -> void
{
fDDSKeyValue.subscribe([&] (const string& propertyId, const string& key, const string& value)
{
try
{
LOG(debug) << "Received update for " << propertyId << ": key=" << key << " value=" << value;
fDDSKeyValue.subscribe([&] (const string& propertyId, const string& value, uint64_t senderTaskID) {
try {
LOG(debug) << "Received update for " << propertyId << ": value=" << value << ", senderTaskID=" << senderTaskID;
string val = value;
// check if it is to handle as one out of multiple values
auto it = fIofN.find(propertyId);
@@ -254,20 +237,18 @@ auto DDS::SubscribeForConnectingChannels() -> void
auto it2 = fI.find(propertyId);
if (it2 != fI.end()) {
LOG(debug) << "adding connecting channel " << propertyId << " : " << connectionStrings.at(it2->second);
fConnectingChans.at(propertyId).fDDSValues.insert(make_pair<string, string>(key.c_str(), connectionStrings.at(it2->second).c_str()));
fConnectingChans.at(propertyId).fDDSValues.insert({senderTaskID, connectionStrings.at(it2->second).c_str()});
} else {
LOG(error) << "multiple bound channels received, but no task index specified, only assigning the first";
fConnectingChans.at(propertyId).fDDSValues.insert(make_pair<string, string>(key.c_str(), connectionStrings.at(0).c_str()));
fConnectingChans.at(propertyId).fDDSValues.insert({senderTaskID, connectionStrings.at(0).c_str()});
}
} else { // only one bound channel received
fConnectingChans.at(propertyId).fDDSValues.insert(make_pair<string, string>(key.c_str(), val.c_str()));
fConnectingChans.at(propertyId).fDDSValues.insert({senderTaskID, val.c_str()});
}
// update channels and remove them from unfinished container
for (auto mi = fConnectingChans.begin(); mi != fConnectingChans.end(); /* no increment */)
{
if (mi->second.fSubChannelAddresses.size() == mi->second.fDDSValues.size())
{
for (auto mi = fConnectingChans.begin(); mi != fConnectingChans.end(); /* no increment */) {
if (mi->second.fSubChannelAddresses.size() == mi->second.fDDSValues.size()) {
// when multiple subChannels are used, their order on every device should be the same, irregardless of arrival order from DDS.
sort(mi->second.fSubChannelAddresses.begin(), mi->second.fSubChannelAddresses.end());
auto it3 = mi->second.fDDSValues.begin();
@@ -277,24 +258,19 @@ auto DDS::SubscribeForConnectingChannels() -> void
++it3;
}
fConnectingChans.erase(mi++);
}
else
{
} else {
++mi;
}
}
}
catch (const exception& e)
{
LOG(error) << "Error on handling DDS property update for " << propertyId << ": key=" << key << " value=" << value << ": " << e.what();
} catch (const exception& e) {
LOG(error) << "Error on handling DDS property update for " << propertyId << ": value=" << value << ", senderTaskID=" << senderTaskID << ": " << e.what();
}
});
}
auto DDS::PublishBoundChannels() -> void
{
for (const auto& chan : fBindingChans)
{
for (const auto& chan : fBindingChans) {
string joined = boost::algorithm::join(chan.second, ",");
LOG(debug) << "Publishing " << chan.first << " bound addresses (" << chan.second.size() << ") to DDS under '" << chan.first << "' property name.";
fDDSKeyValue.putValue(chan.first, joined);
@@ -306,13 +282,11 @@ auto DDS::HeartbeatSender() -> void
string id = GetProperty<string>("id");
string pid(to_string(getpid()));
while (!fDeviceTerminationRequested)
{
while (!fDeviceTerminationRequested) {
{
lock_guard<mutex> lock{fHeartbeatSubscriberMutex};
for (const auto subscriberId : fHeartbeatSubscribers)
{
for (const auto subscriberId : fHeartbeatSubscribers) {
fDDSCustomCmd.send("heartbeat: " + id + "," + pid, to_string(subscriberId));
}
}
@@ -326,58 +300,42 @@ auto DDS::SubscribeForCustomCommands() -> void
string id = GetProperty<string>("id");
string pid(to_string(getpid()));
fDDSCustomCmd.subscribe([id, pid, this](const string& cmd, const string& cond, uint64_t senderId)
{
fDDSCustomCmd.subscribe([id, pid, this](const string& cmd, const string& cond, uint64_t senderId) {
LOG(info) << "Received command: " << cmd;
if (cmd == "check-state")
{
if (cmd == "check-state") {
fDDSCustomCmd.send(id + ": " + ToStr(GetCurrentDeviceState()) + " (pid: " + pid + ")", to_string(senderId));
}
else if (fCommands.find(cmd) != fCommands.end())
{
} else if (fCommands.find(cmd) != fCommands.end()) {
fDDSCustomCmd.send(id + ": attempting to " + cmd, to_string(senderId));
ChangeDeviceState(ToDeviceStateTransition(cmd));
}
else if (cmd == "END")
{
} else if (cmd == "END") {
fDDSCustomCmd.send(id + ": attempting to " + cmd, to_string(senderId));
ChangeDeviceState(ToDeviceStateTransition(cmd));
fDDSCustomCmd.send(id + ": " + ToStr(GetCurrentDeviceState()), to_string(senderId));
if (ToStr(GetCurrentDeviceState()) == "EXITING")
{
if (ToStr(GetCurrentDeviceState()) == "EXITING") {
unique_lock<mutex> lock(fStopMutex);
fStopCondition.notify_one();
}
}
else if (cmd == "dump-config")
{
} else if (cmd == "dump-config") {
stringstream ss;
for (const auto pKey: GetPropertyKeys())
{
for (const auto pKey: GetPropertyKeys()) {
ss << id << ": " << pKey << " -> " << GetPropertyAsString(pKey) << endl;
}
fDDSCustomCmd.send(ss.str(), to_string(senderId));
}
else if (cmd == "subscribe-to-heartbeats")
{
} else if (cmd == "subscribe-to-heartbeats") {
{
// auto size = fHeartbeatSubscribers.size();
lock_guard<mutex> lock{fHeartbeatSubscriberMutex};
fHeartbeatSubscribers.insert(senderId);
}
fDDSCustomCmd.send("heartbeat-subscription: " + id + ",OK", to_string(senderId));
}
else if (cmd == "unsubscribe-from-heartbeats")
{
} else if (cmd == "unsubscribe-from-heartbeats") {
{
lock_guard<mutex> lock{fHeartbeatSubscriberMutex};
fHeartbeatSubscribers.erase(senderId);
}
fDDSCustomCmd.send("heartbeat-unsubscription: " + id + ",OK", to_string(senderId));
}
else if (cmd == "subscribe-to-state-changes")
{
} else if (cmd == "subscribe-to-state-changes") {
{
// auto size = fStateChangeSubscribers.size();
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
@@ -387,17 +345,13 @@ auto DDS::SubscribeForCustomCommands() -> void
auto state = GetCurrentDeviceState();
LOG(debug) << "Publishing state-change: " << state << " to " << senderId;
fDDSCustomCmd.send("state-change: " + id + "," + ToStr(state), to_string(senderId));
}
else if (cmd == "unsubscribe-from-state-changes")
{
} else if (cmd == "unsubscribe-from-state-changes") {
{
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
fStateChangeSubscribers.erase(senderId);
}
fDDSCustomCmd.send("state-changes-unsubscription: " + id + ",OK", to_string(senderId));
}
else
{
} else {
LOG(warn) << "Unknown command: " << cmd;
LOG(warn) << "Origin: " << senderId;
LOG(warn) << "Destination: " << cond;
@@ -408,8 +362,7 @@ auto DDS::SubscribeForCustomCommands() -> void
auto DDS::WaitForNextState() -> DeviceState
{
unique_lock<mutex> lock{fEventsMutex};
while (fEvents.empty())
{
while (fEvents.empty()) {
fNewEvent.wait(lock);
}
@@ -420,13 +373,11 @@ auto DDS::WaitForNextState() -> DeviceState
DDS::~DDS()
{
if (fControllerThread.joinable())
{
if (fControllerThread.joinable()) {
fControllerThread.join();
}
if (fHeartbeatThread.joinable())
{
if (fHeartbeatThread.joinable()) {
fHeartbeatThread.join();
}
}

View File

@@ -41,7 +41,7 @@ struct DDSConfig
// container of sub channel addresses
std::vector<std::string> fSubChannelAddresses;
// dds values for the channel
std::unordered_map<std::string, std::string> fDDSValues;
std::unordered_map<uint64_t, std::string> fDDSValues;
};
struct IofN

View File

@@ -316,29 +316,6 @@ void FairMQMessageSHM::Copy(const FairMQMessage& msg)
}
}
void FairMQMessageSHM::Copy(const FairMQMessagePtr& msg)
{
if (fHandle < 0)
{
bipc::managed_shared_memory::handle_t otherHandle = static_cast<FairMQMessageSHM*>(msg.get())->fHandle;
if (otherHandle)
{
if (InitializeChunk(msg->GetSize()))
{
memcpy(GetData(), msg->GetData(), msg->GetSize());
}
}
else
{
LOG(error) << "copy fail: source message not initialized!";
}
}
else
{
LOG(error) << "copy fail: target message already initialized!";
}
}
void FairMQMessageSHM::CloseMessage()
{
if (fHandle >= 0 && !fQueued)
@@ -346,7 +323,7 @@ void FairMQMessageSHM::CloseMessage()
if (fRegionId == 0)
{
fManager.Segment().deallocate(fManager.Segment().get_address_from_handle(fHandle));
fHandle = 0;
fHandle = -1;
}
else
{
@@ -402,6 +379,7 @@ void FairMQMessageSHM::CloseMessage()
{
LOG(error) << "failed closing message, reason: " << zmq_strerror(errno);
}
fMetaCreated = false;
}
}

View File

@@ -47,7 +47,6 @@ class FairMQMessageSHM final : public FairMQMessage
fair::mq::Transport GetType() const override;
void Copy(const FairMQMessage& msg) override;
void Copy(const FairMQMessagePtr& msg) override;
~FairMQMessageSHM() override;

View File

@@ -23,8 +23,9 @@ using namespace fair::mq;
atomic<bool> FairMQSocketSHM::fInterrupted(false);
FairMQSocketSHM::FairMQSocketSHM(Manager& manager, const string& type, const string& name, const string& id /*= ""*/, void* context)
: fSocket(nullptr)
FairMQSocketSHM::FairMQSocketSHM(Manager& manager, const string& type, const string& name, const string& id /*= ""*/, void* context, FairMQTransportFactory* fac /*=nullptr*/)
: FairMQSocket{fac}
, fSocket(nullptr)
, fManager(manager)
, fId(id + "." + name + "." + type)
, fBytesTx(0)
@@ -80,7 +81,7 @@ FairMQSocketSHM::FairMQSocketSHM(Manager& manager, const string& type, const str
throw fair::mq::SocketError("PUB/SUB socket type is not supported for shared memory transport");
}
// LOG(info) << "created socket " << fId;
LOG(debug) << "Created socket " << GetId();
}
bool FairMQSocketSHM::Bind(const string& address)
@@ -99,16 +100,17 @@ bool FairMQSocketSHM::Bind(const string& address)
return true;
}
void FairMQSocketSHM::Connect(const string& address)
bool FairMQSocketSHM::Connect(const string& address)
{
// LOG(info) << "connect socket " << fId << " on " << address;
if (zmq_connect(fSocket, address.c_str()) != 0)
{
LOG(error) << "Failed connecting socket " << fId << ", reason: " << zmq_strerror(errno);
// error here means incorrect configuration. exit if it happens.
exit(EXIT_FAILURE);
return false;
}
return true;
}
int FairMQSocketSHM::Send(FairMQMessagePtr& msg, const int timeout)
@@ -284,7 +286,7 @@ int64_t FairMQSocketSHM::Send(vector<FairMQMessagePtr>& msgVec, const int timeou
}
else if (nbytes > 0)
{
assert(nbytes == (vecSize * sizeof(MetaHeader))); // all or nothing
assert(static_cast<unsigned int>(nbytes) == (vecSize * sizeof(MetaHeader))); // all or nothing
for (auto& msg : msgVec)
{
@@ -376,7 +378,7 @@ int64_t FairMQSocketSHM::Receive(vector<FairMQMessagePtr>& msgVec, const int tim
MetaHeader metaHeader;
memcpy(&metaHeader, &hdrVec[m], sizeof(MetaHeader));
msgVec.emplace_back(fair::mq::tools::make_unique<FairMQMessageSHM>(fManager));
msgVec.emplace_back(fair::mq::tools::make_unique<FairMQMessageSHM>(fManager, GetTransport()));
FairMQMessageSHM* msg = static_cast<FairMQMessageSHM*>(msgVec.back().get());
MetaHeader* msgHdr = static_cast<MetaHeader*>(zmq_msg_data(msg->GetMessage()));

View File

@@ -15,18 +15,19 @@
#include <atomic>
#include <memory> // unique_ptr
class FairMQTransportFactory;
class FairMQSocketSHM final : public FairMQSocket
{
public:
FairMQSocketSHM(fair::mq::shmem::Manager& manager, const std::string& type, const std::string& name, const std::string& id = "", void* context = nullptr);
FairMQSocketSHM(fair::mq::shmem::Manager& manager, const std::string& type, const std::string& name, const std::string& id = "", void* context = nullptr, FairMQTransportFactory* fac = nullptr);
FairMQSocketSHM(const FairMQSocketSHM&) = delete;
FairMQSocketSHM operator=(const FairMQSocketSHM&) = delete;
std::string GetId() override { return fId; }
bool Bind(const std::string& address) override;
void Connect(const std::string& address) override;
bool Connect(const std::string& address) override;
int Send(FairMQMessagePtr& msg, const int timeout = -1) override;
int Receive(FairMQMessagePtr& msg, const int timeout = -1) override;

View File

@@ -233,10 +233,10 @@ FairMQMessagePtr FairMQTransportFactorySHM::CreateMessage(FairMQUnmanagedRegionP
return unique_ptr<FairMQMessage>(new FairMQMessageSHM(*fManager, region, data, size, hint, this));
}
FairMQSocketPtr FairMQTransportFactorySHM::CreateSocket(const string& type, const string& name) const
FairMQSocketPtr FairMQTransportFactorySHM::CreateSocket(const string& type, const string& name)
{
assert(fContext);
return unique_ptr<FairMQSocket>(new FairMQSocketSHM(*fManager, type, name, GetId(), fContext));
return unique_ptr<FairMQSocket>(new FairMQSocketSHM(*fManager, type, name, GetId(), fContext, this));
}
FairMQPollerPtr FairMQTransportFactorySHM::CreatePoller(const vector<FairMQChannel>& channels) const

View File

@@ -38,7 +38,7 @@ class FairMQTransportFactorySHM final : public FairMQTransportFactory
FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override;
FairMQMessagePtr CreateMessage(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0) override;
FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name) const override;
FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name) override;
FairMQPollerPtr CreatePoller(const std::vector<FairMQChannel>& channels) const override;
FairMQPollerPtr CreatePoller(const std::vector<FairMQChannel*>& channels) const override;

View File

@@ -28,6 +28,7 @@
#include <iostream>
#include <array>
#include <exception>
#include <stdexcept>
#include <algorithm>
using namespace std;
@@ -40,54 +41,52 @@ namespace tools
{
// returns a map with network interface names as keys and their IP addresses as values
int getHostIPs(map<string, string>& addressMap)
map<string, string> getHostIPs()
{
map<string, string> addressMap;
struct ifaddrs *ifaddr, *ifa;
int s;
char host[NI_MAXHOST];
if (getifaddrs(&ifaddr) == -1)
{
if (getifaddrs(&ifaddr) == -1) {
perror("getifaddrs");
return -1;
throw runtime_error("getifaddrs failed");
}
for (ifa = ifaddr; ifa != NULL; ifa = ifa->ifa_next)
{
if (ifa->ifa_addr == NULL)
{
for (ifa = ifaddr; ifa != nullptr; ifa = ifa->ifa_next) {
if (ifa->ifa_addr == nullptr) {
continue;
}
if (ifa->ifa_addr->sa_family == AF_INET)
{
s = getnameinfo(ifa->ifa_addr, sizeof(struct sockaddr_in), host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST);
if (s != 0)
{
if (ifa->ifa_addr->sa_family == AF_INET) {
s = getnameinfo(ifa->ifa_addr, sizeof(struct sockaddr_in), host, NI_MAXHOST, nullptr, 0, NI_NUMERICHOST);
if (s != 0) {
cout << "getnameinfo() failed: " << gai_strerror(s) << endl;
return -1;
throw runtime_error("getnameinfo() failed");
}
addressMap.insert(pair<string, string>(ifa->ifa_name, host));
}
}
freeifaddrs(ifaddr);
return 0;
return addressMap;
}
// get IP address of a given interface name
string getInterfaceIP(const string& interface)
{
map<string, string> IPs;
getHostIPs(IPs);
if (IPs.count(interface))
{
return IPs[interface];
}
else
{
LOG(error) << "Could not find provided network interface: \"" << interface << "\"!, exiting.";
try {
auto IPs = getHostIPs();
if (IPs.count(interface)) {
return IPs[interface];
} else {
LOG(error) << "Could not find provided network interface: \"" << interface << "\"!, exiting.";
return "";
}
} catch (runtime_error& re) {
cout << "could not get interface IP: " << re.what();
return "";
}
}
@@ -104,28 +103,22 @@ string getDefaultRouteNetworkInterface()
unique_ptr<FILE, decltype(pclose) *> file(popen("ip route | grep default | cut -d \" \" -f 5 | head -n 1", "r"), pclose);
#endif
if (!file)
{
if (!file) {
LOG(error) << "Could not detect default route network interface name - popen() failed!";
return "";
}
while (!feof(file.get()))
{
if (fgets(buffer.data(), 128, file.get()) != NULL)
{
while (!feof(file.get())) {
if (fgets(buffer.data(), 128, file.get()) != nullptr) {
interfaceName += buffer.data();
}
}
boost::algorithm::trim(interfaceName);
if (interfaceName == "")
{
if (interfaceName == "") {
LOG(error) << "Could not detect default route network interface name";
}
else
{
} else {
LOG(debug) << "Detected network interface name for the default route: " << interfaceName;
}
@@ -134,30 +127,8 @@ string getDefaultRouteNetworkInterface()
string getIpFromHostname(const string& hostname)
{
try {
namespace bai = boost::asio::ip;
boost::asio::io_service ios;
bai::tcp::resolver resolver(ios);
bai::tcp::resolver::query query(hostname, "");
bai::tcp::resolver::iterator end;
auto it = find_if(static_cast<bai::basic_resolver_iterator<bai::tcp>>(resolver.resolve(query)), end, [](const bai::tcp::endpoint& ep) {
return ep.address().is_v4();
});
if (it != end) {
stringstream ss;
ss << static_cast<bai::tcp::endpoint>(*it).address();
return ss.str();
}
LOG(warn) << "could not find ipv4 address for hostname '" << hostname << "'";
return "";
} catch (exception& e) {
LOG(error) << "could not resolve hostname '" << hostname << "', reason: " << e.what();
return "";
}
boost::asio::io_service ios;
return getIpFromHostname(hostname, ios);
}
string getIpFromHostname(const string& hostname, boost::asio::io_service& ios)

View File

@@ -32,7 +32,7 @@ namespace tools
{
// returns a map with network interface names as keys and their IP addresses as values
int getHostIPs(std::map<std::string, std::string>& addressMap);
std::map<std::string, std::string> getHostIPs();
// get IP address of a given interface name
std::string getInterfaceIP(const std::string& interface);

View File

@@ -13,8 +13,10 @@
#include <iostream>
#include <sstream>
#include <thread>
#include <stdexcept>
using namespace std;
namespace bp = boost::process;
namespace fair
{
@@ -44,10 +46,17 @@ execute_result execute(const string& cmd, const string& prefix, const string& in
out << prefix << cmd << endl;
// Execute command and capture stdout, add prefix line by line
boost::process::ipstream c_stdout;
boost::process::opstream c_stdin;
boost::process::child c(
cmd, boost::process::std_out > c_stdout, boost::process::std_in < c_stdin);
bp::ipstream c_stdout;
bp::opstream c_stdin;
bp::child c(cmd, bp::std_out > c_stdout, bp::std_in < c_stdin);
while (c.valid() && !c.running()) {
;
}
if (!c.valid()) {
throw runtime_error("Can't execute the given process.");
}
// Optionally, write to stdin of the child
if (input != "") {
@@ -57,8 +66,7 @@ execute_result execute(const string& cmd, const string& prefix, const string& in
}
string line;
while (getline(c_stdout, line))
{
while (c.running() && getline(c_stdout, line)) {
// print full line thread-safe
stringstream printLine;
printLine << prefix << line << "\n";

View File

@@ -8,6 +8,10 @@
#include <fairmq/tools/Unique.h>
// We have to force boost::uuids to rely on /dev/*random instead of getrandom(2) or getentropy(3)
// otherwise on some systems we'd get boost::uuids::entropy_error
#define BOOST_UUID_RANDOM_PROVIDER_FORCE_POSIX
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>

View File

@@ -220,24 +220,6 @@ void FairMQMessageZMQ::Copy(const FairMQMessage& msg)
}
}
void FairMQMessageZMQ::Copy(const FairMQMessagePtr& msg)
{
FairMQMessageZMQ* msgPtr = static_cast<FairMQMessageZMQ*>(msg.get());
// Shares the message buffer between msg and this fMsg.
if (zmq_msg_copy(fMsg.get(), msgPtr->GetMessage()) != 0)
{
LOG(error) << "failed copying message, reason: " << zmq_strerror(errno);
return;
}
// if the target message has been resized, apply same to this message also
if (msgPtr->fUsedSizeModified)
{
fUsedSizeModified = true;
fUsedSize = msgPtr->fUsedSize;
}
}
void FairMQMessageZMQ::CloseMessage()
{
if (!fViewMsg)

View File

@@ -49,7 +49,6 @@ class FairMQMessageZMQ final : public FairMQMessage
fair::mq::Transport GetType() const override;
void Copy(const FairMQMessagePtr& msg) override;
void Copy(const FairMQMessage& msg) override;
~FairMQMessageZMQ() override;

View File

@@ -20,8 +20,9 @@ using namespace fair::mq;
atomic<bool> FairMQSocketZMQ::fInterrupted(false);
FairMQSocketZMQ::FairMQSocketZMQ(const string& type, const string& name, const string& id /*= ""*/, void* context)
: fSocket(nullptr)
FairMQSocketZMQ::FairMQSocketZMQ(const string& type, const string& name, const string& id /*= ""*/, void* context, FairMQTransportFactory* fac)
: FairMQSocket{fac}
, fSocket(nullptr)
, fId(id + "." + name + "." + type)
, fBytesTx(0)
, fBytesRx(0)
@@ -70,7 +71,7 @@ FairMQSocketZMQ::FairMQSocketZMQ(const string& type, const string& name, const s
}
}
// LOG(info) << "created socket " << fId;
LOG(debug) << "Created socket " << GetId();
}
string FairMQSocketZMQ::GetId()
@@ -91,19 +92,21 @@ bool FairMQSocketZMQ::Bind(const string& address)
LOG(error) << "Failed binding socket " << fId << ", reason: " << zmq_strerror(errno);
return false;
}
return true;
}
void FairMQSocketZMQ::Connect(const string& address)
bool FairMQSocketZMQ::Connect(const string& address)
{
// LOG(info) << "connect socket " << fId << " on " << address;
if (zmq_connect(fSocket, address.c_str()) != 0)
{
LOG(error) << "Failed connecting socket " << fId << ", reason: " << zmq_strerror(errno);
// error here means incorrect configuration. exit if it happens.
exit(EXIT_FAILURE);
return false;
}
return true;
}
int FairMQSocketZMQ::Send(FairMQMessagePtr& msg, const int timeout)
@@ -312,7 +315,7 @@ int64_t FairMQSocketZMQ::Receive(vector<FairMQMessagePtr>& msgVec, const int tim
do
{
unique_ptr<FairMQMessage> part(new FairMQMessageZMQ());
unique_ptr<FairMQMessage> part(new FairMQMessageZMQ(GetTransport()));
int nbytes = zmq_msg_recv(static_cast<FairMQMessageZMQ*>(part.get())->GetMessage(), fSocket, flags);
if (nbytes >= 0)

View File

@@ -15,18 +15,19 @@
#include "FairMQSocket.h"
#include "FairMQMessage.h"
class FairMQTransportFactory;
class FairMQSocketZMQ final : public FairMQSocket
{
public:
FairMQSocketZMQ(const std::string& type, const std::string& name, const std::string& id = "", void* context = nullptr);
FairMQSocketZMQ(const std::string& type, const std::string& name, const std::string& id = "", void* context = nullptr, FairMQTransportFactory* factory = nullptr);
FairMQSocketZMQ(const FairMQSocketZMQ&) = delete;
FairMQSocketZMQ operator=(const FairMQSocketZMQ&) = delete;
std::string GetId() override;
bool Bind(const std::string& address) override;
void Connect(const std::string& address) override;
bool Connect(const std::string& address) override;
int Send(FairMQMessagePtr& msg, const int timeout = -1) override;
int Receive(FairMQMessagePtr& msg, const int timeout = -1) override;

View File

@@ -69,10 +69,10 @@ FairMQMessagePtr FairMQTransportFactoryZMQ::CreateMessage(FairMQUnmanagedRegionP
return unique_ptr<FairMQMessage>(new FairMQMessageZMQ(region, data, size, hint, this));
}
FairMQSocketPtr FairMQTransportFactoryZMQ::CreateSocket(const string& type, const string& name) const
FairMQSocketPtr FairMQTransportFactoryZMQ::CreateSocket(const string& type, const string& name)
{
assert(fContext);
return unique_ptr<FairMQSocket>(new FairMQSocketZMQ(type, name, GetId(), fContext));
return unique_ptr<FairMQSocket>(new FairMQSocketZMQ(type, name, GetId(), fContext, this));
}
FairMQPollerPtr FairMQTransportFactoryZMQ::CreatePoller(const vector<FairMQChannel>& channels) const

View File

@@ -39,7 +39,7 @@ class FairMQTransportFactoryZMQ final : public FairMQTransportFactory
FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override;
FairMQMessagePtr CreateMessage(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0) override;
FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name) const override;
FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name) override;
FairMQPollerPtr CreatePoller(const std::vector<FairMQChannel>& channels) const override;
FairMQPollerPtr CreatePoller(const std::vector<FairMQChannel*>& channels) const override;

View File

@@ -74,14 +74,14 @@ add_testsuite(FairMQ.Parts
TIMEOUT 5
)
add_testsuite(FairMQ.MessageResize
add_testsuite(FairMQ.Message
SOURCES
${CMAKE_CURRENT_BINARY_DIR}/runner.cxx
message_resize/_message_resize.cxx
message/_message.cxx
LINKS FairMQ
INCLUDES ${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/message_resize
${CMAKE_CURRENT_SOURCE_DIR}/message
${CMAKE_CURRENT_BINARY_DIR}
TIMEOUT 5
${definitions}
@@ -198,6 +198,28 @@ add_testsuite(FairMQ.StateMachine
TIMEOUT 10
)
add_testsuite(FairMQ.Tools
SOURCES
${CMAKE_CURRENT_BINARY_DIR}/runner.cxx
tools/_network.cxx
LINKS FairMQ
INCLUDES ${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_BINARY_DIR}
TIMEOUT 10
)
add_testsuite(FairMQ.Channel
SOURCES
${CMAKE_CURRENT_BINARY_DIR}/runner.cxx
channel/_channel.cxx
LINKS FairMQ
INCLUDES ${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_BINARY_DIR}
TIMEOUT 10
)
add_testsuite(FairMQ.Transport
SOURCES
${CMAKE_CURRENT_BINARY_DIR}/runner.cxx

96
test/channel/_channel.cxx Normal file
View File

@@ -0,0 +1,96 @@
/********************************************************************************
* Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <FairMQChannel.h>
#include <gtest/gtest.h>
#include <string>
namespace
{
using namespace std;
using namespace fair::mq;
TEST(Channel, Validation)
{
FairMQChannel channel;
ASSERT_THROW(channel.Validate(), FairMQChannel::ChannelConfigurationError);
channel.UpdateType("pair");
ASSERT_EQ(channel.Validate(), false);
ASSERT_EQ(channel.IsValid(), false);
channel.UpdateAddress("bla");
ASSERT_THROW(channel.Validate(), FairMQChannel::ChannelConfigurationError);
channel.UpdateMethod("connect");
ASSERT_EQ(channel.Validate(), false);
ASSERT_EQ(channel.IsValid(), false);
channel.UpdateAddress("ipc://");
ASSERT_EQ(channel.Validate(), false);
ASSERT_EQ(channel.IsValid(), false);
channel.UpdateAddress("verbs://");
ASSERT_EQ(channel.Validate(), false);
ASSERT_EQ(channel.IsValid(), false);
channel.UpdateAddress("inproc://");
ASSERT_EQ(channel.Validate(), false);
ASSERT_EQ(channel.IsValid(), false);
channel.UpdateAddress("tcp://");
ASSERT_EQ(channel.Validate(), false);
ASSERT_EQ(channel.IsValid(), false);
channel.UpdateAddress("tcp://localhost:5555");
ASSERT_EQ(channel.Validate(), true);
ASSERT_EQ(channel.IsValid(), true);
channel.UpdateSndBufSize(-1);
ASSERT_THROW(channel.Validate(), FairMQChannel::ChannelConfigurationError);
channel.UpdateSndBufSize(1000);
ASSERT_NO_THROW(channel.Validate());
channel.UpdateRcvBufSize(-1);
ASSERT_THROW(channel.Validate(), FairMQChannel::ChannelConfigurationError);
channel.UpdateRcvBufSize(1000);
ASSERT_NO_THROW(channel.Validate());
channel.UpdateSndKernelSize(-1);
ASSERT_THROW(channel.Validate(), FairMQChannel::ChannelConfigurationError);
channel.UpdateSndKernelSize(1000);
ASSERT_NO_THROW(channel.Validate());
channel.UpdateRcvKernelSize(-1);
ASSERT_THROW(channel.Validate(), FairMQChannel::ChannelConfigurationError);
channel.UpdateRcvKernelSize(1000);
ASSERT_NO_THROW(channel.Validate());
channel.UpdateRateLogging(-1);
ASSERT_THROW(channel.Validate(), FairMQChannel::ChannelConfigurationError);
channel.UpdateRateLogging(1);
ASSERT_NO_THROW(channel.Validate());
FairMQChannel channel2 = channel;
ASSERT_NO_THROW(channel2.Validate());
ASSERT_EQ(channel2.Validate(), true);
ASSERT_EQ(channel2.IsValid(), true);
ASSERT_EQ(channel2.Validate(), true);
channel2.UpdateChannelName("Kanal");
ASSERT_EQ(channel2.GetChannelName(), "Kanal");
channel2.ResetChannel();
ASSERT_EQ(channel2.IsValid(), false);
ASSERT_EQ(channel2.Validate(), true);
}
} /* namespace */

View File

@@ -21,8 +21,8 @@ namespace
using namespace std;
auto RunPushPullWithMsgResize(string transport, string address) -> void {
void RunPushPullWithMsgResize(const string& transport, const string& address)
{
size_t session{fair::mq::tools::UuidHash()};
FairMQProgOptions config;
@@ -53,6 +53,29 @@ auto RunPushPullWithMsgResize(string transport, string address) -> void {
ASSERT_EQ(inMsg->GetSize(), 250);
}
void RunMsgRebuild(const string& transport)
{
size_t session{fair::mq::tools::UuidHash()};
FairMQProgOptions config;
config.SetValue<string>("session", to_string(session));
auto factory = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config);
FairMQMessagePtr msg(factory->CreateMessage());
EXPECT_EQ(msg->GetSize(), 0);
msg->Rebuild(100);
EXPECT_EQ(msg->GetSize(), 100);
string* str = new string("asdf");
msg->Rebuild(const_cast<char*>(str->c_str()),
str->length(),
[](void* /*data*/, void* obj) { delete static_cast<string*>(obj); },
str);
EXPECT_NE(msg->GetSize(), 100);
EXPECT_EQ(msg->GetSize(), string("asdf").length());
EXPECT_EQ(string(static_cast<char*>(msg->GetData()), msg->GetSize()), string("asdf"));
}
TEST(MessageResize, ZeroMQ)
{
RunPushPullWithMsgResize("zeromq", "ipc://test_message_resize");
@@ -70,4 +93,21 @@ TEST(MessageResize, nanomsg)
}
#endif /* BUILD_NANOMSG_TRANSPORT */
TEST(MessageRebuild, ZeroMQ)
{
RunMsgRebuild("zeromq");
}
TEST(MessageRebuild, shmem)
{
RunMsgRebuild("shmem");
}
#ifdef BUILD_NANOMSG_TRANSPORT
TEST(MessageRebuild, nanomsg)
{
RunMsgRebuild("nanomsg");
}
#endif /* BUILD_NANOMSG_TRANSPORT */
} // namespace

View File

@@ -32,6 +32,7 @@ auto RunSingleThreadedMultipart(string transport, string address) -> void {
FairMQProgOptions config;
config.SetValue<string>("session", std::to_string(session));
auto factory = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config);
FairMQTransportFactory* factoryptr = factory.get();
auto push = FairMQChannel{"Push", "push", factory};
ASSERT_TRUE(push.Bind(address));
auto pull = FairMQChannel{"Pull", "pull", factory};
@@ -39,8 +40,8 @@ auto RunSingleThreadedMultipart(string transport, string address) -> void {
// TODO validate that fTransportFactory is not nullptr
// TODO validate that fSocket is not nullptr
ASSERT_TRUE(push.ValidateChannel());
ASSERT_TRUE(pull.ValidateChannel());
ASSERT_TRUE(push.Validate());
ASSERT_TRUE(pull.Validate());
{
auto sentMsg = FairMQParts{};
@@ -55,8 +56,9 @@ auto RunSingleThreadedMultipart(string transport, string address) -> void {
ASSERT_GE(pull.Receive(receivedMsg), 0);
stringstream out;
for_each(receivedMsg.cbegin(), receivedMsg.cend(), [&out](const FairMQMessagePtr& part) {
for_each(receivedMsg.cbegin(), receivedMsg.cend(), [&out,&factoryptr](const FairMQMessagePtr& part) {
out << string{static_cast<char*>(part->GetData()), part->GetSize()};
ASSERT_EQ(part->GetTransport(),factoryptr);
});
ASSERT_EQ(out.str(), "123");
}
@@ -76,7 +78,7 @@ auto RunMultiThreadedMultipart(string transport, string address) -> void
pull.Connect(address);
auto pusher = thread{[&push](){
ASSERT_TRUE(push.ValidateChannel());
ASSERT_TRUE(push.Validate());
auto sentMsg = FairMQParts{};
sentMsg.AddPart(push.NewSimpleMessage("1"));
@@ -87,7 +89,7 @@ auto RunMultiThreadedMultipart(string transport, string address) -> void
}};
auto puller = thread{[&pull](){
ASSERT_TRUE(pull.ValidateChannel());
ASSERT_TRUE(pull.Validate());
auto receivedMsg = FairMQParts{};
ASSERT_GE(pull.Receive(receivedMsg), 0);

View File

@@ -43,8 +43,9 @@ TEST(StateMachine, RegularFSM)
});
fsm.SubscribeToStateChange("test", [&](S newState, S lastState){
if (newState == S::Idle && lastState == S::ResettingDevice)
if (newState == S::Idle && lastState == S::ResettingDevice) {
ASSERT_NO_THROW(fsm.ChangeState(T::End));
}
});
ASSERT_NO_THROW(fsm.ChangeState(T::ResetDevice));

28
test/tools/_network.cxx Normal file
View File

@@ -0,0 +1,28 @@
/********************************************************************************
* Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <gtest/gtest.h>
#include <fairmq/Tools.h>
#include <string>
namespace
{
using namespace std;
using namespace fair::mq;
TEST(Tools, Network)
{
string interface = fair::mq::tools::getDefaultRouteNetworkInterface();
EXPECT_NE(interface, "");
string interfaceIP = fair::mq::tools::getInterfaceIP(interface);
EXPECT_NE(interfaceIP, "");
}
} /* namespace */