mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 08:41:16 +00:00
Support multiple endpoints per socket
Sent messages will be scheduled among the endpoints according to socket type: PUB will send the same data to all endpoints simultaneously, PUSH will do round robin transfer. Incoming data is fair queued between endpoints. This is a feature of at least zeromq and nanomsg. _____________ To use: in the device configuration, instead of specifying just one address, specify a comma separated list e.g. tcp://localhost:123,ipc:///tmp/socket the connection method (bind/connect) applies to all endpoints in this case. ______________ Mixing binding and connecting endpoints is supported: prefix "@" means "bind", "+" (or ">") means connect, e.g. +tcp://localhost:123,@ipc:///tmp/socket,ipc:///tmp/asd (in case of missing prefix, the default channel method is used for that endpoint).
This commit is contained in:
parent
0a3f14c0e3
commit
c2d7c49cf5
|
@ -411,28 +411,40 @@ bool FairMQChannel::ValidateChannel()
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
//TODO: maybe cache fEndpoints as a class member? not really needed as tokenizing is
|
||||||
|
//fast, and only happens during (re-)configure
|
||||||
|
std::vector<std::string> fEndpoints;
|
||||||
|
Tokenize(fEndpoints, fAddress);
|
||||||
|
for (const auto endpoint : fEndpoints)
|
||||||
|
{
|
||||||
|
std::string address;
|
||||||
|
if (endpoint[0]=='@'||endpoint[0]=='+'||endpoint[0]=='>') {
|
||||||
|
address = endpoint.substr(1);
|
||||||
|
} else {
|
||||||
|
address = endpoint;
|
||||||
|
}
|
||||||
// check if address is a tcp or ipc address
|
// check if address is a tcp or ipc address
|
||||||
if (fAddress.compare(0, 6, "tcp://") == 0)
|
if (address.compare(0, 6, "tcp://") == 0)
|
||||||
{
|
{
|
||||||
// check if TCP address contains port delimiter
|
// check if TCP address contains port delimiter
|
||||||
string addressString = fAddress.substr(6);
|
string addressString = address.substr(6);
|
||||||
if (addressString.find(":") == string::npos)
|
if (addressString.find(":") == string::npos)
|
||||||
{
|
{
|
||||||
ss << "INVALID";
|
ss << "INVALID";
|
||||||
LOG(DEBUG) << ss.str();
|
LOG(DEBUG) << ss.str();
|
||||||
LOG(ERROR) << "invalid channel address: \"" << fAddress << "\" (missing port?)";
|
LOG(ERROR) << "invalid channel address: \"" << address << "\" (missing port?)";
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (fAddress.compare(0, 6, "ipc://") == 0)
|
else if (address.compare(0, 6, "ipc://") == 0)
|
||||||
{
|
{
|
||||||
// check if IPC address is not empty
|
// check if IPC address is not empty
|
||||||
string addressString = fAddress.substr(6);
|
string addressString = address.substr(6);
|
||||||
if (addressString == "")
|
if (addressString == "")
|
||||||
{
|
{
|
||||||
ss << "INVALID";
|
ss << "INVALID";
|
||||||
LOG(DEBUG) << ss.str();
|
LOG(DEBUG) << ss.str();
|
||||||
LOG(ERROR) << "invalid channel address: \"" << fAddress << "\" (empty IPC address?)";
|
LOG(ERROR) << "invalid channel address: \"" << address << "\" (empty IPC address?)";
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -441,10 +453,11 @@ bool FairMQChannel::ValidateChannel()
|
||||||
// if neither TCP or IPC is specified, return invalid
|
// if neither TCP or IPC is specified, return invalid
|
||||||
ss << "INVALID";
|
ss << "INVALID";
|
||||||
LOG(DEBUG) << ss.str();
|
LOG(DEBUG) << ss.str();
|
||||||
LOG(ERROR) << "invalid channel address: \"" << fAddress << "\" (missing protocol specifier?)";
|
LOG(ERROR) << "invalid channel address: \"" << address << "\" (missing protocol specifier?)";
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// validate socket buffer size for sending
|
// validate socket buffer size for sending
|
||||||
if (fSndBufSize < 0)
|
if (fSndBufSize < 0)
|
||||||
|
@ -770,3 +783,27 @@ FairMQChannel::~FairMQChannel()
|
||||||
delete fCmdSocket;
|
delete fCmdSocket;
|
||||||
delete fPoller;
|
delete fPoller;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void FairMQChannel::Tokenize(std::vector<std::string>& output,
|
||||||
|
const std::string& input,
|
||||||
|
const std::string delimiters)
|
||||||
|
{
|
||||||
|
using namespace std;
|
||||||
|
size_t start = 0;
|
||||||
|
size_t end = input.find_first_of(delimiters);
|
||||||
|
if (end == string::npos)
|
||||||
|
{
|
||||||
|
output.push_back(input.substr(start, input.length()));
|
||||||
|
}
|
||||||
|
else do
|
||||||
|
{
|
||||||
|
output.push_back(input.substr(start, end-start));
|
||||||
|
start = ++end;
|
||||||
|
end = input.find_first_of(delimiters, start);
|
||||||
|
if (end == string::npos)
|
||||||
|
{
|
||||||
|
output.push_back(input.substr(start, input.length()));
|
||||||
|
}
|
||||||
|
} while (end != string::npos);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
|
@ -248,6 +248,11 @@ class FairMQChannel
|
||||||
int Receive(FairMQMessage* msg, const std::string& flag = "", int rcvTimeoutInMs = -1) const;
|
int Receive(FairMQMessage* msg, const std::string& flag = "", int rcvTimeoutInMs = -1) const;
|
||||||
int Receive(FairMQMessage* msg, const int flags, int rcvTimeoutInMs = -1) const;
|
int Receive(FairMQMessage* msg, const int flags, int rcvTimeoutInMs = -1) const;
|
||||||
|
|
||||||
|
// TODO: this might go to some base utility library
|
||||||
|
static void Tokenize(std::vector<std::string>& output,
|
||||||
|
const std::string& input,
|
||||||
|
const std::string delimiters = ",");
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::string fType;
|
std::string fType;
|
||||||
std::string fMethod;
|
std::string fMethod;
|
||||||
|
|
|
@ -118,7 +118,7 @@ void FairMQDevice::ConnectChannels(list<FairMQChannel*>& chans)
|
||||||
{
|
{
|
||||||
if ((*itr)->ValidateChannel())
|
if ((*itr)->ValidateChannel())
|
||||||
{
|
{
|
||||||
if (ConnectChannel(**itr))
|
if (AttachChannel(**itr))
|
||||||
{
|
{
|
||||||
(*itr)->InitCommandInterface(fTransportFactory, fNumIoThreads);
|
(*itr)->InitCommandInterface(fTransportFactory, fNumIoThreads);
|
||||||
chans.erase(itr++);
|
chans.erase(itr++);
|
||||||
|
@ -144,7 +144,7 @@ void FairMQDevice::BindChannels(list<FairMQChannel*>& chans)
|
||||||
{
|
{
|
||||||
if ((*itr)->ValidateChannel())
|
if ((*itr)->ValidateChannel())
|
||||||
{
|
{
|
||||||
if (BindChannel(**itr))
|
if (AttachChannel(**itr))
|
||||||
{
|
{
|
||||||
(*itr)->InitCommandInterface(fTransportFactory, fNumIoThreads);
|
(*itr)->InitCommandInterface(fTransportFactory, fNumIoThreads);
|
||||||
chans.erase(itr++);
|
chans.erase(itr++);
|
||||||
|
@ -273,37 +273,9 @@ bool FairMQDevice::BindChannel(FairMQChannel& ch)
|
||||||
ch.fSocket->SetOption("snd-hwm", &(ch.fSndBufSize), sizeof(ch.fSndBufSize));
|
ch.fSocket->SetOption("snd-hwm", &(ch.fSndBufSize), sizeof(ch.fSndBufSize));
|
||||||
ch.fSocket->SetOption("rcv-hwm", &(ch.fRcvBufSize), sizeof(ch.fRcvBufSize));
|
ch.fSocket->SetOption("rcv-hwm", &(ch.fRcvBufSize), sizeof(ch.fRcvBufSize));
|
||||||
|
|
||||||
// number of attempts when choosing a random port
|
|
||||||
int maxAttempts = 1000;
|
|
||||||
int numAttempts = 0;
|
|
||||||
|
|
||||||
// initialize random generator
|
|
||||||
std::default_random_engine generator(std::chrono::system_clock::now().time_since_epoch().count());
|
|
||||||
std::uniform_int_distribution<int> randomPort(fPortRangeMin, fPortRangeMax);
|
|
||||||
|
|
||||||
LOG(DEBUG) << "Binding channel " << ch.fChannelName << " on " << ch.fAddress;
|
LOG(DEBUG) << "Binding channel " << ch.fChannelName << " on " << ch.fAddress;
|
||||||
|
|
||||||
// try to bind to the saved port. In case of failure, try random one.
|
return BindEndpoint(*ch.fSocket, ch.fAddress);
|
||||||
while (!ch.fSocket->Bind(ch.fAddress))
|
|
||||||
{
|
|
||||||
LOG(DEBUG) << "Could not bind to configured (TCP) port, trying random port in range " << fPortRangeMin << "-" << fPortRangeMax;
|
|
||||||
++numAttempts;
|
|
||||||
|
|
||||||
if (numAttempts > maxAttempts)
|
|
||||||
{
|
|
||||||
LOG(ERROR) << "could not bind to any (TCP) port in the given range after " << maxAttempts << " attempts";
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
size_t pos = ch.fAddress.rfind(":");
|
|
||||||
stringstream newPort;
|
|
||||||
newPort << static_cast<int>(randomPort(generator));
|
|
||||||
ch.fAddress = ch.fAddress.substr(0, pos + 1) + newPort.str();
|
|
||||||
|
|
||||||
LOG(DEBUG) << "Binding channel " << ch.fChannelName << " on " << ch.fAddress;
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool FairMQDevice::ConnectChannel(FairMQChannel& ch)
|
bool FairMQDevice::ConnectChannel(FairMQChannel& ch)
|
||||||
|
@ -316,7 +288,100 @@ bool FairMQDevice::ConnectChannel(FairMQChannel& ch)
|
||||||
ch.fSocket->SetOption("rcv-hwm", &(ch.fRcvBufSize), sizeof(ch.fRcvBufSize));
|
ch.fSocket->SetOption("rcv-hwm", &(ch.fRcvBufSize), sizeof(ch.fRcvBufSize));
|
||||||
// connect
|
// connect
|
||||||
LOG(DEBUG) << "Connecting channel " << ch.fChannelName << " to " << ch.fAddress;
|
LOG(DEBUG) << "Connecting channel " << ch.fChannelName << " to " << ch.fAddress;
|
||||||
ch.fSocket->Connect(ch.fAddress);
|
ConnectEndpoint(*ch.fSocket, ch.fAddress);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool FairMQDevice::AttachChannel(FairMQChannel& ch)
|
||||||
|
{
|
||||||
|
std::vector<std::string> endpoints;
|
||||||
|
FairMQChannel::Tokenize(endpoints, ch.fAddress);
|
||||||
|
for (auto& endpoint : endpoints)
|
||||||
|
{
|
||||||
|
//(re-)init socket
|
||||||
|
if (!ch.fSocket) {
|
||||||
|
ch.fSocket = fTransportFactory->CreateSocket(ch.fType, ch.fChannelName, fNumIoThreads, fId);
|
||||||
|
}
|
||||||
|
|
||||||
|
// set high water marks
|
||||||
|
ch.fSocket->SetOption("snd-hwm", &(ch.fSndBufSize), sizeof(ch.fSndBufSize));
|
||||||
|
ch.fSocket->SetOption("rcv-hwm", &(ch.fRcvBufSize), sizeof(ch.fRcvBufSize));
|
||||||
|
|
||||||
|
// attach
|
||||||
|
bool bind = (ch.fMethod=="bind");
|
||||||
|
bool connectionModifier = false;
|
||||||
|
std::string address = endpoint;
|
||||||
|
|
||||||
|
// check if the default fMethod is overridden by a modifier
|
||||||
|
if (endpoint[0]=='+' || endpoint[0]=='>') {
|
||||||
|
connectionModifier = true;
|
||||||
|
bind = false;
|
||||||
|
address = endpoint.substr(1);
|
||||||
|
} else if (endpoint[0]=='@') {
|
||||||
|
connectionModifier = true;
|
||||||
|
bind = true;
|
||||||
|
address = endpoint.substr(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool rc = true;
|
||||||
|
// make the connection
|
||||||
|
if (bind) {
|
||||||
|
rc = BindEndpoint(*ch.fSocket, address);
|
||||||
|
} else {
|
||||||
|
rc = ConnectEndpoint(*ch.fSocket, address);
|
||||||
|
}
|
||||||
|
|
||||||
|
// bind might bind to an address different than requested,
|
||||||
|
// put the actual address back in the config
|
||||||
|
endpoint.clear();
|
||||||
|
if (connectionModifier) endpoint.push_back(bind?'@':'+');
|
||||||
|
endpoint += address;
|
||||||
|
|
||||||
|
LOG(DEBUG) << "Attached channel " << ch.fChannelName << " to " << endpoint
|
||||||
|
<< (bind?" (bind) ":" (connect) ");
|
||||||
|
|
||||||
|
// after the book keeping is done, exit in case of errors
|
||||||
|
if (!rc) return rc;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool FairMQDevice::ConnectEndpoint(FairMQSocket& socket, std::string& endpoint)
|
||||||
|
{
|
||||||
|
socket.Connect(endpoint);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool FairMQDevice::BindEndpoint(FairMQSocket& socket, std::string& endpoint)
|
||||||
|
{
|
||||||
|
// number of attempts when choosing a random port
|
||||||
|
int maxAttempts = 1000;
|
||||||
|
int numAttempts = 0;
|
||||||
|
|
||||||
|
// initialize random generator
|
||||||
|
std::default_random_engine generator(std::chrono::system_clock::now().time_since_epoch().count());
|
||||||
|
std::uniform_int_distribution<int> randomPort(fPortRangeMin, fPortRangeMax);
|
||||||
|
|
||||||
|
// try to bind to the saved port. In case of failure, try random one.
|
||||||
|
while (!socket.Bind(endpoint))
|
||||||
|
{
|
||||||
|
LOG(DEBUG) << "Could not bind to configured (TCP) port, trying random port in range "
|
||||||
|
<< fPortRangeMin << "-" << fPortRangeMax;
|
||||||
|
++numAttempts;
|
||||||
|
|
||||||
|
if (numAttempts > maxAttempts)
|
||||||
|
{
|
||||||
|
LOG(ERROR) << "could not bind to any (TCP) port in the given range after "
|
||||||
|
<< maxAttempts << " attempts";
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t pos = endpoint.rfind(":");
|
||||||
|
stringstream newPort;
|
||||||
|
newPort << static_cast<int>(randomPort(generator));
|
||||||
|
// TODO: thread safety? (this comes in as a reference and DOES get changed in this case).
|
||||||
|
endpoint = endpoint.substr(0, pos + 1) + newPort.str();
|
||||||
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -400,6 +400,16 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable
|
||||||
/// Connects a single channel (used in InitWrapper)
|
/// Connects a single channel (used in InitWrapper)
|
||||||
bool ConnectChannel(FairMQChannel& ch);
|
bool ConnectChannel(FairMQChannel& ch);
|
||||||
|
|
||||||
|
/// Sets up and connects/binds a socket to an endpoint
|
||||||
|
/// return a string with the actual endpoint if it happens
|
||||||
|
//to stray from default.
|
||||||
|
bool ConnectEndpoint(FairMQSocket& socket, std::string& endpoint);
|
||||||
|
bool BindEndpoint(FairMQSocket& socket, std::string& endpoint);
|
||||||
|
/// Attaches the channel to all listed endpoints
|
||||||
|
/// the list is comma separated; the default method (bind/connect) is used.
|
||||||
|
/// to override default: prepend "@" to bind, "+" or ">" to connect endpoint.
|
||||||
|
bool AttachChannel(FairMQChannel& ch);
|
||||||
|
|
||||||
/// Signal handler
|
/// Signal handler
|
||||||
void SignalHandler(int signal);
|
void SignalHandler(int signal);
|
||||||
bool fCatchingSignals;
|
bool fCatchingSignals;
|
||||||
|
|
|
@ -11,3 +11,53 @@
|
||||||
* @since 2012-12-05
|
* @since 2012-12-05
|
||||||
* @author D. Klein, A. Rybalchenko
|
* @author D. Klein, A. Rybalchenko
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#include "FairMQSocket.h"
|
||||||
|
|
||||||
|
bool FairMQSocket::Attach(const std::string& config, bool serverish)
|
||||||
|
{
|
||||||
|
if (config.empty())
|
||||||
|
return false;
|
||||||
|
if (config.size()<2)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
const char* endpoints = config.c_str();
|
||||||
|
|
||||||
|
// We hold each individual endpoint here
|
||||||
|
char endpoint [256];
|
||||||
|
while (*endpoints) {
|
||||||
|
const char *delimiter = strchr (endpoints, ',');
|
||||||
|
if (!delimiter)
|
||||||
|
delimiter = endpoints + strlen (endpoints);
|
||||||
|
if (delimiter - endpoints > 255)
|
||||||
|
return false;
|
||||||
|
memcpy (endpoint, endpoints, delimiter - endpoints);
|
||||||
|
endpoint [delimiter - endpoints] = 0;
|
||||||
|
|
||||||
|
bool rc;
|
||||||
|
if (endpoint [0] == '@') {
|
||||||
|
rc = Bind(endpoint + 1);
|
||||||
|
}
|
||||||
|
else if (endpoint [0] == '>' || endpoint [0] == '-' || endpoint [0] == '+' ) {
|
||||||
|
Connect(endpoint + 1);
|
||||||
|
}
|
||||||
|
else if (serverish) {
|
||||||
|
rc = Bind(endpoint);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
Connect(endpoint);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!rc) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (*delimiter == 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
endpoints = delimiter + 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
|
@ -37,6 +37,7 @@ class FairMQSocket
|
||||||
|
|
||||||
virtual bool Bind(const std::string& address) = 0;
|
virtual bool Bind(const std::string& address) = 0;
|
||||||
virtual void Connect(const std::string& address) = 0;
|
virtual void Connect(const std::string& address) = 0;
|
||||||
|
virtual bool Attach(const std::string& address, bool serverish = false);
|
||||||
|
|
||||||
virtual int Send(FairMQMessage* msg, const std::string& flag = "") = 0;
|
virtual int Send(FairMQMessage* msg, const std::string& flag = "") = 0;
|
||||||
virtual int Send(FairMQMessage* msg, const int flags = 0) = 0;
|
virtual int Send(FairMQMessage* msg, const int flags = 0) = 0;
|
||||||
|
|
Loading…
Reference in New Issue
Block a user