FairMQ: Extend Multipart and messaging API

- Extend the multipart API to allow sending vectors of messages or helper
   thin wrapper FairMQParts. See example in examples/MQ/8-multipart.
 - NewMessage() can be used in devices instead of
   fTransportFactory->CreateMessage().
   Possible arguments remain unchanged (no args, size or data+size).
 - Send()/Receive() methods can be used in devices instead of
   fChannels.at("chan").at(i).Send()/Receive():
   Send(msg, "chan", i = 0), Receive(msg, "chan", i = 0).
 - Use the new methods in MQ examples and tests.
 - No breaking changes, but FAIRMQ_INTERFACE_VERSION is incremented to 3
   to allow to check for new methods.
This commit is contained in:
Alexey Rybalchenko 2016-02-23 18:00:35 +01:00
parent 82ab7670a9
commit e1fef82657
33 changed files with 393 additions and 246 deletions

View File

@ -91,6 +91,7 @@ EndIf(NANOMSG_FOUND)
# to copy src that are header-only files (e.g. c++ template) for FairRoot external installation
# manual install (globbing add not recommended)
Set(FAIRMQHEADERS
FairMQParts.h
devices/GenericSampler.h
devices/GenericSampler.tpl
devices/GenericProcessor.h

View File

@ -451,55 +451,79 @@ int FairMQChannel::Send(const unique_ptr<FairMQMessage>& msg) const
return -2;
}
int FairMQChannel::SendAsync(const unique_ptr<FairMQMessage>& msg) const
uint64_t FairMQChannel::Send(const std::vector<std::unique_ptr<FairMQMessage>>& msgVec) const
{
return fSocket->Send(msg.get(), fNoBlockFlag);
// Sending vector typicaly handles more then one part
if (msgVec.size() > 1)
{
uint64_t totalSize = 0;
for (unsigned int i = 0; i < msgVec.size() - 1; ++i)
{
int nbytes = SendPart(msgVec[i]);
if (nbytes >= 0)
{
totalSize += nbytes;
}
else
{
return nbytes;
}
}
int FairMQChannel::SendPart(const unique_ptr<FairMQMessage>& msg) const
int n = Send(msgVec.back());
if (n >= 0)
{
return fSocket->Send(msg.get(), fSndMoreFlag);
totalSize += n;
}
else
{
return n;
}
int FairMQChannel::SendPartAsync(const unique_ptr<FairMQMessage>& msg) const
return totalSize;
} // If there's only one part, send it as a regular message
else if (msgVec.size() == 1)
{
return fSocket->Send(msg.get(), fSndMoreFlag|fNoBlockFlag);
return Send(msgVec.back());
}
else // if the vector is empty, something might be wrong
{
LOG(WARN) << "Will not send empty vector";
return -1;
}
}
// int FairMQChannel::SendParts(initializer_list<unique_ptr<FairMQMessage>> partsList) const
// {
// int totalSize = 0;
// initializer_list<unique_ptr<FairMQMessage>>::iterator it = partsList.end();
// auto &last = --it;
// for (auto &p : partsList)
// {
// if (&p != last)
// {
// int nbytes = SendPart(p);
// if (nbytes >= 0)
// {
// totalSize += nbytes;
// }
// else
// {
// return nbytes;
// }
// }
// else
// {
// int nbytes = Send(p);
// if (nbytes >= 0)
// {
// totalSize += nbytes;
// }
// else
// {
// return nbytes;
// }
// }
// }
// return totalSize;
// }
uint64_t FairMQChannel::Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) const
{
// Warn if the vector is filled before Receive() and empty it.
if (msgVec.size() > 0)
{
LOG(WARN) << "Message vector contains elements before Receive(), they will be deleted!";
msgVec.clear();
}
uint64_t totalSize = 0;
do
{
std::unique_ptr<FairMQMessage> part(fTransportFactory->CreateMessage());
int nbytes = Receive(part);
if (nbytes >= 0)
{
msgVec.push_back(std::move(part));
totalSize += nbytes;
}
else
{
return nbytes;
}
}
while (ExpectsAnotherPart());
return totalSize;
}
int FairMQChannel::Receive(const unique_ptr<FairMQMessage>& msg) const
{
@ -519,11 +543,6 @@ int FairMQChannel::Receive(const unique_ptr<FairMQMessage>& msg) const
return -2;
}
int FairMQChannel::ReceiveAsync(const unique_ptr<FairMQMessage>& msg) const
{
return fSocket->Receive(msg.get(), fNoBlockFlag);
}
int FairMQChannel::Send(FairMQMessage* msg, const string& flag) const
{
if (flag == "")
@ -624,64 +643,6 @@ int FairMQChannel::Receive(FairMQMessage* msg, const int flags) const
}
}
void FairMQChannel::SetSendTimeout(const int timeout)
{
// if (fSocket)
// {
// if (fSocket->SetSendTimeout(timeout, fAddress, fMethod))
// {
fSndTimeoutInMs = timeout;
// return true;
// }
// }
// LOG(ERROR) << "SetSendTimeout() failed - socket is not initialized!";
// return false;
}
int FairMQChannel::GetSendTimeout() const
{
return fSndTimeoutInMs;
// if (fSocket)
// {
// return fSocket->GetSendTimeout();
// }
// else
// {
// LOG(ERROR) << "GetSendTimeout() failed - socket is not initialized!";
// return -1;
// }
}
void FairMQChannel::SetReceiveTimeout(const int timeout)
{
// if (fSocket)
// {
// if (fSocket->SetReceiveTimeout(timeout, fAddress, fMethod))
// {
fRcvTimeoutInMs = timeout;
// return true;
// }
// }
// LOG(ERROR) << "SetReceiveTimeout() failed - socket is not initialized!";
// return false;
}
int FairMQChannel::GetReceiveTimeout() const
{
return fRcvTimeoutInMs;
// if (fSocket)
// {
// return fSocket->GetReceiveTimeout();
// }
// else
// {
// LOG(ERROR) << "GetReceiveTimeout() failed - socket is not initialized!";
// return -1;
// }
}
bool FairMQChannel::ExpectsAnotherPart() const
{
int64_t more = 0;

View File

@ -116,38 +116,41 @@ class FairMQChannel
/// for some other reason (e.g. no peers connected for a binding socket), the method returns 0.
///
/// @param msg Constant reference of unique_ptr to a FairMQMessage
/// @return Returns the number of bytes that have been queued. If queueing failed due to
/// @return Number of bytes that have been queued. If queueing failed due to
/// full queue or no connected peers (when binding), returns -2. In case of errors, returns -1.
int SendAsync(const std::unique_ptr<FairMQMessage>& msg) const;
inline int SendAsync(const std::unique_ptr<FairMQMessage>& msg) const
{
return fSocket->Send(msg.get(), fNoBlockFlag);
}
/// Queues the current message as a part of a multi-part message
/// @details SendPart method queues the provided message as a part of a multi-part message.
/// The actual transfer over the network is initiated once final part has been queued with the Send() or SendAsync() methods.
///
/// @param msg Constant reference of unique_ptr to a FairMQMessage
/// @return Returns the number of bytes that have been queued. -2 If queueing was not possible. In case of errors, returns -1.
int SendPart(const std::unique_ptr<FairMQMessage>& msg) const;
/// @return Number of bytes that have been queued. -2 If queueing was not possible. In case of errors, returns -1.
inline int SendPart(const std::unique_ptr<FairMQMessage>& msg) const
{
return fSocket->Send(msg.get(), fSndMoreFlag);
}
/// Queues the current message as a part of a multi-part message without blocking
/// @details SendPart method queues the provided message as a part of a multi-part message without blocking.
/// The actual transfer over the network is initiated once final part has been queued with the Send() or SendAsync() methods.
///
/// @param msg Constant reference of unique_ptr to a FairMQMessage
/// @return Returns the number of bytes that have been queued. -2 If queueing was not possible. In case of errors, returns -1.
int SendPartAsync(const std::unique_ptr<FairMQMessage>& msg) const;
// /// Sends the messages provided as arguments as a multi-part message.
// ///
// /// @param partsList Initializer list of FairMQMessages
// /// @return Returns the number of bytes that have been queued. -2 If queueing was not possible. In case of errors, returns -1.
// int SendParts(std::initializer_list<std::unique_ptr<FairMQMessage>> partsList) const;
/// @return Number of bytes that have been queued. -2 If queueing was not possible. In case of errors, returns -1.
inline int SendPartAsync(const std::unique_ptr<FairMQMessage>& msg) const
{
return fSocket->Send(msg.get(), fSndMoreFlag|fNoBlockFlag);
}
/// Receives a message from the socket queue.
/// @details Receive method attempts to receive a message from the input queue.
/// If the queue is empty the method blocks.
///
/// @param msg Constant reference of unique_ptr to a FairMQMessage
/// @return Returns the number of bytes that have been received. -2 If reading from the queue was not possible or timed out. In case of errors, returns -1.
/// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out. In case of errors, returns -1.
int Receive(const std::unique_ptr<FairMQMessage>& msg) const;
/// Receives a message in non-blocking mode.
@ -155,9 +158,26 @@ class FairMQChannel
/// If the queue is empty the method returns 0.
///
/// @param msg Constant reference of unique_ptr to a FairMQMessage
/// @return Returns the number of bytes that have been received. If queue is empty, returns -2.
/// @return Number of bytes that have been received. If queue is empty, returns -2.
/// In case of errors, returns -1.
int ReceiveAsync(const std::unique_ptr<FairMQMessage>& msg) const;
inline int ReceiveAsync(const std::unique_ptr<FairMQMessage>& msg) const
{
return fSocket->Receive(msg.get(), fNoBlockFlag);
}
/// Shorthand method to send a vector of messages on `chan` at index `i`
/// @param msgVec message vector reference
/// @param chan channel name
/// @param i channel index
/// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. In case of errors, returns -1.
uint64_t Send(const std::vector<std::unique_ptr<FairMQMessage>>& msgVec) const;
/// Shorthand method to receive a vector of messages on `chan` at index `i`
/// @param msgVec message vector reference
/// @param chan channel name
/// @param i channel index
/// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out. In case of errors, returns -1.
uint64_t Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) const;
// DEPRECATED socket method wrappers with raw pointers and flag checks
int Send(FairMQMessage* msg, const std::string& flag = "") const;
@ -167,19 +187,31 @@ class FairMQChannel
/// Sets a timeout on the (blocking) Send method
/// @param timeout timeout value in milliseconds
void SetSendTimeout(const int timeout);
inline void SetSendTimeout(const int timeout)
{
fSndTimeoutInMs = timeout;
}
/// Gets the current value of the timeout on the (blocking) Send method
/// @return Timeout value in milliseconds. -1 for no timeout.
int GetSendTimeout() const;
inline int GetSendTimeout() const
{
return fSndTimeoutInMs;
}
/// Sets a timeout on the (blocking) Receive method
/// @param timeout timeout value in milliseconds
void SetReceiveTimeout(const int timeout);
inline void SetReceiveTimeout(const int timeout)
{
fRcvTimeoutInMs = timeout;
}
/// Gets the current value of the timeout on the (blocking) Receive method
/// @return Timeout value in milliseconds. -1 for no timeout.
int GetReceiveTimeout() const;
inline int GetReceiveTimeout() const
{
return fRcvTimeoutInMs;
}
/// Checks if the socket is expecting to receive another part of a multipart message.
/// @return Return true if the socket expects another part of a multipart message and false otherwise.

