Update FairMQParts with doxygen comments and non-blocking send

This commit is contained in:
Alexey Rybalchenko 2016-04-04 09:43:48 +02:00
parent 82090c356c
commit a892a5a744
8 changed files with 147 additions and 76 deletions

View File

@ -119,17 +119,18 @@ class FairMQChannel
/// for some other reason (e.g. no peers connected for a binding socket), the method blocks.
///
/// @param msg Constant reference of unique_ptr to a FairMQMessage
/// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. In case of errors, returns -1.
/// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out.
/// In case of errors, returns -1.
int Send(const std::unique_ptr<FairMQMessage>& msg) const;
/// Sends a message in non-blocking mode.
/// @details SendAsync method attempts to send a message without blocking by
/// putting it in the queue. If the queue is full or queueing is not possible
/// for some other reason (e.g. no peers connected for a binding socket), the method returns 0.
/// putting it in the queue.
///
/// @param msg Constant reference of unique_ptr to a FairMQMessage
/// @return Number of bytes that have been queued. If queueing failed due to
/// full queue or no connected peers (when binding), returns -2. In case of errors, returns -1.
/// full queue or no connected peers (when binding), returns -2.
/// In case of errors, returns -1.
inline int SendAsync(const std::unique_ptr<FairMQMessage>& msg) const
{
return fSocket->Send(msg.get(), fNoBlockFlag);
@ -140,7 +141,8 @@ class FairMQChannel
/// The actual transfer over the network is initiated once final part has been queued with the Send() or SendAsync() methods.
///
/// @param msg Constant reference of unique_ptr to a FairMQMessage
/// @return Number of bytes that have been queued. -2 If queueing was not possible. In case of errors, returns -1.
/// @return Number of bytes that have been queued. -2 If queueing was not possible.
/// In case of errors, returns -1.
inline int SendPart(const std::unique_ptr<FairMQMessage>& msg) const
{
return fSocket->Send(msg.get(), fSndMoreFlag);
@ -151,23 +153,42 @@ class FairMQChannel
/// The actual transfer over the network is initiated once final part has been queued with the Send() or SendAsync() methods.
///
/// @param msg Constant reference of unique_ptr to a FairMQMessage
/// @return Number of bytes that have been queued. -2 If queueing was not possible. In case of errors, returns -1.
/// @return Number of bytes that have been queued. -2 If queueing was not possible.
/// In case of errors, returns -1.
inline int SendPartAsync(const std::unique_ptr<FairMQMessage>& msg) const
{
return fSocket->Send(msg.get(), fSndMoreFlag|fNoBlockFlag);
}
/// Send a vector of messages
///
/// @param msgVec message vector reference
/// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out.
/// In case of errors, returns -1.
int64_t Send(const std::vector<std::unique_ptr<FairMQMessage>>& msgVec) const;
/// Sends a vector of message in non-blocking mode.
/// @details SendAsync method attempts to send a vector of messages without blocking by
/// putting it them the queue.
///
/// @param msgVec message vector reference
/// @return Number of bytes that have been queued. If queueing failed due to
/// full queue or no connected peers (when binding), returns -2. In case of errors, returns -1.
inline int64_t SendAsync(const std::vector<std::unique_ptr<FairMQMessage>>& msgVec) const
{
return fSocket->Send(msgVec, fNoBlockFlag);
}
/// Receives a message from the socket queue.
/// @details Receive method attempts to receive a message from the input queue.
/// If the queue is empty the method blocks.
///
/// @param msg Constant reference of unique_ptr to a FairMQMessage
/// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out. In case of errors, returns -1.
/// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out.
/// In case of errors, returns -1.
int Receive(const std::unique_ptr<FairMQMessage>& msg) const;
/// Receives a message in non-blocking mode.
/// @details ReceiveAsync method attempts to receive a message without blocking from the input queue.
/// If the queue is empty the method returns 0.
///
/// @param msg Constant reference of unique_ptr to a FairMQMessage
/// @return Number of bytes that have been received. If queue is empty, returns -2.
@ -177,20 +198,23 @@ class FairMQChannel
return fSocket->Receive(msg.get(), fNoBlockFlag);
}
/// Shorthand method to send a vector of messages on `chan` at index `i`
/// Receive a vector of messages
///
/// @param msgVec message vector reference
/// @param chan channel name
/// @param i channel index
/// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. In case of errors, returns -1.
int64_t Send(const std::vector<std::unique_ptr<FairMQMessage>>& msgVec) const;
/// Shorthand method to receive a vector of messages on `chan` at index `i`
/// @param msgVec message vector reference
/// @param chan channel name
/// @param i channel index
/// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out. In case of errors, returns -1.
/// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out.
/// In case of errors, returns -1.
int64_t Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) const;
/// Receives a vector of messages in non-blocking mode.
///
/// @param msgVec message vector reference
/// @return Number of bytes that have been received. If queue is empty, returns -2.
/// In case of errors, returns -1.
inline int64_t ReceiveAsync(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) const
{
return fSocket->Receive(msgVec, fNoBlockFlag);
}
// DEPRECATED socket method wrappers with raw pointers and flag checks
int Send(FairMQMessage* msg, const std::string& flag = "") const;
int Send(FairMQMessage* msg, const int flags) const;

