Add configurable default snd/rcv timeout

This commit is contained in:
Alexey Rybalchenko 2021-11-11 11:16:18 +01:00
parent 856780f88a
commit f24dee33c2
6 changed files with 138 additions and 99 deletions

View File

@ -39,6 +39,8 @@ constexpr int Channel::DefaultSndBufSize;
constexpr int Channel::DefaultRcvBufSize; constexpr int Channel::DefaultRcvBufSize;
constexpr int Channel::DefaultSndKernelSize; constexpr int Channel::DefaultSndKernelSize;
constexpr int Channel::DefaultRcvKernelSize; constexpr int Channel::DefaultRcvKernelSize;
constexpr int Channel::DefaultSndTimeoutMs;
constexpr int Channel::DefaultRcvTimeoutMs;
constexpr int Channel::DefaultLinger; constexpr int Channel::DefaultLinger;
constexpr int Channel::DefaultRateLogging; constexpr int Channel::DefaultRateLogging;
constexpr int Channel::DefaultPortRangeMin; constexpr int Channel::DefaultPortRangeMin;
@ -73,6 +75,8 @@ Channel::Channel(string name, string type, string method, string address, shared
, fRcvBufSize(DefaultRcvBufSize) , fRcvBufSize(DefaultRcvBufSize)
, fSndKernelSize(DefaultSndKernelSize) , fSndKernelSize(DefaultSndKernelSize)
, fRcvKernelSize(DefaultRcvKernelSize) , fRcvKernelSize(DefaultRcvKernelSize)
, fSndTimeoutMs(DefaultSndTimeoutMs)
, fRcvTimeoutMs(DefaultRcvTimeoutMs)
, fLinger(DefaultLinger) , fLinger(DefaultLinger)
, fRateLogging(DefaultRateLogging) , fRateLogging(DefaultRateLogging)
, fPortRangeMin(DefaultPortRangeMin) , fPortRangeMin(DefaultPortRangeMin)
@ -97,6 +101,8 @@ Channel::Channel(const string& name, int index, const Properties& properties)
fRcvBufSize = GetPropertyOrDefault(properties, string(prefix + "rcvBufSize"), DefaultRcvBufSize); fRcvBufSize = GetPropertyOrDefault(properties, string(prefix + "rcvBufSize"), DefaultRcvBufSize);
fSndKernelSize = GetPropertyOrDefault(properties, string(prefix + "sndKernelSize"), DefaultSndKernelSize); fSndKernelSize = GetPropertyOrDefault(properties, string(prefix + "sndKernelSize"), DefaultSndKernelSize);
fRcvKernelSize = GetPropertyOrDefault(properties, string(prefix + "rcvKernelSize"), DefaultRcvKernelSize); fRcvKernelSize = GetPropertyOrDefault(properties, string(prefix + "rcvKernelSize"), DefaultRcvKernelSize);
fSndTimeoutMs = GetPropertyOrDefault(properties, string(prefix + "sndTimeoutMs"), DefaultSndTimeoutMs);
fRcvTimeoutMs = GetPropertyOrDefault(properties, string(prefix + "rcvTimeoutMs"), DefaultRcvTimeoutMs);
fLinger = GetPropertyOrDefault(properties, string(prefix + "linger"), DefaultLinger); fLinger = GetPropertyOrDefault(properties, string(prefix + "linger"), DefaultLinger);
fRateLogging = GetPropertyOrDefault(properties, string(prefix + "rateLogging"), DefaultRateLogging); fRateLogging = GetPropertyOrDefault(properties, string(prefix + "rateLogging"), DefaultRateLogging);
fPortRangeMin = GetPropertyOrDefault(properties, string(prefix + "portRangeMin"), DefaultPortRangeMin); fPortRangeMin = GetPropertyOrDefault(properties, string(prefix + "portRangeMin"), DefaultPortRangeMin);
@ -120,6 +126,8 @@ Channel::Channel(const Channel& chan, string newName)
, fRcvBufSize(chan.fRcvBufSize) , fRcvBufSize(chan.fRcvBufSize)
, fSndKernelSize(chan.fSndKernelSize) , fSndKernelSize(chan.fSndKernelSize)
, fRcvKernelSize(chan.fRcvKernelSize) , fRcvKernelSize(chan.fRcvKernelSize)
, fSndTimeoutMs(chan.fSndTimeoutMs)
, fRcvTimeoutMs(chan.fRcvTimeoutMs)
, fLinger(chan.fLinger) , fLinger(chan.fLinger)
, fRateLogging(chan.fRateLogging) , fRateLogging(chan.fRateLogging)
, fPortRangeMin(chan.fPortRangeMin) , fPortRangeMin(chan.fPortRangeMin)
@ -146,6 +154,8 @@ Channel& Channel::operator=(const Channel& chan)
fRcvBufSize = chan.fRcvBufSize; fRcvBufSize = chan.fRcvBufSize;
fSndKernelSize = chan.fSndKernelSize; fSndKernelSize = chan.fSndKernelSize;
fRcvKernelSize = chan.fRcvKernelSize; fRcvKernelSize = chan.fRcvKernelSize;
fSndTimeoutMs = chan.fSndTimeoutMs;
fRcvTimeoutMs = chan.fRcvTimeoutMs;
fLinger = chan.fLinger; fLinger = chan.fLinger;
fRateLogging = chan.fRateLogging; fRateLogging = chan.fRateLogging;
fPortRangeMin = chan.fPortRangeMin; fPortRangeMin = chan.fPortRangeMin;

View File

@ -166,6 +166,14 @@ class Channel
/// @return Returns socket kernel transmit receive buffer size (in bytes) /// @return Returns socket kernel transmit receive buffer size (in bytes)
int GetRcvKernelSize() const { return fRcvKernelSize; } int GetRcvKernelSize() const { return fRcvKernelSize; }
/// Get socket default send timeout (in ms)
/// @return Returns socket default send timeout (in ms)
int GetSndTimeout() const { return fSndTimeoutMs; }
/// Get socket default receive timeout (in ms)
/// @return Returns socket default receive timeout (in ms)
int GetRcvTimeout() const { return fRcvTimeoutMs; }
/// Get linger duration (in milliseconds) /// Get linger duration (in milliseconds)
/// @return Returns linger duration (in milliseconds) /// @return Returns linger duration (in milliseconds)
int GetLinger() const { return fLinger; } int GetLinger() const { return fLinger; }
@ -230,6 +238,14 @@ class Channel
/// @param rcvKernelSize Socket receive buffer size (in bytes) /// @param rcvKernelSize Socket receive buffer size (in bytes)
void UpdateRcvKernelSize(int rcvKernelSize) { fRcvKernelSize = rcvKernelSize; Invalidate(); } void UpdateRcvKernelSize(int rcvKernelSize) { fRcvKernelSize = rcvKernelSize; Invalidate(); }
/// Set socket default send timeout (in ms)
/// @param sndTimeoutMs Socket default send timeout (in ms)
void UpdateSndTimeout(int sndTimeoutMs) { fSndTimeoutMs = sndTimeoutMs; Invalidate(); }
/// Set socket default receive timeout (in ms)
/// @param rcvTimeoutMs Socket default receive timeout (in ms)
void UpdateRcvTimeout(int rcvTimeoutMs) { fRcvTimeoutMs = rcvTimeoutMs; Invalidate(); }
/// Set linger duration (in milliseconds) /// Set linger duration (in milliseconds)
/// @param duration linger duration (in milliseconds) /// @param duration linger duration (in milliseconds)
void UpdateLinger(int duration) { fLinger = duration; Invalidate(); } void UpdateLinger(int duration) { fLinger = duration; Invalidate(); }
@ -267,62 +283,52 @@ class Channel
/// invalidates the channel (requires validation to be used again). /// invalidates the channel (requires validation to be used again).
void Invalidate() { fValid = false; } void Invalidate() { fValid = false; }
/// Sends a message to the socket queue. /// Send message(s) to the socket queue.
/// @param msg Constant reference of unique_ptr to a Message /// @param m reference to MessagePtr/Parts/vector<MessagePtr>
/// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send) /// @param sndTimeoutMs send timeout in ms.
/// @return Number of bytes that have been queued, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change) /// -1 will wait forever (or until interrupt (e.g. via state change)),
int64_t Send(MessagePtr& msg, int sndTimeoutInMs = -1) /// 0 will not wait (return immediately if cannot send).
/// If not provided, default timeout will be taken.
/// @return Number of bytes that have been queued,
/// TransferCode::timeout if timed out,
/// TransferCode::error if there was an error,
/// TransferCode::interrupted if interrupted (e.g. by requested state change)
template<typename M, typename... Timeout>
std::enable_if_t<is_transferrable<M>::value, int64_t>
Send(M& m, Timeout&&... sndTimeoutMs)
{ {
CheckSendCompatibility(msg); static_assert(sizeof...(sndTimeoutMs) <= 1, "Send called with too many arguments");
return fSocket->Send(msg, sndTimeoutInMs);
CheckSendCompatibility(m);
int t = fSndTimeoutMs;
if constexpr (sizeof...(sndTimeoutMs) == 1) {
t = {sndTimeoutMs...};
}
return fSocket->Send(m, t);
} }
/// Receives a message from the socket queue. /// Receive message(s) from the socket queue.
/// @param msg Constant reference of unique_ptr to a Message /// @param m reference to MessagePtr/Parts/vector<MessagePtr>
/// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive) /// @param rcvTimeoutMs receive timeout in ms.
/// @return Number of bytes that have been received, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change) /// -1 will wait forever (or until interrupt (e.g. via state change)),
int64_t Receive(MessagePtr& msg, int rcvTimeoutInMs = -1) /// 0 will not wait (return immediately if cannot receive).
/// If not provided, default timeout will be taken.
/// @return Number of bytes that have been received,
/// TransferCode::timeout if timed out,
/// TransferCode::error if there was an error,
/// TransferCode::interrupted if interrupted (e.g. by requested state change)
template<typename M, typename... Timeout>
std::enable_if_t<is_transferrable<M>::value, int64_t>
Receive(M& m, Timeout&&... rcvTimeoutMs)
{ {
CheckReceiveCompatibility(msg); static_assert(sizeof...(rcvTimeoutMs) <= 1, "Receive called with too many arguments");
return fSocket->Receive(msg, rcvTimeoutInMs);
}
/// Send a vector of messages CheckReceiveCompatibility(m);
/// @param msgVec message vector reference int t = fRcvTimeoutMs;
/// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send) if constexpr (sizeof...(rcvTimeoutMs) == 1) {
/// @return Number of bytes that have been queued, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change) t = {rcvTimeoutMs...};
int64_t Send(std::vector<MessagePtr>& msgVec, int sndTimeoutInMs = -1) }
{ return fSocket->Receive(m, t);
CheckSendCompatibility(msgVec);
return fSocket->Send(msgVec, sndTimeoutInMs);
}
/// Receive a vector of messages
/// @param msgVec message vector reference
/// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Receive(std::vector<MessagePtr>& msgVec, int rcvTimeoutInMs = -1)
{
CheckReceiveCompatibility(msgVec);
return fSocket->Receive(msgVec, rcvTimeoutInMs);
}
/// Send Parts
/// @param parts Parts reference
/// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Send(Parts& parts, int sndTimeoutInMs = -1)
{
return Send(parts.fParts, sndTimeoutInMs);
}
/// Receive Parts
/// @param parts Parts reference
/// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Receive(Parts& parts, int rcvTimeoutInMs = -1)
{
return Receive(parts.fParts, rcvTimeoutInMs);
} }
unsigned long GetBytesTx() const { return fSocket->GetBytesTx(); } unsigned long GetBytesTx() const { return fSocket->GetBytesTx(); }
@ -366,6 +372,8 @@ class Channel
static constexpr int DefaultRcvBufSize = 1000; static constexpr int DefaultRcvBufSize = 1000;
static constexpr int DefaultSndKernelSize = 0; static constexpr int DefaultSndKernelSize = 0;
static constexpr int DefaultRcvKernelSize = 0; static constexpr int DefaultRcvKernelSize = 0;
static constexpr int DefaultSndTimeoutMs = -1;
static constexpr int DefaultRcvTimeoutMs = -1;
static constexpr int DefaultLinger = 500; static constexpr int DefaultLinger = 500;
static constexpr int DefaultRateLogging = 1; static constexpr int DefaultRateLogging = 1;
static constexpr int DefaultPortRangeMin = 22000; static constexpr int DefaultPortRangeMin = 22000;
@ -385,6 +393,8 @@ class Channel
int fRcvBufSize; int fRcvBufSize;
int fSndKernelSize; int fSndKernelSize;
int fRcvKernelSize; int fRcvKernelSize;
int fSndTimeoutMs;
int fRcvTimeoutMs;
int fLinger; int fLinger;
int fRateLogging; int fRateLogging;
int fPortRangeMin; int fPortRangeMin;
@ -414,6 +424,7 @@ class Channel
} }
} }
void CheckSendCompatibility(Parts& parts) { CheckSendCompatibility(parts.fParts); }
void CheckSendCompatibility(std::vector<MessagePtr>& msgVec) void CheckSendCompatibility(std::vector<MessagePtr>& msgVec)
{ {
for (auto& msg : msgVec) { for (auto& msg : msgVec) {
@ -443,6 +454,7 @@ class Channel
} }
} }
void CheckReceiveCompatibility(Parts& parts) { CheckReceiveCompatibility(parts.fParts); }
void CheckReceiveCompatibility(std::vector<MessagePtr>& msgVec) void CheckReceiveCompatibility(std::vector<MessagePtr>& msgVec)
{ {
for (auto& msg : msgVec) { for (auto& msg : msgVec) {

View File

@ -81,72 +81,70 @@ class Device
Deserializer().Deserialize(msg, std::forward<DataType>(data), std::forward<Args>(args)...); Deserializer().Deserialize(msg, std::forward<DataType>(data), std::forward<Args>(args)...);
} }
/// Shorthand method to send `msg` on `chan` at index `i` /// Send `m` on `chan` at index `i`
/// @param msg message reference /// @param m reference to MessagePtr/Parts/vector<MessagePtr>
/// @param chan channel name /// @param chan channel name
/// @param i channel index /// @param i channel index
/// @param sndTimeoutInMs send timeout in ms, -1 will wait forever (or until interrupt (e.g. via /// @return Number of queued bytes,
/// state change)), 0 will not wait (return immediately if cannot send) /// TransferCode::timeout if timed out,
/// @return Number of bytes that have been queued, TransferCode::timeout if timed out, /// TransferCode::error if there was an error,
/// TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by /// TransferCode::interrupted if interrupted (e.g. by requested state change)
/// requested state change) template<typename M>
int64_t Send(MessagePtr& msg, std::enable_if_t<is_transferrable<M>::value, int64_t>
const std::string& channel, Send(M& m, const std::string& channel, const int index = 0)
const int index = 0,
int sndTimeoutInMs = -1)
{ {
return GetChannel(channel, index).Send(msg, sndTimeoutInMs); return GetChannel(channel, index).Send(m);
} }
/// Shorthand method to receive `msg` on `chan` at index `i` /// Receive `m` on `chan` at index `i`
/// @param msg message reference /// @param m reference to MessagePtr/Parts/vector<MessagePtr>
/// @param chan channel name /// @param chan channel name
/// @param i channel index /// @param i channel index
/// @param rcvTimeoutInMs receive timeout in ms, -1 will wait forever (or until interrupt (e.g. /// @return Number of received bytes,
/// via state change)), 0 will not wait (return immediately if cannot receive) /// TransferCode::timeout if timed out,
/// @return Number of bytes that have been received, TransferCode::timeout if timed out, /// TransferCode::error if there was an error,
/// TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by /// TransferCode::interrupted if interrupted (e.g. by requested state change)
/// requested state change) template<typename M>
int64_t Receive(MessagePtr& msg, std::enable_if_t<is_transferrable<M>::value, int64_t>
const std::string& channel, Receive(M& m, const std::string& channel, const int index = 0)
const int index = 0,
int rcvTimeoutInMs = -1)
{ {
return GetChannel(channel, index).Receive(msg, rcvTimeoutInMs); return GetChannel(channel, index).Receive(m);
} }
/// Shorthand method to send Parts on `chan` at index `i` /// Send `m` on `chan` at index `i`
/// @param parts parts reference /// @param m reference to MessagePtr/Parts/vector<MessagePtr>
/// @param chan channel name /// @param chan channel name
/// @param i channel index /// @param i channel index
/// @param sndTimeoutInMs send timeout in ms, -1 will wait forever (or until interrupt (e.g. via /// @param sndTimeoutMs send timeout in ms,
/// state change)), 0 will not wait (return immediately if cannot send) /// -1 will wait forever (or until interrupt (e.g. via state change)),
/// @return Number of bytes that have been queued, TransferCode::timeout if timed out, /// 0 will not wait (return immediately if cannot send)
/// TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by /// @return Number of queued bytes,
/// requested state change) /// TransferCode::timeout if timed out,
int64_t Send(Parts& parts, /// TransferCode::error if there was an error,
const std::string& channel, /// TransferCode::interrupted if interrupted (e.g. by requested state change)
const int index = 0, template<typename M>
int sndTimeoutInMs = -1) std::enable_if_t<is_transferrable<M>::value, int64_t>
Send(M& m, const std::string& channel, const int index, int sndTimeoutMs)
{ {
return GetChannel(channel, index).Send(parts.fParts, sndTimeoutInMs); return GetChannel(channel, index).Send(m, sndTimeoutMs);
} }
/// Shorthand method to receive Parts on `chan` at index `i` /// Receive `m` on `chan` at index `i`
/// @param parts parts reference /// @param m reference to MessagePtr/Parts/vector<MessagePtr>
/// @param chan channel name /// @param chan channel name
/// @param i channel index /// @param i channel index
/// @param rcvTimeoutInMs receive timeout in ms, -1 will wait forever (or until interrupt (e.g. /// @param rcvTimeoutMs receive timeout in ms,
/// via state change)), 0 will not wait (return immediately if cannot receive) /// -1 will wait forever (or until interrupt (e.g. via state change),
/// @return Number of bytes that have been received, TransferCode::timeout if timed out, /// 0 will not wait (return immediately if cannot receive)
/// TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by /// @return Number of received bytes,
/// requested state change) /// TransferCode::timeout if timed out,
int64_t Receive(Parts& parts, /// TransferCode::error if there was an error,
const std::string& channel, /// TransferCode::interrupted if interrupted (e.g. by requested state change)
const int index = 0, template<typename M>
int rcvTimeoutInMs = -1) std::enable_if_t<is_transferrable<M>::value, int64_t>
Receive(M& m, const std::string& channel, const int index, int rcvTimeoutMs)
{ {
return GetChannel(channel, index).Receive(parts.fParts, rcvTimeoutInMs); return GetChannel(channel, index).Receive(m, rcvTimeoutMs);
} }
/// @brief Getter for default transport factory /// @brief Getter for default transport factory

View File

@ -96,6 +96,8 @@ void ChannelParser(const ptree& tree, fair::mq::Properties& properties)
commonProperties.emplace("rcvBufSize", cn.second.get<int>("rcvBufSize", FairMQChannel::DefaultRcvBufSize)); commonProperties.emplace("rcvBufSize", cn.second.get<int>("rcvBufSize", FairMQChannel::DefaultRcvBufSize));
commonProperties.emplace("sndKernelSize", cn.second.get<int>("sndKernelSize", FairMQChannel::DefaultSndKernelSize)); commonProperties.emplace("sndKernelSize", cn.second.get<int>("sndKernelSize", FairMQChannel::DefaultSndKernelSize));
commonProperties.emplace("rcvKernelSize", cn.second.get<int>("rcvKernelSize", FairMQChannel::DefaultRcvKernelSize)); commonProperties.emplace("rcvKernelSize", cn.second.get<int>("rcvKernelSize", FairMQChannel::DefaultRcvKernelSize));
commonProperties.emplace("sndTimeoutMs", cn.second.get<int>("sndTimeoutMs", FairMQChannel::DefaultSndTimeoutMs));
commonProperties.emplace("rcvTimeoutMs", cn.second.get<int>("rcvTimeoutMs", FairMQChannel::DefaultRcvTimeoutMs));
commonProperties.emplace("linger", cn.second.get<int>("linger", FairMQChannel::DefaultLinger)); commonProperties.emplace("linger", cn.second.get<int>("linger", FairMQChannel::DefaultLinger));
commonProperties.emplace("rateLogging", cn.second.get<int>("rateLogging", FairMQChannel::DefaultRateLogging)); commonProperties.emplace("rateLogging", cn.second.get<int>("rateLogging", FairMQChannel::DefaultRateLogging));
commonProperties.emplace("portRangeMin", cn.second.get<int>("portRangeMin", FairMQChannel::DefaultPortRangeMin)); commonProperties.emplace("portRangeMin", cn.second.get<int>("portRangeMin", FairMQChannel::DefaultPortRangeMin));
@ -146,6 +148,8 @@ void SubChannelParser(const ptree& channelTree, fair::mq::Properties& properties
newProperties["rcvBufSize"] = sn.second.get<int>("rcvBufSize", boost::any_cast<int>(commonProperties.at("rcvBufSize"))); newProperties["rcvBufSize"] = sn.second.get<int>("rcvBufSize", boost::any_cast<int>(commonProperties.at("rcvBufSize")));
newProperties["sndKernelSize"] = sn.second.get<int>("sndKernelSize", boost::any_cast<int>(commonProperties.at("sndKernelSize"))); newProperties["sndKernelSize"] = sn.second.get<int>("sndKernelSize", boost::any_cast<int>(commonProperties.at("sndKernelSize")));
newProperties["rcvKernelSize"] = sn.second.get<int>("rcvKernelSize", boost::any_cast<int>(commonProperties.at("rcvKernelSize"))); newProperties["rcvKernelSize"] = sn.second.get<int>("rcvKernelSize", boost::any_cast<int>(commonProperties.at("rcvKernelSize")));
newProperties["sndTimeoutMs"] = sn.second.get<int>("sndTimeoutMs", boost::any_cast<int>(commonProperties.at("sndTimeoutMs")));
newProperties["rcvTimeoutMs"] = sn.second.get<int>("rcvTimeoutMs", boost::any_cast<int>(commonProperties.at("rcvTimeoutMs")));
newProperties["linger"] = sn.second.get<int>("linger", boost::any_cast<int>(commonProperties.at("linger"))); newProperties["linger"] = sn.second.get<int>("linger", boost::any_cast<int>(commonProperties.at("linger")));
newProperties["rateLogging"] = sn.second.get<int>("rateLogging", boost::any_cast<int>(commonProperties.at("rateLogging"))); newProperties["rateLogging"] = sn.second.get<int>("rateLogging", boost::any_cast<int>(commonProperties.at("rateLogging")));
newProperties["portRangeMin"] = sn.second.get<int>("portRangeMin", boost::any_cast<int>(commonProperties.at("portRangeMin"))); newProperties["portRangeMin"] = sn.second.get<int>("portRangeMin", boost::any_cast<int>(commonProperties.at("portRangeMin")));

View File

@ -10,9 +10,12 @@
#define FAIR_MQ_SOCKET_H #define FAIR_MQ_SOCKET_H
#include <fairmq/Message.h> #include <fairmq/Message.h>
#include <fairmq/Parts.h>
#include <memory> #include <memory>
#include <stdexcept> #include <stdexcept>
#include <string> #include <string>
#include <type_traits>
#include <vector> #include <vector>
namespace fair::mq { namespace fair::mq {
@ -27,6 +30,12 @@ enum class TransferCode : int
interrupted = -3 interrupted = -3
}; };
template <typename T>
struct is_transferrable : std::disjunction<std::is_same<T, MessagePtr>,
std::is_same<T, std::vector<MessagePtr>>,
std::is_same<T, fair::mq::Parts>>
{};
struct Socket struct Socket
{ {
Socket() = default; Socket() = default;
@ -45,6 +54,8 @@ struct Socket
virtual int64_t Receive(MessagePtr& msg, int timeout = -1) = 0; virtual int64_t Receive(MessagePtr& msg, int timeout = -1) = 0;
virtual int64_t Send(std::vector<std::unique_ptr<Message>>& msgVec, int timeout = -1) = 0; virtual int64_t Send(std::vector<std::unique_ptr<Message>>& msgVec, int timeout = -1) = 0;
virtual int64_t Receive(std::vector<std::unique_ptr<Message>>& msgVec, int timeout = -1) = 0; virtual int64_t Receive(std::vector<std::unique_ptr<Message>>& msgVec, int timeout = -1) = 0;
virtual int64_t Send(Parts& parts, int timeout = -1) { return Send(parts.fParts, timeout); }
virtual int64_t Receive(Parts& parts, int timeout = -1) { return Receive(parts.fParts, timeout); }
[[deprecated("Use Socket::~Socket() instead.")]] [[deprecated("Use Socket::~Socket() instead.")]]
virtual void Close() = 0; virtual void Close() = 0;

View File

@ -38,6 +38,8 @@ enum channelOptionKeyIds
RCVBUFSIZE, // size of the receive queue RCVBUFSIZE, // size of the receive queue
SNDKERNELSIZE, SNDKERNELSIZE,
RCVKERNELSIZE, RCVKERNELSIZE,
SNDTIMEOUTMS,
RCVTIMEOUTMS,
LINGER, LINGER,
RATELOGGING, // logging rate RATELOGGING, // logging rate
PORTRANGEMIN, PORTRANGEMIN,
@ -57,6 +59,8 @@ constexpr static const char* channelOptionKeys[] = {
/*[RCVBUFSIZE] = */ "rcvBufSize", /*[RCVBUFSIZE] = */ "rcvBufSize",
/*[SNDKERNELSIZE] = */ "sndKernelSize", /*[SNDKERNELSIZE] = */ "sndKernelSize",
/*[RCVKERNELSIZE] = */ "rcvKernelSize", /*[RCVKERNELSIZE] = */ "rcvKernelSize",
/*[SNDTIMEOUTMS] = */ "sndTimeoutMs",
/*[RCVTIMEOUTMS] = */ "rcvTimeoutMs",
/*[LINGER] = */ "linger", /*[LINGER] = */ "linger",
/*[RATELOGGING] = */ "rateLogging", /*[RATELOGGING] = */ "rateLogging",
/*[PORTRANGEMIN] = */ "portRangeMin", /*[PORTRANGEMIN] = */ "portRangeMin",