View File

@ -666,12 +666,6 @@ void FairMQDevice::InteractiveStateLoop()
tcsetattr(STDIN_FILENO, TCSANOW, &t); // apply the new settings
}
inline void FairMQDevice::PrintInteractiveStateLoopHelp()
{
LOG(INFO) << "Use keys to control the state machine:";
LOG(INFO) << "[h] help, [p] pause, [r] run, [s] stop, [t] reset task, [d] reset device, [q] end, [j] init task, [i] init device";
}
void FairMQDevice::Unblock()
{
FairMQMessage* cmd = fTransportFactory->CreateMessage();

View File

@ -26,6 +26,8 @@
#include "FairMQTransportFactory.h"
#include "FairMQSocket.h"
#include "FairMQChannel.h"
#include "FairMQMessage.h"
#include "FairMQParts.h"
class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable
{
@ -67,6 +69,92 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable
/// @param name Name of the channel
void PrintChannel(const std::string& name);
/// Shorthand method to send `msg` on `chan` at index `i`
/// @param msg message reference
/// @param chan channel name
/// @param i channel index
/// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. In case of errors, returns -1.
inline int Send(const std::unique_ptr<FairMQMessage>& msg, const std::string& chan, const int i = 0) const
{
return fChannels.at(chan).at(i).Send(msg);
}
/// Shorthand method to receive `msg` on `chan` at index `i`
/// @param msg message reference
/// @param chan channel name
/// @param i channel index
/// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out. In case of errors, returns -1.
inline int Receive(const std::unique_ptr<FairMQMessage>& msg, const std::string& chan, const int i = 0) const
{
return fChannels.at(chan).at(i).Receive(msg);
}
/// Shorthand method to send a vector of messages on `chan` at index `i`
/// @param msgVec message vector reference
/// @param chan channel name
/// @param i channel index
/// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. In case of errors, returns -1.
inline uint64_t Send(const std::vector<std::unique_ptr<FairMQMessage>>& msgVec, const std::string& chan, const int i = 0) const
{
return fChannels.at(chan).at(i).Send(msgVec);
}
/// Shorthand method to receive a vector of messages on `chan` at index `i`
/// @param msgVec message vector reference
/// @param chan channel name
/// @param i channel index
/// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out. In case of errors, returns -1.
inline uint64_t Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, const std::string& chan, const int i = 0) const
{
return fChannels.at(chan).at(i).Receive(msgVec);
}
/// Shorthand method to send FairMQParts on `chan` at index `i`
/// @param parts parts reference
/// @param chan channel name
/// @param i channel index
/// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. In case of errors, returns -1.
inline uint64_t Send(const FairMQParts& parts, const std::string& chan, const int i = 0) const
{
return fChannels.at(chan).at(i).Send(parts.fParts);
}
/// Shorthand method to receive FairMQParts on `chan` at index `i`
/// @param parts parts reference
/// @param chan channel name
/// @param i channel index
/// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out. In case of errors, returns -1.
inline uint64_t Receive(FairMQParts& parts, const std::string& chan, const int i = 0) const
{
return fChannels.at(chan).at(i).Receive(parts.fParts);
}
/// @brief Create empty FairMQMessage
/// @return pointer to FairMQMessage
inline FairMQMessage* NewMessage() const
{
return fTransportFactory->CreateMessage();
}
/// @brief Create new FairMQMessage of specified size
/// @param size message size
/// @return pointer to FairMQMessage
inline FairMQMessage* NewMessage(int size) const
{
return fTransportFactory->CreateMessage(size);
}
/// @brief Create new FairMQMessage with user provided buffer and size
/// @param data pointer to user provided buffer
/// @param size size of the user provided buffer
/// @param ffn optional callback, called when the message is transfered (and can be deleted)
/// @param hint optional helper pointer that can be used in the callback
/// @return pointer to FairMQMessage
inline FairMQMessage* NewMessage(void* data, int size, fairmq_free_fn* ffn = NULL, void* hint = NULL) const
{
return fTransportFactory->CreateMessage(data, size, ffn, hint);
}
/// Waits for the first initialization run to finish
void WaitForInitialValidation();
@ -74,7 +162,11 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable
/// Works only when running in a terminal. Running in background would exit, because no interactive input (std::cin) is possible.
void InteractiveStateLoop();
/// Prints the available commands of the InteractiveStateLoop()
void PrintInteractiveStateLoopHelp();
inline void PrintInteractiveStateLoopHelp()
{
LOG(INFO) << "Use keys to control the state machine:";
LOG(INFO) << "[h] help, [p] pause, [r] run, [s] stop, [t] reset task, [d] reset device, [q] end, [j] init task, [i] init device";
}
/// Set Device properties stored as strings
/// @param key Property key

52
fairmq/FairMQParts.h Normal file
View File

@ -0,0 +1,52 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIRMQPARTS_H_
#define FAIRMQPARTS_H_
#include <vector>
#include <memory> // unique_ptr
#include "FairMQTransportFactory.h"
#include "FairMQMessage.h"
class FairMQParts
{
public:
/// Default constructor
FairMQParts() {};
/// Copy Constructor
FairMQParts(const FairMQParts&) = delete;
/// Assignment operator
FairMQParts& operator=(const FairMQParts&) = delete;
/// Default destructor
~FairMQParts() {};
/// Adds part (FairMQMessage) to the container
/// @param msg message pointer (for example created with NewMessage() method of FairMQDevice)
inline void AddPart(FairMQMessage* msg)
{
fParts.push_back(std::unique_ptr<FairMQMessage>(msg));
}
/// Get reference to part in the container at index (without bounds check)
/// @param index container index
inline FairMQMessage& operator[](const int index) { return *(fParts[index]); }
/// Get reference to part in the container at index (with bounds check)
/// @param index container index
inline FairMQMessage& At(const int index) { return *(fParts.at(index)); }
/// Get number of parts in the container
/// @return number of parts in the container
inline int Size() const { return fParts.size(); }
std::vector<std::unique_ptr<FairMQMessage>> fParts;
};
#endif /* FAIRMQPARTS_H_ */

View File

@ -15,7 +15,7 @@
#ifndef FAIRMQSTATEMACHINE_H_
#define FAIRMQSTATEMACHINE_H_
#define FAIRMQ_INTERFACE_VERSION 2
#define FAIRMQ_INTERFACE_VERSION 3
#include <string>
#include <atomic>

View File

@ -31,14 +31,14 @@ class FairMQTransportFactory
{
public:
virtual FairMQMessage* CreateMessage() = 0;
virtual FairMQMessage* CreateMessage(size_t size) = 0;
virtual FairMQMessage* CreateMessage(void* data, size_t size, fairmq_free_fn *ffn = NULL, void* hint = NULL) = 0;
virtual FairMQMessage* CreateMessage(const size_t size) = 0;
virtual FairMQMessage* CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn = NULL, void* hint = NULL) = 0;
virtual FairMQSocket* CreateSocket(const std::string& type, const std::string& name, int numIoThreads) = 0;
virtual FairMQSocket* CreateSocket(const std::string& type, const std::string& name, const int numIoThreads) = 0;
virtual FairMQPoller* CreatePoller(const std::vector<FairMQChannel>& channels) = 0;
virtual FairMQPoller* CreatePoller(std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, std::initializer_list<std::string> channelList) = 0;
virtual FairMQPoller* CreatePoller(FairMQSocket& cmdSocket, FairMQSocket& dataSocket) = 0;
virtual FairMQPoller* CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::initializer_list<std::string> channelList) = 0;
virtual FairMQPoller* CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) = 0;
virtual ~FairMQTransportFactory() {};
};