View File

@ -69,78 +69,106 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable
/// @param name Name of the channel
void PrintChannel(const std::string& name);
/// Shorthand method to send `msg` on `chan` at index `i`
/// @param msg message reference
/// @param chan channel name
/// @param i channel index
/// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. In case of errors, returns -1.
inline int Send(const std::unique_ptr<FairMQMessage>& msg, const std::string& chan, const int i = 0) const
{
return fChannels.at(chan).at(i).Send(msg);
}
template<typename Serializer, typename DataType, typename... Args>
void Serialize(FairMQMessage& msg, DataType&& data, Args&&... args) const
{
Serializer().Serialize(msg,std::forward<DataType>(data),std::forward<Args>(args)...);
Serializer().Serialize(msg, std::forward<DataType>(data), std::forward<Args>(args)...);
}
template<typename Deserializer, typename DataType, typename... Args>
void Deserialize(FairMQMessage& msg, DataType&& data, Args&&... args) const
{
Deserializer().Deserialize(msg,std::forward<DataType>(data),std::forward<Args>(args)...);
Deserializer().Deserialize(msg, std::forward<DataType>(data), std::forward<Args>(args)...);
}
/// Shorthand method to receive `msg` on `chan` at index `i`
/// Shorthand method to send `msg` on `chan` at index `i`
/// @param msg message reference
/// @param chan channel name
/// @param i channel index
/// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out. In case of errors, returns -1.
inline int Receive(const std::unique_ptr<FairMQMessage>& msg, const std::string& chan, const int i = 0) const
/// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out.
/// In case of errors, returns -1.
inline int Send(const std::unique_ptr<FairMQMessage>& msg, const std::string& chan, const int i = 0) const
{
return fChannels.at(chan).at(i).Receive(msg);
return fChannels.at(chan).at(i).Send(msg);
}
/// Shorthand method to send a vector of messages on `chan` at index `i`
/// @param msgVec message vector reference
/// Shorthand method to send `msg` on `chan` at index `i` without blocking
/// @param msg message reference
/// @param chan channel name
/// @param i channel index
/// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. In case of errors, returns -1.
inline int64_t Send(const std::vector<std::unique_ptr<FairMQMessage>>& msgVec, const std::string& chan, const int i = 0) const
/// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out.
/// In case of errors, returns -1.
inline int SendAsync(const std::unique_ptr<FairMQMessage>& msg, const std::string& chan, const int i = 0) const
{
return fChannels.at(chan).at(i).Send(msgVec);
}
/// Shorthand method to receive a vector of messages on `chan` at index `i`
/// @param msgVec message vector reference
/// @param chan channel name
/// @param i channel index
/// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out. In case of errors, returns -1.
inline int64_t Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, const std::string& chan, const int i = 0) const
{
return fChannels.at(chan).at(i).Receive(msgVec);
return fChannels.at(chan).at(i).SendAsync(msg);
}
/// Shorthand method to send FairMQParts on `chan` at index `i`
/// @param parts parts reference
/// @param chan channel name
/// @param i channel index
/// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. In case of errors, returns -1.
/// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out.
/// In case of errors, returns -1.
inline int64_t Send(const FairMQParts& parts, const std::string& chan, const int i = 0) const
{
return fChannels.at(chan).at(i).Send(parts.fParts);
}
/// Shorthand method to send FairMQParts on `chan` at index `i` without blocking
/// @param parts parts reference
/// @param chan channel name
/// @param i channel index
/// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out.
/// In case of errors, returns -1.
inline int64_t SendAsync(const FairMQParts& parts, const std::string& chan, const int i = 0) const
{
return fChannels.at(chan).at(i).SendAsync(parts.fParts);
}
/// Shorthand method to receive `msg` on `chan` at index `i`
/// @param msg message reference
/// @param chan channel name
/// @param i channel index
/// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out.
/// In case of errors, returns -1.
inline int Receive(const std::unique_ptr<FairMQMessage>& msg, const std::string& chan, const int i = 0) const
{
return fChannels.at(chan).at(i).Receive(msg);
}
/// Shorthand method to receive `msg` on `chan` at index `i` without blocking
/// @param msg message reference
/// @param chan channel name
/// @param i channel index
/// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out.
/// In case of errors, returns -1.
inline int ReceiveAsync(const std::unique_ptr<FairMQMessage>& msg, const std::string& chan, const int i = 0) const
{
return fChannels.at(chan).at(i).ReceiveAsync(msg);
}
/// Shorthand method to receive FairMQParts on `chan` at index `i`
/// @param parts parts reference
/// @param chan channel name
/// @param i channel index
/// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out. In case of errors, returns -1.
/// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out.
/// In case of errors, returns -1.
inline int64_t Receive(FairMQParts& parts, const std::string& chan, const int i = 0) const
{
return fChannels.at(chan).at(i).Receive(parts.fParts);
}
/// Shorthand method to receive FairMQParts on `chan` at index `i` without blocking
/// @param parts parts reference
/// @param chan channel name
/// @param i channel index
/// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out.
/// In case of errors, returns -1.
inline int64_t ReceiveAsync(FairMQParts& parts, const std::string& chan, const int i = 0) const
{
return fChannels.at(chan).at(i).ReceiveAsync(parts.fParts);
}
/// @brief Create empty FairMQMessage
/// @return pointer to FairMQMessage
inline FairMQMessage* NewMessage() const

