mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 16:46:47 +00:00
Fix the type mismatch in the multi-channel poller
This commit is contained in:
parent
19afacb504
commit
0b11ad9274
|
@ -17,6 +17,7 @@
|
||||||
|
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
#include <unordered_map>
|
||||||
|
|
||||||
#include "FairMQMessage.h"
|
#include "FairMQMessage.h"
|
||||||
#include "FairMQChannel.h"
|
#include "FairMQChannel.h"
|
||||||
|
@ -36,7 +37,7 @@ class FairMQTransportFactory
|
||||||
virtual FairMQSocket* CreateSocket(const std::string& type, const std::string& name, int numIoThreads) = 0;
|
virtual FairMQSocket* CreateSocket(const std::string& type, const std::string& name, int numIoThreads) = 0;
|
||||||
|
|
||||||
virtual FairMQPoller* CreatePoller(const std::vector<FairMQChannel>& channels) = 0;
|
virtual FairMQPoller* CreatePoller(const std::vector<FairMQChannel>& channels) = 0;
|
||||||
virtual FairMQPoller* CreatePoller(std::map<std::string, std::vector<FairMQChannel>>& channelsMap, std::initializer_list<std::string> channelList) = 0;
|
virtual FairMQPoller* CreatePoller(std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, std::initializer_list<std::string> channelList) = 0;
|
||||||
virtual FairMQPoller* CreatePoller(FairMQSocket& cmdSocket, FairMQSocket& dataSocket) = 0;
|
virtual FairMQPoller* CreatePoller(FairMQSocket& cmdSocket, FairMQSocket& dataSocket) = 0;
|
||||||
|
|
||||||
virtual ~FairMQTransportFactory() {};
|
virtual ~FairMQTransportFactory() {};
|
||||||
|
|
|
@ -34,11 +34,32 @@ FairMQPollerNN::FairMQPollerNN(const vector<FairMQChannel>& channels)
|
||||||
for (int i = 0; i < fNumItems; ++i)
|
for (int i = 0; i < fNumItems; ++i)
|
||||||
{
|
{
|
||||||
items[i].fd = channels.at(i).fSocket->GetSocket(1);
|
items[i].fd = channels.at(i).fSocket->GetSocket(1);
|
||||||
|
|
||||||
|
int type = 0;
|
||||||
|
size_t sz = sizeof(type);
|
||||||
|
nn_getsockopt(channels.at(i).fSocket->GetSocket(1), NN_SOL_SOCKET, NN_PROTOCOL, &type, &sz);
|
||||||
|
|
||||||
|
if (type == NN_REQ || type == NN_REP || type == NN_PAIR)
|
||||||
|
{
|
||||||
|
items[i].events = NN_POLLIN|NN_POLLOUT;
|
||||||
|
}
|
||||||
|
else if (type == NN_PUSH || type == NN_PUB)
|
||||||
|
{
|
||||||
|
items[i].events = NN_POLLOUT;
|
||||||
|
}
|
||||||
|
else if (type == NN_PULL || type == NN_SUB)
|
||||||
|
{
|
||||||
items[i].events = NN_POLLIN;
|
items[i].events = NN_POLLIN;
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
LOG(ERROR) << "invalid poller configuration, exiting.";
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
FairMQPollerNN::FairMQPollerNN(map<string, vector<FairMQChannel>>& channelsMap, initializer_list<string> channelList)
|
FairMQPollerNN::FairMQPollerNN(unordered_map<string, vector<FairMQChannel>>& channelsMap, initializer_list<string> channelList)
|
||||||
: items()
|
: items()
|
||||||
, fNumItems(0)
|
, fNumItems(0)
|
||||||
, fOffsetMap()
|
, fOffsetMap()
|
||||||
|
@ -64,8 +85,29 @@ FairMQPollerNN::FairMQPollerNN(map<string, vector<FairMQChannel>>& channelsMap,
|
||||||
{
|
{
|
||||||
index = fOffsetMap[channel] + i;
|
index = fOffsetMap[channel] + i;
|
||||||
items[index].fd = channelsMap.at(channel).at(i).fSocket->GetSocket(1);
|
items[index].fd = channelsMap.at(channel).at(i).fSocket->GetSocket(1);
|
||||||
|
|
||||||
|
int type = 0;
|
||||||
|
size_t sz = sizeof(type);
|
||||||
|
nn_getsockopt(channelsMap.at(channel).at(i).fSocket->GetSocket(1), NN_SOL_SOCKET, NN_PROTOCOL, &type, &sz);
|
||||||
|
|
||||||
|
if (type == NN_REQ || type == NN_REP || type == NN_PAIR)
|
||||||
|
{
|
||||||
|
items[index].events = NN_POLLIN|NN_POLLOUT;
|
||||||
|
}
|
||||||
|
else if (type == NN_PUSH || type == NN_PUB)
|
||||||
|
{
|
||||||
|
items[index].events = NN_POLLOUT;
|
||||||
|
}
|
||||||
|
else if (type == NN_PULL || type == NN_SUB)
|
||||||
|
{
|
||||||
items[index].events = NN_POLLIN;
|
items[index].events = NN_POLLIN;
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
LOG(ERROR) << "invalid poller configuration, exiting.";
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (const std::out_of_range& oor)
|
catch (const std::out_of_range& oor)
|
||||||
|
|
|
@ -32,7 +32,7 @@ class FairMQPollerNN : public FairMQPoller
|
||||||
|
|
||||||
public:
|
public:
|
||||||
FairMQPollerNN(const std::vector<FairMQChannel>& channels);
|
FairMQPollerNN(const std::vector<FairMQChannel>& channels);
|
||||||
FairMQPollerNN(std::map<std::string, std::vector<FairMQChannel>>& channelsMap, std::initializer_list<std::string> channelList);
|
FairMQPollerNN(std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, std::initializer_list<std::string> channelList);
|
||||||
|
|
||||||
virtual void Poll(const int timeout);
|
virtual void Poll(const int timeout);
|
||||||
virtual bool CheckInput(const int index);
|
virtual bool CheckInput(const int index);
|
||||||
|
|
|
@ -46,7 +46,7 @@ FairMQPoller* FairMQTransportFactoryNN::CreatePoller(const vector<FairMQChannel>
|
||||||
return new FairMQPollerNN(channels);
|
return new FairMQPollerNN(channels);
|
||||||
}
|
}
|
||||||
|
|
||||||
FairMQPoller* FairMQTransportFactoryNN::CreatePoller(std::map<std::string, std::vector<FairMQChannel>>& channelsMap, std::initializer_list<std::string> channelList)
|
FairMQPoller* FairMQTransportFactoryNN::CreatePoller(std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, std::initializer_list<std::string> channelList)
|
||||||
{
|
{
|
||||||
return new FairMQPollerNN(channelsMap, channelList);
|
return new FairMQPollerNN(channelsMap, channelList);
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,7 +34,7 @@ class FairMQTransportFactoryNN : public FairMQTransportFactory
|
||||||
virtual FairMQSocket* CreateSocket(const std::string& type, const std::string& name, int numIoThreads);
|
virtual FairMQSocket* CreateSocket(const std::string& type, const std::string& name, int numIoThreads);
|
||||||
|
|
||||||
virtual FairMQPoller* CreatePoller(const std::vector<FairMQChannel>& channels);
|
virtual FairMQPoller* CreatePoller(const std::vector<FairMQChannel>& channels);
|
||||||
virtual FairMQPoller* CreatePoller(std::map<std::string, std::vector<FairMQChannel>>& channelsMap, std::initializer_list<std::string> channelList);
|
virtual FairMQPoller* CreatePoller(std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, std::initializer_list<std::string> channelList);
|
||||||
virtual FairMQPoller* CreatePoller(FairMQSocket& cmdSocket, FairMQSocket& dataSocket);
|
virtual FairMQPoller* CreatePoller(FairMQSocket& cmdSocket, FairMQSocket& dataSocket);
|
||||||
|
|
||||||
virtual ~FairMQTransportFactoryNN() {};
|
virtual ~FairMQTransportFactoryNN() {};
|
||||||
|
|
|
@ -31,12 +31,33 @@ FairMQPollerZMQ::FairMQPollerZMQ(const vector<FairMQChannel>& channels)
|
||||||
{
|
{
|
||||||
items[i].socket = channels.at(i).fSocket->GetSocket();
|
items[i].socket = channels.at(i).fSocket->GetSocket();
|
||||||
items[i].fd = 0;
|
items[i].fd = 0;
|
||||||
items[i].events = ZMQ_POLLIN;
|
|
||||||
items[i].revents = 0;
|
items[i].revents = 0;
|
||||||
|
|
||||||
|
int type = 0;
|
||||||
|
size_t size = sizeof(type);
|
||||||
|
zmq_getsockopt (channels.at(i).fSocket->GetSocket(), ZMQ_TYPE, &type, &size);
|
||||||
|
|
||||||
|
if (type == ZMQ_REQ || type == ZMQ_REP || type == ZMQ_PAIR || type == ZMQ_DEALER || type == ZMQ_ROUTER)
|
||||||
|
{
|
||||||
|
items[i].events = ZMQ_POLLIN|ZMQ_POLLOUT;
|
||||||
|
}
|
||||||
|
else if (type == ZMQ_PUSH || type == ZMQ_PUB || type == ZMQ_XPUB)
|
||||||
|
{
|
||||||
|
items[i].events = ZMQ_POLLOUT;
|
||||||
|
}
|
||||||
|
else if (type == ZMQ_PULL || type == ZMQ_SUB || type == ZMQ_XSUB)
|
||||||
|
{
|
||||||
|
items[i].events = ZMQ_POLLIN;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
LOG(ERROR) << "invalid poller configuration, exiting.";
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
FairMQPollerZMQ::FairMQPollerZMQ(map<string, vector<FairMQChannel>>& channelsMap, initializer_list<string> channelList)
|
FairMQPollerZMQ::FairMQPollerZMQ(unordered_map<string, vector<FairMQChannel>>& channelsMap, initializer_list<string> channelList)
|
||||||
: items()
|
: items()
|
||||||
, fNumItems(0)
|
, fNumItems(0)
|
||||||
, fOffsetMap()
|
, fOffsetMap()
|
||||||
|
@ -61,10 +82,32 @@ FairMQPollerZMQ::FairMQPollerZMQ(map<string, vector<FairMQChannel>>& channelsMap
|
||||||
for (int i = 0; i < channelsMap.at(channel).size(); ++i)
|
for (int i = 0; i < channelsMap.at(channel).size(); ++i)
|
||||||
{
|
{
|
||||||
index = fOffsetMap[channel] + i;
|
index = fOffsetMap[channel] + i;
|
||||||
|
|
||||||
items[index].socket = channelsMap.at(channel).at(i).fSocket->GetSocket();
|
items[index].socket = channelsMap.at(channel).at(i).fSocket->GetSocket();
|
||||||
items[index].fd = 0;
|
items[index].fd = 0;
|
||||||
items[index].events = ZMQ_POLLIN;
|
|
||||||
items[index].revents = 0;
|
items[index].revents = 0;
|
||||||
|
|
||||||
|
int type = 0;
|
||||||
|
size_t size = sizeof(type);
|
||||||
|
zmq_getsockopt (channelsMap.at(channel).at(i).fSocket->GetSocket(), ZMQ_TYPE, &type, &size);
|
||||||
|
|
||||||
|
if (type == ZMQ_REQ || type == ZMQ_REP || type == ZMQ_PAIR || type == ZMQ_DEALER || type == ZMQ_ROUTER)
|
||||||
|
{
|
||||||
|
items[index].events = ZMQ_POLLIN|ZMQ_POLLOUT;
|
||||||
|
}
|
||||||
|
else if (type == ZMQ_PUSH || type == ZMQ_PUB || type == ZMQ_XPUB)
|
||||||
|
{
|
||||||
|
items[index].events = ZMQ_POLLOUT;
|
||||||
|
}
|
||||||
|
else if (type == ZMQ_PULL || type == ZMQ_SUB || type == ZMQ_XSUB)
|
||||||
|
{
|
||||||
|
items[index].events = ZMQ_POLLIN;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
LOG(ERROR) << "invalid poller configuration, exiting.";
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,7 +32,7 @@ class FairMQPollerZMQ : public FairMQPoller
|
||||||
|
|
||||||
public:
|
public:
|
||||||
FairMQPollerZMQ(const std::vector<FairMQChannel>& channels);
|
FairMQPollerZMQ(const std::vector<FairMQChannel>& channels);
|
||||||
FairMQPollerZMQ(std::map<std::string, std::vector<FairMQChannel>>& channelsMap, std::initializer_list<std::string> channelList);
|
FairMQPollerZMQ(std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, std::initializer_list<std::string> channelList);
|
||||||
|
|
||||||
virtual void Poll(const int timeout);
|
virtual void Poll(const int timeout);
|
||||||
virtual bool CheckInput(const int index);
|
virtual bool CheckInput(const int index);
|
||||||
|
|
|
@ -50,7 +50,7 @@ FairMQPoller* FairMQTransportFactoryZMQ::CreatePoller(const vector<FairMQChannel
|
||||||
return new FairMQPollerZMQ(channels);
|
return new FairMQPollerZMQ(channels);
|
||||||
}
|
}
|
||||||
|
|
||||||
FairMQPoller* FairMQTransportFactoryZMQ::CreatePoller(std::map<std::string, std::vector<FairMQChannel>>& channelsMap, std::initializer_list<std::string> channelList)
|
FairMQPoller* FairMQTransportFactoryZMQ::CreatePoller(std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, std::initializer_list<std::string> channelList)
|
||||||
{
|
{
|
||||||
return new FairMQPollerZMQ(channelsMap, channelList);
|
return new FairMQPollerZMQ(channelsMap, channelList);
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,7 +35,7 @@ class FairMQTransportFactoryZMQ : public FairMQTransportFactory
|
||||||
virtual FairMQSocket* CreateSocket(const std::string& type, const std::string& name, int numIoThreads);
|
virtual FairMQSocket* CreateSocket(const std::string& type, const std::string& name, int numIoThreads);
|
||||||
|
|
||||||
virtual FairMQPoller* CreatePoller(const std::vector<FairMQChannel>& channels);
|
virtual FairMQPoller* CreatePoller(const std::vector<FairMQChannel>& channels);
|
||||||
virtual FairMQPoller* CreatePoller(std::map<std::string, std::vector<FairMQChannel>>& channelsMap, std::initializer_list<std::string> channelList);
|
virtual FairMQPoller* CreatePoller(std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, std::initializer_list<std::string> channelList);
|
||||||
virtual FairMQPoller* CreatePoller(FairMQSocket& cmdSocket, FairMQSocket& dataSocket);
|
virtual FairMQPoller* CreatePoller(FairMQSocket& cmdSocket, FairMQSocket& dataSocket);
|
||||||
|
|
||||||
virtual ~FairMQTransportFactoryZMQ() {};
|
virtual ~FairMQTransportFactoryZMQ() {};
|
||||||
|
|
Loading…
Reference in New Issue
Block a user