From 59b04a1a64a8fc6357f5a018dd76a851046591af Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Mon, 11 Jun 2018 15:51:34 +0200 Subject: [PATCH] Handle Receive differently when switching transports No need for buffer+size message on Receive. --- fairmq/FairMQChannel.cxx | 71 ++++++++++++++++++++++++++-------------- fairmq/FairMQChannel.h | 31 ++++++++++-------- 2 files changed, 64 insertions(+), 38 deletions(-) diff --git a/fairmq/FairMQChannel.cxx b/fairmq/FairMQChannel.cxx index 84466001..d4078465 100644 --- a/fairmq/FairMQChannel.cxx +++ b/fairmq/FairMQChannel.cxx @@ -653,13 +653,13 @@ void FairMQChannel::ResetChannel() int FairMQChannel::Send(unique_ptr& msg) const { - CheckCompatibility(msg); + CheckSendCompatibility(msg); return fSocket->Send(msg); } int FairMQChannel::Receive(unique_ptr& msg) const { - CheckCompatibility(msg); + CheckReceiveCompatibility(msg); return fSocket->Receive(msg); } @@ -675,25 +675,25 @@ int FairMQChannel::Receive(unique_ptr& msg, int rcvTimeoutInMs) c int FairMQChannel::SendAsync(unique_ptr& msg) const { - CheckCompatibility(msg); + CheckSendCompatibility(msg); return fSocket->TrySend(msg); } int FairMQChannel::ReceiveAsync(unique_ptr& msg) const { - CheckCompatibility(msg); + CheckReceiveCompatibility(msg); return fSocket->TryReceive(msg); } int64_t FairMQChannel::Send(vector>& msgVec) const { - CheckCompatibility(msgVec); + CheckSendCompatibility(msgVec); return fSocket->Send(msgVec); } int64_t FairMQChannel::Receive(vector>& msgVec) const { - CheckCompatibility(msgVec); + CheckReceiveCompatibility(msgVec); return fSocket->Receive(msgVec); } @@ -709,7 +709,7 @@ int64_t FairMQChannel::Receive(vector>& msgVec, int rc int64_t FairMQChannel::SendAsync(vector>& msgVec) const { - CheckCompatibility(msgVec); + CheckSendCompatibility(msgVec); return fSocket->TrySend(msgVec); } @@ -720,7 +720,7 @@ int64_t FairMQChannel::SendAsync(vector>& msgVec) cons /// In case of errors, returns -1. int64_t FairMQChannel::ReceiveAsync(vector>& msgVec) const { - CheckCompatibility(msgVec); + CheckReceiveCompatibility(msgVec); return fSocket->TryReceive(msgVec); } @@ -748,35 +748,58 @@ unsigned long FairMQChannel::GetMessagesRx() const return fSocket->GetMessagesRx(); } -void FairMQChannel::CheckCompatibility(unique_ptr& msg) const +void FairMQChannel::CheckSendCompatibility(FairMQMessagePtr& msg) const { if (fTransportType != msg->GetType()) { // LOG(debug) << "Channel type does not match message type. Creating wrapper"; - FairMQMessagePtr msgWrapper(fTransportFactory->CreateMessage(msg->GetData(), - msg->GetSize(), - [](void* /*data*/, void* msg) { delete static_cast(msg); }, - msg.get() - )); + FairMQMessagePtr msgWrapper(NewMessage(msg->GetData(), + msg->GetSize(), + [](void* /*data*/, void* msg) { delete static_cast(msg); }, + msg.get() + )); msg.release(); msg = move(msgWrapper); } } -void FairMQChannel::CheckCompatibility(vector& msgVec) const +void FairMQChannel::CheckSendCompatibility(vector& msgVec) const { - for (auto& part : msgVec) + for (auto& msg : msgVec) { - if (fTransportType != part->GetType()) + if (fTransportType != msg->GetType()) { // LOG(debug) << "Channel type does not match message type. Creating wrapper"; - FairMQMessagePtr partWrapper(fTransportFactory->CreateMessage(part->GetData(), - part->GetSize(), - [](void* /*data*/, void* part) { delete static_cast(part); }, - part.get() - )); - part.release(); - part = move(partWrapper); + FairMQMessagePtr msgWrapper(NewMessage(msg->GetData(), + msg->GetSize(), + [](void* /*data*/, void* msg) { delete static_cast(msg); }, + msg.get() + )); + msg.release(); + msg = move(msgWrapper); + } + } +} + +void FairMQChannel::CheckReceiveCompatibility(FairMQMessagePtr& msg) const +{ + if (fTransportType != msg->GetType()) + { + // LOG(debug) << "Channel type does not match message type. Creating wrapper"; + FairMQMessagePtr newMsg(NewMessage()); + msg = move(newMsg); + } +} + +void FairMQChannel::CheckReceiveCompatibility(vector& msgVec) const +{ + for (auto& msg : msgVec) + { + if (fTransportType != msg->GetType()) + { + // LOG(debug) << "Channel type does not match message type. Creating wrapper"; + FairMQMessagePtr newMsg(NewMessage()); + msg = move(newMsg); } } } diff --git a/fairmq/FairMQChannel.h b/fairmq/FairMQChannel.h index f0d9aa24..fd1267df 100644 --- a/fairmq/FairMQChannel.h +++ b/fairmq/FairMQChannel.h @@ -20,6 +20,7 @@ #include #include #include +#include class FairMQChannel { @@ -165,8 +166,8 @@ class FairMQChannel /// Resets the channel (requires validation to be used again). void ResetChannel(); - int Send(std::unique_ptr& msg) const; - int Receive(std::unique_ptr& msg) const; + int Send(FairMQMessagePtr& msg) const; + int Receive(FairMQMessagePtr& msg) const; /// Sends a message to the socket queue. /// @details Send method attempts to send a message by @@ -176,7 +177,7 @@ class FairMQChannel /// @param msg Constant reference of unique_ptr to a FairMQMessage /// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. /// In case of errors, returns -1. - int Send(std::unique_ptr& msg, int sndTimeoutInMs) const; + int Send(FairMQMessagePtr& msg, int sndTimeoutInMs) const; /// Receives a message from the socket queue. /// @details Receive method attempts to receive a message from the input queue. @@ -185,7 +186,7 @@ class FairMQChannel /// @param msg Constant reference of unique_ptr to a FairMQMessage /// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out. /// In case of errors, returns -1. - int Receive(std::unique_ptr& msg, int rcvTimeoutInMs) const; + int Receive(FairMQMessagePtr& msg, int rcvTimeoutInMs) const; /// Sends a message in non-blocking mode. /// @details SendAsync method attempts to send a message without blocking by @@ -195,31 +196,31 @@ class FairMQChannel /// @return Number of bytes that have been queued. If queueing failed due to /// full queue or no connected peers (when binding), returns -2. /// In case of errors, returns -1. - int SendAsync(std::unique_ptr& msg) const; + int SendAsync(FairMQMessagePtr& msg) const; /// Receives a message in non-blocking mode. /// /// @param msg Constant reference of unique_ptr to a FairMQMessage /// @return Number of bytes that have been received. If queue is empty, returns -2. /// In case of errors, returns -1. - int ReceiveAsync(std::unique_ptr& msg) const; + int ReceiveAsync(FairMQMessagePtr& msg) const; - int64_t Send(std::vector>& msgVec) const; - int64_t Receive(std::vector>& msgVec) const; + int64_t Send(std::vector& msgVec) const; + int64_t Receive(std::vector& msgVec) const; /// Send a vector of messages /// /// @param msgVec message vector reference /// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. /// In case of errors, returns -1. - int64_t Send(std::vector>& msgVec, int sndTimeoutInMs) const; + int64_t Send(std::vector& msgVec, int sndTimeoutInMs) const; /// Receive a vector of messages /// /// @param msgVec message vector reference /// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out. /// In case of errors, returns -1. - int64_t Receive(std::vector>& msgVec, int rcvTimeoutInMs) const; + int64_t Receive(std::vector& msgVec, int rcvTimeoutInMs) const; /// Sends a vector of message in non-blocking mode. /// @details SendAsync method attempts to send a vector of messages without blocking by @@ -228,14 +229,14 @@ class FairMQChannel /// @param msgVec message vector reference /// @return Number of bytes that have been queued. If queueing failed due to /// full queue or no connected peers (when binding), returns -2. In case of errors, returns -1. - int64_t SendAsync(std::vector>& msgVec) const; + int64_t SendAsync(std::vector& msgVec) const; /// Receives a vector of messages in non-blocking mode. /// /// @param msgVec message vector reference /// @return Number of bytes that have been received. If queue is empty, returns -2. /// In case of errors, returns -1. - int64_t ReceiveAsync(std::vector>& msgVec) const; + int64_t ReceiveAsync(std::vector& msgVec) const; int64_t Send(FairMQParts& parts) const { @@ -313,8 +314,10 @@ class FairMQChannel std::shared_ptr fTransportFactory; - void CheckCompatibility(std::unique_ptr& msg) const; - void CheckCompatibility(std::vector>& msgVec) const; + void CheckSendCompatibility(FairMQMessagePtr& msg) const; + void CheckSendCompatibility(std::vector& msgVec) const; + void CheckReceiveCompatibility(FairMQMessagePtr& msg) const; + void CheckReceiveCompatibility(std::vector& msgVec) const; void InitTransport(std::shared_ptr factory);