View File

@ -38,7 +38,6 @@ class FairMQParts
fParts.push_back(std::unique_ptr<FairMQMessage>(msg));
}
/// Adds part (std::unique_ptr<FairMQMessage>&) to the container (move)
/// @param msg unique pointer to FairMQMessage
/// lvalue ref (move not required when passing argument)

View File

@ -40,11 +40,11 @@ class FairMQSocket
virtual int Send(FairMQMessage* msg, const std::string& flag = "") = 0;
virtual int Send(FairMQMessage* msg, const int flags = 0) = 0;
virtual int64_t Send(const std::vector<std::unique_ptr<FairMQMessage>>& msgVec) = 0;
virtual int64_t Send(const std::vector<std::unique_ptr<FairMQMessage>>& msgVec, const int flags = 0) = 0;
virtual int Receive(FairMQMessage* msg, const std::string& flag = "") = 0;
virtual int Receive(FairMQMessage* msg, const int flags = 0) = 0;
virtual int64_t Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) = 0;
virtual int64_t Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, const int flags = 0) = 0;
virtual void* GetSocket() const = 0;
virtual int GetSocket(int nothing) const = 0;

View File

@ -145,7 +145,7 @@ int FairMQSocketNN::Send(FairMQMessage* msg, const int flags)
return nbytes;
}
int64_t FairMQSocketNN::Send(const vector<unique_ptr<FairMQMessage>>& msgVec)
int64_t FairMQSocketNN::Send(const vector<unique_ptr<FairMQMessage>>& msgVec, const int flags)
{
#ifdef MSGPACK_FOUND
// create msgpack simple buffer
@ -161,7 +161,7 @@ int64_t FairMQSocketNN::Send(const vector<unique_ptr<FairMQMessage>>& msgVec)
packer.pack_bin_body(static_cast<char*>(msgVec[i]->GetData()), msgVec[i]->GetSize());
}
int64_t nbytes = nn_send(fSocket, sbuf.data(), sbuf.size(), 0);
int64_t nbytes = nn_send(fSocket, sbuf.data(), sbuf.size(), flags);
if (nbytes >= 0)
{
fBytesTx += nbytes;
@ -236,7 +236,7 @@ int FairMQSocketNN::Receive(FairMQMessage* msg, const int flags)
return nbytes;
}
int64_t FairMQSocketNN::Receive(vector<unique_ptr<FairMQMessage>>& msgVec)
int64_t FairMQSocketNN::Receive(vector<unique_ptr<FairMQMessage>>& msgVec, const int flags)
{
#ifdef MSGPACK_FOUND
// Warn if the vector is filled before Receive() and empty it.
@ -249,7 +249,7 @@ int64_t FairMQSocketNN::Receive(vector<unique_ptr<FairMQMessage>>& msgVec)
// pointer to point to received message buffer
char* ptr = NULL;
// receive the message into a buffer allocated by nanomsg and let ptr point to it
int nbytes = nn_recv(fSocket, &ptr, NN_MSG, 0);
int nbytes = nn_recv(fSocket, &ptr, NN_MSG, flags);
if (nbytes >= 0) // if no errors or non-blocking timeouts
{
// store statistics on how many bytes received

View File

@ -39,11 +39,11 @@ class FairMQSocketNN : public FairMQSocket
virtual int Send(FairMQMessage* msg, const std::string& flag = "");
virtual int Send(FairMQMessage* msg, const int flags = 0);
virtual int64_t Send(const std::vector<std::unique_ptr<FairMQMessage>>& msgVec);
virtual int64_t Send(const std::vector<std::unique_ptr<FairMQMessage>>& msgVec, const int flags = 0);
virtual int Receive(FairMQMessage* msg, const std::string& flag = "");
virtual int Receive(FairMQMessage* msg, const int flags = 0);
virtual int64_t Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec);
virtual int64_t Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, const int flags = 0);
virtual void* GetSocket() const;
virtual int GetSocket(int nothing) const;