View File

@ -34,7 +34,7 @@ FairMQMessageNN::FairMQMessageNN()
}
}
FairMQMessageNN::FairMQMessageNN(size_t size)
FairMQMessageNN::FairMQMessageNN(const size_t size)
: fMessage(NULL)
, fSize(0)
, fReceiving(false)
@ -53,7 +53,7 @@ FairMQMessageNN::FairMQMessageNN(size_t size)
* create FairMQMessage object only with size parameter and fill it with data.
* possible TODO: make this zero copy (will should then be as efficient as ZeroMQ).
*/
FairMQMessageNN::FairMQMessageNN(void* data, size_t size, fairmq_free_fn *ffn, void* hint)
FairMQMessageNN::FairMQMessageNN(void* data, const size_t size, fairmq_free_fn *ffn, void* hint)
: fMessage(NULL)
, fSize(0)
, fReceiving(false)
@ -63,6 +63,8 @@ FairMQMessageNN::FairMQMessageNN(void* data, size_t size, fairmq_free_fn *ffn, v
{
LOG(ERROR) << "failed allocating message, reason: " << nn_strerror(errno);
}
else
{
memcpy(fMessage, data, size);
fSize = size;
@ -75,6 +77,7 @@ FairMQMessageNN::FairMQMessageNN(void* data, size_t size, fairmq_free_fn *ffn, v
if(data) free(data);
}
}
}
void FairMQMessageNN::Rebuild()
{
@ -82,7 +85,7 @@ void FairMQMessageNN::Rebuild()
fReceiving = false;
}
void FairMQMessageNN::Rebuild(size_t size)
void FairMQMessageNN::Rebuild(const size_t size)
{
Clear();
fMessage = nn_allocmsg(size, 0);
@ -94,7 +97,7 @@ void FairMQMessageNN::Rebuild(size_t size)
fReceiving = false;
}
void FairMQMessageNN::Rebuild(void* data, size_t size, fairmq_free_fn *ffn, void* hint)
void FairMQMessageNN::Rebuild(void* data, const size_t size, fairmq_free_fn *ffn, void* hint)
{
Clear();
fMessage = nn_allocmsg(size, 0);
@ -102,6 +105,8 @@ void FairMQMessageNN::Rebuild(void* data, size_t size, fairmq_free_fn *ffn, void
{
LOG(ERROR) << "failed allocating message, reason: " << nn_strerror(errno);
}
else
{
memcpy(fMessage, data, size);
fSize = size;
fReceiving = false;
@ -115,6 +120,7 @@ void FairMQMessageNN::Rebuild(void* data, size_t size, fairmq_free_fn *ffn, void
if(data) free(data);
}
}
}
void* FairMQMessageNN::GetMessage()
{
@ -131,7 +137,7 @@ size_t FairMQMessageNN::GetSize()
return fSize;
}
void FairMQMessageNN::SetMessage(void* data, size_t size)
void FairMQMessageNN::SetMessage(void* data, const size_t size)
{
fMessage = data;
fSize = size;
@ -156,9 +162,12 @@ void FairMQMessageNN::Copy(FairMQMessage* msg)
{
LOG(ERROR) << "failed allocating message, reason: " << nn_strerror(errno);
}
else
{
memcpy(fMessage, msg->GetMessage(), size);
fSize = size;
}
}
void FairMQMessageNN::Copy(const unique_ptr<FairMQMessage>& msg)
{
@ -177,9 +186,12 @@ void FairMQMessageNN::Copy(const unique_ptr<FairMQMessage>& msg)
{
LOG(ERROR) << "failed allocating message, reason: " << nn_strerror(errno);
}
else
{
memcpy(fMessage, msg->GetMessage(), size);
fSize = size;
}
}
inline void FairMQMessageNN::Clear()
{

View File

@ -23,20 +23,20 @@ class FairMQMessageNN : public FairMQMessage
{
public:
FairMQMessageNN();
FairMQMessageNN(size_t size);
FairMQMessageNN(void* data, size_t size, fairmq_free_fn *ffn = NULL, void* hint = NULL);
FairMQMessageNN(const size_t size);
FairMQMessageNN(void* data, const size_t size, fairmq_free_fn *ffn = NULL, void* hint = NULL);
FairMQMessageNN(const FairMQMessageNN&) = delete;
FairMQMessageNN operator=(const FairMQMessageNN&) = delete;
virtual void Rebuild();
virtual void Rebuild(size_t size);
virtual void Rebuild(void* data, size_t size, fairmq_free_fn *ffn = NULL, void* hint = NULL);
virtual void Rebuild(const size_t size);
virtual void Rebuild(void* data, const size_t size, fairmq_free_fn *ffn = NULL, void* hint = NULL);
virtual void* GetMessage();
virtual void* GetData();
virtual size_t GetSize();
virtual void SetMessage(void* data, size_t size);
virtual void SetMessage(void* data, const size_t size);
virtual void CloseMessage() {};
virtual void Copy(FairMQMessage* msg);

View File

@ -59,7 +59,7 @@ FairMQPollerNN::FairMQPollerNN(const vector<FairMQChannel>& channels)
}
}
FairMQPollerNN::FairMQPollerNN(unordered_map<string, vector<FairMQChannel>>& channelsMap, initializer_list<string> channelList)
FairMQPollerNN::FairMQPollerNN(const unordered_map<string, vector<FairMQChannel>>& channelsMap, const initializer_list<string> channelList)
: items()
, fNumItems(0)
, fOffsetMap()
@ -118,7 +118,7 @@ FairMQPollerNN::FairMQPollerNN(unordered_map<string, vector<FairMQChannel>>& cha
}
}
FairMQPollerNN::FairMQPollerNN(FairMQSocket& cmdSocket, FairMQSocket& dataSocket)
FairMQPollerNN::FairMQPollerNN(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket)
: items()
, fNumItems(2)
, fOffsetMap()

