Adding multiple transports support & other fixes:

- Avoid polling when only one input channel is used.
 - Send only handles for shared memory transport.
 - Avoid waiting in the rate logger thread when nothing to log.
 - Hide warnings from generated files
 - Fix #483
This commit is contained in:
Alexey Rybalchenko
2017-01-13 15:53:25 +01:00
committed by Mohammad Al-Turany
parent e53ad151a7
commit c66fd6fe91
39 changed files with 1840 additions and 1189 deletions

View File

@@ -13,11 +13,11 @@
*/
#include <set>
#include <utility> // std::move
#include <boost/algorithm/string.hpp> // join/split
#include "FairMQChannel.h"
#include "FairMQLogger.h"
using namespace std;
@@ -30,18 +30,21 @@ FairMQChannel::FairMQChannel()
, fType("unspecified")
, fMethod("unspecified")
, fAddress("unspecified")
, fTransport("default")
, fSndBufSize(1000)
, fRcvBufSize(1000)
, fSndKernelSize(0)
, fRcvKernelSize(0)
, fRateLogging(1)
, fChannelName("")
, fName("")
, fIsValid(false)
, fPoller(nullptr)
, fCmdSocket(nullptr)
, fChannelCmdSocket(nullptr)
, fTransportType(FairMQ::Transport::DEFAULT)
, fTransportFactory(nullptr)
, fNoBlockFlag(0)
, fSndMoreFlag(0)
, fMultipart(false)
{
}
@@ -50,18 +53,21 @@ FairMQChannel::FairMQChannel(const string& type, const string& method, const str
, fType(type)
, fMethod(method)
, fAddress(address)
, fTransport("default")
, fSndBufSize(1000)
, fRcvBufSize(1000)
, fSndKernelSize(0)
, fRcvKernelSize(0)
, fRateLogging(1)
, fChannelName("")
, fName("")
, fIsValid(false)
, fPoller(nullptr)
, fCmdSocket(nullptr)
, fChannelCmdSocket(nullptr)
, fTransportType(FairMQ::Transport::DEFAULT)
, fTransportFactory(nullptr)
, fNoBlockFlag(0)
, fSndMoreFlag(0)
, fMultipart(false)
{
}
@@ -70,18 +76,21 @@ FairMQChannel::FairMQChannel(const FairMQChannel& chan)
, fType(chan.fType)
, fMethod(chan.fMethod)
, fAddress(chan.fAddress)
, fTransport(chan.fTransport)
, fSndBufSize(chan.fSndBufSize)
, fRcvBufSize(chan.fRcvBufSize)
, fSndKernelSize(chan.fSndKernelSize)
, fRcvKernelSize(chan.fRcvKernelSize)
, fRateLogging(chan.fRateLogging)
, fChannelName(chan.fChannelName)
, fName(chan.fName)
, fIsValid(false)
, fPoller(nullptr)
, fCmdSocket(nullptr)
, fChannelCmdSocket(nullptr)
, fTransportType(FairMQ::Transport::DEFAULT)
, fTransportFactory(nullptr)
, fNoBlockFlag(chan.fNoBlockFlag)
, fSndMoreFlag(chan.fSndMoreFlag)
, fMultipart(chan.fMultipart)
{}
FairMQChannel& FairMQChannel::operator=(const FairMQChannel& chan)
@@ -89,16 +98,18 @@ FairMQChannel& FairMQChannel::operator=(const FairMQChannel& chan)
fType = chan.fType;
fMethod = chan.fMethod;
fAddress = chan.fAddress;
fTransport = chan.fTransport;
fSndBufSize = chan.fSndBufSize;
fRcvBufSize = chan.fRcvBufSize;
fSndKernelSize = chan.fSndKernelSize;
fRcvKernelSize = chan.fRcvKernelSize;
fRateLogging = chan.fRateLogging;
fSocket = nullptr;
fChannelName = chan.fChannelName;
fName = chan.fName;
fIsValid = false;
fPoller = nullptr;
fCmdSocket = nullptr;
fChannelCmdSocket = nullptr;
fTransportType = FairMQ::Transport::DEFAULT;
fTransportFactory = nullptr;
fNoBlockFlag = chan.fNoBlockFlag;
fSndMoreFlag = chan.fSndMoreFlag;
@@ -108,13 +119,13 @@ FairMQChannel& FairMQChannel::operator=(const FairMQChannel& chan)
string FairMQChannel::GetChannelName() const
{
return fChannelName;
return fName;
}
string FairMQChannel::GetChannelPrefix() const
{
string prefix = fChannelName;
return prefix.erase(fChannelName.rfind("["));
string prefix = fName;
return prefix.erase(fName.rfind("["));
}
string FairMQChannel::GetType() const
@@ -159,6 +170,20 @@ string FairMQChannel::GetAddress() const
}
}
string FairMQChannel::GetTransport() const
{
try
{
unique_lock<mutex> lock(fChannelMutex);
return fTransport;
}
catch (exception& e)
{
LOG(ERROR) << "Exception caught in FairMQChannel::GetTransport: " << e.what();
exit(EXIT_FAILURE);
}
}
int FairMQChannel::GetSndBufSize() const
{
try
@@ -274,6 +299,21 @@ void FairMQChannel::UpdateAddress(const string& address)
}
}
void FairMQChannel::UpdateTransport(const string& transport)
{
try
{
unique_lock<mutex> lock(fChannelMutex);
fIsValid = false;
fTransport = transport;
}
catch (exception& e)
{
LOG(ERROR) << "Exception caught in FairMQChannel::UpdateTransport: " << e.what();
exit(EXIT_FAILURE);
}
}
void FairMQChannel::UpdateSndBufSize(const int sndBufSize)
{
try
@@ -370,7 +410,7 @@ bool FairMQChannel::ValidateChannel()
unique_lock<mutex> lock(fChannelMutex);
stringstream ss;
ss << "Validating channel \"" << fChannelName << "\"... ";
ss << "Validating channel \"" << fName << "\"... ";
if (fIsValid)
{
@@ -461,6 +501,17 @@ bool FairMQChannel::ValidateChannel()
}
}
// validate channel transport
// const string channelTransportNames[] = { "default", "zeromq", "nanomsg", "shmem" };
// const set<string> channelTransports(channelTransportNames, channelTransportNames + sizeof(channelTransportNames) / sizeof(string));
if (FairMQ::TransportTypes.find(fTransport) == FairMQ::TransportTypes.end())
{
ss << "INVALID";
LOG(DEBUG) << ss.str();
LOG(ERROR) << "Invalid channel transport: \"" << fTransport << "\"";
exit(EXIT_FAILURE);
}
// validate socket buffer size for sending
if (fSndBufSize < 0)
{
@@ -518,19 +569,23 @@ bool FairMQChannel::ValidateChannel()
}
}
bool FairMQChannel::InitCommandInterface(shared_ptr<FairMQTransportFactory> factory, int numIoThreads)
void FairMQChannel::InitTransport(shared_ptr<FairMQTransportFactory> factory)
{
fTransportFactory = factory;
fTransportType = factory->GetType();
}
fCmdSocket = fTransportFactory->CreateSocket("sub", "device-commands", numIoThreads, "internal");
if (fCmdSocket)
bool FairMQChannel::InitCommandInterface(int numIoThreads)
{
fChannelCmdSocket = fTransportFactory->CreateSocket("sub", "device-commands", numIoThreads, "internal");
if (fChannelCmdSocket)
{
fCmdSocket->Connect("inproc://commands");
fChannelCmdSocket->Connect("inproc://commands");
fNoBlockFlag = fCmdSocket->NOBLOCK;
fSndMoreFlag = fCmdSocket->SNDMORE;
fNoBlockFlag = fChannelCmdSocket->NOBLOCK;
fSndMoreFlag = fChannelCmdSocket->SNDMORE;
fPoller = fTransportFactory->CreatePoller(*fCmdSocket, *fSocket);
fPoller = fTransportFactory->CreatePoller(*fChannelCmdSocket, *fSocket);
return true;
}
@@ -547,7 +602,19 @@ void FairMQChannel::ResetChannel()
// TODO: implement channel resetting
}
int FairMQChannel::Send(const unique_ptr<FairMQMessage>& msg, int sndTimeoutInMs) const
int FairMQChannel::Send(unique_ptr<FairMQMessage>& msg) const
{
CheckCompatibility(msg);
return fSocket->Send(msg);
}
int FairMQChannel::Receive(unique_ptr<FairMQMessage>& msg) const
{
CheckCompatibility(msg);
return fSocket->Receive(msg);
}
int FairMQChannel::Send(unique_ptr<FairMQMessage>& msg, int sndTimeoutInMs) const
{
fPoller->Poll(sndTimeoutInMs);
@@ -562,13 +629,13 @@ int FairMQChannel::Send(const unique_ptr<FairMQMessage>& msg, int sndTimeoutInMs
if (fPoller->CheckOutput(1))
{
return fSocket->Send(msg.get(), 0);
return Send(msg);
}
return -2;
}
int FairMQChannel::Receive(const unique_ptr<FairMQMessage>& msg, int rcvTimeoutInMs) const
int FairMQChannel::Receive(unique_ptr<FairMQMessage>& msg, int rcvTimeoutInMs) const
{
fPoller->Poll(rcvTimeoutInMs);
@@ -583,13 +650,37 @@ int FairMQChannel::Receive(const unique_ptr<FairMQMessage>& msg, int rcvTimeoutI
if (fPoller->CheckInput(1))
{
return fSocket->Receive(msg.get(), 0);
return Receive(msg);
}
return -2;
}
int64_t FairMQChannel::Send(const vector<unique_ptr<FairMQMessage>>& msgVec, int sndTimeoutInMs) const
int FairMQChannel::SendAsync(unique_ptr<FairMQMessage>& msg) const
{
CheckCompatibility(msg);
return fSocket->Send(msg, fNoBlockFlag);
}
int FairMQChannel::ReceiveAsync(unique_ptr<FairMQMessage>& msg) const
{
CheckCompatibility(msg);
return fSocket->Receive(msg, fNoBlockFlag);
}
int64_t FairMQChannel::Send(vector<unique_ptr<FairMQMessage>>& msgVec) const
{
CheckCompatibility(msgVec);
return fSocket->Send(msgVec);
}
int64_t FairMQChannel::Receive(vector<unique_ptr<FairMQMessage>>& msgVec) const
{
CheckCompatibility(msgVec);
return fSocket->Receive(msgVec);
}
int64_t FairMQChannel::Send(vector<unique_ptr<FairMQMessage>>& msgVec, int sndTimeoutInMs) const
{
fPoller->Poll(sndTimeoutInMs);
@@ -604,7 +695,7 @@ int64_t FairMQChannel::Send(const vector<unique_ptr<FairMQMessage>>& msgVec, int
if (fPoller->CheckOutput(1))
{
return fSocket->Send(msgVec);
return Send(msgVec);
}
return -2;
@@ -625,155 +716,33 @@ int64_t FairMQChannel::Receive(vector<unique_ptr<FairMQMessage>>& msgVec, int rc
if (fPoller->CheckInput(1))
{
return fSocket->Receive(msgVec);
return Receive(msgVec);
}
return -2;
}
int FairMQChannel::Send(FairMQMessage* msg, const string& flag, int sndTimeoutInMs) const
int64_t FairMQChannel::SendAsync(vector<unique_ptr<FairMQMessage>>& msgVec) const
{
if (flag == "")
{
fPoller->Poll(sndTimeoutInMs);
if (fPoller->CheckInput(0))
{
HandleUnblock();
if (fInterrupted)
{
return -2;
}
}
if (fPoller->CheckOutput(1))
{
return fSocket->Send(msg, flag);
}
return -2;
}
else
{
return fSocket->Send(msg, flag);
}
CheckCompatibility(msgVec);
return fSocket->Send(msgVec, fNoBlockFlag);
}
int FairMQChannel::Send(FairMQMessage* msg, const int flags, int sndTimeoutInMs) const
/// Receives a vector of messages in non-blocking mode.
///
/// @param msgVec message vector reference
/// @return Number of bytes that have been received. If queue is empty, returns -2.
/// In case of errors, returns -1.
int64_t FairMQChannel::ReceiveAsync(vector<unique_ptr<FairMQMessage>>& msgVec) const
{
if (flags == 0)
{
fPoller->Poll(sndTimeoutInMs);
if (fPoller->CheckInput(0))
{
HandleUnblock();
if (fInterrupted)
{
return -2;
}
}
if (fPoller->CheckOutput(1))
{
return fSocket->Send(msg, flags);
}
return -2;
}
else
{
return fSocket->Send(msg, flags);
}
}
int FairMQChannel::Receive(FairMQMessage* msg, const string& flag, int rcvTimeoutInMs) const
{
if (flag == "")
{
fPoller->Poll(rcvTimeoutInMs);
if (fPoller->CheckInput(0))
{
HandleUnblock();
if (fInterrupted)
{
return -2;
}
}
if (fPoller->CheckInput(1))
{
return fSocket->Receive(msg, flag);
}
return -2;
}
else
{
return fSocket->Receive(msg, flag);
}
}
int FairMQChannel::Receive(FairMQMessage* msg, const int flags, int rcvTimeoutInMs) const
{
if (flags == 0)
{
fPoller->Poll(rcvTimeoutInMs);
if (fPoller->CheckInput(0))
{
HandleUnblock();
if (fInterrupted)
{
return -2;
}
}
if (fPoller->CheckInput(1))
{
return fSocket->Receive(msg, flags);
}
return -2;
}
else
{
return fSocket->Receive(msg, flags);
}
}
bool FairMQChannel::ExpectsAnotherPart() const
{
int64_t more = 0;
size_t more_size = sizeof more;
if (fSocket)
{
fSocket->GetOption("rcv-more", &more, &more_size);
if (more)
{
return true;
}
else
{
return false;
}
}
else
{
return false;
}
CheckCompatibility(msgVec);
return fSocket->Receive(msgVec, fNoBlockFlag);
}
inline bool FairMQChannel::HandleUnblock() const
{
FairMQMessagePtr cmd(fTransportFactory->CreateMessage());
if (fCmdSocket->Receive(cmd.get(), 0) >= 0)
if (fChannelCmdSocket->Receive(cmd) >= 0)
{
// LOG(DEBUG) << "unblocked";
}
@@ -788,3 +757,51 @@ void FairMQChannel::Tokenize(vector<string>& output, const string& input, const
{
boost::algorithm::split(output, input, boost::algorithm::is_any_of(delimiters));
}
FairMQTransportFactory* FairMQChannel::Transport()
{
return fTransportFactory.get();
}
bool FairMQChannel::CheckCompatibility(unique_ptr<FairMQMessage>& msg) const
{
if (fTransportType == msg->GetType())
{
return true;
}
else
{
// LOG(WARN) << "Channel type does not match message type. Copying...";
FairMQMessagePtr msgCopy(fTransportFactory->CreateMessage(msg->GetSize()));
memcpy(msgCopy->GetData(), msg->GetData(), msg->GetSize());
msg = move(msgCopy);
return false;
}
}
bool FairMQChannel::CheckCompatibility(vector<unique_ptr<FairMQMessage>>& msgVec) const
{
if (msgVec.size() > 0)
{
if (fTransportType == msgVec.at(0)->GetType())
{
return true;
}
else
{
// LOG(WARN) << "Channel type does not match message type. Copying...";
vector<unique_ptr<FairMQMessage>> tempVec;
for (unsigned int i = 0; i < msgVec.size(); ++i)
{
tempVec.push_back(fTransportFactory->CreateMessage(msgVec[i]->GetSize()));
memcpy(tempVec[i]->GetData(), msgVec[i]->GetData(), msgVec[i]->GetSize());
}
msgVec = move(tempVec);
return false;
}
}
else
{
return true;
}
}