View File

@ -150,7 +150,7 @@ int FairMQSocketZMQ::Send(FairMQMessage* msg, const int flags)
return nbytes;
}
int64_t FairMQSocketZMQ::Send(const vector<unique_ptr<FairMQMessage>>& msgVec)
int64_t FairMQSocketZMQ::Send(const vector<unique_ptr<FairMQMessage>>& msgVec, const int flags)
{
// Sending vector typicaly handles more then one part
if (msgVec.size() > 1)
@ -159,7 +159,7 @@ int64_t FairMQSocketZMQ::Send(const vector<unique_ptr<FairMQMessage>>& msgVec)
for (unsigned int i = 0; i < msgVec.size() - 1; ++i)
{
int nbytes = zmq_msg_send(static_cast<zmq_msg_t*>(msgVec[i]->GetMessage()), fSocket, ZMQ_SNDMORE);
int nbytes = zmq_msg_send(static_cast<zmq_msg_t*>(msgVec[i]->GetMessage()), fSocket, ZMQ_SNDMORE|flags);
if (nbytes >= 0)
{
totalSize += nbytes;
@ -167,18 +167,38 @@ int64_t FairMQSocketZMQ::Send(const vector<unique_ptr<FairMQMessage>>& msgVec)
}
else
{
if (zmq_errno() == EAGAIN)
{
return -2;
}
if (zmq_errno() == ETERM)
{
LOG(INFO) << "terminating socket " << fId;
return -1;
}
LOG(ERROR) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno);
return nbytes;
}
}
int n = zmq_msg_send(static_cast<zmq_msg_t*>(msgVec.back()->GetMessage()), fSocket, 0);
if (n >= 0)
int nbytes = zmq_msg_send(static_cast<zmq_msg_t*>(msgVec.back()->GetMessage()), fSocket, flags);
if (nbytes >= 0)
{
totalSize += n;
totalSize += nbytes;
}
else
{
return n;
if (zmq_errno() == EAGAIN)
{
return -2;
}
if (zmq_errno() == ETERM)
{
LOG(INFO) << "terminating socket " << fId;
return -1;
}
LOG(ERROR) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno);
return nbytes;
}
// store statistics on how many messages have been sent (handle all parts as a single message)
@ -187,7 +207,7 @@ int64_t FairMQSocketZMQ::Send(const vector<unique_ptr<FairMQMessage>>& msgVec)
} // If there's only one part, send it as a regular message
else if (msgVec.size() == 1)
{
return zmq_msg_send(static_cast<zmq_msg_t*>(msgVec.back()->GetMessage()), fSocket, 0);
return Send(msgVec.back().get(), flags);
}
else // if the vector is empty, something might be wrong
{
@ -240,7 +260,7 @@ int FairMQSocketZMQ::Receive(FairMQMessage* msg, const int flags)
return nbytes;
}
int64_t FairMQSocketZMQ::Receive(vector<unique_ptr<FairMQMessage>>& msgVec)
int64_t FairMQSocketZMQ::Receive(vector<unique_ptr<FairMQMessage>>& msgVec, const int flags)
{
// Warn if the vector is filled before Receive() and empty it.
if (msgVec.size() > 0)
@ -256,7 +276,7 @@ int64_t FairMQSocketZMQ::Receive(vector<unique_ptr<FairMQMessage>>& msgVec)
{
unique_ptr<FairMQMessage> part(new FairMQMessageZMQ());
int nbytes = zmq_msg_recv(static_cast<zmq_msg_t*>(part->GetMessage()), fSocket, 0);
int nbytes = zmq_msg_recv(static_cast<zmq_msg_t*>(part->GetMessage()), fSocket, flags);
if (nbytes >= 0)
{
msgVec.push_back(move(part));

View File

@ -34,11 +34,11 @@ class FairMQSocketZMQ : public FairMQSocket
virtual int Send(FairMQMessage* msg, const std::string& flag = "");
virtual int Send(FairMQMessage* msg, const int flags = 0);
virtual int64_t Send(const std::vector<std::unique_ptr<FairMQMessage>>& msgVec);
virtual int64_t Send(const std::vector<std::unique_ptr<FairMQMessage>>& msgVec, const int flags = 0);
virtual int Receive(FairMQMessage* msg, const std::string& flag = "");
virtual int Receive(FairMQMessage* msg, const int flags = 0);
virtual int64_t Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec);
virtual int64_t Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, const int flags = 0);
virtual void* GetSocket() const;
virtual int GetSocket(int nothing) const;