View File

@ -32,7 +32,7 @@ class FairMQPollerNN : public FairMQPoller
public:
FairMQPollerNN(const std::vector<FairMQChannel>& channels);
FairMQPollerNN(std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, std::initializer_list<std::string> channelList);
FairMQPollerNN(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::initializer_list<std::string> channelList);
FairMQPollerNN(const FairMQPollerNN&) = delete;
FairMQPollerNN operator=(const FairMQPollerNN&) = delete;
@ -45,7 +45,7 @@ class FairMQPollerNN : public FairMQPoller
virtual ~FairMQPollerNN();
private:
FairMQPollerNN(FairMQSocket& cmdSocket, FairMQSocket& dataSocket);
FairMQPollerNN(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket);
nn_pollfd* items;
int fNumItems;

View File

@ -20,7 +20,7 @@
using namespace std;
FairMQSocketNN::FairMQSocketNN(const string& type, const std::string& name, int numIoThreads)
FairMQSocketNN::FairMQSocketNN(const string& type, const std::string& name, const int numIoThreads)
: FairMQSocket(0, 0, NN_DONTWAIT)
, fSocket(-1)
, fId()

View File

@ -26,7 +26,7 @@
class FairMQSocketNN : public FairMQSocket
{
public:
FairMQSocketNN(const std::string& type, const std::string& name, int numIoThreads); // numIoThreads is not used in nanomsg.
FairMQSocketNN(const std::string& type, const std::string& name, const int numIoThreads); // numIoThreads is not used in nanomsg.
FairMQSocketNN(const FairMQSocketNN&) = delete;
FairMQSocketNN operator=(const FairMQSocketNN&) = delete;

View File

@ -26,17 +26,17 @@ FairMQMessage* FairMQTransportFactoryNN::CreateMessage()
return new FairMQMessageNN();
}
FairMQMessage* FairMQTransportFactoryNN::CreateMessage(size_t size)
FairMQMessage* FairMQTransportFactoryNN::CreateMessage(const size_t size)
{
return new FairMQMessageNN(size);
}
FairMQMessage* FairMQTransportFactoryNN::CreateMessage(void* data, size_t size, fairmq_free_fn *ffn, void* hint)
FairMQMessage* FairMQTransportFactoryNN::CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint)
{
return new FairMQMessageNN(data, size, ffn, hint);
}
FairMQSocket* FairMQTransportFactoryNN::CreateSocket(const string& type, const std::string& name, int numIoThreads)
FairMQSocket* FairMQTransportFactoryNN::CreateSocket(const string& type, const std::string& name, const int numIoThreads)
{
return new FairMQSocketNN(type, name, numIoThreads);
}
@ -46,12 +46,12 @@ FairMQPoller* FairMQTransportFactoryNN::CreatePoller(const vector<FairMQChannel>
return new FairMQPollerNN(channels);
}
FairMQPoller* FairMQTransportFactoryNN::CreatePoller(std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, std::initializer_list<std::string> channelList)
FairMQPoller* FairMQTransportFactoryNN::CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::initializer_list<std::string> channelList)
{
return new FairMQPollerNN(channelsMap, channelList);
}
FairMQPoller* FairMQTransportFactoryNN::CreatePoller(FairMQSocket& cmdSocket, FairMQSocket& dataSocket)
FairMQPoller* FairMQTransportFactoryNN::CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket)
{
return new FairMQPollerNN(cmdSocket, dataSocket);
}

