Handle Receive differently when switching transports

No need for buffer+size message on Receive.
This commit is contained in:
Alexey Rybalchenko 2018-06-11 15:51:34 +02:00 committed by Mohammad Al-Turany
parent 653e82cab4
commit 59b04a1a64
2 changed files with 64 additions and 38 deletions

View File

@ -653,13 +653,13 @@ void FairMQChannel::ResetChannel()
int FairMQChannel::Send(unique_ptr<FairMQMessage>& msg) const int FairMQChannel::Send(unique_ptr<FairMQMessage>& msg) const
{ {
CheckCompatibility(msg); CheckSendCompatibility(msg);
return fSocket->Send(msg); return fSocket->Send(msg);
} }
int FairMQChannel::Receive(unique_ptr<FairMQMessage>& msg) const int FairMQChannel::Receive(unique_ptr<FairMQMessage>& msg) const
{ {
CheckCompatibility(msg); CheckReceiveCompatibility(msg);
return fSocket->Receive(msg); return fSocket->Receive(msg);
} }
@ -675,25 +675,25 @@ int FairMQChannel::Receive(unique_ptr<FairMQMessage>& msg, int rcvTimeoutInMs) c
int FairMQChannel::SendAsync(unique_ptr<FairMQMessage>& msg) const int FairMQChannel::SendAsync(unique_ptr<FairMQMessage>& msg) const
{ {
CheckCompatibility(msg); CheckSendCompatibility(msg);
return fSocket->TrySend(msg); return fSocket->TrySend(msg);
} }
int FairMQChannel::ReceiveAsync(unique_ptr<FairMQMessage>& msg) const int FairMQChannel::ReceiveAsync(unique_ptr<FairMQMessage>& msg) const
{ {
CheckCompatibility(msg); CheckReceiveCompatibility(msg);
return fSocket->TryReceive(msg); return fSocket->TryReceive(msg);
} }
int64_t FairMQChannel::Send(vector<unique_ptr<FairMQMessage>>& msgVec) const int64_t FairMQChannel::Send(vector<unique_ptr<FairMQMessage>>& msgVec) const
{ {
CheckCompatibility(msgVec); CheckSendCompatibility(msgVec);
return fSocket->Send(msgVec); return fSocket->Send(msgVec);
} }
int64_t FairMQChannel::Receive(vector<unique_ptr<FairMQMessage>>& msgVec) const int64_t FairMQChannel::Receive(vector<unique_ptr<FairMQMessage>>& msgVec) const
{ {
CheckCompatibility(msgVec); CheckReceiveCompatibility(msgVec);
return fSocket->Receive(msgVec); return fSocket->Receive(msgVec);
} }
@ -709,7 +709,7 @@ int64_t FairMQChannel::Receive(vector<unique_ptr<FairMQMessage>>& msgVec, int rc
int64_t FairMQChannel::SendAsync(vector<unique_ptr<FairMQMessage>>& msgVec) const int64_t FairMQChannel::SendAsync(vector<unique_ptr<FairMQMessage>>& msgVec) const
{ {
CheckCompatibility(msgVec); CheckSendCompatibility(msgVec);
return fSocket->TrySend(msgVec); return fSocket->TrySend(msgVec);
} }
@ -720,7 +720,7 @@ int64_t FairMQChannel::SendAsync(vector<unique_ptr<FairMQMessage>>& msgVec) cons
/// In case of errors, returns -1. /// In case of errors, returns -1.
int64_t FairMQChannel::ReceiveAsync(vector<unique_ptr<FairMQMessage>>& msgVec) const int64_t FairMQChannel::ReceiveAsync(vector<unique_ptr<FairMQMessage>>& msgVec) const
{ {
CheckCompatibility(msgVec); CheckReceiveCompatibility(msgVec);
return fSocket->TryReceive(msgVec); return fSocket->TryReceive(msgVec);
} }
@ -748,12 +748,12 @@ unsigned long FairMQChannel::GetMessagesRx() const
return fSocket->GetMessagesRx(); return fSocket->GetMessagesRx();
} }
void FairMQChannel::CheckCompatibility(unique_ptr<FairMQMessage>& msg) const void FairMQChannel::CheckSendCompatibility(FairMQMessagePtr& msg) const
{ {
if (fTransportType != msg->GetType()) if (fTransportType != msg->GetType())
{ {
// LOG(debug) << "Channel type does not match message type. Creating wrapper"; // LOG(debug) << "Channel type does not match message type. Creating wrapper";
FairMQMessagePtr msgWrapper(fTransportFactory->CreateMessage(msg->GetData(), FairMQMessagePtr msgWrapper(NewMessage(msg->GetData(),
msg->GetSize(), msg->GetSize(),
[](void* /*data*/, void* msg) { delete static_cast<FairMQMessage*>(msg); }, [](void* /*data*/, void* msg) { delete static_cast<FairMQMessage*>(msg); },
msg.get() msg.get()
@ -763,20 +763,43 @@ void FairMQChannel::CheckCompatibility(unique_ptr<FairMQMessage>& msg) const
} }
} }
void FairMQChannel::CheckCompatibility(vector<FairMQMessagePtr>& msgVec) const void FairMQChannel::CheckSendCompatibility(vector<FairMQMessagePtr>& msgVec) const
{ {
for (auto& part : msgVec) for (auto& msg : msgVec)
{ {
if (fTransportType != part->GetType()) if (fTransportType != msg->GetType())
{ {
// LOG(debug) << "Channel type does not match message type. Creating wrapper"; // LOG(debug) << "Channel type does not match message type. Creating wrapper";
FairMQMessagePtr partWrapper(fTransportFactory->CreateMessage(part->GetData(), FairMQMessagePtr msgWrapper(NewMessage(msg->GetData(),
part->GetSize(), msg->GetSize(),
[](void* /*data*/, void* part) { delete static_cast<FairMQMessage*>(part); }, [](void* /*data*/, void* msg) { delete static_cast<FairMQMessage*>(msg); },
part.get() msg.get()
)); ));
part.release(); msg.release();
part = move(partWrapper); msg = move(msgWrapper);
}
}
}
void FairMQChannel::CheckReceiveCompatibility(FairMQMessagePtr& msg) const
{
if (fTransportType != msg->GetType())
{
// LOG(debug) << "Channel type does not match message type. Creating wrapper";
FairMQMessagePtr newMsg(NewMessage());
msg = move(newMsg);
}
}
void FairMQChannel::CheckReceiveCompatibility(vector<FairMQMessagePtr>& msgVec) const
{
for (auto& msg : msgVec)
{
if (fTransportType != msg->GetType())
{
// LOG(debug) << "Channel type does not match message type. Creating wrapper";
FairMQMessagePtr newMsg(NewMessage());
msg = move(newMsg);
} }
} }
} }

View File

@ -20,6 +20,7 @@
#include <fairmq/Transports.h> #include <fairmq/Transports.h>
#include <FairMQLogger.h> #include <FairMQLogger.h>
#include <FairMQParts.h> #include <FairMQParts.h>
#include <FairMQMessage.h>
class FairMQChannel class FairMQChannel
{ {
@ -165,8 +166,8 @@ class FairMQChannel
/// Resets the channel (requires validation to be used again). /// Resets the channel (requires validation to be used again).
void ResetChannel(); void ResetChannel();
int Send(std::unique_ptr<FairMQMessage>& msg) const; int Send(FairMQMessagePtr& msg) const;
int Receive(std::unique_ptr<FairMQMessage>& msg) const; int Receive(FairMQMessagePtr& msg) const;
/// Sends a message to the socket queue. /// Sends a message to the socket queue.
/// @details Send method attempts to send a message by /// @details Send method attempts to send a message by
@ -176,7 +177,7 @@ class FairMQChannel
/// @param msg Constant reference of unique_ptr to a FairMQMessage /// @param msg Constant reference of unique_ptr to a FairMQMessage
/// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. /// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out.
/// In case of errors, returns -1. /// In case of errors, returns -1.
int Send(std::unique_ptr<FairMQMessage>& msg, int sndTimeoutInMs) const; int Send(FairMQMessagePtr& msg, int sndTimeoutInMs) const;
/// Receives a message from the socket queue. /// Receives a message from the socket queue.
/// @details Receive method attempts to receive a message from the input queue. /// @details Receive method attempts to receive a message from the input queue.
@ -185,7 +186,7 @@ class FairMQChannel
/// @param msg Constant reference of unique_ptr to a FairMQMessage /// @param msg Constant reference of unique_ptr to a FairMQMessage
/// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out. /// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out.
/// In case of errors, returns -1. /// In case of errors, returns -1.
int Receive(std::unique_ptr<FairMQMessage>& msg, int rcvTimeoutInMs) const; int Receive(FairMQMessagePtr& msg, int rcvTimeoutInMs) const;
/// Sends a message in non-blocking mode. /// Sends a message in non-blocking mode.
/// @details SendAsync method attempts to send a message without blocking by /// @details SendAsync method attempts to send a message without blocking by
@ -195,31 +196,31 @@ class FairMQChannel
/// @return Number of bytes that have been queued. If queueing failed due to /// @return Number of bytes that have been queued. If queueing failed due to
/// full queue or no connected peers (when binding), returns -2. /// full queue or no connected peers (when binding), returns -2.
/// In case of errors, returns -1. /// In case of errors, returns -1.
int SendAsync(std::unique_ptr<FairMQMessage>& msg) const; int SendAsync(FairMQMessagePtr& msg) const;
/// Receives a message in non-blocking mode. /// Receives a message in non-blocking mode.
/// ///
/// @param msg Constant reference of unique_ptr to a FairMQMessage /// @param msg Constant reference of unique_ptr to a FairMQMessage
/// @return Number of bytes that have been received. If queue is empty, returns -2. /// @return Number of bytes that have been received. If queue is empty, returns -2.
/// In case of errors, returns -1. /// In case of errors, returns -1.
int ReceiveAsync(std::unique_ptr<FairMQMessage>& msg) const; int ReceiveAsync(FairMQMessagePtr& msg) const;
int64_t Send(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) const; int64_t Send(std::vector<FairMQMessagePtr>& msgVec) const;
int64_t Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) const; int64_t Receive(std::vector<FairMQMessagePtr>& msgVec) const;
/// Send a vector of messages /// Send a vector of messages
/// ///
/// @param msgVec message vector reference /// @param msgVec message vector reference
/// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. /// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out.
/// In case of errors, returns -1. /// In case of errors, returns -1.
int64_t Send(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, int sndTimeoutInMs) const; int64_t Send(std::vector<FairMQMessagePtr>& msgVec, int sndTimeoutInMs) const;
/// Receive a vector of messages /// Receive a vector of messages
/// ///
/// @param msgVec message vector reference /// @param msgVec message vector reference
/// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out. /// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out.
/// In case of errors, returns -1. /// In case of errors, returns -1.
int64_t Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, int rcvTimeoutInMs) const; int64_t Receive(std::vector<FairMQMessagePtr>& msgVec, int rcvTimeoutInMs) const;
/// Sends a vector of message in non-blocking mode. /// Sends a vector of message in non-blocking mode.
/// @details SendAsync method attempts to send a vector of messages without blocking by /// @details SendAsync method attempts to send a vector of messages without blocking by
@ -228,14 +229,14 @@ class FairMQChannel
/// @param msgVec message vector reference /// @param msgVec message vector reference
/// @return Number of bytes that have been queued. If queueing failed due to /// @return Number of bytes that have been queued. If queueing failed due to
/// full queue or no connected peers (when binding), returns -2. In case of errors, returns -1. /// full queue or no connected peers (when binding), returns -2. In case of errors, returns -1.
int64_t SendAsync(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) const; int64_t SendAsync(std::vector<FairMQMessagePtr>& msgVec) const;
/// Receives a vector of messages in non-blocking mode. /// Receives a vector of messages in non-blocking mode.
/// ///
/// @param msgVec message vector reference /// @param msgVec message vector reference
/// @return Number of bytes that have been received. If queue is empty, returns -2. /// @return Number of bytes that have been received. If queue is empty, returns -2.
/// In case of errors, returns -1. /// In case of errors, returns -1.
int64_t ReceiveAsync(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) const; int64_t ReceiveAsync(std::vector<FairMQMessagePtr>& msgVec) const;
int64_t Send(FairMQParts& parts) const int64_t Send(FairMQParts& parts) const
{ {
@ -313,8 +314,10 @@ class FairMQChannel
std::shared_ptr<FairMQTransportFactory> fTransportFactory; std::shared_ptr<FairMQTransportFactory> fTransportFactory;
void CheckCompatibility(std::unique_ptr<FairMQMessage>& msg) const; void CheckSendCompatibility(FairMQMessagePtr& msg) const;
void CheckCompatibility(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) const; void CheckSendCompatibility(std::vector<FairMQMessagePtr>& msgVec) const;
void CheckReceiveCompatibility(FairMQMessagePtr& msg) const;
void CheckReceiveCompatibility(std::vector<FairMQMessagePtr>& msgVec) const;
void InitTransport(std::shared_ptr<FairMQTransportFactory> factory); void InitTransport(std::shared_ptr<FairMQTransportFactory> factory);