Remove GetSocket interface that exposes transport details

This commit is contained in:
Alexey Rybalchenko 2018-10-16 13:27:30 +02:00 committed by Dennis Klein
parent f8824335a5
commit ce4062f3a0
27 changed files with 59 additions and 189 deletions

View File

@ -129,7 +129,7 @@ FairMQChannel& FairMQChannel::operator=(const FairMQChannel& chan)
return *this; return *this;
} }
FairMQSocket const & FairMQChannel::GetSocket() const FairMQSocket & FairMQChannel::GetSocket() const
{ {
assert(fSocket); assert(fSocket);
return *fSocket; return *fSocket;

View File

@ -51,7 +51,7 @@ class FairMQChannel
/// Default destructor /// Default destructor
virtual ~FairMQChannel(); virtual ~FairMQChannel();
FairMQSocket const & GetSocket() const; FairMQSocket& GetSocket() const;
auto Bind(const std::string& address) -> bool auto Bind(const std::string& address) -> bool
{ {

View File

@ -35,9 +35,6 @@ class FairMQSocket
virtual int64_t TrySend(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) = 0; virtual int64_t TrySend(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) = 0;
virtual int64_t TryReceive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) = 0; virtual int64_t TryReceive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) = 0;
virtual void* GetSocket() const = 0;
virtual int GetSocket(int nothing) const = 0;
virtual void Close() = 0; virtual void Close() = 0;
virtual void SetOption(const std::string& option, const void* value, size_t valueSize) = 0; virtual void SetOption(const std::string& option, const void* value, size_t valueSize) = 0;

View File

@ -63,8 +63,6 @@ class FairMQTransportFactory
virtual FairMQPollerPtr CreatePoller(const std::vector<const FairMQChannel*>& channels) const = 0; virtual FairMQPollerPtr CreatePoller(const std::vector<const FairMQChannel*>& channels) const = 0;
/// Create a poller for specific channels (all subchannels) /// Create a poller for specific channels (all subchannels)
virtual FairMQPollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const = 0; virtual FairMQPollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const = 0;
/// Create a poller for two sockets
virtual FairMQPollerPtr CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) const = 0;
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr) const = 0; virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr) const = 0;

View File