View File

@ -28,14 +28,14 @@ class FairMQTransportFactoryNN : public FairMQTransportFactory
FairMQTransportFactoryNN();
virtual FairMQMessage* CreateMessage();
virtual FairMQMessage* CreateMessage(size_t size);
virtual FairMQMessage* CreateMessage(void* data, size_t size, fairmq_free_fn *ffn = NULL, void* hint = NULL);
virtual FairMQMessage* CreateMessage(const size_t size);
virtual FairMQMessage* CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn = NULL, void* hint = NULL);
virtual FairMQSocket* CreateSocket(const std::string& type, const std::string& name, int numIoThreads);
virtual FairMQSocket* CreateSocket(const std::string& type, const std::string& name, const int numIoThreads);
virtual FairMQPoller* CreatePoller(const std::vector<FairMQChannel>& channels);
virtual FairMQPoller* CreatePoller(std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, std::initializer_list<std::string> channelList);
virtual FairMQPoller* CreatePoller(FairMQSocket& cmdSocket, FairMQSocket& dataSocket);
virtual FairMQPoller* CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::initializer_list<std::string> channelList);
virtual FairMQPoller* CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket);
virtual ~FairMQTransportFactoryNN() {};
};

View File

@ -101,7 +101,7 @@ int FairMQProgOptions::ParseAll(const int argc, char** argv, bool allowUnregiste
{
LOG(WARN) << p;
}
LOG(WARN) << "No channels will be created (You can still fill these manually).";
LOG(WARN) << "No channels will be created (You can create them manually).";
// return 1;
}

