mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 16:46:47 +00:00
Add setters/getters for socket options
This commit is contained in:
parent
2e7005225e
commit
f8824335a5
|
@ -296,20 +296,20 @@ bool FairMQDevice::AttachChannel(FairMQChannel& ch)
|
||||||
}
|
}
|
||||||
|
|
||||||
// set linger duration (how long socket should wait for outstanding transfers before shutdown)
|
// set linger duration (how long socket should wait for outstanding transfers before shutdown)
|
||||||
ch.fSocket->SetOption("linger", &(ch.fLinger), sizeof(ch.fLinger));
|
ch.fSocket->SetLinger(ch.fLinger);
|
||||||
|
|
||||||
// set high water marks
|
// set high water marks
|
||||||
ch.fSocket->SetOption("snd-hwm", &(ch.fSndBufSize), sizeof(ch.fSndBufSize));
|
ch.fSocket->SetSndBufSize(ch.fSndBufSize);
|
||||||
ch.fSocket->SetOption("rcv-hwm", &(ch.fRcvBufSize), sizeof(ch.fRcvBufSize));
|
ch.fSocket->SetRcvBufSize(ch.fRcvBufSize);
|
||||||
|
|
||||||
// set kernel transmit size (set it only if value is not the default value)
|
// set kernel transmit size (set it only if value is not the default value)
|
||||||
if (ch.fSndKernelSize != 0)
|
if (ch.fSndKernelSize != 0)
|
||||||
{
|
{
|
||||||
ch.fSocket->SetOption("snd-size", &(ch.fSndKernelSize), sizeof(ch.fSndKernelSize));
|
ch.fSocket->SetSndKernelSize(ch.fSndKernelSize);
|
||||||
}
|
}
|
||||||
if (ch.fRcvKernelSize != 0)
|
if (ch.fRcvKernelSize != 0)
|
||||||
{
|
{
|
||||||
ch.fSocket->SetOption("rcv-size", &(ch.fRcvKernelSize), sizeof(ch.fRcvKernelSize));
|
ch.fSocket->SetRcvKernelSize(ch.fRcvKernelSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
// attach
|
// attach
|
||||||
|
|
|
@ -43,6 +43,17 @@ class FairMQSocket
|
||||||
virtual void SetOption(const std::string& option, const void* value, size_t valueSize) = 0;
|
virtual void SetOption(const std::string& option, const void* value, size_t valueSize) = 0;
|
||||||
virtual void GetOption(const std::string& option, void* value, size_t* valueSize) = 0;
|
virtual void GetOption(const std::string& option, void* value, size_t* valueSize) = 0;
|
||||||
|
|
||||||
|
virtual void SetLinger(const int value) = 0;
|
||||||
|
virtual int GetLinger() const = 0;
|
||||||
|
virtual void SetSndBufSize(const int value) = 0;
|
||||||
|
virtual int GetSndBufSize() const = 0;
|
||||||
|
virtual void SetRcvBufSize(const int value) = 0;
|
||||||
|
virtual int GetRcvBufSize() const = 0;
|
||||||
|
virtual void SetSndKernelSize(const int value) = 0;
|
||||||
|
virtual int GetSndKernelSize() const = 0;
|
||||||
|
virtual void SetRcvKernelSize(const int value) = 0;
|
||||||
|
virtual int GetRcvKernelSize() const = 0;
|
||||||
|
|
||||||
virtual unsigned long GetBytesTx() const = 0;
|
virtual unsigned long GetBytesTx() const = 0;
|
||||||
virtual unsigned long GetBytesRx() const = 0;
|
virtual unsigned long GetBytesRx() const = 0;
|
||||||
virtual unsigned long GetMessagesTx() const = 0;
|
virtual unsigned long GetMessagesTx() const = 0;
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
#include "FairMQMessageNN.h"
|
#include "FairMQMessageNN.h"
|
||||||
#include "FairMQLogger.h"
|
#include "FairMQLogger.h"
|
||||||
#include "FairMQUnmanagedRegionNN.h"
|
#include "FairMQUnmanagedRegionNN.h"
|
||||||
|
#include <fairmq/Tools.h>
|
||||||
|
|
||||||
#include <nanomsg/nn.h>
|
#include <nanomsg/nn.h>
|
||||||
#include <nanomsg/pipeline.h>
|
#include <nanomsg/pipeline.h>
|
||||||
|
@ -27,6 +28,7 @@
|
||||||
#include <msgpack.hpp>
|
#include <msgpack.hpp>
|
||||||
|
|
||||||
using namespace std;
|
using namespace std;
|
||||||
|
using namespace fair::mq;
|
||||||
|
|
||||||
atomic<bool> FairMQSocketNN::fInterrupted(false);
|
atomic<bool> FairMQSocketNN::fInterrupted(false);
|
||||||
|
|
||||||
|
@ -431,7 +433,7 @@ void FairMQSocketNN::SetOption(const string& option, const void* value, size_t v
|
||||||
int val = *(static_cast<int*>(const_cast<void*>(value)));
|
int val = *(static_cast<int*>(const_cast<void*>(value)));
|
||||||
if (val <= 0)
|
if (val <= 0)
|
||||||
{
|
{
|
||||||
LOG(warn) << "value for sndKernelSize/rcvKernelSize should be greater than 0, using defaults (128kB).";
|
LOG(warn) << "value for sndKernelSize/rcvKernelSize should be greater than 0, leaving unchanged.";
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -464,6 +466,73 @@ void FairMQSocketNN::GetOption(const string& option, void* value, size_t* valueS
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void FairMQSocketNN::SetLinger(const int value)
|
||||||
|
{
|
||||||
|
fLinger = value;
|
||||||
|
}
|
||||||
|
|
||||||
|
int FairMQSocketNN::GetLinger() const
|
||||||
|
{
|
||||||
|
return fLinger;
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQSocketNN::SetSndBufSize(const int /* value */)
|
||||||
|
{
|
||||||
|
// not used in nanomsg
|
||||||
|
}
|
||||||
|
|
||||||
|
int FairMQSocketNN::GetSndBufSize() const
|
||||||
|
{
|
||||||
|
// not used in nanomsg
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQSocketNN::SetRcvBufSize(const int /* value */)
|
||||||
|
{
|
||||||
|
// not used in nanomsg
|
||||||
|
}
|
||||||
|
|
||||||
|
int FairMQSocketNN::GetRcvBufSize() const
|
||||||
|
{
|
||||||
|
// not used in nanomsg
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQSocketNN::SetSndKernelSize(const int value)
|
||||||
|
{
|
||||||
|
if (nn_setsockopt(fSocket, NN_SOL_SOCKET, NN_SNDBUF, &value, sizeof(value)) < 0) {
|
||||||
|
throw SocketError(tools::ToString("failed setting NN_SNDBUF, reason: ", nn_strerror(errno)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int FairMQSocketNN::GetSndKernelSize() const
|
||||||
|
{
|
||||||
|
int value = 0;
|
||||||
|
size_t valueSize;
|
||||||
|
if (nn_getsockopt(fSocket, NN_SOL_SOCKET, NN_SNDBUF, &value, &valueSize) < 0) {
|
||||||
|
throw SocketError(tools::ToString("failed getting NN_SNDBUF, reason: ", nn_strerror(errno)));
|
||||||
|
}
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQSocketNN::SetRcvKernelSize(const int value)
|
||||||
|
{
|
||||||
|
if (nn_setsockopt(fSocket, NN_SOL_SOCKET, NN_RCVBUF, &value, sizeof(value)) < 0) {
|
||||||
|
throw SocketError(tools::ToString("failed setting NN_RCVBUF, reason: ", nn_strerror(errno)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int FairMQSocketNN::GetRcvKernelSize() const
|
||||||
|
{
|
||||||
|
int value = 0;
|
||||||
|
size_t valueSize;
|
||||||
|
if (nn_getsockopt(fSocket, NN_SOL_SOCKET, NN_RCVBUF, &value, &valueSize) < 0) {
|
||||||
|
throw SocketError(tools::ToString("failed getting NN_RCVBUF, reason: ", nn_strerror(errno)));
|
||||||
|
}
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
unsigned long FairMQSocketNN::GetBytesTx() const
|
unsigned long FairMQSocketNN::GetBytesTx() const
|
||||||
{
|
{
|
||||||
return fBytesTx;
|
return fBytesTx;
|
||||||
|
|
|
@ -15,12 +15,8 @@
|
||||||
#include "FairMQSocket.h"
|
#include "FairMQSocket.h"
|
||||||
#include "FairMQMessage.h"
|
#include "FairMQMessage.h"
|
||||||
|
|
||||||
class FairMQTransportFactoryNN;
|
|
||||||
|
|
||||||
class FairMQSocketNN final : public FairMQSocket
|
class FairMQSocketNN final : public FairMQSocket
|
||||||
{
|
{
|
||||||
friend class FairMQTransportFactoryNN;
|
|
||||||
|
|
||||||
public:
|
public:
|
||||||
FairMQSocketNN(const std::string& type, const std::string& name, const std::string& id = "");
|
FairMQSocketNN(const std::string& type, const std::string& name, const std::string& id = "");
|
||||||
FairMQSocketNN(const FairMQSocketNN&) = delete;
|
FairMQSocketNN(const FairMQSocketNN&) = delete;
|
||||||
|
@ -52,6 +48,17 @@ class FairMQSocketNN final : public FairMQSocket
|
||||||
void SetOption(const std::string& option, const void* value, size_t valueSize) override;
|
void SetOption(const std::string& option, const void* value, size_t valueSize) override;
|
||||||
void GetOption(const std::string& option, void* value, size_t* valueSize) override;
|
void GetOption(const std::string& option, void* value, size_t* valueSize) override;
|
||||||
|
|
||||||
|
void SetLinger(const int value) override;
|
||||||
|
int GetLinger() const override;
|
||||||
|
void SetSndBufSize(const int value) override;
|
||||||
|
int GetSndBufSize() const override;
|
||||||
|
void SetRcvBufSize(const int value) override;
|
||||||
|
int GetRcvBufSize() const override;
|
||||||
|
void SetSndKernelSize(const int value) override;
|
||||||
|
int GetSndKernelSize() const override;
|
||||||
|
void SetRcvKernelSize(const int value) override;
|
||||||
|
int GetRcvKernelSize() const override;
|
||||||
|
|
||||||
unsigned long GetBytesTx() const override;
|
unsigned long GetBytesTx() const override;
|
||||||
unsigned long GetBytesRx() const override;
|
unsigned long GetBytesRx() const override;
|
||||||
unsigned long GetMessagesTx() const override;
|
unsigned long GetMessagesTx() const override;
|
||||||
|
|
|
@ -82,11 +82,11 @@ fair::mq::Transport FairMQTransportFactoryNN::GetType() const
|
||||||
|
|
||||||
void FairMQTransportFactoryNN::Reset()
|
void FairMQTransportFactoryNN::Reset()
|
||||||
{
|
{
|
||||||
auto result = max_element(fSockets.begin(), fSockets.end(), [](FairMQSocket* s1, FairMQSocket* s2) {
|
auto it = max_element(fSockets.begin(), fSockets.end(), [](FairMQSocket* s1, FairMQSocket* s2) {
|
||||||
return static_cast<FairMQSocketNN*>(s1)->fLinger < static_cast<FairMQSocketNN*>(s2)->fLinger;
|
return static_cast<FairMQSocketNN*>(s1)->GetLinger() < static_cast<FairMQSocketNN*>(s2)->GetLinger();
|
||||||
});
|
});
|
||||||
if (result != fSockets.end()) {
|
if (it != fSockets.end()) {
|
||||||
this_thread::sleep_for(chrono::milliseconds(static_cast<FairMQSocketNN*>(*result)->fLinger));
|
this_thread::sleep_for(chrono::milliseconds(static_cast<FairMQSocketNN*>(*it)->GetLinger()));
|
||||||
}
|
}
|
||||||
fSockets.clear();
|
fSockets.clear();
|
||||||
}
|
}
|
||||||
|
|
|
@ -619,6 +619,84 @@ auto Socket::GetOption(const string& option, void* value, size_t* valueSize) ->
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int Socket::GetLinger() const
|
||||||
|
{
|
||||||
|
int value = 0;
|
||||||
|
size_t valueSize;
|
||||||
|
if (zmq_getsockopt(fControlSocket, ZMQ_LINGER, &value, &valueSize) < 0) {
|
||||||
|
throw SocketError(tools::ToString("failed getting ZMQ_LINGER, reason: ", zmq_strerror(errno)));
|
||||||
|
}
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
void Socket::SetSndBufSize(const int value)
|
||||||
|
{
|
||||||
|
if (zmq_setsockopt(fControlSocket, ZMQ_SNDHWM, &value, sizeof(value)) < 0) {
|
||||||
|
throw SocketError(tools::ToString("failed setting ZMQ_SNDHWM, reason: ", zmq_strerror(errno)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int Socket::GetSndBufSize() const
|
||||||
|
{
|
||||||
|
int value = 0;
|
||||||
|
size_t valueSize;
|
||||||
|
if (zmq_getsockopt(fControlSocket, ZMQ_SNDHWM, &value, &valueSize) < 0) {
|
||||||
|
throw SocketError(tools::ToString("failed getting ZMQ_SNDHWM, reason: ", zmq_strerror(errno)));
|
||||||
|
}
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
void Socket::SetRcvBufSize(const int value)
|
||||||
|
{
|
||||||
|
if (zmq_setsockopt(fControlSocket, ZMQ_RCVHWM, &value, sizeof(value)) < 0) {
|
||||||
|
throw SocketError(tools::ToString("failed setting ZMQ_RCVHWM, reason: ", zmq_strerror(errno)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int Socket::GetRcvBufSize() const
|
||||||
|
{
|
||||||
|
int value = 0;
|
||||||
|
size_t valueSize;
|
||||||
|
if (zmq_getsockopt(fControlSocket, ZMQ_RCVHWM, &value, &valueSize) < 0) {
|
||||||
|
throw SocketError(tools::ToString("failed getting ZMQ_RCVHWM, reason: ", zmq_strerror(errno)));
|
||||||
|
}
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
void Socket::SetSndKernelSize(const int value)
|
||||||
|
{
|
||||||
|
if (zmq_setsockopt(fControlSocket, ZMQ_SNDBUF, &value, sizeof(value)) < 0) {
|
||||||
|
throw SocketError(tools::ToString("failed getting ZMQ_SNDBUF, reason: ", zmq_strerror(errno)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int Socket::GetSndKernelSize() const
|
||||||
|
{
|
||||||
|
int value = 0;
|
||||||
|
size_t valueSize;
|
||||||
|
if (zmq_getsockopt(fControlSocket, ZMQ_SNDBUF, &value, &valueSize) < 0) {
|
||||||
|
throw SocketError(tools::ToString("failed getting ZMQ_SNDBUF, reason: ", zmq_strerror(errno)));
|
||||||
|
}
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
void Socket::SetRcvKernelSize(const int value)
|
||||||
|
{
|
||||||
|
if (zmq_setsockopt(fControlSocket, ZMQ_RCVBUF, &value, sizeof(value)) < 0) {
|
||||||
|
throw SocketError(tools::ToString("failed getting ZMQ_RCVBUF, reason: ", zmq_strerror(errno)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int Socket::GetRcvKernelSize() const
|
||||||
|
{
|
||||||
|
int value = 0;
|
||||||
|
size_t valueSize;
|
||||||
|
if (zmq_getsockopt(fControlSocket, ZMQ_RCVBUF, &value, &valueSize) < 0) {
|
||||||
|
throw SocketError(tools::ToString("failed getting ZMQ_RCVBUF, reason: ", zmq_strerror(errno)));
|
||||||
|
}
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
auto Socket::GetConstant(const string& constant) -> int
|
auto Socket::GetConstant(const string& constant) -> int
|
||||||
{
|
{
|
||||||
if (constant == "")
|
if (constant == "")
|
||||||
|
|
|
@ -57,6 +57,17 @@ class Socket final : public fair::mq::Socket
|
||||||
auto GetSocket() const -> void* override { return fControlSocket; }
|
auto GetSocket() const -> void* override { return fControlSocket; }
|
||||||
auto GetSocket(int nothing) const -> int override { return -1; }
|
auto GetSocket(int nothing) const -> int override { return -1; }
|
||||||
|
|
||||||
|
void SetLinger(const int value) override;
|
||||||
|
int GetLinger() const override;
|
||||||
|
void SetSndBufSize(const int value) override;
|
||||||
|
int GetSndBufSize() const override;
|
||||||
|
void SetRcvBufSize(const int value) override;
|
||||||
|
int GetRcvBufSize() const override;
|
||||||
|
void SetSndKernelSize(const int value) override;
|
||||||
|
int GetSndKernelSize() const override;
|
||||||
|
void SetRcvKernelSize(const int value) override;
|
||||||
|
int GetRcvKernelSize() const override;
|
||||||
|
|
||||||
auto Close() -> void override;
|
auto Close() -> void override;
|
||||||
|
|
||||||
auto SetOption(const std::string& option, const void* value, size_t valueSize) -> void override;
|
auto SetOption(const std::string& option, const void* value, size_t valueSize) -> void override;
|
||||||
|
|
|
@ -11,6 +11,7 @@
|
||||||
#include "FairMQMessageSHM.h"
|
#include "FairMQMessageSHM.h"
|
||||||
#include "FairMQUnmanagedRegionSHM.h"
|
#include "FairMQUnmanagedRegionSHM.h"
|
||||||
#include "FairMQLogger.h"
|
#include "FairMQLogger.h"
|
||||||
|
#include <fairmq/Tools.h>
|
||||||
|
|
||||||
#include <zmq.h>
|
#include <zmq.h>
|
||||||
|
|
||||||
|
@ -18,6 +19,7 @@
|
||||||
|
|
||||||
using namespace std;
|
using namespace std;
|
||||||
using namespace fair::mq::shmem;
|
using namespace fair::mq::shmem;
|
||||||
|
using namespace fair::mq;
|
||||||
|
|
||||||
atomic<bool> FairMQSocketSHM::fInterrupted(false);
|
atomic<bool> FairMQSocketSHM::fInterrupted(false);
|
||||||
|
|
||||||
|
@ -477,6 +479,91 @@ void FairMQSocketSHM::GetOption(const string& option, void* value, size_t* value
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void FairMQSocketSHM::SetLinger(const int value)
|
||||||
|
{
|
||||||
|
if (zmq_setsockopt(fSocket, ZMQ_LINGER, &value, sizeof(value)) < 0) {
|
||||||
|
throw SocketError(tools::ToString("failed setting ZMQ_LINGER, reason: ", zmq_strerror(errno)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int FairMQSocketSHM::GetLinger() const
|
||||||
|
{
|
||||||
|
int value = 0;
|
||||||
|
size_t valueSize;
|
||||||
|
if (zmq_getsockopt(fSocket, ZMQ_LINGER, &value, &valueSize) < 0) {
|
||||||
|
throw SocketError(tools::ToString("failed getting ZMQ_LINGER, reason: ", zmq_strerror(errno)));
|
||||||
|
}
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQSocketSHM::SetSndBufSize(const int value)
|
||||||
|
{
|
||||||
|
if (zmq_setsockopt(fSocket, ZMQ_SNDHWM, &value, sizeof(value)) < 0) {
|
||||||
|
throw SocketError(tools::ToString("failed setting ZMQ_SNDHWM, reason: ", zmq_strerror(errno)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int FairMQSocketSHM::GetSndBufSize() const
|
||||||
|
{
|
||||||
|
int value = 0;
|
||||||
|
size_t valueSize;
|
||||||
|
if (zmq_getsockopt(fSocket, ZMQ_SNDHWM, &value, &valueSize) < 0) {
|
||||||
|
throw SocketError(tools::ToString("failed getting ZMQ_SNDHWM, reason: ", zmq_strerror(errno)));
|
||||||
|
}
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQSocketSHM::SetRcvBufSize(const int value)
|
||||||
|
{
|
||||||
|
if (zmq_setsockopt(fSocket, ZMQ_RCVHWM, &value, sizeof(value)) < 0) {
|
||||||
|
throw SocketError(tools::ToString("failed setting ZMQ_RCVHWM, reason: ", zmq_strerror(errno)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int FairMQSocketSHM::GetRcvBufSize() const
|
||||||
|
{
|
||||||
|
int value = 0;
|
||||||
|
size_t valueSize;
|
||||||
|
if (zmq_getsockopt(fSocket, ZMQ_RCVHWM, &value, &valueSize) < 0) {
|
||||||
|
throw SocketError(tools::ToString("failed getting ZMQ_RCVHWM, reason: ", zmq_strerror(errno)));
|
||||||
|
}
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQSocketSHM::SetSndKernelSize(const int value)
|
||||||
|
{
|
||||||
|
if (zmq_setsockopt(fSocket, ZMQ_SNDBUF, &value, sizeof(value)) < 0) {
|
||||||
|
throw SocketError(tools::ToString("failed getting ZMQ_SNDBUF, reason: ", zmq_strerror(errno)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int FairMQSocketSHM::GetSndKernelSize() const
|
||||||
|
{
|
||||||
|
int value = 0;
|
||||||
|
size_t valueSize;
|
||||||
|
if (zmq_getsockopt(fSocket, ZMQ_SNDBUF, &value, &valueSize) < 0) {
|
||||||
|
throw SocketError(tools::ToString("failed getting ZMQ_SNDBUF, reason: ", zmq_strerror(errno)));
|
||||||
|
}
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQSocketSHM::SetRcvKernelSize(const int value)
|
||||||
|
{
|
||||||
|
if (zmq_setsockopt(fSocket, ZMQ_RCVBUF, &value, sizeof(value)) < 0) {
|
||||||
|
throw SocketError(tools::ToString("failed getting ZMQ_RCVBUF, reason: ", zmq_strerror(errno)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int FairMQSocketSHM::GetRcvKernelSize() const
|
||||||
|
{
|
||||||
|
int value = 0;
|
||||||
|
size_t valueSize;
|
||||||
|
if (zmq_getsockopt(fSocket, ZMQ_RCVBUF, &value, &valueSize) < 0) {
|
||||||
|
throw SocketError(tools::ToString("failed getting ZMQ_RCVBUF, reason: ", zmq_strerror(errno)));
|
||||||
|
}
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
unsigned long FairMQSocketSHM::GetBytesTx() const
|
unsigned long FairMQSocketSHM::GetBytesTx() const
|
||||||
{
|
{
|
||||||
return fBytesTx;
|
return fBytesTx;
|
||||||
|
|
|
@ -49,6 +49,17 @@ class FairMQSocketSHM final : public FairMQSocket
|
||||||
void SetOption(const std::string& option, const void* value, size_t valueSize) override;
|
void SetOption(const std::string& option, const void* value, size_t valueSize) override;
|
||||||
void GetOption(const std::string& option, void* value, size_t* valueSize) override;
|
void GetOption(const std::string& option, void* value, size_t* valueSize) override;
|
||||||
|
|
||||||
|
void SetLinger(const int value) override;
|
||||||
|
int GetLinger() const override;
|
||||||
|
void SetSndBufSize(const int value) override;
|
||||||
|
int GetSndBufSize() const override;
|
||||||
|
void SetRcvBufSize(const int value) override;
|
||||||
|
int GetRcvBufSize() const override;
|
||||||
|
void SetSndKernelSize(const int value) override;
|
||||||
|
int GetSndKernelSize() const override;
|
||||||
|
void SetRcvKernelSize(const int value) override;
|
||||||
|
int GetRcvKernelSize() const override;
|
||||||
|
|
||||||
unsigned long GetBytesTx() const override;
|
unsigned long GetBytesTx() const override;
|
||||||
unsigned long GetBytesRx() const override;
|
unsigned long GetBytesRx() const override;
|
||||||
unsigned long GetMessagesTx() const override;
|
unsigned long GetMessagesTx() const override;
|
||||||
|
|
|
@ -9,12 +9,14 @@
|
||||||
#include "FairMQSocketZMQ.h"
|
#include "FairMQSocketZMQ.h"
|
||||||
#include "FairMQMessageZMQ.h"
|
#include "FairMQMessageZMQ.h"
|
||||||
#include "FairMQLogger.h"
|
#include "FairMQLogger.h"
|
||||||
|
#include <fairmq/Tools.h>
|
||||||
|
|
||||||
#include <zmq.h>
|
#include <zmq.h>
|
||||||
|
|
||||||
#include <cassert>
|
#include <cassert>
|
||||||
|
|
||||||
using namespace std;
|
using namespace std;
|
||||||
|
using namespace fair::mq;
|
||||||
|
|
||||||
atomic<bool> FairMQSocketZMQ::fInterrupted(false);
|
atomic<bool> FairMQSocketZMQ::fInterrupted(false);
|
||||||
|
|
||||||
|
@ -402,6 +404,91 @@ void FairMQSocketZMQ::GetOption(const string& option, void* value, size_t* value
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void FairMQSocketZMQ::SetLinger(const int value)
|
||||||
|
{
|
||||||
|
if (zmq_setsockopt(fSocket, ZMQ_LINGER, &value, sizeof(value)) < 0) {
|
||||||
|
throw SocketError(tools::ToString("failed setting ZMQ_LINGER, reason: ", zmq_strerror(errno)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int FairMQSocketZMQ::GetLinger() const
|
||||||
|
{
|
||||||
|
int value = 0;
|
||||||
|
size_t valueSize;
|
||||||
|
if (zmq_getsockopt(fSocket, ZMQ_LINGER, &value, &valueSize) < 0) {
|
||||||
|
throw SocketError(tools::ToString("failed getting ZMQ_LINGER, reason: ", zmq_strerror(errno)));
|
||||||
|
}
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQSocketZMQ::SetSndBufSize(const int value)
|
||||||
|
{
|
||||||
|
if (zmq_setsockopt(fSocket, ZMQ_SNDHWM, &value, sizeof(value)) < 0) {
|
||||||
|
throw SocketError(tools::ToString("failed setting ZMQ_SNDHWM, reason: ", zmq_strerror(errno)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int FairMQSocketZMQ::GetSndBufSize() const
|
||||||
|
{
|
||||||
|
int value = 0;
|
||||||
|
size_t valueSize;
|
||||||
|
if (zmq_getsockopt(fSocket, ZMQ_SNDHWM, &value, &valueSize) < 0) {
|
||||||
|
throw SocketError(tools::ToString("failed getting ZMQ_SNDHWM, reason: ", zmq_strerror(errno)));
|
||||||
|
}
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQSocketZMQ::SetRcvBufSize(const int value)
|
||||||
|
{
|
||||||
|
if (zmq_setsockopt(fSocket, ZMQ_RCVHWM, &value, sizeof(value)) < 0) {
|
||||||
|
throw SocketError(tools::ToString("failed setting ZMQ_RCVHWM, reason: ", zmq_strerror(errno)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int FairMQSocketZMQ::GetRcvBufSize() const
|
||||||
|
{
|
||||||
|
int value = 0;
|
||||||
|
size_t valueSize;
|
||||||
|
if (zmq_getsockopt(fSocket, ZMQ_RCVHWM, &value, &valueSize) < 0) {
|
||||||
|
throw SocketError(tools::ToString("failed getting ZMQ_RCVHWM, reason: ", zmq_strerror(errno)));
|
||||||
|
}
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQSocketZMQ::SetSndKernelSize(const int value)
|
||||||
|
{
|
||||||
|
if (zmq_setsockopt(fSocket, ZMQ_SNDBUF, &value, sizeof(value)) < 0) {
|
||||||
|
throw SocketError(tools::ToString("failed getting ZMQ_SNDBUF, reason: ", zmq_strerror(errno)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int FairMQSocketZMQ::GetSndKernelSize() const
|
||||||
|
{
|
||||||
|
int value = 0;
|
||||||
|
size_t valueSize;
|
||||||
|
if (zmq_getsockopt(fSocket, ZMQ_SNDBUF, &value, &valueSize) < 0) {
|
||||||
|
throw SocketError(tools::ToString("failed getting ZMQ_SNDBUF, reason: ", zmq_strerror(errno)));
|
||||||
|
}
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQSocketZMQ::SetRcvKernelSize(const int value)
|
||||||
|
{
|
||||||
|
if (zmq_setsockopt(fSocket, ZMQ_RCVBUF, &value, sizeof(value)) < 0) {
|
||||||
|
throw SocketError(tools::ToString("failed getting ZMQ_RCVBUF, reason: ", zmq_strerror(errno)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int FairMQSocketZMQ::GetRcvKernelSize() const
|
||||||
|
{
|
||||||
|
int value = 0;
|
||||||
|
size_t valueSize;
|
||||||
|
if (zmq_getsockopt(fSocket, ZMQ_RCVBUF, &value, &valueSize) < 0) {
|
||||||
|
throw SocketError(tools::ToString("failed getting ZMQ_RCVBUF, reason: ", zmq_strerror(errno)));
|
||||||
|
}
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
unsigned long FairMQSocketZMQ::GetBytesTx() const
|
unsigned long FairMQSocketZMQ::GetBytesTx() const
|
||||||
{
|
{
|
||||||
return fBytesTx;
|
return fBytesTx;
|
||||||
|
|
|
@ -49,6 +49,17 @@ class FairMQSocketZMQ final : public FairMQSocket
|
||||||
void SetOption(const std::string& option, const void* value, size_t valueSize) override;
|
void SetOption(const std::string& option, const void* value, size_t valueSize) override;
|
||||||
void GetOption(const std::string& option, void* value, size_t* valueSize) override;
|
void GetOption(const std::string& option, void* value, size_t* valueSize) override;
|
||||||
|
|
||||||
|
void SetLinger(const int value) override;
|
||||||
|
int GetLinger() const override;
|
||||||
|
void SetSndBufSize(const int value) override;
|
||||||
|
int GetSndBufSize() const override;
|
||||||
|
void SetRcvBufSize(const int value) override;
|
||||||
|
int GetRcvBufSize() const override;
|
||||||
|
void SetSndKernelSize(const int value) override;
|
||||||
|
int GetSndKernelSize() const override;
|
||||||
|
void SetRcvKernelSize(const int value) override;
|
||||||
|
int GetRcvKernelSize() const override;
|
||||||
|
|
||||||
unsigned long GetBytesTx() const override;
|
unsigned long GetBytesTx() const override;
|
||||||
unsigned long GetBytesRx() const override;
|
unsigned long GetBytesRx() const override;
|
||||||
unsigned long GetMessagesTx() const override;
|
unsigned long GetMessagesTx() const override;
|
||||||
|
|
Loading…
Reference in New Issue
Block a user