mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 08:41:16 +00:00
FairMQChannel: Refactor, moving short methods to header
This commit is contained in:
parent
120760da0a
commit
6699711e17
|
@ -83,26 +83,28 @@ FairMQChannel::FairMQChannel(const string& name, const string& type, const strin
|
||||||
, fAutoBind(DefaultAutoBind)
|
, fAutoBind(DefaultAutoBind)
|
||||||
, fValid(false)
|
, fValid(false)
|
||||||
, fMultipart(false)
|
, fMultipart(false)
|
||||||
{}
|
{
|
||||||
|
// LOG(warn) << "Constructing channel '" << fName << "'";
|
||||||
|
}
|
||||||
|
|
||||||
FairMQChannel::FairMQChannel(const string& name, int index, const fair::mq::Properties& properties)
|
FairMQChannel::FairMQChannel(const string& name, int index, const fair::mq::Properties& properties)
|
||||||
: FairMQChannel(tools::ToString(name, "[", index, "]"), "unspecified", "unspecified", "unspecified", nullptr)
|
: FairMQChannel(tools::ToString(name, "[", index, "]"), "unspecified", "unspecified", "unspecified", nullptr)
|
||||||
{
|
{
|
||||||
string prefix(tools::ToString("chans.", name, ".", index, "."));
|
string prefix(tools::ToString("chans.", name, ".", index, "."));
|
||||||
|
|
||||||
fType = GetPropertyOrDefault(properties, string(prefix + "type"), fType);
|
fType = GetPropertyOrDefault(properties, string(prefix + "type"), std::string(DefaultType));
|
||||||
fMethod = GetPropertyOrDefault(properties, string(prefix + "method"), fMethod);
|
fMethod = GetPropertyOrDefault(properties, string(prefix + "method"), std::string(DefaultMethod));
|
||||||
fAddress = GetPropertyOrDefault(properties, string(prefix + "address"), fAddress);
|
fAddress = GetPropertyOrDefault(properties, string(prefix + "address"), std::string(DefaultAddress));
|
||||||
fTransportType = TransportTypes.at(GetPropertyOrDefault(properties, string(prefix + "transport"), TransportNames.at(fTransportType)));
|
fTransportType = TransportType(GetPropertyOrDefault(properties, string(prefix + "transport"), std::string(DefaultTransportName)));
|
||||||
fSndBufSize = GetPropertyOrDefault(properties, string(prefix + "sndBufSize"), fSndBufSize);
|
fSndBufSize = GetPropertyOrDefault(properties, string(prefix + "sndBufSize"), DefaultSndBufSize);
|
||||||
fRcvBufSize = GetPropertyOrDefault(properties, string(prefix + "rcvBufSize"), fRcvBufSize);
|
fRcvBufSize = GetPropertyOrDefault(properties, string(prefix + "rcvBufSize"), DefaultRcvBufSize);
|
||||||
fSndKernelSize = GetPropertyOrDefault(properties, string(prefix + "sndKernelSize"), fSndKernelSize);
|
fSndKernelSize = GetPropertyOrDefault(properties, string(prefix + "sndKernelSize"), DefaultSndKernelSize);
|
||||||
fRcvKernelSize = GetPropertyOrDefault(properties, string(prefix + "rcvKernelSize"), fRcvKernelSize);
|
fRcvKernelSize = GetPropertyOrDefault(properties, string(prefix + "rcvKernelSize"), DefaultRcvKernelSize);
|
||||||
fLinger = GetPropertyOrDefault(properties, string(prefix + "linger"), fLinger);
|
fLinger = GetPropertyOrDefault(properties, string(prefix + "linger"), DefaultLinger);
|
||||||
fRateLogging = GetPropertyOrDefault(properties, string(prefix + "rateLogging"), fRateLogging);
|
fRateLogging = GetPropertyOrDefault(properties, string(prefix + "rateLogging"), DefaultRateLogging);
|
||||||
fPortRangeMin = GetPropertyOrDefault(properties, string(prefix + "portRangeMin"), fPortRangeMin);
|
fPortRangeMin = GetPropertyOrDefault(properties, string(prefix + "portRangeMin"), DefaultPortRangeMin);
|
||||||
fPortRangeMax = GetPropertyOrDefault(properties, string(prefix + "portRangeMax"), fPortRangeMax);
|
fPortRangeMax = GetPropertyOrDefault(properties, string(prefix + "portRangeMax"), DefaultPortRangeMax);
|
||||||
fAutoBind = GetPropertyOrDefault(properties, string(prefix + "autoBind"), fAutoBind);
|
fAutoBind = GetPropertyOrDefault(properties, string(prefix + "autoBind"), DefaultAutoBind);
|
||||||
}
|
}
|
||||||
|
|
||||||
FairMQChannel::FairMQChannel(const FairMQChannel& chan)
|
FairMQChannel::FairMQChannel(const FairMQChannel& chan)
|
||||||
|
@ -158,191 +160,6 @@ FairMQChannel& FairMQChannel::operator=(const FairMQChannel& chan)
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
FairMQSocket & FairMQChannel::GetSocket() const
|
|
||||||
{
|
|
||||||
assert(fSocket);
|
|
||||||
return *fSocket;
|
|
||||||
}
|
|
||||||
|
|
||||||
string FairMQChannel::GetName() const
|
|
||||||
{
|
|
||||||
return fName;
|
|
||||||
}
|
|
||||||
|
|
||||||
string FairMQChannel::GetPrefix() const
|
|
||||||
{
|
|
||||||
string prefix = fName;
|
|
||||||
prefix = prefix.erase(fName.rfind('['));
|
|
||||||
return prefix;
|
|
||||||
}
|
|
||||||
|
|
||||||
string FairMQChannel::GetIndex() const
|
|
||||||
{
|
|
||||||
string indexStr = fName;
|
|
||||||
indexStr.erase(indexStr.rfind(']'));
|
|
||||||
indexStr.erase(0, indexStr.rfind('[') + 1);
|
|
||||||
return indexStr;
|
|
||||||
}
|
|
||||||
|
|
||||||
string FairMQChannel::GetType() const
|
|
||||||
{
|
|
||||||
return fType;
|
|
||||||
}
|
|
||||||
|
|
||||||
string FairMQChannel::GetMethod() const
|
|
||||||
{
|
|
||||||
return fMethod;
|
|
||||||
}
|
|
||||||
|
|
||||||
string FairMQChannel::GetAddress() const
|
|
||||||
{
|
|
||||||
return fAddress;
|
|
||||||
}
|
|
||||||
|
|
||||||
string FairMQChannel::GetTransportName() const
|
|
||||||
{
|
|
||||||
return TransportNames.at(fTransportType);
|
|
||||||
}
|
|
||||||
|
|
||||||
Transport FairMQChannel::GetTransportType() const
|
|
||||||
{
|
|
||||||
return fTransportType;
|
|
||||||
}
|
|
||||||
|
|
||||||
int FairMQChannel::GetSndBufSize() const
|
|
||||||
{
|
|
||||||
return fSndBufSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
int FairMQChannel::GetRcvBufSize() const
|
|
||||||
{
|
|
||||||
return fRcvBufSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
int FairMQChannel::GetSndKernelSize() const
|
|
||||||
{
|
|
||||||
return fSndKernelSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
int FairMQChannel::GetRcvKernelSize() const
|
|
||||||
{
|
|
||||||
return fRcvKernelSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
int FairMQChannel::GetLinger() const
|
|
||||||
{
|
|
||||||
return fLinger;
|
|
||||||
}
|
|
||||||
|
|
||||||
int FairMQChannel::GetRateLogging() const
|
|
||||||
{
|
|
||||||
return fRateLogging;
|
|
||||||
}
|
|
||||||
|
|
||||||
int FairMQChannel::GetPortRangeMin() const
|
|
||||||
{
|
|
||||||
return fPortRangeMin;
|
|
||||||
}
|
|
||||||
|
|
||||||
int FairMQChannel::GetPortRangeMax() const
|
|
||||||
{
|
|
||||||
return fPortRangeMax;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool FairMQChannel::GetAutoBind() const
|
|
||||||
{
|
|
||||||
return fAutoBind;
|
|
||||||
}
|
|
||||||
|
|
||||||
void FairMQChannel::UpdateType(const string& type)
|
|
||||||
{
|
|
||||||
fIsValid = false;
|
|
||||||
fType = type;
|
|
||||||
}
|
|
||||||
|
|
||||||
void FairMQChannel::UpdateMethod(const string& method)
|
|
||||||
{
|
|
||||||
fIsValid = false;
|
|
||||||
fMethod = method;
|
|
||||||
}
|
|
||||||
|
|
||||||
void FairMQChannel::UpdateAddress(const string& address)
|
|
||||||
{
|
|
||||||
fIsValid = false;
|
|
||||||
fAddress = address;
|
|
||||||
}
|
|
||||||
|
|
||||||
void FairMQChannel::UpdateTransport(const string& transport)
|
|
||||||
{
|
|
||||||
fIsValid = false;
|
|
||||||
fTransportType = TransportTypes.at(transport);
|
|
||||||
}
|
|
||||||
|
|
||||||
void FairMQChannel::UpdateSndBufSize(const int sndBufSize)
|
|
||||||
{
|
|
||||||
fIsValid = false;
|
|
||||||
fSndBufSize = sndBufSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
void FairMQChannel::UpdateRcvBufSize(const int rcvBufSize)
|
|
||||||
{
|
|
||||||
fIsValid = false;
|
|
||||||
fRcvBufSize = rcvBufSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
void FairMQChannel::UpdateSndKernelSize(const int sndKernelSize)
|
|
||||||
{
|
|
||||||
fIsValid = false;
|
|
||||||
fSndKernelSize = sndKernelSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
void FairMQChannel::UpdateRcvKernelSize(const int rcvKernelSize)
|
|
||||||
{
|
|
||||||
fIsValid = false;
|
|
||||||
fRcvKernelSize = rcvKernelSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
void FairMQChannel::UpdateLinger(const int duration)
|
|
||||||
{
|
|
||||||
fIsValid = false;
|
|
||||||
fLinger = duration;
|
|
||||||
}
|
|
||||||
|
|
||||||
void FairMQChannel::UpdateRateLogging(const int rateLogging)
|
|
||||||
{
|
|
||||||
fIsValid = false;
|
|
||||||
fRateLogging = rateLogging;
|
|
||||||
}
|
|
||||||
|
|
||||||
void FairMQChannel::UpdatePortRangeMin(const int minPort)
|
|
||||||
{
|
|
||||||
fIsValid = false;
|
|
||||||
fPortRangeMin = minPort;
|
|
||||||
}
|
|
||||||
|
|
||||||
void FairMQChannel::UpdatePortRangeMax(const int maxPort)
|
|
||||||
{
|
|
||||||
fIsValid = false;
|
|
||||||
fPortRangeMax = maxPort;
|
|
||||||
}
|
|
||||||
|
|
||||||
void FairMQChannel::UpdateAutoBind(const bool autobind)
|
|
||||||
{
|
|
||||||
fIsValid = false;
|
|
||||||
fAutoBind = autobind;
|
|
||||||
}
|
|
||||||
|
|
||||||
void FairMQChannel::UpdateName(const string& name)
|
|
||||||
{
|
|
||||||
fIsValid = false;
|
|
||||||
fName = name;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool FairMQChannel::IsValid() const
|
|
||||||
{
|
|
||||||
return fIsValid;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool FairMQChannel::Validate()
|
bool FairMQChannel::Validate()
|
||||||
try {
|
try {
|
||||||
stringstream ss;
|
stringstream ss;
|
||||||
|
|
|
@ -81,14 +81,11 @@ class FairMQChannel
|
||||||
// FairMQChannel& operator=(FairMQChannel&&) = delete;
|
// FairMQChannel& operator=(FairMQChannel&&) = delete;
|
||||||
|
|
||||||
/// Destructor
|
/// Destructor
|
||||||
virtual ~FairMQChannel()
|
virtual ~FairMQChannel() { /* LOG(warn) << "Destroying channel '" << fName << "'"; */ }
|
||||||
{
|
|
||||||
// LOG(debug) << "Destroying channel " << fName;
|
|
||||||
}
|
|
||||||
|
|
||||||
struct ChannelConfigurationError : std::runtime_error { using std::runtime_error::runtime_error; };
|
struct ChannelConfigurationError : std::runtime_error { using std::runtime_error::runtime_error; };
|
||||||
|
|
||||||
FairMQSocket& GetSocket() const;
|
FairMQSocket& GetSocket() const { assert(fSocket); return *fSocket; }
|
||||||
|
|
||||||
bool Bind(const std::string& address)
|
bool Bind(const std::string& address)
|
||||||
{
|
{
|
||||||
|
@ -106,127 +103,138 @@ class FairMQChannel
|
||||||
|
|
||||||
/// Get channel name
|
/// Get channel name
|
||||||
/// @return Returns full channel name (e.g. "data[0]")
|
/// @return Returns full channel name (e.g. "data[0]")
|
||||||
std::string GetName() const;
|
std::string GetName() const { return fName; }
|
||||||
|
|
||||||
/// Get channel prefix
|
/// Get channel prefix
|
||||||
/// @return Returns channel prefix (e.g. "data" in "data[0]")
|
/// @return Returns channel prefix (e.g. "data" in "data[0]")
|
||||||
std::string GetPrefix() const;
|
std::string GetPrefix() const
|
||||||
|
{
|
||||||
|
std::string prefix = fName;
|
||||||
|
prefix = prefix.erase(fName.rfind('['));
|
||||||
|
return prefix;
|
||||||
|
}
|
||||||
|
|
||||||
/// Get channel index
|
/// Get channel index
|
||||||
/// @return Returns channel index (e.g. 0 in "data[0]")
|
/// @return Returns channel index (e.g. 0 in "data[0]")
|
||||||
std::string GetIndex() const;
|
std::string GetIndex() const
|
||||||
|
{
|
||||||
|
std::string indexStr = fName;
|
||||||
|
indexStr.erase(indexStr.rfind(']'));
|
||||||
|
indexStr.erase(0, indexStr.rfind('[') + 1);
|
||||||
|
return indexStr;
|
||||||
|
}
|
||||||
|
|
||||||
/// Get socket type
|
/// Get socket type
|
||||||
/// @return Returns socket type (push/pull/pub/sub/spub/xsub/pair/req/rep/dealer/router/)
|
/// @return Returns socket type (push/pull/pub/sub/spub/xsub/pair/req/rep/dealer/router/)
|
||||||
std::string GetType() const;
|
std::string GetType() const { return fType; }
|
||||||
|
|
||||||
/// Get socket method
|
/// Get socket method
|
||||||
/// @return Returns socket method (bind/connect)
|
/// @return Returns socket method (bind/connect)
|
||||||
std::string GetMethod() const;
|
std::string GetMethod() const { return fMethod; }
|
||||||
|
|
||||||
/// Get socket address (e.g. "tcp://127.0.0.1:5555" or "ipc://abc")
|
/// Get socket address (e.g. "tcp://127.0.0.1:5555" or "ipc://abc")
|
||||||
/// @return Returns socket address (e.g. "tcp://127.0.0.1:5555" or "ipc://abc")
|
/// @return Returns socket address (e.g. "tcp://127.0.0.1:5555" or "ipc://abc")
|
||||||
std::string GetAddress() const;
|
std::string GetAddress() const { return fAddress; }
|
||||||
|
|
||||||
/// Get channel transport name ("default", "zeromq" or "shmem")
|
/// Get channel transport name ("default", "zeromq" or "shmem")
|
||||||
/// @return Returns channel transport name (e.g. "default", "zeromq" or "shmem")
|
/// @return Returns channel transport name (e.g. "default", "zeromq" or "shmem")
|
||||||
std::string GetTransportName() const;
|
std::string GetTransportName() const { return fair::mq::TransportName(fTransportType); }
|
||||||
|
|
||||||
/// Get channel transport type
|
/// Get channel transport type
|
||||||
/// @return Returns channel transport type
|
/// @return Returns channel transport type
|
||||||
fair::mq::Transport GetTransportType() const;
|
fair::mq::Transport GetTransportType() const { return fTransportType; }
|
||||||
|
|
||||||
/// Get socket send buffer size (in number of messages)
|
/// Get socket send buffer size (in number of messages)
|
||||||
/// @return Returns socket send buffer size (in number of messages)
|
/// @return Returns socket send buffer size (in number of messages)
|
||||||
int GetSndBufSize() const;
|
int GetSndBufSize() const { return fSndBufSize; }
|
||||||
|
|
||||||
/// Get socket receive buffer size (in number of messages)
|
/// Get socket receive buffer size (in number of messages)
|
||||||
/// @return Returns socket receive buffer size (in number of messages)
|
/// @return Returns socket receive buffer size (in number of messages)
|
||||||
int GetRcvBufSize() const;
|
int GetRcvBufSize() const { return fRcvBufSize; }
|
||||||
|
|
||||||
/// Get socket kernel transmit send buffer size (in bytes)
|
/// Get socket kernel transmit send buffer size (in bytes)
|
||||||
/// @return Returns socket kernel transmit send buffer size (in bytes)
|
/// @return Returns socket kernel transmit send buffer size (in bytes)
|
||||||
int GetSndKernelSize() const;
|
int GetSndKernelSize() const { return fSndKernelSize; }
|
||||||
|
|
||||||
/// Get socket kernel transmit receive buffer size (in bytes)
|
/// Get socket kernel transmit receive buffer size (in bytes)
|
||||||
/// @return Returns socket kernel transmit receive buffer size (in bytes)
|
/// @return Returns socket kernel transmit receive buffer size (in bytes)
|
||||||
int GetRcvKernelSize() const;
|
int GetRcvKernelSize() const { return fRcvKernelSize; }
|
||||||
|
|
||||||
/// Get linger duration (in milliseconds)
|
/// Get linger duration (in milliseconds)
|
||||||
/// @return Returns linger duration (in milliseconds)
|
/// @return Returns linger duration (in milliseconds)
|
||||||
int GetLinger() const;
|
int GetLinger() const { return fLinger; }
|
||||||
|
|
||||||
/// Get socket rate logging interval (in seconds)
|
/// Get socket rate logging interval (in seconds)
|
||||||
/// @return Returns socket rate logging interval (in seconds)
|
/// @return Returns socket rate logging interval (in seconds)
|
||||||
int GetRateLogging() const;
|
int GetRateLogging() const { return fRateLogging; }
|
||||||
|
|
||||||
/// Get start of the port range for automatic binding
|
/// Get start of the port range for automatic binding
|
||||||
/// @return start of the port range
|
/// @return start of the port range
|
||||||
int GetPortRangeMin() const;
|
int GetPortRangeMin() const { return fPortRangeMin; }
|
||||||
|
|
||||||
/// Get end of the port range for automatic binding
|
/// Get end of the port range for automatic binding
|
||||||
/// @return end of the port range
|
/// @return end of the port range
|
||||||
int GetPortRangeMax() const;
|
int GetPortRangeMax() const { return fPortRangeMax; }
|
||||||
|
|
||||||
/// Set automatic binding (pick random port if bind fails)
|
/// Set automatic binding (pick random port if bind fails)
|
||||||
/// @return true/false, true if automatic binding is enabled
|
/// @return true/false, true if automatic binding is enabled
|
||||||
bool GetAutoBind() const;
|
bool GetAutoBind() const { return fAutoBind; }
|
||||||
|
|
||||||
/// Set socket type
|
|
||||||
/// @param type Socket type (push/pull/pub/sub/spub/xsub/pair/req/rep/dealer/router/)
|
|
||||||
void UpdateType(const std::string& type);
|
|
||||||
|
|
||||||
/// Set socket method
|
|
||||||
/// @param method Socket method (bind/connect)
|
|
||||||
void UpdateMethod(const std::string& method);
|
|
||||||
|
|
||||||
/// Set socket address
|
|
||||||
/// @param Socket address (e.g. "tcp://127.0.0.1:5555" or "ipc://abc")
|
|
||||||
void UpdateAddress(const std::string& address);
|
|
||||||
|
|
||||||
/// Set channel transport
|
|
||||||
/// @param transport transport string ("default", "zeromq" or "shmem")
|
|
||||||
void UpdateTransport(const std::string& transport);
|
|
||||||
|
|
||||||
/// Set socket send buffer size
|
|
||||||
/// @param sndBufSize Socket send buffer size (in number of messages)
|
|
||||||
void UpdateSndBufSize(const int sndBufSize);
|
|
||||||
|
|
||||||
/// Set socket receive buffer size
|
|
||||||
/// @param rcvBufSize Socket receive buffer size (in number of messages)
|
|
||||||
void UpdateRcvBufSize(const int rcvBufSize);
|
|
||||||
|
|
||||||
/// Set socket kernel transmit send buffer size (in bytes)
|
|
||||||
/// @param sndKernelSize Socket send buffer size (in bytes)
|
|
||||||
void UpdateSndKernelSize(const int sndKernelSize);
|
|
||||||
|
|
||||||
/// Set socket kernel transmit receive buffer size (in bytes)
|
|
||||||
/// @param rcvKernelSize Socket receive buffer size (in bytes)
|
|
||||||
void UpdateRcvKernelSize(const int rcvKernelSize);
|
|
||||||
|
|
||||||
/// Set linger duration (in milliseconds)
|
|
||||||
/// @param duration linger duration (in milliseconds)
|
|
||||||
void UpdateLinger(const int duration);
|
|
||||||
|
|
||||||
/// Set socket rate logging interval (in seconds)
|
|
||||||
/// @param rateLogging Socket rate logging interval (in seconds)
|
|
||||||
void UpdateRateLogging(const int rateLogging);
|
|
||||||
|
|
||||||
/// Set start of the port range for automatic binding
|
|
||||||
/// @param minPort start of the port range
|
|
||||||
void UpdatePortRangeMin(const int minPort);
|
|
||||||
|
|
||||||
/// Set end of the port range for automatic binding
|
|
||||||
/// @param maxPort end of the port range
|
|
||||||
void UpdatePortRangeMax(const int maxPort);
|
|
||||||
|
|
||||||
/// Set automatic binding (pick random port if bind fails)
|
|
||||||
/// @param autobind true/false, true to enable automatic binding
|
|
||||||
void UpdateAutoBind(const bool autobind);
|
|
||||||
|
|
||||||
/// Set channel name
|
/// Set channel name
|
||||||
/// @param name Arbitrary channel name
|
/// @param name Arbitrary channel name
|
||||||
void UpdateName(const std::string& name);
|
void UpdateName(const std::string& name) { fName = name; Invalidate(); }
|
||||||
|
|
||||||
|
/// Set socket type
|
||||||
|
/// @param type Socket type (push/pull/pub/sub/spub/xsub/pair/req/rep/dealer/router/)
|
||||||
|
void UpdateType(const std::string& type) { fType = type; Invalidate(); }
|
||||||
|
|
||||||
|
/// Set socket method
|
||||||
|
/// @param method Socket method (bind/connect)
|
||||||
|
void UpdateMethod(const std::string& method) { fMethod = method; Invalidate(); }
|
||||||
|
|
||||||
|
/// Set socket address
|
||||||
|
/// @param Socket address (e.g. "tcp://127.0.0.1:5555" or "ipc://abc")
|
||||||
|
void UpdateAddress(const std::string& address) { fAddress = address; Invalidate(); }
|
||||||
|
|
||||||
|
/// Set channel transport
|
||||||
|
/// @param transport transport string ("default", "zeromq" or "shmem")
|
||||||
|
void UpdateTransport(const std::string& transport) { fTransportType = fair::mq::TransportType(transport); Invalidate(); }
|
||||||
|
|
||||||
|
/// Set socket send buffer size
|
||||||
|
/// @param sndBufSize Socket send buffer size (in number of messages)
|
||||||
|
void UpdateSndBufSize(const int sndBufSize) { fSndBufSize = sndBufSize; Invalidate(); }
|
||||||
|
|
||||||
|
/// Set socket receive buffer size
|
||||||
|
/// @param rcvBufSize Socket receive buffer size (in number of messages)
|
||||||
|
void UpdateRcvBufSize(const int rcvBufSize) { fRcvBufSize = rcvBufSize; Invalidate(); }
|
||||||
|
|
||||||
|
/// Set socket kernel transmit send buffer size (in bytes)
|
||||||
|
/// @param sndKernelSize Socket send buffer size (in bytes)
|
||||||
|
void UpdateSndKernelSize(const int sndKernelSize) { fSndKernelSize = sndKernelSize; Invalidate(); }
|
||||||
|
|
||||||
|
/// Set socket kernel transmit receive buffer size (in bytes)
|
||||||
|
/// @param rcvKernelSize Socket receive buffer size (in bytes)
|
||||||
|
void UpdateRcvKernelSize(const int rcvKernelSize) { fRcvKernelSize = rcvKernelSize; Invalidate(); }
|
||||||
|
|
||||||
|
/// Set linger duration (in milliseconds)
|
||||||
|
/// @param duration linger duration (in milliseconds)
|
||||||
|
void UpdateLinger(const int duration) { fLinger = duration; Invalidate(); }
|
||||||
|
|
||||||
|
/// Set socket rate logging interval (in seconds)
|
||||||
|
/// @param rateLogging Socket rate logging interval (in seconds)
|
||||||
|
void UpdateRateLogging(const int rateLogging) { fRateLogging = rateLogging; Invalidate(); }
|
||||||
|
|
||||||
|
/// Set start of the port range for automatic binding
|
||||||
|
/// @param minPort start of the port range
|
||||||
|
void UpdatePortRangeMin(const int minPort) { fPortRangeMin = minPort; Invalidate(); }
|
||||||
|
|
||||||
|
/// Set end of the port range for automatic binding
|
||||||
|
/// @param maxPort end of the port range
|
||||||
|
void UpdatePortRangeMax(const int maxPort) { fPortRangeMax = maxPort; Invalidate(); }
|
||||||
|
|
||||||
|
/// Set automatic binding (pick random port if bind fails)
|
||||||
|
/// @param autobind true/false, true to enable automatic binding
|
||||||
|
void UpdateAutoBind(const bool autobind) { fAutoBind = autobind; Invalidate(); }
|
||||||
|
|
||||||
/// Checks if the configured channel settings are valid (checks the validity parameter, without running full validation (as oposed to ValidateChannel()))
|
/// Checks if the configured channel settings are valid (checks the validity parameter, without running full validation (as oposed to ValidateChannel()))
|
||||||
/// @return true if channel settings are valid, false otherwise.
|
/// @return true if channel settings are valid, false otherwise.
|
||||||
|
|
|
@ -10,8 +10,10 @@
|
||||||
#define FAIR_MQ_TRANSPORTS_H
|
#define FAIR_MQ_TRANSPORTS_H
|
||||||
|
|
||||||
#include <fairmq/tools/CppSTL.h>
|
#include <fairmq/tools/CppSTL.h>
|
||||||
|
#include <fairmq/tools/Strings.h>
|
||||||
|
|
||||||
#include <memory>
|
#include <memory>
|
||||||
|
#include <stdexcept>
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
|
|
||||||
|
@ -28,6 +30,8 @@ enum class Transport
|
||||||
OFI
|
OFI
|
||||||
};
|
};
|
||||||
|
|
||||||
|
struct TransportError : std::runtime_error { using std::runtime_error::runtime_error; };
|
||||||
|
|
||||||
} /* namespace mq */
|
} /* namespace mq */
|
||||||
} /* namespace fair */
|
} /* namespace fair */
|
||||||
|
|
||||||
|
@ -58,6 +62,18 @@ static std::unordered_map<Transport, std::string> TransportNames {
|
||||||
{ Transport::OFI, "ofi" }
|
{ Transport::OFI, "ofi" }
|
||||||
};
|
};
|
||||||
|
|
||||||
|
inline std::string TransportName(Transport transport)
|
||||||
|
{
|
||||||
|
return TransportNames[transport];
|
||||||
|
}
|
||||||
|
|
||||||
|
inline Transport TransportType(const std::string& transport)
|
||||||
|
try {
|
||||||
|
return TransportTypes.at(transport);
|
||||||
|
} catch (std::out_of_range&) {
|
||||||
|
throw TransportError(tools::ToString("Unknown transport provided: ", transport));
|
||||||
|
}
|
||||||
|
|
||||||
} /* namespace mq */
|
} /* namespace mq */
|
||||||
} /* namespace fair */
|
} /* namespace fair */
|
||||||
|
|
||||||
|
|
|
@ -97,11 +97,6 @@ class Socket final : public fair::mq::Socket
|
||||||
// LOG(error) << "Failed setting ZMQ_SUBSCRIBE socket option, reason: " << zmq_strerror(errno);
|
// LOG(error) << "Failed setting ZMQ_SUBSCRIBE socket option, reason: " << zmq_strerror(errno);
|
||||||
// }
|
// }
|
||||||
// }
|
// }
|
||||||
|
|
||||||
if (type == "sub" || type == "pub") {
|
|
||||||
LOG(error) << "PUB/SUB socket type is not supported for shared memory transport";
|
|
||||||
throw SocketError("PUB/SUB socket type is not supported for shared memory transport");
|
|
||||||
}
|
|
||||||
LOG(debug) << "Created socket " << GetId();
|
LOG(debug) << "Created socket " << GetId();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -88,7 +88,7 @@ TEST(Channel, Validation)
|
||||||
channel2.UpdateName("Kanal");
|
channel2.UpdateName("Kanal");
|
||||||
ASSERT_EQ(channel2.GetName(), "Kanal");
|
ASSERT_EQ(channel2.GetName(), "Kanal");
|
||||||
|
|
||||||
channel2.ResetChannel();
|
channel2.Invalidate();
|
||||||
ASSERT_EQ(channel2.IsValid(), false);
|
ASSERT_EQ(channel2.IsValid(), false);
|
||||||
ASSERT_EQ(channel2.Validate(), true);
|
ASSERT_EQ(channel2.Validate(), true);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user