diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index 7e600385..c9d0535a 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -91,6 +91,7 @@ EndIf(NANOMSG_FOUND) # to copy src that are header-only files (e.g. c++ template) for FairRoot external installation # manual install (globbing add not recommended) Set(FAIRMQHEADERS + FairMQParts.h devices/GenericSampler.h devices/GenericSampler.tpl devices/GenericProcessor.h diff --git a/fairmq/FairMQChannel.cxx b/fairmq/FairMQChannel.cxx index e6b69d13..2b30c62b 100644 --- a/fairmq/FairMQChannel.cxx +++ b/fairmq/FairMQChannel.cxx @@ -451,55 +451,79 @@ int FairMQChannel::Send(const unique_ptr& msg) const return -2; } -int FairMQChannel::SendAsync(const unique_ptr& msg) const +uint64_t FairMQChannel::Send(const std::vector>& msgVec) const { - return fSocket->Send(msg.get(), fNoBlockFlag); + // Sending vector typicaly handles more then one part + if (msgVec.size() > 1) + { + uint64_t totalSize = 0; + + for (unsigned int i = 0; i < msgVec.size() - 1; ++i) + { + int nbytes = SendPart(msgVec[i]); + if (nbytes >= 0) + { + totalSize += nbytes; + } + else + { + return nbytes; + } + } + + int n = Send(msgVec.back()); + if (n >= 0) + { + totalSize += n; + } + else + { + return n; + } + + return totalSize; + } // If there's only one part, send it as a regular message + else if (msgVec.size() == 1) + { + return Send(msgVec.back()); + } + else // if the vector is empty, something might be wrong + { + LOG(WARN) << "Will not send empty vector"; + return -1; + } } -int FairMQChannel::SendPart(const unique_ptr& msg) const +uint64_t FairMQChannel::Receive(std::vector>& msgVec) const { - return fSocket->Send(msg.get(), fSndMoreFlag); -} + // Warn if the vector is filled before Receive() and empty it. + if (msgVec.size() > 0) + { + LOG(WARN) << "Message vector contains elements before Receive(), they will be deleted!"; + msgVec.clear(); + } -int FairMQChannel::SendPartAsync(const unique_ptr& msg) const -{ - return fSocket->Send(msg.get(), fSndMoreFlag|fNoBlockFlag); -} + uint64_t totalSize = 0; -// int FairMQChannel::SendParts(initializer_list> partsList) const -// { -// int totalSize = 0; -// initializer_list>::iterator it = partsList.end(); -// auto &last = --it; -// for (auto &p : partsList) -// { -// if (&p != last) -// { -// int nbytes = SendPart(p); -// if (nbytes >= 0) -// { -// totalSize += nbytes; -// } -// else -// { -// return nbytes; -// } -// } -// else -// { -// int nbytes = Send(p); -// if (nbytes >= 0) -// { -// totalSize += nbytes; -// } -// else -// { -// return nbytes; -// } -// } -// } -// return totalSize; -// } + do + { + std::unique_ptr part(fTransportFactory->CreateMessage()); + + int nbytes = Receive(part); + if (nbytes >= 0) + { + msgVec.push_back(std::move(part)); + totalSize += nbytes; + } + else + { + return nbytes; + } + } + while (ExpectsAnotherPart()); + + return totalSize; +} int FairMQChannel::Receive(const unique_ptr& msg) const { @@ -519,11 +543,6 @@ int FairMQChannel::Receive(const unique_ptr& msg) const return -2; } -int FairMQChannel::ReceiveAsync(const unique_ptr& msg) const -{ - return fSocket->Receive(msg.get(), fNoBlockFlag); -} - int FairMQChannel::Send(FairMQMessage* msg, const string& flag) const { if (flag == "") @@ -624,64 +643,6 @@ int FairMQChannel::Receive(FairMQMessage* msg, const int flags) const } } -void FairMQChannel::SetSendTimeout(const int timeout) -{ - // if (fSocket) - // { - // if (fSocket->SetSendTimeout(timeout, fAddress, fMethod)) - // { - fSndTimeoutInMs = timeout; - // return true; - // } - // } - - // LOG(ERROR) << "SetSendTimeout() failed - socket is not initialized!"; - // return false; -} - -int FairMQChannel::GetSendTimeout() const -{ - return fSndTimeoutInMs; - // if (fSocket) - // { - // return fSocket->GetSendTimeout(); - // } - // else - // { - // LOG(ERROR) << "GetSendTimeout() failed - socket is not initialized!"; - // return -1; - // } -} - -void FairMQChannel::SetReceiveTimeout(const int timeout) -{ - // if (fSocket) - // { - // if (fSocket->SetReceiveTimeout(timeout, fAddress, fMethod)) - // { - fRcvTimeoutInMs = timeout; - // return true; - // } - // } - - // LOG(ERROR) << "SetReceiveTimeout() failed - socket is not initialized!"; - // return false; -} - -int FairMQChannel::GetReceiveTimeout() const -{ - return fRcvTimeoutInMs; - // if (fSocket) - // { - // return fSocket->GetReceiveTimeout(); - // } - // else - // { - // LOG(ERROR) << "GetReceiveTimeout() failed - socket is not initialized!"; - // return -1; - // } -} - bool FairMQChannel::ExpectsAnotherPart() const { int64_t more = 0; diff --git a/fairmq/FairMQChannel.h b/fairmq/FairMQChannel.h index e8490b50..bf167343 100644 --- a/fairmq/FairMQChannel.h +++ b/fairmq/FairMQChannel.h @@ -116,38 +116,41 @@ class FairMQChannel /// for some other reason (e.g. no peers connected for a binding socket), the method returns 0. /// /// @param msg Constant reference of unique_ptr to a FairMQMessage - /// @return Returns the number of bytes that have been queued. If queueing failed due to + /// @return Number of bytes that have been queued. If queueing failed due to /// full queue or no connected peers (when binding), returns -2. In case of errors, returns -1. - int SendAsync(const std::unique_ptr& msg) const; + inline int SendAsync(const std::unique_ptr& msg) const + { + return fSocket->Send(msg.get(), fNoBlockFlag); + } /// Queues the current message as a part of a multi-part message /// @details SendPart method queues the provided message as a part of a multi-part message. /// The actual transfer over the network is initiated once final part has been queued with the Send() or SendAsync() methods. /// /// @param msg Constant reference of unique_ptr to a FairMQMessage - /// @return Returns the number of bytes that have been queued. -2 If queueing was not possible. In case of errors, returns -1. - int SendPart(const std::unique_ptr& msg) const; + /// @return Number of bytes that have been queued. -2 If queueing was not possible. In case of errors, returns -1. + inline int SendPart(const std::unique_ptr& msg) const + { + return fSocket->Send(msg.get(), fSndMoreFlag); + } /// Queues the current message as a part of a multi-part message without blocking /// @details SendPart method queues the provided message as a part of a multi-part message without blocking. /// The actual transfer over the network is initiated once final part has been queued with the Send() or SendAsync() methods. /// /// @param msg Constant reference of unique_ptr to a FairMQMessage - /// @return Returns the number of bytes that have been queued. -2 If queueing was not possible. In case of errors, returns -1. - int SendPartAsync(const std::unique_ptr& msg) const; - - // /// Sends the messages provided as arguments as a multi-part message. - // /// - // /// @param partsList Initializer list of FairMQMessages - // /// @return Returns the number of bytes that have been queued. -2 If queueing was not possible. In case of errors, returns -1. - // int SendParts(std::initializer_list> partsList) const; + /// @return Number of bytes that have been queued. -2 If queueing was not possible. In case of errors, returns -1. + inline int SendPartAsync(const std::unique_ptr& msg) const + { + return fSocket->Send(msg.get(), fSndMoreFlag|fNoBlockFlag); + } /// Receives a message from the socket queue. /// @details Receive method attempts to receive a message from the input queue. /// If the queue is empty the method blocks. /// /// @param msg Constant reference of unique_ptr to a FairMQMessage - /// @return Returns the number of bytes that have been received. -2 If reading from the queue was not possible or timed out. In case of errors, returns -1. + /// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out. In case of errors, returns -1. int Receive(const std::unique_ptr& msg) const; /// Receives a message in non-blocking mode. @@ -155,9 +158,26 @@ class FairMQChannel /// If the queue is empty the method returns 0. /// /// @param msg Constant reference of unique_ptr to a FairMQMessage - /// @return Returns the number of bytes that have been received. If queue is empty, returns -2. + /// @return Number of bytes that have been received. If queue is empty, returns -2. /// In case of errors, returns -1. - int ReceiveAsync(const std::unique_ptr& msg) const; + inline int ReceiveAsync(const std::unique_ptr& msg) const + { + return fSocket->Receive(msg.get(), fNoBlockFlag); + } + + /// Shorthand method to send a vector of messages on `chan` at index `i` + /// @param msgVec message vector reference + /// @param chan channel name + /// @param i channel index + /// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. In case of errors, returns -1. + uint64_t Send(const std::vector>& msgVec) const; + + /// Shorthand method to receive a vector of messages on `chan` at index `i` + /// @param msgVec message vector reference + /// @param chan channel name + /// @param i channel index + /// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out. In case of errors, returns -1. + uint64_t Receive(std::vector>& msgVec) const; // DEPRECATED socket method wrappers with raw pointers and flag checks int Send(FairMQMessage* msg, const std::string& flag = "") const; @@ -167,19 +187,31 @@ class FairMQChannel /// Sets a timeout on the (blocking) Send method /// @param timeout timeout value in milliseconds - void SetSendTimeout(const int timeout); + inline void SetSendTimeout(const int timeout) + { + fSndTimeoutInMs = timeout; + } /// Gets the current value of the timeout on the (blocking) Send method /// @return Timeout value in milliseconds. -1 for no timeout. - int GetSendTimeout() const; + inline int GetSendTimeout() const + { + return fSndTimeoutInMs; + } /// Sets a timeout on the (blocking) Receive method /// @param timeout timeout value in milliseconds - void SetReceiveTimeout(const int timeout); + inline void SetReceiveTimeout(const int timeout) + { + fRcvTimeoutInMs = timeout; + } /// Gets the current value of the timeout on the (blocking) Receive method /// @return Timeout value in milliseconds. -1 for no timeout. - int GetReceiveTimeout() const; + inline int GetReceiveTimeout() const + { + return fRcvTimeoutInMs; + } /// Checks if the socket is expecting to receive another part of a multipart message. /// @return Return true if the socket expects another part of a multipart message and false otherwise. diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index bfc86cf5..37a0beda 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -666,12 +666,6 @@ void FairMQDevice::InteractiveStateLoop() tcsetattr(STDIN_FILENO, TCSANOW, &t); // apply the new settings } -inline void FairMQDevice::PrintInteractiveStateLoopHelp() -{ - LOG(INFO) << "Use keys to control the state machine:"; - LOG(INFO) << "[h] help, [p] pause, [r] run, [s] stop, [t] reset task, [d] reset device, [q] end, [j] init task, [i] init device"; -} - void FairMQDevice::Unblock() { FairMQMessage* cmd = fTransportFactory->CreateMessage(); diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index 37c5bd3a..a930aac3 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -26,6 +26,8 @@ #include "FairMQTransportFactory.h" #include "FairMQSocket.h" #include "FairMQChannel.h" +#include "FairMQMessage.h" +#include "FairMQParts.h" class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable { @@ -67,6 +69,92 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable /// @param name Name of the channel void PrintChannel(const std::string& name); + /// Shorthand method to send `msg` on `chan` at index `i` + /// @param msg message reference + /// @param chan channel name + /// @param i channel index + /// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. In case of errors, returns -1. + inline int Send(const std::unique_ptr& msg, const std::string& chan, const int i = 0) const + { + return fChannels.at(chan).at(i).Send(msg); + } + + /// Shorthand method to receive `msg` on `chan` at index `i` + /// @param msg message reference + /// @param chan channel name + /// @param i channel index + /// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out. In case of errors, returns -1. + inline int Receive(const std::unique_ptr& msg, const std::string& chan, const int i = 0) const + { + return fChannels.at(chan).at(i).Receive(msg); + } + + /// Shorthand method to send a vector of messages on `chan` at index `i` + /// @param msgVec message vector reference + /// @param chan channel name + /// @param i channel index + /// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. In case of errors, returns -1. + inline uint64_t Send(const std::vector>& msgVec, const std::string& chan, const int i = 0) const + { + return fChannels.at(chan).at(i).Send(msgVec); + } + + /// Shorthand method to receive a vector of messages on `chan` at index `i` + /// @param msgVec message vector reference + /// @param chan channel name + /// @param i channel index + /// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out. In case of errors, returns -1. + inline uint64_t Receive(std::vector>& msgVec, const std::string& chan, const int i = 0) const + { + return fChannels.at(chan).at(i).Receive(msgVec); + } + + /// Shorthand method to send FairMQParts on `chan` at index `i` + /// @param parts parts reference + /// @param chan channel name + /// @param i channel index + /// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. In case of errors, returns -1. + inline uint64_t Send(const FairMQParts& parts, const std::string& chan, const int i = 0) const + { + return fChannels.at(chan).at(i).Send(parts.fParts); + } + + /// Shorthand method to receive FairMQParts on `chan` at index `i` + /// @param parts parts reference + /// @param chan channel name + /// @param i channel index + /// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out. In case of errors, returns -1. + inline uint64_t Receive(FairMQParts& parts, const std::string& chan, const int i = 0) const + { + return fChannels.at(chan).at(i).Receive(parts.fParts); + } + + /// @brief Create empty FairMQMessage + /// @return pointer to FairMQMessage + inline FairMQMessage* NewMessage() const + { + return fTransportFactory->CreateMessage(); + } + + /// @brief Create new FairMQMessage of specified size + /// @param size message size + /// @return pointer to FairMQMessage + inline FairMQMessage* NewMessage(int size) const + { + return fTransportFactory->CreateMessage(size); + } + + /// @brief Create new FairMQMessage with user provided buffer and size + /// @param data pointer to user provided buffer + /// @param size size of the user provided buffer + /// @param ffn optional callback, called when the message is transfered (and can be deleted) + /// @param hint optional helper pointer that can be used in the callback + /// @return pointer to FairMQMessage + inline FairMQMessage* NewMessage(void* data, int size, fairmq_free_fn* ffn = NULL, void* hint = NULL) const + { + return fTransportFactory->CreateMessage(data, size, ffn, hint); + } + /// Waits for the first initialization run to finish void WaitForInitialValidation(); @@ -74,7 +162,11 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable /// Works only when running in a terminal. Running in background would exit, because no interactive input (std::cin) is possible. void InteractiveStateLoop(); /// Prints the available commands of the InteractiveStateLoop() - void PrintInteractiveStateLoopHelp(); + inline void PrintInteractiveStateLoopHelp() + { + LOG(INFO) << "Use keys to control the state machine:"; + LOG(INFO) << "[h] help, [p] pause, [r] run, [s] stop, [t] reset task, [d] reset device, [q] end, [j] init task, [i] init device"; + } /// Set Device properties stored as strings /// @param key Property key diff --git a/fairmq/FairMQParts.h b/fairmq/FairMQParts.h new file mode 100644 index 00000000..dd15bfc7 --- /dev/null +++ b/fairmq/FairMQParts.h @@ -0,0 +1,52 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#ifndef FAIRMQPARTS_H_ +#define FAIRMQPARTS_H_ + +#include +#include // unique_ptr + +#include "FairMQTransportFactory.h" +#include "FairMQMessage.h" + +class FairMQParts +{ + public: + /// Default constructor + FairMQParts() {}; + /// Copy Constructor + FairMQParts(const FairMQParts&) = delete; + /// Assignment operator + FairMQParts& operator=(const FairMQParts&) = delete; + /// Default destructor + ~FairMQParts() {}; + + /// Adds part (FairMQMessage) to the container + /// @param msg message pointer (for example created with NewMessage() method of FairMQDevice) + inline void AddPart(FairMQMessage* msg) + { + fParts.push_back(std::unique_ptr(msg)); + } + + /// Get reference to part in the container at index (without bounds check) + /// @param index container index + inline FairMQMessage& operator[](const int index) { return *(fParts[index]); } + + /// Get reference to part in the container at index (with bounds check) + /// @param index container index + inline FairMQMessage& At(const int index) { return *(fParts.at(index)); } + + /// Get number of parts in the container + /// @return number of parts in the container + inline int Size() const { return fParts.size(); } + + std::vector> fParts; +}; + +#endif /* FAIRMQPARTS_H_ */ diff --git a/fairmq/FairMQStateMachine.h b/fairmq/FairMQStateMachine.h index 39373819..f584a9a6 100644 --- a/fairmq/FairMQStateMachine.h +++ b/fairmq/FairMQStateMachine.h @@ -15,7 +15,7 @@ #ifndef FAIRMQSTATEMACHINE_H_ #define FAIRMQSTATEMACHINE_H_ -#define FAIRMQ_INTERFACE_VERSION 2 +#define FAIRMQ_INTERFACE_VERSION 3 #include #include diff --git a/fairmq/FairMQTransportFactory.h b/fairmq/FairMQTransportFactory.h index 21a14e8d..6951d889 100644 --- a/fairmq/FairMQTransportFactory.h +++ b/fairmq/FairMQTransportFactory.h @@ -31,14 +31,14 @@ class FairMQTransportFactory { public: virtual FairMQMessage* CreateMessage() = 0; - virtual FairMQMessage* CreateMessage(size_t size) = 0; - virtual FairMQMessage* CreateMessage(void* data, size_t size, fairmq_free_fn *ffn = NULL, void* hint = NULL) = 0; + virtual FairMQMessage* CreateMessage(const size_t size) = 0; + virtual FairMQMessage* CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn = NULL, void* hint = NULL) = 0; - virtual FairMQSocket* CreateSocket(const std::string& type, const std::string& name, int numIoThreads) = 0; + virtual FairMQSocket* CreateSocket(const std::string& type, const std::string& name, const int numIoThreads) = 0; virtual FairMQPoller* CreatePoller(const std::vector& channels) = 0; - virtual FairMQPoller* CreatePoller(std::unordered_map>& channelsMap, std::initializer_list channelList) = 0; - virtual FairMQPoller* CreatePoller(FairMQSocket& cmdSocket, FairMQSocket& dataSocket) = 0; + virtual FairMQPoller* CreatePoller(const std::unordered_map>& channelsMap, const std::initializer_list channelList) = 0; + virtual FairMQPoller* CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) = 0; virtual ~FairMQTransportFactory() {}; }; diff --git a/fairmq/nanomsg/FairMQMessageNN.cxx b/fairmq/nanomsg/FairMQMessageNN.cxx index 34f6f30c..4b3f12e3 100644 --- a/fairmq/nanomsg/FairMQMessageNN.cxx +++ b/fairmq/nanomsg/FairMQMessageNN.cxx @@ -34,7 +34,7 @@ FairMQMessageNN::FairMQMessageNN() } } -FairMQMessageNN::FairMQMessageNN(size_t size) +FairMQMessageNN::FairMQMessageNN(const size_t size) : fMessage(NULL) , fSize(0) , fReceiving(false) @@ -53,7 +53,7 @@ FairMQMessageNN::FairMQMessageNN(size_t size) * create FairMQMessage object only with size parameter and fill it with data. * possible TODO: make this zero copy (will should then be as efficient as ZeroMQ). */ -FairMQMessageNN::FairMQMessageNN(void* data, size_t size, fairmq_free_fn *ffn, void* hint) +FairMQMessageNN::FairMQMessageNN(void* data, const size_t size, fairmq_free_fn *ffn, void* hint) : fMessage(NULL) , fSize(0) , fReceiving(false) @@ -63,16 +63,19 @@ FairMQMessageNN::FairMQMessageNN(void* data, size_t size, fairmq_free_fn *ffn, v { LOG(ERROR) << "failed allocating message, reason: " << nn_strerror(errno); } - memcpy(fMessage, data, size); - fSize = size; - - if (ffn) - { - ffn(data, hint); - } else { - if(data) free(data); + memcpy(fMessage, data, size); + fSize = size; + + if (ffn) + { + ffn(data, hint); + } + else + { + if(data) free(data); + } } } @@ -82,7 +85,7 @@ void FairMQMessageNN::Rebuild() fReceiving = false; } -void FairMQMessageNN::Rebuild(size_t size) +void FairMQMessageNN::Rebuild(const size_t size) { Clear(); fMessage = nn_allocmsg(size, 0); @@ -94,7 +97,7 @@ void FairMQMessageNN::Rebuild(size_t size) fReceiving = false; } -void FairMQMessageNN::Rebuild(void* data, size_t size, fairmq_free_fn *ffn, void* hint) +void FairMQMessageNN::Rebuild(void* data, const size_t size, fairmq_free_fn *ffn, void* hint) { Clear(); fMessage = nn_allocmsg(size, 0); @@ -102,17 +105,20 @@ void FairMQMessageNN::Rebuild(void* data, size_t size, fairmq_free_fn *ffn, void { LOG(ERROR) << "failed allocating message, reason: " << nn_strerror(errno); } - memcpy(fMessage, data, size); - fSize = size; - fReceiving = false; - - if(ffn) - { - ffn(data, hint); - } else { - if(data) free(data); + memcpy(fMessage, data, size); + fSize = size; + fReceiving = false; + + if(ffn) + { + ffn(data, hint); + } + else + { + if(data) free(data); + } } } @@ -131,7 +137,7 @@ size_t FairMQMessageNN::GetSize() return fSize; } -void FairMQMessageNN::SetMessage(void* data, size_t size) +void FairMQMessageNN::SetMessage(void* data, const size_t size) { fMessage = data; fSize = size; @@ -156,8 +162,11 @@ void FairMQMessageNN::Copy(FairMQMessage* msg) { LOG(ERROR) << "failed allocating message, reason: " << nn_strerror(errno); } - memcpy(fMessage, msg->GetMessage(), size); - fSize = size; + else + { + memcpy(fMessage, msg->GetMessage(), size); + fSize = size; + } } void FairMQMessageNN::Copy(const unique_ptr& msg) @@ -177,8 +186,11 @@ void FairMQMessageNN::Copy(const unique_ptr& msg) { LOG(ERROR) << "failed allocating message, reason: " << nn_strerror(errno); } - memcpy(fMessage, msg->GetMessage(), size); - fSize = size; + else + { + memcpy(fMessage, msg->GetMessage(), size); + fSize = size; + } } inline void FairMQMessageNN::Clear() diff --git a/fairmq/nanomsg/FairMQMessageNN.h b/fairmq/nanomsg/FairMQMessageNN.h index c0a19c0b..594ee5f9 100644 --- a/fairmq/nanomsg/FairMQMessageNN.h +++ b/fairmq/nanomsg/FairMQMessageNN.h @@ -23,20 +23,20 @@ class FairMQMessageNN : public FairMQMessage { public: FairMQMessageNN(); - FairMQMessageNN(size_t size); - FairMQMessageNN(void* data, size_t size, fairmq_free_fn *ffn = NULL, void* hint = NULL); + FairMQMessageNN(const size_t size); + FairMQMessageNN(void* data, const size_t size, fairmq_free_fn *ffn = NULL, void* hint = NULL); FairMQMessageNN(const FairMQMessageNN&) = delete; FairMQMessageNN operator=(const FairMQMessageNN&) = delete; virtual void Rebuild(); - virtual void Rebuild(size_t size); - virtual void Rebuild(void* data, size_t size, fairmq_free_fn *ffn = NULL, void* hint = NULL); + virtual void Rebuild(const size_t size); + virtual void Rebuild(void* data, const size_t size, fairmq_free_fn *ffn = NULL, void* hint = NULL); virtual void* GetMessage(); virtual void* GetData(); virtual size_t GetSize(); - virtual void SetMessage(void* data, size_t size); + virtual void SetMessage(void* data, const size_t size); virtual void CloseMessage() {}; virtual void Copy(FairMQMessage* msg); diff --git a/fairmq/nanomsg/FairMQPollerNN.cxx b/fairmq/nanomsg/FairMQPollerNN.cxx index 933f8afd..b9305190 100644 --- a/fairmq/nanomsg/FairMQPollerNN.cxx +++ b/fairmq/nanomsg/FairMQPollerNN.cxx @@ -59,7 +59,7 @@ FairMQPollerNN::FairMQPollerNN(const vector& channels) } } -FairMQPollerNN::FairMQPollerNN(unordered_map>& channelsMap, initializer_list channelList) +FairMQPollerNN::FairMQPollerNN(const unordered_map>& channelsMap, const initializer_list channelList) : items() , fNumItems(0) , fOffsetMap() @@ -118,7 +118,7 @@ FairMQPollerNN::FairMQPollerNN(unordered_map>& cha } } -FairMQPollerNN::FairMQPollerNN(FairMQSocket& cmdSocket, FairMQSocket& dataSocket) +FairMQPollerNN::FairMQPollerNN(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) : items() , fNumItems(2) , fOffsetMap() diff --git a/fairmq/nanomsg/FairMQPollerNN.h b/fairmq/nanomsg/FairMQPollerNN.h index ebba554f..d4f52140 100644 --- a/fairmq/nanomsg/FairMQPollerNN.h +++ b/fairmq/nanomsg/FairMQPollerNN.h @@ -32,7 +32,7 @@ class FairMQPollerNN : public FairMQPoller public: FairMQPollerNN(const std::vector& channels); - FairMQPollerNN(std::unordered_map>& channelsMap, std::initializer_list channelList); + FairMQPollerNN(const std::unordered_map>& channelsMap, const std::initializer_list channelList); FairMQPollerNN(const FairMQPollerNN&) = delete; FairMQPollerNN operator=(const FairMQPollerNN&) = delete; @@ -45,7 +45,7 @@ class FairMQPollerNN : public FairMQPoller virtual ~FairMQPollerNN(); private: - FairMQPollerNN(FairMQSocket& cmdSocket, FairMQSocket& dataSocket); + FairMQPollerNN(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket); nn_pollfd* items; int fNumItems; diff --git a/fairmq/nanomsg/FairMQSocketNN.cxx b/fairmq/nanomsg/FairMQSocketNN.cxx index 582a3bbb..eab99585 100644 --- a/fairmq/nanomsg/FairMQSocketNN.cxx +++ b/fairmq/nanomsg/FairMQSocketNN.cxx @@ -20,7 +20,7 @@ using namespace std; -FairMQSocketNN::FairMQSocketNN(const string& type, const std::string& name, int numIoThreads) +FairMQSocketNN::FairMQSocketNN(const string& type, const std::string& name, const int numIoThreads) : FairMQSocket(0, 0, NN_DONTWAIT) , fSocket(-1) , fId() diff --git a/fairmq/nanomsg/FairMQSocketNN.h b/fairmq/nanomsg/FairMQSocketNN.h index ac69326e..09ae2763 100644 --- a/fairmq/nanomsg/FairMQSocketNN.h +++ b/fairmq/nanomsg/FairMQSocketNN.h @@ -26,7 +26,7 @@ class FairMQSocketNN : public FairMQSocket { public: - FairMQSocketNN(const std::string& type, const std::string& name, int numIoThreads); // numIoThreads is not used in nanomsg. + FairMQSocketNN(const std::string& type, const std::string& name, const int numIoThreads); // numIoThreads is not used in nanomsg. FairMQSocketNN(const FairMQSocketNN&) = delete; FairMQSocketNN operator=(const FairMQSocketNN&) = delete; diff --git a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx index 8391f33c..6d09d414 100644 --- a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx +++ b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx @@ -26,17 +26,17 @@ FairMQMessage* FairMQTransportFactoryNN::CreateMessage() return new FairMQMessageNN(); } -FairMQMessage* FairMQTransportFactoryNN::CreateMessage(size_t size) +FairMQMessage* FairMQTransportFactoryNN::CreateMessage(const size_t size) { return new FairMQMessageNN(size); } -FairMQMessage* FairMQTransportFactoryNN::CreateMessage(void* data, size_t size, fairmq_free_fn *ffn, void* hint) +FairMQMessage* FairMQTransportFactoryNN::CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint) { return new FairMQMessageNN(data, size, ffn, hint); } -FairMQSocket* FairMQTransportFactoryNN::CreateSocket(const string& type, const std::string& name, int numIoThreads) +FairMQSocket* FairMQTransportFactoryNN::CreateSocket(const string& type, const std::string& name, const int numIoThreads) { return new FairMQSocketNN(type, name, numIoThreads); } @@ -46,12 +46,12 @@ FairMQPoller* FairMQTransportFactoryNN::CreatePoller(const vector return new FairMQPollerNN(channels); } -FairMQPoller* FairMQTransportFactoryNN::CreatePoller(std::unordered_map>& channelsMap, std::initializer_list channelList) +FairMQPoller* FairMQTransportFactoryNN::CreatePoller(const std::unordered_map>& channelsMap, const std::initializer_list channelList) { return new FairMQPollerNN(channelsMap, channelList); } -FairMQPoller* FairMQTransportFactoryNN::CreatePoller(FairMQSocket& cmdSocket, FairMQSocket& dataSocket) +FairMQPoller* FairMQTransportFactoryNN::CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) { return new FairMQPollerNN(cmdSocket, dataSocket); } diff --git a/fairmq/nanomsg/FairMQTransportFactoryNN.h b/fairmq/nanomsg/FairMQTransportFactoryNN.h index ee27845f..76638267 100644 --- a/fairmq/nanomsg/FairMQTransportFactoryNN.h +++ b/fairmq/nanomsg/FairMQTransportFactoryNN.h @@ -28,14 +28,14 @@ class FairMQTransportFactoryNN : public FairMQTransportFactory FairMQTransportFactoryNN(); virtual FairMQMessage* CreateMessage(); - virtual FairMQMessage* CreateMessage(size_t size); - virtual FairMQMessage* CreateMessage(void* data, size_t size, fairmq_free_fn *ffn = NULL, void* hint = NULL); + virtual FairMQMessage* CreateMessage(const size_t size); + virtual FairMQMessage* CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn = NULL, void* hint = NULL); - virtual FairMQSocket* CreateSocket(const std::string& type, const std::string& name, int numIoThreads); + virtual FairMQSocket* CreateSocket(const std::string& type, const std::string& name, const int numIoThreads); virtual FairMQPoller* CreatePoller(const std::vector& channels); - virtual FairMQPoller* CreatePoller(std::unordered_map>& channelsMap, std::initializer_list channelList); - virtual FairMQPoller* CreatePoller(FairMQSocket& cmdSocket, FairMQSocket& dataSocket); + virtual FairMQPoller* CreatePoller(const std::unordered_map>& channelsMap, const std::initializer_list channelList); + virtual FairMQPoller* CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket); virtual ~FairMQTransportFactoryNN() {}; }; diff --git a/fairmq/options/FairMQProgOptions.cxx b/fairmq/options/FairMQProgOptions.cxx index 76c621d8..2bcddfe4 100644 --- a/fairmq/options/FairMQProgOptions.cxx +++ b/fairmq/options/FairMQProgOptions.cxx @@ -101,7 +101,7 @@ int FairMQProgOptions::ParseAll(const int argc, char** argv, bool allowUnregiste { LOG(WARN) << p; } - LOG(WARN) << "No channels will be created (You can still fill these manually)."; + LOG(WARN) << "No channels will be created (You can create them manually)."; // return 1; } diff --git a/fairmq/run/startBenchmark.sh.in b/fairmq/run/startBenchmark.sh.in index 11c519a6..4e7a3201 100755 --- a/fairmq/run/startBenchmark.sh.in +++ b/fairmq/run/startBenchmark.sh.in @@ -24,6 +24,8 @@ echo "Usage: startBenchmark [message size=1000000] [number of messages=0]" SAMPLER="bsampler" SAMPLER+=" --id bsampler1" +#SAMPLER+=" --io-threads 2" +#SAMPLER+=" --transport nanomsg" SAMPLER+=" --msg-size $msgSize" SAMPLER+=" --num-msgs $numMsgs" SAMPLER+=" --config-json-file @CMAKE_BINARY_DIR@/bin/config/benchmark.json" @@ -31,6 +33,8 @@ xterm -geometry 80x23+0+0 -hold -e taskset 0x1 @CMAKE_BINARY_DIR@/bin/$SAMPLER & SINK="sink" SINK+=" --id sink1" +#SINK+=" --io-threads 2" +#SINK+=" --transport nanomsg" SINK+=" --num-msgs $numMsgs" SINK+=" --config-json-file @CMAKE_BINARY_DIR@/bin/config/benchmark.json" xterm -geometry 80x23+500+0 -hold -e taskset 0x2 @CMAKE_BINARY_DIR@/bin/$SINK & diff --git a/fairmq/test/pub-sub/FairMQTestPub.cxx b/fairmq/test/pub-sub/FairMQTestPub.cxx index 194259af..96190b7f 100644 --- a/fairmq/test/pub-sub/FairMQTestPub.cxx +++ b/fairmq/test/pub-sub/FairMQTestPub.cxx @@ -23,21 +23,21 @@ FairMQTestPub::FairMQTestPub() void FairMQTestPub::Run() { - std::unique_ptr ready1Msg(fTransportFactory->CreateMessage()); - int r1 = fChannels.at("control").at(0).Receive(ready1Msg); - std::unique_ptr ready2Msg(fTransportFactory->CreateMessage()); - int r2 = fChannels.at("control").at(0).Receive(ready2Msg); + std::unique_ptr ready1Msg(NewMessage()); + int r1 = Receive(ready1Msg, "control"); + std::unique_ptr ready2Msg(NewMessage()); + int r2 = Receive(ready2Msg, "control"); if (r1 >= 0 && r2 >= 0) { - std::unique_ptr msg(fTransportFactory->CreateMessage()); - fChannels.at("data").at(0).Send(msg); + std::unique_ptr msg(NewMessage()); + Send(msg, "data"); - std::unique_ptr ack1Msg(fTransportFactory->CreateMessage()); - std::unique_ptr ack2Msg(fTransportFactory->CreateMessage()); - if (fChannels.at("control").at(0).Receive(ack1Msg) >= 0) + std::unique_ptr ack1Msg(NewMessage()); + std::unique_ptr ack2Msg(NewMessage()); + if (Receive(ack1Msg, "control") >= 0) { - if (fChannels.at("control").at(0).Receive(ack2Msg) >= 0) + if (Receive(ack2Msg, "control") >= 0) { LOG(INFO) << "PUB-SUB test successfull"; } diff --git a/fairmq/test/pub-sub/FairMQTestSub.cxx b/fairmq/test/pub-sub/FairMQTestSub.cxx index 65ee3a96..62ca406a 100644 --- a/fairmq/test/pub-sub/FairMQTestSub.cxx +++ b/fairmq/test/pub-sub/FairMQTestSub.cxx @@ -23,14 +23,14 @@ FairMQTestSub::FairMQTestSub() void FairMQTestSub::Run() { - std::unique_ptr readyMsg(fTransportFactory->CreateMessage()); - fChannels.at("control").at(0).Send(readyMsg); + std::unique_ptr readyMsg(NewMessage()); + Send(readyMsg, "control"); - std::unique_ptr msg(fTransportFactory->CreateMessage()); - if (fChannels.at("data").at(0).Receive(msg) >= 0) + std::unique_ptr msg(NewMessage()); + if (Receive(msg, "data") >= 0) { - std::unique_ptr ackMsg(fTransportFactory->CreateMessage()); - fChannels.at("control").at(0).Send(ackMsg); + std::unique_ptr ackMsg(NewMessage()); + Send(ackMsg, "control"); } else { diff --git a/fairmq/test/push-pull/FairMQTestPull.cxx b/fairmq/test/push-pull/FairMQTestPull.cxx index 4d7cb3bd..b6e0077c 100644 --- a/fairmq/test/push-pull/FairMQTestPull.cxx +++ b/fairmq/test/push-pull/FairMQTestPull.cxx @@ -23,9 +23,9 @@ FairMQTestPull::FairMQTestPull() void FairMQTestPull::Run() { - std::unique_ptr msg(fTransportFactory->CreateMessage()); + std::unique_ptr msg(NewMessage()); - if (fChannels.at("data").at(0).Receive(msg) >= 0) + if (Receive(msg, "data") >= 0) { LOG(INFO) << "PUSH-PULL test successfull"; } diff --git a/fairmq/test/push-pull/FairMQTestPush.cxx b/fairmq/test/push-pull/FairMQTestPush.cxx index 79174e14..3c70d9b9 100644 --- a/fairmq/test/push-pull/FairMQTestPush.cxx +++ b/fairmq/test/push-pull/FairMQTestPush.cxx @@ -23,8 +23,8 @@ FairMQTestPush::FairMQTestPush() void FairMQTestPush::Run() { - std::unique_ptr msg(fTransportFactory->CreateMessage()); - fChannels.at("data").at(0).Send(msg); + std::unique_ptr msg(NewMessage()); + Send(msg, "data"); } FairMQTestPush::~FairMQTestPush() diff --git a/fairmq/test/req-rep/FairMQTestRep.cxx b/fairmq/test/req-rep/FairMQTestRep.cxx index a89d660b..188eb470 100644 --- a/fairmq/test/req-rep/FairMQTestRep.cxx +++ b/fairmq/test/req-rep/FairMQTestRep.cxx @@ -23,11 +23,11 @@ FairMQTestRep::FairMQTestRep() void FairMQTestRep::Run() { - std::unique_ptr request(fTransportFactory->CreateMessage()); - if (fChannels.at("data").at(0).Receive(request) >= 0) + std::unique_ptr request(NewMessage()); + if (Receive(request, "data") >= 0) { - std::unique_ptr reply(fTransportFactory->CreateMessage()); - fChannels.at("data").at(0).Send(reply); + std::unique_ptr reply(NewMessage()); + Send(reply, "data"); } } diff --git a/fairmq/test/req-rep/FairMQTestReq.cxx b/fairmq/test/req-rep/FairMQTestReq.cxx index a9f01660..a68abe02 100644 --- a/fairmq/test/req-rep/FairMQTestReq.cxx +++ b/fairmq/test/req-rep/FairMQTestReq.cxx @@ -23,11 +23,11 @@ FairMQTestReq::FairMQTestReq() void FairMQTestReq::Run() { - std::unique_ptr request(fTransportFactory->CreateMessage()); - fChannels.at("data").at(0).Send(request); + std::unique_ptr request(NewMessage()); + Send(request, "data"); - std::unique_ptr reply(fTransportFactory->CreateMessage()); - if (fChannels.at("data").at(0).Receive(reply) >= 0) + std::unique_ptr reply(NewMessage()); + if (Receive(reply, "data") >= 0) { LOG(INFO) << "REQ-REP test successfull"; } diff --git a/fairmq/test/runTransferTimeoutTest.cxx b/fairmq/test/runTransferTimeoutTest.cxx index ea545090..75e878dd 100644 --- a/fairmq/test/runTransferTimeoutTest.cxx +++ b/fairmq/test/runTransferTimeoutTest.cxx @@ -56,11 +56,10 @@ class TransferTimeoutTester : public FairMQDevice if (getSndOK && getRcvOK) { - void* buffer = malloc(1000); - std::unique_ptr msg1(fTransportFactory->CreateMessage(buffer, 1000)); - std::unique_ptr msg2(fTransportFactory->CreateMessage()); + std::unique_ptr msg1(NewMessage()); + std::unique_ptr msg2(NewMessage()); - if (fChannels.at("data-out").at(0).Send(msg1) == -2) + if (Send(msg1, "data-out") == -2) { LOG(INFO) << "send canceled"; sendCanceling = true; @@ -70,7 +69,7 @@ class TransferTimeoutTester : public FairMQDevice LOG(ERROR) << "send did not cancel"; } - if (fChannels.at("data-in").at(0).Receive(msg2) == -2) + if (Receive(msg2, "data-in") == -2) { LOG(INFO) << "receive canceled"; receiveCanceling = true; diff --git a/fairmq/zeromq/FairMQMessageZMQ.cxx b/fairmq/zeromq/FairMQMessageZMQ.cxx index 28d7cd0d..6f464e6a 100644 --- a/fairmq/zeromq/FairMQMessageZMQ.cxx +++ b/fairmq/zeromq/FairMQMessageZMQ.cxx @@ -29,7 +29,7 @@ FairMQMessageZMQ::FairMQMessageZMQ() } } -FairMQMessageZMQ::FairMQMessageZMQ(size_t size) +FairMQMessageZMQ::FairMQMessageZMQ(const size_t size) : fMessage() { if (zmq_msg_init_size(&fMessage, size) != 0) @@ -38,7 +38,7 @@ FairMQMessageZMQ::FairMQMessageZMQ(size_t size) } } -FairMQMessageZMQ::FairMQMessageZMQ(void* data, size_t size, fairmq_free_fn *ffn, void* hint) +FairMQMessageZMQ::FairMQMessageZMQ(void* data, const size_t size, fairmq_free_fn *ffn, void* hint) : fMessage() { if (zmq_msg_init_data(&fMessage, data, size, ffn ? ffn : &CleanUp, hint) != 0) @@ -56,7 +56,7 @@ void FairMQMessageZMQ::Rebuild() } } -void FairMQMessageZMQ::Rebuild(size_t size) +void FairMQMessageZMQ::Rebuild(const size_t size) { CloseMessage(); if (zmq_msg_init_size(&fMessage, size) != 0) @@ -65,7 +65,7 @@ void FairMQMessageZMQ::Rebuild(size_t size) } } -void FairMQMessageZMQ::Rebuild(void* data, size_t size, fairmq_free_fn *ffn, void* hint) +void FairMQMessageZMQ::Rebuild(void* data, const size_t size, fairmq_free_fn *ffn, void* hint) { CloseMessage(); if (zmq_msg_init_data(&fMessage, data, size, ffn ? ffn : &CleanUp, hint) != 0) @@ -89,7 +89,7 @@ size_t FairMQMessageZMQ::GetSize() return zmq_msg_size(&fMessage); } -void FairMQMessageZMQ::SetMessage(void*, size_t) +void FairMQMessageZMQ::SetMessage(void*, const size_t) { // dummy method to comply with the interface. functionality not allowed in zeromq. } diff --git a/fairmq/zeromq/FairMQMessageZMQ.h b/fairmq/zeromq/FairMQMessageZMQ.h index fbcf7764..5b5a7734 100644 --- a/fairmq/zeromq/FairMQMessageZMQ.h +++ b/fairmq/zeromq/FairMQMessageZMQ.h @@ -25,18 +25,18 @@ class FairMQMessageZMQ : public FairMQMessage { public: FairMQMessageZMQ(); - FairMQMessageZMQ(size_t size); - FairMQMessageZMQ(void* data, size_t size, fairmq_free_fn *ffn = &CleanUp, void* hint = NULL); + FairMQMessageZMQ(const size_t size); + FairMQMessageZMQ(void* data, const size_t size, fairmq_free_fn *ffn = &CleanUp, void* hint = NULL); virtual void Rebuild(); - virtual void Rebuild(size_t size); - virtual void Rebuild(void* data, size_t size, fairmq_free_fn *ffn = &CleanUp, void* hint = NULL); + virtual void Rebuild(const size_t size); + virtual void Rebuild(void* data, const size_t size, fairmq_free_fn *ffn = &CleanUp, void* hint = NULL); virtual void* GetMessage(); virtual void* GetData(); virtual size_t GetSize(); - virtual void SetMessage(void* data, size_t size); + virtual void SetMessage(void* data, const size_t size); virtual void CloseMessage(); virtual void Copy(FairMQMessage* msg); diff --git a/fairmq/zeromq/FairMQPollerZMQ.cxx b/fairmq/zeromq/FairMQPollerZMQ.cxx index fde97a3f..4d3895a5 100644 --- a/fairmq/zeromq/FairMQPollerZMQ.cxx +++ b/fairmq/zeromq/FairMQPollerZMQ.cxx @@ -57,7 +57,7 @@ FairMQPollerZMQ::FairMQPollerZMQ(const vector& channels) } } -FairMQPollerZMQ::FairMQPollerZMQ(unordered_map>& channelsMap, initializer_list channelList) +FairMQPollerZMQ::FairMQPollerZMQ(const unordered_map>& channelsMap, const initializer_list channelList) : items() , fNumItems(0) , fOffsetMap() @@ -119,7 +119,7 @@ FairMQPollerZMQ::FairMQPollerZMQ(unordered_map>& c } } -FairMQPollerZMQ::FairMQPollerZMQ(FairMQSocket& cmdSocket, FairMQSocket& dataSocket) +FairMQPollerZMQ::FairMQPollerZMQ(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) : items() , fNumItems(2) , fOffsetMap() @@ -137,7 +137,7 @@ FairMQPollerZMQ::FairMQPollerZMQ(FairMQSocket& cmdSocket, FairMQSocket& dataSock int type = 0; size_t size = sizeof(type); - zmq_getsockopt (dataSocket.GetSocket(), ZMQ_TYPE, &type, &size); + zmq_getsockopt(dataSocket.GetSocket(), ZMQ_TYPE, &type, &size); if (type == ZMQ_REQ || type == ZMQ_REP || type == ZMQ_PAIR || type == ZMQ_DEALER || type == ZMQ_ROUTER) { diff --git a/fairmq/zeromq/FairMQPollerZMQ.h b/fairmq/zeromq/FairMQPollerZMQ.h index f3a7d6ff..74eacc44 100644 --- a/fairmq/zeromq/FairMQPollerZMQ.h +++ b/fairmq/zeromq/FairMQPollerZMQ.h @@ -32,7 +32,7 @@ class FairMQPollerZMQ : public FairMQPoller public: FairMQPollerZMQ(const std::vector& channels); - FairMQPollerZMQ(std::unordered_map>& channelsMap, std::initializer_list channelList); + FairMQPollerZMQ(const std::unordered_map>& channelsMap, const std::initializer_list channelList); FairMQPollerZMQ(const FairMQPollerZMQ&) = delete; FairMQPollerZMQ operator=(const FairMQPollerZMQ&) = delete; @@ -45,7 +45,7 @@ class FairMQPollerZMQ : public FairMQPoller virtual ~FairMQPollerZMQ(); private: - FairMQPollerZMQ(FairMQSocket& cmdSocket, FairMQSocket& dataSocket); + FairMQPollerZMQ(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket); zmq_pollitem_t* items; int fNumItems; diff --git a/fairmq/zeromq/FairMQSocketZMQ.cxx b/fairmq/zeromq/FairMQSocketZMQ.cxx index 8b499b51..02fa9157 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.cxx +++ b/fairmq/zeromq/FairMQSocketZMQ.cxx @@ -24,7 +24,7 @@ using namespace std; // Context to hold the ZeroMQ sockets boost::shared_ptr FairMQSocketZMQ::fContext = boost::shared_ptr(new FairMQContextZMQ(1)); -FairMQSocketZMQ::FairMQSocketZMQ(const string& type, const string& name, int numIoThreads) +FairMQSocketZMQ::FairMQSocketZMQ(const string& type, const string& name, const int numIoThreads) : FairMQSocket(ZMQ_SNDMORE, ZMQ_RCVMORE, ZMQ_DONTWAIT) , fSocket(NULL) , fId() diff --git a/fairmq/zeromq/FairMQSocketZMQ.h b/fairmq/zeromq/FairMQSocketZMQ.h index 60d686a3..f7ccbb58 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.h +++ b/fairmq/zeromq/FairMQSocketZMQ.h @@ -23,7 +23,7 @@ class FairMQSocketZMQ : public FairMQSocket { public: - FairMQSocketZMQ(const std::string& type, const std::string& name, int numIoThreads); + FairMQSocketZMQ(const std::string& type, const std::string& name, const int numIoThreads); FairMQSocketZMQ(const FairMQSocketZMQ&) = delete; FairMQSocketZMQ operator=(const FairMQSocketZMQ&) = delete; diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx b/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx index 34c7514d..40326a51 100644 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx +++ b/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx @@ -30,17 +30,17 @@ FairMQMessage* FairMQTransportFactoryZMQ::CreateMessage() return new FairMQMessageZMQ(); } -FairMQMessage* FairMQTransportFactoryZMQ::CreateMessage(size_t size) +FairMQMessage* FairMQTransportFactoryZMQ::CreateMessage(const size_t size) { return new FairMQMessageZMQ(size); } -FairMQMessage* FairMQTransportFactoryZMQ::CreateMessage(void* data, size_t size, fairmq_free_fn *ffn, void* hint) +FairMQMessage* FairMQTransportFactoryZMQ::CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint) { return new FairMQMessageZMQ(data, size, ffn, hint); } -FairMQSocket* FairMQTransportFactoryZMQ::CreateSocket(const string& type, const std::string& name, int numIoThreads) +FairMQSocket* FairMQTransportFactoryZMQ::CreateSocket(const string& type, const std::string& name, const int numIoThreads) { return new FairMQSocketZMQ(type, name, numIoThreads); } @@ -50,12 +50,12 @@ FairMQPoller* FairMQTransportFactoryZMQ::CreatePoller(const vector>& channelsMap, std::initializer_list channelList) +FairMQPoller* FairMQTransportFactoryZMQ::CreatePoller(const std::unordered_map>& channelsMap, const std::initializer_list channelList) { return new FairMQPollerZMQ(channelsMap, channelList); } -FairMQPoller* FairMQTransportFactoryZMQ::CreatePoller(FairMQSocket& cmdSocket, FairMQSocket& dataSocket) +FairMQPoller* FairMQTransportFactoryZMQ::CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) { return new FairMQPollerZMQ(cmdSocket, dataSocket); } diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.h b/fairmq/zeromq/FairMQTransportFactoryZMQ.h index 220a6ff6..5f932f84 100644 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.h +++ b/fairmq/zeromq/FairMQTransportFactoryZMQ.h @@ -29,14 +29,14 @@ class FairMQTransportFactoryZMQ : public FairMQTransportFactory FairMQTransportFactoryZMQ(); virtual FairMQMessage* CreateMessage(); - virtual FairMQMessage* CreateMessage(size_t size); - virtual FairMQMessage* CreateMessage(void* data, size_t size, fairmq_free_fn *ffn = NULL, void* hint = NULL); + virtual FairMQMessage* CreateMessage(const size_t size); + virtual FairMQMessage* CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn = NULL, void* hint = NULL); - virtual FairMQSocket* CreateSocket(const std::string& type, const std::string& name, int numIoThreads); + virtual FairMQSocket* CreateSocket(const std::string& type, const std::string& name, const int numIoThreads); virtual FairMQPoller* CreatePoller(const std::vector& channels); - virtual FairMQPoller* CreatePoller(std::unordered_map>& channelsMap, std::initializer_list channelList); - virtual FairMQPoller* CreatePoller(FairMQSocket& cmdSocket, FairMQSocket& dataSocket); + virtual FairMQPoller* CreatePoller(const std::unordered_map>& channelsMap, const std::initializer_list channelList); + virtual FairMQPoller* CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket); virtual ~FairMQTransportFactoryZMQ() {}; };