View File

@ -24,6 +24,8 @@ echo "Usage: startBenchmark [message size=1000000] [number of messages=0]"
SAMPLER="bsampler"
SAMPLER+=" --id bsampler1"
#SAMPLER+=" --io-threads 2"
#SAMPLER+=" --transport nanomsg"
SAMPLER+=" --msg-size $msgSize"
SAMPLER+=" --num-msgs $numMsgs"
SAMPLER+=" --config-json-file @CMAKE_BINARY_DIR@/bin/config/benchmark.json"
@ -31,6 +33,8 @@ xterm -geometry 80x23+0+0 -hold -e taskset 0x1 @CMAKE_BINARY_DIR@/bin/$SAMPLER &
SINK="sink"
SINK+=" --id sink1"
#SINK+=" --io-threads 2"
#SINK+=" --transport nanomsg"
SINK+=" --num-msgs $numMsgs"
SINK+=" --config-json-file @CMAKE_BINARY_DIR@/bin/config/benchmark.json"
xterm -geometry 80x23+500+0 -hold -e taskset 0x2 @CMAKE_BINARY_DIR@/bin/$SINK &

View File

@ -23,21 +23,21 @@ FairMQTestPub::FairMQTestPub()
void FairMQTestPub::Run()
{
std::unique_ptr<FairMQMessage> ready1Msg(fTransportFactory->CreateMessage());
int r1 = fChannels.at("control").at(0).Receive(ready1Msg);
std::unique_ptr<FairMQMessage> ready2Msg(fTransportFactory->CreateMessage());
int r2 = fChannels.at("control").at(0).Receive(ready2Msg);
std::unique_ptr<FairMQMessage> ready1Msg(NewMessage());
int r1 = Receive(ready1Msg, "control");
std::unique_ptr<FairMQMessage> ready2Msg(NewMessage());
int r2 = Receive(ready2Msg, "control");
if (r1 >= 0 && r2 >= 0)
{
std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
fChannels.at("data").at(0).Send(msg);
std::unique_ptr<FairMQMessage> msg(NewMessage());
Send(msg, "data");
std::unique_ptr<FairMQMessage> ack1Msg(fTransportFactory->CreateMessage());
std::unique_ptr<FairMQMessage> ack2Msg(fTransportFactory->CreateMessage());
if (fChannels.at("control").at(0).Receive(ack1Msg) >= 0)
std::unique_ptr<FairMQMessage> ack1Msg(NewMessage());
std::unique_ptr<FairMQMessage> ack2Msg(NewMessage());
if (Receive(ack1Msg, "control") >= 0)
{
if (fChannels.at("control").at(0).Receive(ack2Msg) >= 0)
if (Receive(ack2Msg, "control") >= 0)
{
LOG(INFO) << "PUB-SUB test successfull";
}

View File

@ -23,14 +23,14 @@ FairMQTestSub::FairMQTestSub()
void FairMQTestSub::Run()
{
std::unique_ptr<FairMQMessage> readyMsg(fTransportFactory->CreateMessage());
fChannels.at("control").at(0).Send(readyMsg);
std::unique_ptr<FairMQMessage> readyMsg(NewMessage());
Send(readyMsg, "control");
std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
if (fChannels.at("data").at(0).Receive(msg) >= 0)
std::unique_ptr<FairMQMessage> msg(NewMessage());
if (Receive(msg, "data") >= 0)
{
std::unique_ptr<FairMQMessage> ackMsg(fTransportFactory->CreateMessage());
fChannels.at("control").at(0).Send(ackMsg);
std::unique_ptr<FairMQMessage> ackMsg(NewMessage());
Send(ackMsg, "control");
}
else
{

View File

@ -23,9 +23,9 @@ FairMQTestPull::FairMQTestPull()
void FairMQTestPull::Run()
{
std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
std::unique_ptr<FairMQMessage> msg(NewMessage());
if (fChannels.at("data").at(0).Receive(msg) >= 0)
if (Receive(msg, "data") >= 0)
{
LOG(INFO) << "PUSH-PULL test successfull";
}

View File

@ -23,8 +23,8 @@ FairMQTestPush::FairMQTestPush()
void FairMQTestPush::Run()
{
std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
fChannels.at("data").at(0).Send(msg);
std::unique_ptr<FairMQMessage> msg(NewMessage());
Send(msg, "data");
}
FairMQTestPush::~FairMQTestPush()

View File

@ -23,11 +23,11 @@ FairMQTestRep::FairMQTestRep()
void FairMQTestRep::Run()
{
std::unique_ptr<FairMQMessage> request(fTransportFactory->CreateMessage());
if (fChannels.at("data").at(0).Receive(request) >= 0)
std::unique_ptr<FairMQMessage> request(NewMessage());
if (Receive(request, "data") >= 0)
{
std::unique_ptr<FairMQMessage> reply(fTransportFactory->CreateMessage());
fChannels.at("data").at(0).Send(reply);
std::unique_ptr<FairMQMessage> reply(NewMessage());
Send(reply, "data");
}
}

View File

@ -23,11 +23,11 @@ FairMQTestReq::FairMQTestReq()
void FairMQTestReq::Run()
{
std::unique_ptr<FairMQMessage> request(fTransportFactory->CreateMessage());
fChannels.at("data").at(0).Send(request);
std::unique_ptr<FairMQMessage> request(NewMessage());
Send(request, "data");
std::unique_ptr<FairMQMessage> reply(fTransportFactory->CreateMessage());
if (fChannels.at("data").at(0).Receive(reply) >= 0)
std::unique_ptr<FairMQMessage> reply(NewMessage());
if (Receive(reply, "data") >= 0)
{
LOG(INFO) << "REQ-REP test successfull";
}

View File

@ -56,11 +56,10 @@ class TransferTimeoutTester : public FairMQDevice
if (getSndOK && getRcvOK)
{
void* buffer = malloc(1000);
std::unique_ptr<FairMQMessage> msg1(fTransportFactory->CreateMessage(buffer, 1000));
std::unique_ptr<FairMQMessage> msg2(fTransportFactory->CreateMessage());
std::unique_ptr<FairMQMessage> msg1(NewMessage());
std::unique_ptr<FairMQMessage> msg2(NewMessage());
if (fChannels.at("data-out").at(0).Send(msg1) == -2)
if (Send(msg1, "data-out") == -2)
{
LOG(INFO) << "send canceled";
sendCanceling = true;
@ -70,7 +69,7 @@ class TransferTimeoutTester : public FairMQDevice
LOG(ERROR) << "send did not cancel";
}
if (fChannels.at("data-in").at(0).Receive(msg2) == -2)
if (Receive(msg2, "data-in") == -2)
{
LOG(INFO) << "receive canceled";
receiveCanceling = true;

View File

@ -29,7 +29,7 @@ FairMQMessageZMQ::FairMQMessageZMQ()
}
}
FairMQMessageZMQ::FairMQMessageZMQ(size_t size)
FairMQMessageZMQ::FairMQMessageZMQ(const size_t size)
: fMessage()
{
if (zmq_msg_init_size(&fMessage, size) != 0)
@ -38,7 +38,7 @@ FairMQMessageZMQ::FairMQMessageZMQ(size_t size)
}
}
FairMQMessageZMQ::FairMQMessageZMQ(void* data, size_t size, fairmq_free_fn *ffn, void* hint)
FairMQMessageZMQ::FairMQMessageZMQ(void* data, const size_t size, fairmq_free_fn *ffn, void* hint)
: fMessage()
{
if (zmq_msg_init_data(&fMessage, data, size, ffn ? ffn : &CleanUp, hint) != 0)
@ -56,7 +56,7 @@ void FairMQMessageZMQ::Rebuild()
}
}
void FairMQMessageZMQ::Rebuild(size_t size)
void FairMQMessageZMQ::Rebuild(const size_t size)
{
CloseMessage();
if (zmq_msg_init_size(&fMessage, size) != 0)
@ -65,7 +65,7 @@ void FairMQMessageZMQ::Rebuild(size_t size)
}
}
void FairMQMessageZMQ::Rebuild(void* data, size_t size, fairmq_free_fn *ffn, void* hint)
void FairMQMessageZMQ::Rebuild(void* data, const size_t size, fairmq_free_fn *ffn, void* hint)
{
CloseMessage();
if (zmq_msg_init_data(&fMessage, data, size, ffn ? ffn : &CleanUp, hint) != 0)
@ -89,7 +89,7 @@ size_t FairMQMessageZMQ::GetSize()
return zmq_msg_size(&fMessage);
}
void FairMQMessageZMQ::SetMessage(void*, size_t)
void FairMQMessageZMQ::SetMessage(void*, const size_t)
{
// dummy method to comply with the interface. functionality not allowed in zeromq.
}

View File

@ -25,18 +25,18 @@ class FairMQMessageZMQ : public FairMQMessage
{
public:
FairMQMessageZMQ();
FairMQMessageZMQ(size_t size);
FairMQMessageZMQ(void* data, size_t size, fairmq_free_fn *ffn = &CleanUp, void* hint = NULL);
FairMQMessageZMQ(const size_t size);
FairMQMessageZMQ(void* data, const size_t size, fairmq_free_fn *ffn = &CleanUp, void* hint = NULL);
virtual void Rebuild();
virtual void Rebuild(size_t size);
virtual void Rebuild(void* data, size_t size, fairmq_free_fn *ffn = &CleanUp, void* hint = NULL);
virtual void Rebuild(const size_t size);
virtual void Rebuild(void* data, const size_t size, fairmq_free_fn *ffn = &CleanUp, void* hint = NULL);
virtual void* GetMessage();
virtual void* GetData();
virtual size_t GetSize();
virtual void SetMessage(void* data, size_t size);
virtual void SetMessage(void* data, const size_t size);
virtual void CloseMessage();
virtual void Copy(FairMQMessage* msg);

View File

@ -57,7 +57,7 @@ FairMQPollerZMQ::FairMQPollerZMQ(const vector<FairMQChannel>& channels)
}
}
FairMQPollerZMQ::FairMQPollerZMQ(unordered_map<string, vector<FairMQChannel>>& channelsMap, initializer_list<string> channelList)
FairMQPollerZMQ::FairMQPollerZMQ(const unordered_map<string, vector<FairMQChannel>>& channelsMap, const initializer_list<string> channelList)
: items()
, fNumItems(0)
, fOffsetMap()
@ -119,7 +119,7 @@ FairMQPollerZMQ::FairMQPollerZMQ(unordered_map<string, vector<FairMQChannel>>& c
}
}
FairMQPollerZMQ::FairMQPollerZMQ(FairMQSocket& cmdSocket, FairMQSocket& dataSocket)
FairMQPollerZMQ::FairMQPollerZMQ(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket)
: items()
, fNumItems(2)
, fOffsetMap()

View File

@ -32,7 +32,7 @@ class FairMQPollerZMQ : public FairMQPoller
public:
FairMQPollerZMQ(const std::vector<FairMQChannel>& channels);
FairMQPollerZMQ(std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, std::initializer_list<std::string> channelList);
FairMQPollerZMQ(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::initializer_list<std::string> channelList);
FairMQPollerZMQ(const FairMQPollerZMQ&) = delete;
FairMQPollerZMQ operator=(const FairMQPollerZMQ&) = delete;
@ -45,7 +45,7 @@ class FairMQPollerZMQ : public FairMQPoller
virtual ~FairMQPollerZMQ();
private:
FairMQPollerZMQ(FairMQSocket& cmdSocket, FairMQSocket& dataSocket);
FairMQPollerZMQ(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket);
zmq_pollitem_t* items;
int fNumItems;

View File

@ -24,7 +24,7 @@ using namespace std;
// Context to hold the ZeroMQ sockets
boost::shared_ptr<FairMQContextZMQ> FairMQSocketZMQ::fContext = boost::shared_ptr<FairMQContextZMQ>(new FairMQContextZMQ(1));
FairMQSocketZMQ::FairMQSocketZMQ(const string& type, const string& name, int numIoThreads)
FairMQSocketZMQ::FairMQSocketZMQ(const string& type, const string& name, const int numIoThreads)
: FairMQSocket(ZMQ_SNDMORE, ZMQ_RCVMORE, ZMQ_DONTWAIT)
, fSocket(NULL)
, fId()

View File

@ -23,7 +23,7 @@
class FairMQSocketZMQ : public FairMQSocket
{
public:
FairMQSocketZMQ(const std::string& type, const std::string& name, int numIoThreads);
FairMQSocketZMQ(const std::string& type, const std::string& name, const int numIoThreads);
FairMQSocketZMQ(const FairMQSocketZMQ&) = delete;
FairMQSocketZMQ operator=(const FairMQSocketZMQ&) = delete;

View File

@ -30,17 +30,17 @@ FairMQMessage* FairMQTransportFactoryZMQ::CreateMessage()
return new FairMQMessageZMQ();
}
FairMQMessage* FairMQTransportFactoryZMQ::CreateMessage(size_t size)
FairMQMessage* FairMQTransportFactoryZMQ::CreateMessage(const size_t size)
{
return new FairMQMessageZMQ(size);
}
FairMQMessage* FairMQTransportFactoryZMQ::CreateMessage(void* data, size_t size, fairmq_free_fn *ffn, void* hint)
FairMQMessage* FairMQTransportFactoryZMQ::CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint)
{
return new FairMQMessageZMQ(data, size, ffn, hint);
}
FairMQSocket* FairMQTransportFactoryZMQ::CreateSocket(const string& type, const std::string& name, int numIoThreads)
FairMQSocket* FairMQTransportFactoryZMQ::CreateSocket(const string& type, const std::string& name, const int numIoThreads)
{
return new FairMQSocketZMQ(type, name, numIoThreads);
}
@ -50,12 +50,12 @@ FairMQPoller* FairMQTransportFactoryZMQ::CreatePoller(const vector<FairMQChannel
return new FairMQPollerZMQ(channels);
}
FairMQPoller* FairMQTransportFactoryZMQ::CreatePoller(std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, std::initializer_list<std::string> channelList)
FairMQPoller* FairMQTransportFactoryZMQ::CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::initializer_list<std::string> channelList)
{
return new FairMQPollerZMQ(channelsMap, channelList);
}
FairMQPoller* FairMQTransportFactoryZMQ::CreatePoller(FairMQSocket& cmdSocket, FairMQSocket& dataSocket)
FairMQPoller* FairMQTransportFactoryZMQ::CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket)
{
return new FairMQPollerZMQ(cmdSocket, dataSocket);
}

View File

@ -29,14 +29,14 @@ class FairMQTransportFactoryZMQ : public FairMQTransportFactory
FairMQTransportFactoryZMQ();
virtual FairMQMessage* CreateMessage();
virtual FairMQMessage* CreateMessage(size_t size);
virtual FairMQMessage* CreateMessage(void* data, size_t size, fairmq_free_fn *ffn = NULL, void* hint = NULL);
virtual FairMQMessage* CreateMessage(const size_t size);
virtual FairMQMessage* CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn = NULL, void* hint = NULL);
virtual FairMQSocket* CreateSocket(const std::string& type, const std::string& name, int numIoThreads);
virtual FairMQSocket* CreateSocket(const std::string& type, const std::string& name, const int numIoThreads);
virtual FairMQPoller* CreatePoller(const std::vector<FairMQChannel>& channels);
virtual FairMQPoller* CreatePoller(std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, std::initializer_list<std::string> channelList);
virtual FairMQPoller* CreatePoller(FairMQSocket& cmdSocket, FairMQSocket& dataSocket);
virtual FairMQPoller* CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::initializer_list<std::string> channelList);
virtual FairMQPoller* CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket);
virtual ~FairMQTransportFactoryZMQ() {};
};