@ -19,6 +19,7 @@
#include <nanomsg/pair.h> #include <nanomsg/pair.h>
#include "FairMQPollerNN.h" #include "FairMQPollerNN.h"
#include "FairMQSocketNN.h"
#include "FairMQLogger.h" #include "FairMQLogger.h"
using namespace std; using namespace std;
@ -33,11 +34,11 @@ FairMQPollerNN::FairMQPollerNN(const vector<FairMQChannel>& channels)
for (int i = 0; i < fNumItems; ++i) for (int i = 0; i < fNumItems; ++i)
{ {
fItems[i].fd = channels.at(i).GetSocket().GetSocket(1); fItems[i].fd = static_cast<const FairMQSocketNN*>(&(channels.at(i).GetSocket()))->GetSocket();
int type = 0; int type = 0;
size_t sz = sizeof(type); size_t sz = sizeof(type);
nn_getsockopt(channels.at(i).GetSocket().GetSocket(1), NN_SOL_SOCKET, NN_PROTOCOL, &type, &sz); nn_getsockopt(static_cast<const FairMQSocketNN*>(&(channels.at(i).GetSocket()))->GetSocket(), NN_SOL_SOCKET, NN_PROTOCOL, &type, &sz);
SetItemEvents(fItems[i], type); SetItemEvents(fItems[i], type);
} }
@ -53,11 +54,11 @@ FairMQPollerNN::FairMQPollerNN(const vector<const FairMQChannel*>& channels)
for (int i = 0; i < fNumItems; ++i) for (int i = 0; i < fNumItems; ++i)
{ {
fItems[i].fd = channels.at(i)->GetSocket().GetSocket(1); fItems[i].fd = static_cast<const FairMQSocketNN*>(&(channels.at(i)->GetSocket()))->GetSocket();
int type = 0; int type = 0;
size_t sz = sizeof(type); size_t sz = sizeof(type);
nn_getsockopt(channels.at(i)->GetSocket().GetSocket(1), NN_SOL_SOCKET, NN_PROTOCOL, &type, &sz); nn_getsockopt(static_cast<const FairMQSocketNN*>(&(channels.at(i)->GetSocket()))->GetSocket(), NN_SOL_SOCKET, NN_PROTOCOL, &type, &sz);
SetItemEvents(fItems[i], type); SetItemEvents(fItems[i], type);
} }
@ -87,11 +88,11 @@ FairMQPollerNN::FairMQPollerNN(const unordered_map<string, vector<FairMQChannel>
for (unsigned int i = 0; i < channelsMap.at(channel).size(); ++i) for (unsigned int i = 0; i < channelsMap.at(channel).size(); ++i)
{ {
index = fOffsetMap[channel] + i; index = fOffsetMap[channel] + i;
fItems[index].fd = channelsMap.at(channel).at(i).GetSocket().GetSocket(1); fItems[index].fd = static_cast<const FairMQSocketNN*>(&(channelsMap.at(channel).at(i).GetSocket()))->GetSocket();
int type = 0; int type = 0;
size_t sz = sizeof(type); size_t sz = sizeof(type);
nn_getsockopt(channelsMap.at(channel).at(i).GetSocket().GetSocket(1), NN_SOL_SOCKET, NN_PROTOCOL, &type, &sz); nn_getsockopt(static_cast<const FairMQSocketNN*>(&(channelsMap.at(channel).at(i).GetSocket()))->GetSocket(), NN_SOL_SOCKET, NN_PROTOCOL, &type, &sz);
SetItemEvents(fItems[index], type); SetItemEvents(fItems[index], type);
} }
@ -105,27 +106,6 @@ FairMQPollerNN::FairMQPollerNN(const unordered_map<string, vector<FairMQChannel>
} }
} }
FairMQPollerNN::FairMQPollerNN(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket)
: fItems()
, fNumItems(2)
, fOffsetMap()
{
fItems = new nn_pollfd[fNumItems];
fItems[0].fd = cmdSocket.GetSocket(1);
fItems[0].events = NN_POLLIN;
fItems[0].revents = 0;
fItems[1].fd = dataSocket.GetSocket(1);
fItems[1].revents = 0;
int type = 0;
size_t sz = sizeof(type);
nn_getsockopt(dataSocket.GetSocket(1), NN_SOL_SOCKET, NN_PROTOCOL, &type, &sz);
SetItemEvents(fItems[1], type);
}
void FairMQPollerNN::SetItemEvents(nn_pollfd& item, const int type) void FairMQPollerNN::SetItemEvents(nn_pollfd& item, const int type)
{ {
if (type == NN_REQ || type == NN_REP || type == NN_PAIR) if (type == NN_REQ || type == NN_REP || type == NN_PAIR)

View File

@ -50,8 +50,6 @@ class FairMQPollerNN final : public FairMQPoller
~FairMQPollerNN() override; ~FairMQPollerNN() override;
private: private:
FairMQPollerNN(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket);
nn_pollfd* fItems; nn_pollfd* fItems;
int fNumItems; int fNumItems;

View File

@ -416,12 +416,7 @@ void FairMQSocketNN::Resume()
fInterrupted = false; fInterrupted = false;
} }
void* FairMQSocketNN::GetSocket() const int FairMQSocketNN::GetSocket() const
{
return nullptr; // dummy method to comply with the interface. functionality not possible in zeromq.
}
int FairMQSocketNN::GetSocket(int /*nothing*/) const
{ {
return fSocket; return fSocket;
} }
@ -445,8 +440,7 @@ void FairMQSocketNN::SetOption(const string& option, const void* value, size_t v
if (option == "linger") if (option == "linger")
{ {
int val = *(static_cast<int*>(const_cast<void*>(value))); fLinger = *static_cast<int*>(const_cast<void*>(value));
fLinger = val;
return; return;
} }
@ -459,6 +453,19 @@ void FairMQSocketNN::SetOption(const string& option, const void* value, size_t v
void FairMQSocketNN::GetOption(const string& option, void* value, size_t* valueSize) void FairMQSocketNN::GetOption(const string& option, void* value, size_t* valueSize)
{ {
if (option == "linger")
{
*static_cast<int*>(value) = fLinger;
return;
}
if (option == "snd-hwm" || option == "rcv-hwm")
{
*static_cast<int*>(value) = -1;
return;
}
int rc = nn_getsockopt(fSocket, NN_SOL_SOCKET, GetConstant(option), value, valueSize); int rc = nn_getsockopt(fSocket, NN_SOL_SOCKET, GetConstant(option), value, valueSize);
if (rc < 0) if (rc < 0)
{ {

View File

@ -37,8 +37,7 @@ class FairMQSocketNN final : public FairMQSocket
int64_t TrySend(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) override; int64_t TrySend(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) override;
int64_t TryReceive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) override; int64_t TryReceive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) override;
void* GetSocket() const override; int GetSocket() const;
int GetSocket(int nothing) const override;
void Close() override; void Close() override;

View File

@ -65,11 +65,6 @@ FairMQPollerPtr FairMQTransportFactoryNN::CreatePoller(const unordered_map<strin
return unique_ptr<FairMQPoller>(new FairMQPollerNN(channelsMap, channelList)); return unique_ptr<FairMQPoller>(new FairMQPollerNN(channelsMap, channelList));
} }
FairMQPollerPtr FairMQTransportFactoryNN::CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) const
{
return unique_ptr<FairMQPoller>(new FairMQPollerNN(cmdSocket, dataSocket));
}
FairMQUnmanagedRegionPtr FairMQTransportFactoryNN::CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback) const FairMQUnmanagedRegionPtr FairMQTransportFactoryNN::CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback) const
{ {
return unique_ptr<FairMQUnmanagedRegion>(new FairMQUnmanagedRegionNN(size, callback)); return unique_ptr<FairMQUnmanagedRegion>(new FairMQUnmanagedRegionNN(size, callback));

View File

@ -35,7 +35,6 @@ class FairMQTransportFactoryNN final : public FairMQTransportFactory
FairMQPollerPtr CreatePoller(const std::vector<FairMQChannel>& channels) const override; FairMQPollerPtr CreatePoller(const std::vector<FairMQChannel>& channels) const override;
FairMQPollerPtr CreatePoller(const std::vector<const FairMQChannel*>& channels) const override; FairMQPollerPtr CreatePoller(const std::vector<const FairMQChannel*>& channels) const override;
FairMQPollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const override; FairMQPollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const override;
FairMQPollerPtr CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) const override;
FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback) const override; FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback) const override;

View File

@ -7,6 +7,7 @@
********************************************************************************/ ********************************************************************************/
#include <fairmq/ofi/Poller.h> #include <fairmq/ofi/Poller.h>
#include <fairmq/ofi/Socket.h>
#include <fairmq/Tools.h> #include <fairmq/Tools.h>
#include <FairMQLogger.h> #include <FairMQLogger.h>
@ -27,13 +28,13 @@ Poller::Poller(const vector<FairMQChannel>& channels)
fItems = new zmq_pollitem_t[fNumItems]; fItems = new zmq_pollitem_t[fNumItems];
for (int i = 0; i < fNumItems; ++i) { for (int i = 0; i < fNumItems; ++i) {
fItems[i].socket = channels.at(i).GetSocket().GetSocket(); fItems[i].socket = static_cast<const Socket*>(&(channels.at(i).GetSocket()))->GetSocket();
fItems[i].fd = 0; fItems[i].fd = 0;
fItems[i].revents = 0; fItems[i].revents = 0;
int type = 0; int type = 0;
size_t size = sizeof(type); size_t size = sizeof(type);
zmq_getsockopt(channels.at(i).GetSocket().GetSocket(), ZMQ_TYPE, &type, &size); zmq_getsockopt(static_cast<const Socket*>(&(channels.at(i).GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size);
SetItemEvents(fItems[i], type); SetItemEvents(fItems[i], type);
} }
@ -45,13 +46,13 @@ Poller::Poller(const vector<const FairMQChannel*>& channels)
fItems = new zmq_pollitem_t[fNumItems]; fItems = new zmq_pollitem_t[fNumItems];
for (int i = 0; i < fNumItems; ++i) { for (int i = 0; i < fNumItems; ++i) {
fItems[i].socket = channels.at(i)->GetSocket().GetSocket(); fItems[i].socket = static_cast<const Socket*>(&(channels.at(i)->GetSocket()))->GetSocket();
fItems[i].fd = 0; fItems[i].fd = 0;
fItems[i].revents = 0; fItems[i].revents = 0;
int type = 0; int type = 0;
size_t size = sizeof(type); size_t size = sizeof(type);
zmq_getsockopt(channels.at(i)->GetSocket().GetSocket(), ZMQ_TYPE, &type, &size); zmq_getsockopt(static_cast<const Socket*>(&(channels.at(i)->GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size);
SetItemEvents(fItems[i], type); SetItemEvents(fItems[i], type);
} }
@ -75,13 +76,13 @@ Poller::Poller(const unordered_map<string, vector<FairMQChannel>>& channelsMap,
for (unsigned int i = 0; i < channelsMap.at(channel).size(); ++i) { for (unsigned int i = 0; i < channelsMap.at(channel).size(); ++i) {
index = fOffsetMap[channel] + i; index = fOffsetMap[channel] + i;
fItems[index].socket = channelsMap.at(channel).at(i).GetSocket().GetSocket(); fItems[index].socket = static_cast<const Socket*>(&(channelsMap.at(channel).at(i).GetSocket()))->GetSocket();
fItems[index].fd = 0; fItems[index].fd = 0;
fItems[index].revents = 0; fItems[index].revents = 0;
int type = 0; int type = 0;
size_t size = sizeof(type); size_t size = sizeof(type);
zmq_getsockopt(channelsMap.at(channel).at(i).GetSocket().GetSocket(), ZMQ_TYPE, &type, &size); zmq_getsockopt(static_cast<const Socket*>(&(channelsMap.at(channel).at(i).GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size);
SetItemEvents(fItems[index], type); SetItemEvents(fItems[index], type);
} }
@ -93,27 +94,6 @@ Poller::Poller(const unordered_map<string, vector<FairMQChannel>>& channelsMap,
} }
} }
Poller::Poller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket)
: fNumItems{2}
{
fItems = new zmq_pollitem_t[fNumItems];
fItems[0].socket = cmdSocket.GetSocket();
fItems[0].fd = 0;
fItems[0].events = ZMQ_POLLIN;
fItems[0].revents = 0;
fItems[1].socket = dataSocket.GetSocket();
fItems[1].fd = 0;
fItems[1].revents = 0;
int type = 0;
size_t size = sizeof(type);
zmq_getsockopt(dataSocket.GetSocket(), ZMQ_TYPE, &type, &size);
SetItemEvents(fItems[1], type);
}
auto Poller::SetItemEvents(zmq_pollitem_t& item, const int type) -> void auto Poller::SetItemEvents(zmq_pollitem_t& item, const int type) -> void
{ {
if (type == ZMQ_PAIR) { if (type == ZMQ_PAIR) {

View File

@ -57,8 +57,6 @@ class Poller final : public FairMQPoller
~Poller() override; ~Poller() override;
private: private:
Poller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket);
zmq_pollitem_t* fItems; zmq_pollitem_t* fItems;
int fNumItems; int fNumItems;

View File

@ -54,8 +54,7 @@ class Socket final : public fair::mq::Socket
auto TrySend(std::vector<MessagePtr>& msgVec) -> int64_t override; auto TrySend(std::vector<MessagePtr>& msgVec) -> int64_t override;
auto TryReceive(std::vector<MessagePtr>& msgVec) -> int64_t override; auto TryReceive(std::vector<MessagePtr>& msgVec) -> int64_t override;
auto GetSocket() const -> void* override { return fControlSocket; } auto GetSocket() const -> void* { return fControlSocket; }
auto GetSocket(int nothing) const -> int override { return -1; }
void SetLinger(const int value) override; void SetLinger(const int value) override;
int GetLinger() const override; int GetLinger() const override;

View File

@ -76,11 +76,6 @@ auto TransportFactory::CreatePoller(const unordered_map<string, vector<FairMQCha
return PollerPtr{new Poller(channelsMap, channelList)}; return PollerPtr{new Poller(channelsMap, channelList)};
} }
auto TransportFactory::CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) const -> PollerPtr
{
return PollerPtr{new Poller(cmdSocket, dataSocket)};
}
auto TransportFactory::CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback) const -> UnmanagedRegionPtr auto TransportFactory::CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback) const -> UnmanagedRegionPtr
{ {
throw runtime_error{"Not yet implemented UMR."}; throw runtime_error{"Not yet implemented UMR."};

View File

@ -43,7 +43,6 @@ class TransportFactory final : public FairMQTransportFactory
auto CreatePoller(const std::vector<FairMQChannel>& channels) const -> PollerPtr override; auto CreatePoller(const std::vector<FairMQChannel>& channels) const -> PollerPtr override;
auto CreatePoller(const std::vector<const FairMQChannel*>& channels) const -> PollerPtr override; auto CreatePoller(const std::vector<const FairMQChannel*>& channels) const -> PollerPtr override;
auto CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const -> PollerPtr override; auto CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const -> PollerPtr override;
auto CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) const -> PollerPtr override;
auto CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr) const -> UnmanagedRegionPtr override; auto CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr) const -> UnmanagedRegionPtr override;

View File

@ -13,6 +13,7 @@
*/ */
#include "FairMQPollerSHM.h" #include "FairMQPollerSHM.h"
#include "FairMQSocketSHM.h"
#include "FairMQLogger.h" #include "FairMQLogger.h"
#include <zmq.h> #include <zmq.h>
@ -29,13 +30,13 @@ FairMQPollerSHM::FairMQPollerSHM(const vector<FairMQChannel>& channels)
for (int i = 0; i < fNumItems; ++i) for (int i = 0; i < fNumItems; ++i)
{ {
fItems[i].socket = channels.at(i).GetSocket().GetSocket(); fItems[i].socket = static_cast<const FairMQSocketSHM*>(&(channels.at(i).GetSocket()))->GetSocket();
fItems[i].fd = 0; fItems[i].fd = 0;
fItems[i].revents = 0; fItems[i].revents = 0;
int type = 0; int type = 0;
size_t size = sizeof(type); size_t size = sizeof(type);
zmq_getsockopt(channels.at(i).GetSocket().GetSocket(), ZMQ_TYPE, &type, &size); zmq_getsockopt(static_cast<const FairMQSocketSHM*>(&(channels.at(i).GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size);
SetItemEvents(fItems[i], type); SetItemEvents(fItems[i], type);
} }
@ -51,13 +52,13 @@ FairMQPollerSHM::FairMQPollerSHM(const vector<const FairMQChannel*>& channels)
for (int i = 0; i < fNumItems; ++i) for (int i = 0; i < fNumItems; ++i)
{ {
fItems[i].socket = channels.at(i)->GetSocket().GetSocket(); fItems[i].socket = static_cast<const FairMQSocketSHM*>(&(channels.at(i)->GetSocket()))->GetSocket();
fItems[i].fd = 0; fItems[i].fd = 0;
fItems[i].revents = 0; fItems[i].revents = 0;
int type = 0; int type = 0;
size_t size = sizeof(type); size_t size = sizeof(type);
zmq_getsockopt(channels.at(i)->GetSocket().GetSocket(), ZMQ_TYPE, &type, &size); zmq_getsockopt(static_cast<const FairMQSocketSHM*>(&(channels.at(i)->GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size);
SetItemEvents(fItems[i], type); SetItemEvents(fItems[i], type);
} }
@ -88,13 +89,13 @@ FairMQPollerSHM::FairMQPollerSHM(const unordered_map<string, vector<FairMQChanne
{ {
index = fOffsetMap[channel] + i; index = fOffsetMap[channel] + i;
fItems[index].socket = channelsMap.at(channel).at(i).GetSocket().GetSocket(); fItems[index].socket = static_cast<const FairMQSocketSHM*>(&(channelsMap.at(channel).at(i).GetSocket()))->GetSocket();
fItems[index].fd = 0; fItems[index].fd = 0;
fItems[index].revents = 0; fItems[index].revents = 0;
int type = 0; int type = 0;
size_t size = sizeof(type); size_t size = sizeof(type);
zmq_getsockopt(channelsMap.at(channel).at(i).GetSocket().GetSocket(), ZMQ_TYPE, &type, &size); zmq_getsockopt(static_cast<const FairMQSocketSHM*>(&(channelsMap.at(channel).at(i).GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size);
SetItemEvents(fItems[index], type); SetItemEvents(fItems[index], type);
} }
@ -108,29 +109,6 @@ FairMQPollerSHM::FairMQPollerSHM(const unordered_map<string, vector<FairMQChanne
} }
} }
FairMQPollerSHM::FairMQPollerSHM(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket)
: fItems()
, fNumItems(2)
, fOffsetMap()
{
fItems = new zmq_pollitem_t[fNumItems];
fItems[0].socket = cmdSocket.GetSocket();
fItems[0].fd = 0;
fItems[0].events = ZMQ_POLLIN;
fItems[0].revents = 0;
fItems[1].socket = dataSocket.GetSocket();
fItems[1].fd = 0;
fItems[1].revents = 0;
int type = 0;
size_t size = sizeof(type);
zmq_getsockopt(dataSocket.GetSocket(), ZMQ_TYPE, &type, &size);
SetItemEvents(fItems[1], type);
}
void FairMQPollerSHM::SetItemEvents(zmq_pollitem_t& item, const int type) void FairMQPollerSHM::SetItemEvents(zmq_pollitem_t& item, const int type)
{ {
if (type == ZMQ_REQ || type == ZMQ_REP || type == ZMQ_PAIR || type == ZMQ_DEALER || type == ZMQ_ROUTER) if (type == ZMQ_REQ || type == ZMQ_REP || type == ZMQ_PAIR || type == ZMQ_DEALER || type == ZMQ_ROUTER)

View File

@ -43,8 +43,6 @@ class FairMQPollerSHM final : public FairMQPoller
~FairMQPollerSHM() override; ~FairMQPollerSHM() override;
private: private:
FairMQPollerSHM(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket);
zmq_pollitem_t* fItems; zmq_pollitem_t* fItems;
int fNumItems; int fNumItems;

View File

@ -457,12 +457,6 @@ void* FairMQSocketSHM::GetSocket() const
return fSocket; return fSocket;
} }
int FairMQSocketSHM::GetSocket(int) const
{
// dummy method to comply with the interface. functionality not possible in zeromq.
return -1;
}
void FairMQSocketSHM::SetOption(const string& option, const void* value, size_t valueSize) void FairMQSocketSHM::SetOption(const string& option, const void* value, size_t valueSize)
{ {
if (zmq_setsockopt(fSocket, GetConstant(option), value, valueSize) < 0) if (zmq_setsockopt(fSocket, GetConstant(option), value, valueSize) < 0)
@ -489,7 +483,7 @@ void FairMQSocketSHM::SetLinger(const int value)
int FairMQSocketSHM::GetLinger() const int FairMQSocketSHM::GetLinger() const
{ {
int value = 0; int value = 0;
size_t valueSize; size_t valueSize = sizeof(value);
if (zmq_getsockopt(fSocket, ZMQ_LINGER, &value, &valueSize) < 0) { if (zmq_getsockopt(fSocket, ZMQ_LINGER, &value, &valueSize) < 0) {
throw SocketError(tools::ToString("failed getting ZMQ_LINGER, reason: ", zmq_strerror(errno))); throw SocketError(tools::ToString("failed getting ZMQ_LINGER, reason: ", zmq_strerror(errno)));
} }
@ -506,7 +500,7 @@ void FairMQSocketSHM::SetSndBufSize(const int value)
int FairMQSocketSHM::GetSndBufSize() const int FairMQSocketSHM::GetSndBufSize() const
{ {
int value = 0; int value = 0;
size_t valueSize; size_t valueSize = sizeof(value);
if (zmq_getsockopt(fSocket, ZMQ_SNDHWM, &value, &valueSize) < 0) { if (zmq_getsockopt(fSocket, ZMQ_SNDHWM, &value, &valueSize) < 0) {
throw SocketError(tools::ToString("failed getting ZMQ_SNDHWM, reason: ", zmq_strerror(errno))); throw SocketError(tools::ToString("failed getting ZMQ_SNDHWM, reason: ", zmq_strerror(errno)));
} }
@ -523,7 +517,7 @@ void FairMQSocketSHM::SetRcvBufSize(const int value)
int FairMQSocketSHM::GetRcvBufSize() const int FairMQSocketSHM::GetRcvBufSize() const
{ {
int value = 0; int value = 0;
size_t valueSize; size_t valueSize = sizeof(value);
if (zmq_getsockopt(fSocket, ZMQ_RCVHWM, &value, &valueSize) < 0) { if (zmq_getsockopt(fSocket, ZMQ_RCVHWM, &value, &valueSize) < 0) {
throw SocketError(tools::ToString("failed getting ZMQ_RCVHWM, reason: ", zmq_strerror(errno))); throw SocketError(tools::ToString("failed getting ZMQ_RCVHWM, reason: ", zmq_strerror(errno)));
} }
@ -540,7 +534,7 @@ void FairMQSocketSHM::SetSndKernelSize(const int value)
int FairMQSocketSHM::GetSndKernelSize() const int FairMQSocketSHM::GetSndKernelSize() const
{ {
int value = 0; int value = 0;
size_t valueSize; size_t valueSize = sizeof(value);
if (zmq_getsockopt(fSocket, ZMQ_SNDBUF, &value, &valueSize) < 0) { if (zmq_getsockopt(fSocket, ZMQ_SNDBUF, &value, &valueSize) < 0) {
throw SocketError(tools::ToString("failed getting ZMQ_SNDBUF, reason: ", zmq_strerror(errno))); throw SocketError(tools::ToString("failed getting ZMQ_SNDBUF, reason: ", zmq_strerror(errno)));
} }
@ -557,7 +551,7 @@ void FairMQSocketSHM::SetRcvKernelSize(const int value)
int FairMQSocketSHM::GetRcvKernelSize() const int FairMQSocketSHM::GetRcvKernelSize() const
{ {
int value = 0; int value = 0;
size_t valueSize; size_t valueSize = sizeof(value);
if (zmq_getsockopt(fSocket, ZMQ_RCVBUF, &value, &valueSize) < 0) { if (zmq_getsockopt(fSocket, ZMQ_RCVBUF, &value, &valueSize) < 0) {
throw SocketError(tools::ToString("failed getting ZMQ_RCVBUF, reason: ", zmq_strerror(errno))); throw SocketError(tools::ToString("failed getting ZMQ_RCVBUF, reason: ", zmq_strerror(errno)));
} }

View File

@ -38,8 +38,7 @@ class FairMQSocketSHM final : public FairMQSocket
int64_t TrySend(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) override; int64_t TrySend(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) override;
int64_t TryReceive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) override; int64_t TryReceive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) override;
void* GetSocket() const override; void* GetSocket() const;
int GetSocket(int nothing) const override;
void Close() override; void Close() override;

View File

@ -254,11 +254,6 @@ FairMQPollerPtr FairMQTransportFactorySHM::CreatePoller(const unordered_map<stri
return unique_ptr<FairMQPoller>(new FairMQPollerSHM(channelsMap, channelList)); return unique_ptr<FairMQPoller>(new FairMQPollerSHM(channelsMap, channelList));
} }
FairMQPollerPtr FairMQTransportFactorySHM::CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) const
{
return unique_ptr<FairMQPoller>(new FairMQPollerSHM(cmdSocket, dataSocket));
}
FairMQUnmanagedRegionPtr FairMQTransportFactorySHM::CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback) const FairMQUnmanagedRegionPtr FairMQTransportFactorySHM::CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback) const
{ {
return unique_ptr<FairMQUnmanagedRegion>(new FairMQUnmanagedRegionSHM(*fManager, size, callback)); return unique_ptr<FairMQUnmanagedRegion>(new FairMQUnmanagedRegionSHM(*fManager, size, callback));

View File

@ -43,7 +43,6 @@ class FairMQTransportFactorySHM final : public FairMQTransportFactory
FairMQPollerPtr CreatePoller(const std::vector<FairMQChannel>& channels) const override; FairMQPollerPtr CreatePoller(const std::vector<FairMQChannel>& channels) const override;
FairMQPollerPtr CreatePoller(const std::vector<const FairMQChannel*>& channels) const override; FairMQPollerPtr CreatePoller(const std::vector<const FairMQChannel*>& channels) const override;
FairMQPollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const override; FairMQPollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const override;
FairMQPollerPtr CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) const override;
FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr) const override; FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr) const override;

View File

@ -15,6 +15,7 @@
#include <zmq.h> #include <zmq.h>
#include "FairMQPollerZMQ.h" #include "FairMQPollerZMQ.h"
#include "FairMQSocketZMQ.h"
#include "FairMQLogger.h" #include "FairMQLogger.h"
using namespace std; using namespace std;
@ -29,13 +30,13 @@ FairMQPollerZMQ::FairMQPollerZMQ(const vector<FairMQChannel>& channels)
for (int i = 0; i < fNumItems; ++i) for (int i = 0; i < fNumItems; ++i)
{ {
fItems[i].socket = channels.at(i).GetSocket().GetSocket(); fItems[i].socket = static_cast<const FairMQSocketZMQ*>(&(channels.at(i).GetSocket()))->GetSocket();
fItems[i].fd = 0; fItems[i].fd = 0;
fItems[i].revents = 0; fItems[i].revents = 0;
int type = 0; int type = 0;
size_t size = sizeof(type); size_t size = sizeof(type);
zmq_getsockopt(channels.at(i).GetSocket().GetSocket(), ZMQ_TYPE, &type, &size); zmq_getsockopt(static_cast<const FairMQSocketZMQ*>(&(channels.at(i).GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size);
SetItemEvents(fItems[i], type); SetItemEvents(fItems[i], type);
} }
@ -52,13 +53,13 @@ FairMQPollerZMQ::FairMQPollerZMQ(const std::vector<const FairMQChannel*>& channe
for (int i = 0; i < fNumItems; ++i) for (int i = 0; i < fNumItems; ++i)
{ {
fItems[i].socket = channels.at(i)->GetSocket().GetSocket(); fItems[i].socket = static_cast<const FairMQSocketZMQ*>(&(channels.at(i)->GetSocket()))->GetSocket();
fItems[i].fd = 0; fItems[i].fd = 0;
fItems[i].revents = 0; fItems[i].revents = 0;
int type = 0; int type = 0;
size_t size = sizeof(type); size_t size = sizeof(type);
zmq_getsockopt(channels.at(i)->GetSocket().GetSocket(), ZMQ_TYPE, &type, &size); zmq_getsockopt(static_cast<const FairMQSocketZMQ*>(&(channels.at(i)->GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size);
SetItemEvents(fItems[i], type); SetItemEvents(fItems[i], type);
} }
@ -89,13 +90,13 @@ FairMQPollerZMQ::FairMQPollerZMQ(const unordered_map<string, vector<FairMQChanne
{ {
index = fOffsetMap[channel] + i; index = fOffsetMap[channel] + i;
fItems[index].socket = channelsMap.at(channel).at(i).GetSocket().GetSocket(); fItems[index].socket = static_cast<const FairMQSocketZMQ*>(&(channelsMap.at(channel).at(i).GetSocket()))->GetSocket();
fItems[index].fd = 0; fItems[index].fd = 0;
fItems[index].revents = 0; fItems[index].revents = 0;
int type = 0; int type = 0;
size_t size = sizeof(type); size_t size = sizeof(type);
zmq_getsockopt(channelsMap.at(channel).at(i).GetSocket().GetSocket(), ZMQ_TYPE, &type, &size); zmq_getsockopt(static_cast<const FairMQSocketZMQ*>(&(channelsMap.at(channel).at(i).GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size);
SetItemEvents(fItems[index], type); SetItemEvents(fItems[index], type);
} }
@ -109,29 +110,6 @@ FairMQPollerZMQ::FairMQPollerZMQ(const unordered_map<string, vector<FairMQChanne
} }
} }
FairMQPollerZMQ::FairMQPollerZMQ(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket)
: fItems()
, fNumItems(2)
, fOffsetMap()
{
fItems = new zmq_pollitem_t[fNumItems];
fItems[0].socket = cmdSocket.GetSocket();
fItems[0].fd = 0;
fItems[0].events = ZMQ_POLLIN;
fItems[0].revents = 0;
fItems[1].socket = dataSocket.GetSocket();
fItems[1].fd = 0;
fItems[1].revents = 0;
int type = 0;
size_t size = sizeof(type);
zmq_getsockopt(dataSocket.GetSocket(), ZMQ_TYPE, &type, &size);
SetItemEvents(fItems[1], type);
}
void FairMQPollerZMQ::SetItemEvents(zmq_pollitem_t& item, const int type) void FairMQPollerZMQ::SetItemEvents(zmq_pollitem_t& item, const int type)
{ {
if (type == ZMQ_REQ || type == ZMQ_REP || type == ZMQ_PAIR || type == ZMQ_DEALER || type == ZMQ_ROUTER) if (type == ZMQ_REQ || type == ZMQ_REP || type == ZMQ_PAIR || type == ZMQ_DEALER || type == ZMQ_ROUTER)

View File

@ -51,8 +51,6 @@ class FairMQPollerZMQ final : public FairMQPoller
~FairMQPollerZMQ() override; ~FairMQPollerZMQ() override;
private: private:
FairMQPollerZMQ(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket);
zmq_pollitem_t* fItems; zmq_pollitem_t* fItems;
int fNumItems; int fNumItems;

View File

@ -382,12 +382,6 @@ void* FairMQSocketZMQ::GetSocket() const
return fSocket; return fSocket;
} }
int FairMQSocketZMQ::GetSocket(int) const
{
// dummy method to comply with the interface. functionality not possible in zeromq.
return -1;
}
void FairMQSocketZMQ::SetOption(const string& option, const void* value, size_t valueSize) void FairMQSocketZMQ::SetOption(const string& option, const void* value, size_t valueSize)
{ {
if (zmq_setsockopt(fSocket, GetConstant(option), value, valueSize) < 0) if (zmq_setsockopt(fSocket, GetConstant(option), value, valueSize) < 0)
@ -414,7 +408,7 @@ void FairMQSocketZMQ::SetLinger(const int value)
int FairMQSocketZMQ::GetLinger() const int FairMQSocketZMQ::GetLinger() const
{ {
int value = 0; int value = 0;
size_t valueSize; size_t valueSize = sizeof(value);
if (zmq_getsockopt(fSocket, ZMQ_LINGER, &value, &valueSize) < 0) { if (zmq_getsockopt(fSocket, ZMQ_LINGER, &value, &valueSize) < 0) {
throw SocketError(tools::ToString("failed getting ZMQ_LINGER, reason: ", zmq_strerror(errno))); throw SocketError(tools::ToString("failed getting ZMQ_LINGER, reason: ", zmq_strerror(errno)));
} }
@ -431,7 +425,7 @@ void FairMQSocketZMQ::SetSndBufSize(const int value)
int FairMQSocketZMQ::GetSndBufSize() const int FairMQSocketZMQ::GetSndBufSize() const
{ {
int value = 0; int value = 0;
size_t valueSize; size_t valueSize = sizeof(value);
if (zmq_getsockopt(fSocket, ZMQ_SNDHWM, &value, &valueSize) < 0) { if (zmq_getsockopt(fSocket, ZMQ_SNDHWM, &value, &valueSize) < 0) {
throw SocketError(tools::ToString("failed getting ZMQ_SNDHWM, reason: ", zmq_strerror(errno))); throw SocketError(tools::ToString("failed getting ZMQ_SNDHWM, reason: ", zmq_strerror(errno)));
} }
@ -448,7 +442,7 @@ void FairMQSocketZMQ::SetRcvBufSize(const int value)
int FairMQSocketZMQ::GetRcvBufSize() const int FairMQSocketZMQ::GetRcvBufSize() const
{ {
int value = 0; int value = 0;
size_t valueSize; size_t valueSize = sizeof(value);
if (zmq_getsockopt(fSocket, ZMQ_RCVHWM, &value, &valueSize) < 0) { if (zmq_getsockopt(fSocket, ZMQ_RCVHWM, &value, &valueSize) < 0) {
throw SocketError(tools::ToString("failed getting ZMQ_RCVHWM, reason: ", zmq_strerror(errno))); throw SocketError(tools::ToString("failed getting ZMQ_RCVHWM, reason: ", zmq_strerror(errno)));
} }
@ -465,7 +459,7 @@ void FairMQSocketZMQ::SetSndKernelSize(const int value)
int FairMQSocketZMQ::GetSndKernelSize() const int FairMQSocketZMQ::GetSndKernelSize() const
{ {
int value = 0; int value = 0;
size_t valueSize; size_t valueSize = sizeof(value);
if (zmq_getsockopt(fSocket, ZMQ_SNDBUF, &value, &valueSize) < 0) { if (zmq_getsockopt(fSocket, ZMQ_SNDBUF, &value, &valueSize) < 0) {
throw SocketError(tools::ToString("failed getting ZMQ_SNDBUF, reason: ", zmq_strerror(errno))); throw SocketError(tools::ToString("failed getting ZMQ_SNDBUF, reason: ", zmq_strerror(errno)));
} }
@ -482,7 +476,7 @@ void FairMQSocketZMQ::SetRcvKernelSize(const int value)
int FairMQSocketZMQ::GetRcvKernelSize() const int FairMQSocketZMQ::GetRcvKernelSize() const
{ {
int value = 0; int value = 0;
size_t valueSize; size_t valueSize = sizeof(value);
if (zmq_getsockopt(fSocket, ZMQ_RCVBUF, &value, &valueSize) < 0) { if (zmq_getsockopt(fSocket, ZMQ_RCVBUF, &value, &valueSize) < 0) {
throw SocketError(tools::ToString("failed getting ZMQ_RCVBUF, reason: ", zmq_strerror(errno))); throw SocketError(tools::ToString("failed getting ZMQ_RCVBUF, reason: ", zmq_strerror(errno)));
} }

View File

@ -38,8 +38,7 @@ class FairMQSocketZMQ final : public FairMQSocket
int64_t TrySend(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) override; int64_t TrySend(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) override;
int64_t TryReceive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) override; int64_t TryReceive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) override;
void* GetSocket() const override; void* GetSocket() const;
int GetSocket(int nothing) const override;
void Close() override; void Close() override;

View File

@ -90,11 +90,6 @@ FairMQPollerPtr FairMQTransportFactoryZMQ::CreatePoller(const unordered_map<stri
return unique_ptr<FairMQPoller>(new FairMQPollerZMQ(channelsMap, channelList)); return unique_ptr<FairMQPoller>(new FairMQPollerZMQ(channelsMap, channelList));
} }
FairMQPollerPtr FairMQTransportFactoryZMQ::CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) const
{
return unique_ptr<FairMQPoller>(new FairMQPollerZMQ(cmdSocket, dataSocket));
}
FairMQUnmanagedRegionPtr FairMQTransportFactoryZMQ::CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback) const FairMQUnmanagedRegionPtr FairMQTransportFactoryZMQ::CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback) const
{ {
return unique_ptr<FairMQUnmanagedRegion>(new FairMQUnmanagedRegionZMQ(size, callback)); return unique_ptr<FairMQUnmanagedRegion>(new FairMQUnmanagedRegionZMQ(size, callback));

View File

@ -44,7 +44,6 @@ class FairMQTransportFactoryZMQ final : public FairMQTransportFactory
FairMQPollerPtr CreatePoller(const std::vector<FairMQChannel>& channels) const override; FairMQPollerPtr CreatePoller(const std::vector<FairMQChannel>& channels) const override;
FairMQPollerPtr CreatePoller(const std::vector<const FairMQChannel*>& channels) const override; FairMQPollerPtr CreatePoller(const std::vector<const FairMQChannel*>& channels) const override;
FairMQPollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const override; FairMQPollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const override;
FairMQPollerPtr CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) const override;
FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback) const override; FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback) const override;