diff --git a/fairmq/FairMQChannel.cxx b/fairmq/FairMQChannel.cxx index 1f11f78c..2cbcd420 100644 --- a/fairmq/FairMQChannel.cxx +++ b/fairmq/FairMQChannel.cxx @@ -689,49 +689,49 @@ void FairMQChannel::ResetChannel() // TODO: implement channel resetting } -int FairMQChannel::Send(unique_ptr& msg, int sndTimeoutInMs) const +int FairMQChannel::Send(unique_ptr& msg, int sndTimeoutInMs) { CheckSendCompatibility(msg); return fSocket->Send(msg, sndTimeoutInMs); } -int FairMQChannel::Receive(unique_ptr& msg, int rcvTimeoutInMs) const +int FairMQChannel::Receive(unique_ptr& msg, int rcvTimeoutInMs) { CheckReceiveCompatibility(msg); return fSocket->Receive(msg, rcvTimeoutInMs); } -int FairMQChannel::SendAsync(unique_ptr& msg) const +int FairMQChannel::SendAsync(unique_ptr& msg) { CheckSendCompatibility(msg); return fSocket->Send(msg, 0); } -int FairMQChannel::ReceiveAsync(unique_ptr& msg) const +int FairMQChannel::ReceiveAsync(unique_ptr& msg) { CheckReceiveCompatibility(msg); return fSocket->Receive(msg, 0); } -int64_t FairMQChannel::Send(vector>& msgVec, int sndTimeoutInMs) const +int64_t FairMQChannel::Send(vector>& msgVec, int sndTimeoutInMs) { CheckSendCompatibility(msgVec); return fSocket->Send(msgVec, sndTimeoutInMs); } -int64_t FairMQChannel::Receive(vector>& msgVec, int rcvTimeoutInMs) const +int64_t FairMQChannel::Receive(vector>& msgVec, int rcvTimeoutInMs) { CheckReceiveCompatibility(msgVec); return fSocket->Receive(msgVec, rcvTimeoutInMs); } -int64_t FairMQChannel::SendAsync(vector>& msgVec) const +int64_t FairMQChannel::SendAsync(vector>& msgVec) { CheckSendCompatibility(msgVec); return fSocket->Send(msgVec, 0); } -int64_t FairMQChannel::ReceiveAsync(vector>& msgVec) const +int64_t FairMQChannel::ReceiveAsync(vector>& msgVec) { CheckReceiveCompatibility(msgVec); return fSocket->Receive(msgVec, 0); @@ -761,7 +761,7 @@ unsigned long FairMQChannel::GetMessagesRx() const return fSocket->GetMessagesRx(); } -void FairMQChannel::CheckSendCompatibility(FairMQMessagePtr& msg) const +void FairMQChannel::CheckSendCompatibility(FairMQMessagePtr& msg) { if (fTransportType != msg->GetType()) { @@ -776,7 +776,7 @@ void FairMQChannel::CheckSendCompatibility(FairMQMessagePtr& msg) const } } -void FairMQChannel::CheckSendCompatibility(vector& msgVec) const +void FairMQChannel::CheckSendCompatibility(vector& msgVec) { for (auto& msg : msgVec) { @@ -794,7 +794,7 @@ void FairMQChannel::CheckSendCompatibility(vector& msgVec) con } } -void FairMQChannel::CheckReceiveCompatibility(FairMQMessagePtr& msg) const +void FairMQChannel::CheckReceiveCompatibility(FairMQMessagePtr& msg) { if (fTransportType != msg->GetType()) { @@ -804,7 +804,7 @@ void FairMQChannel::CheckReceiveCompatibility(FairMQMessagePtr& msg) const } } -void FairMQChannel::CheckReceiveCompatibility(vector& msgVec) const +void FairMQChannel::CheckReceiveCompatibility(vector& msgVec) { for (auto& msg : msgVec) { diff --git a/fairmq/FairMQChannel.h b/fairmq/FairMQChannel.h index 9430d66d..81d509a1 100644 --- a/fairmq/FairMQChannel.h +++ b/fairmq/FairMQChannel.h @@ -178,37 +178,37 @@ class FairMQChannel /// @param msg Constant reference of unique_ptr to a FairMQMessage /// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send) /// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. -1 if there was an error. - int Send(FairMQMessagePtr& msg, int sndTimeoutInMs = -1) const; + int Send(FairMQMessagePtr& msg, int sndTimeoutInMs = -1); /// Receives a message from the socket queue. /// @param msg Constant reference of unique_ptr to a FairMQMessage /// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive) /// @return Number of bytes that have been received. -2 if reading from the queue was not possible or timed out. -1 if there was an error. - int Receive(FairMQMessagePtr& msg, int rcvTimeoutInMs = -1) const; + int Receive(FairMQMessagePtr& msg, int rcvTimeoutInMs = -1); - int SendAsync(FairMQMessagePtr& msg) const __attribute__((deprecated("For non-blocking Send, use timeout version with timeout of 0: Send(msg, timeout);"))); - int ReceiveAsync(FairMQMessagePtr& msg) const __attribute__((deprecated("For non-blocking Receive, use timeout version with timeout of 0: Receive(msg, timeout);"))); + int SendAsync(FairMQMessagePtr& msg) __attribute__((deprecated("For non-blocking Send, use timeout version with timeout of 0: Send(msg, timeout);"))); + int ReceiveAsync(FairMQMessagePtr& msg) __attribute__((deprecated("For non-blocking Receive, use timeout version with timeout of 0: Receive(msg, timeout);"))); /// Send a vector of messages /// @param msgVec message vector reference /// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send) /// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. -1 if there was an error. - int64_t Send(std::vector& msgVec, int sndTimeoutInMs = -1) const; + int64_t Send(std::vector& msgVec, int sndTimeoutInMs = -1); /// Receive a vector of messages /// @param msgVec message vector reference /// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive) /// @return Number of bytes that have been received. -2 if reading from the queue was not possible or timed out. -1 if there was an error. - int64_t Receive(std::vector& msgVec, int rcvTimeoutInMs = -1) const; + int64_t Receive(std::vector& msgVec, int rcvTimeoutInMs = -1); - int64_t SendAsync(std::vector& msgVec) const __attribute__((deprecated("For non-blocking Send, use timeout version with timeout of 0: Send(msgVec, timeout);"))); - int64_t ReceiveAsync(std::vector& msgVec) const __attribute__((deprecated("For non-blocking Receive, use timeout version with timeout of 0: Receive(msgVec, timeout);"))); + int64_t SendAsync(std::vector& msgVec) __attribute__((deprecated("For non-blocking Send, use timeout version with timeout of 0: Send(msgVec, timeout);"))); + int64_t ReceiveAsync(std::vector& msgVec) __attribute__((deprecated("For non-blocking Receive, use timeout version with timeout of 0: Receive(msgVec, timeout);"))); /// Send FairMQParts /// @param parts FairMQParts reference /// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send) /// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. -1 if there was an error. - int64_t Send(FairMQParts& parts, int sndTimeoutInMs = -1) const + int64_t Send(FairMQParts& parts, int sndTimeoutInMs = -1) { return Send(parts.fParts, sndTimeoutInMs); } @@ -217,17 +217,17 @@ class FairMQChannel /// @param parts FairMQParts reference /// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive) /// @return Number of bytes that have been received. -2 if reading from the queue was not possible or timed out. -1 if there was an error. - int64_t Receive(FairMQParts& parts, int rcvTimeoutInMs = -1) const + int64_t Receive(FairMQParts& parts, int rcvTimeoutInMs = -1) { return Receive(parts.fParts, rcvTimeoutInMs); } - int64_t SendAsync(FairMQParts& parts) const __attribute__((deprecated("For non-blocking Send, use timeout version with timeout of 0: Send(parts, timeout);"))) + int64_t SendAsync(FairMQParts& parts) __attribute__((deprecated("For non-blocking Send, use timeout version with timeout of 0: Send(parts, timeout);"))) { return Send(parts.fParts, 0); } - int64_t ReceiveAsync(FairMQParts& parts) const __attribute__((deprecated("For non-blocking Receive, use timeout version with timeout of 0: Receive(parts, timeout);"))) + int64_t ReceiveAsync(FairMQParts& parts) __attribute__((deprecated("For non-blocking Receive, use timeout version with timeout of 0: Receive(parts, timeout);"))) { return Receive(parts.fParts, 0); } @@ -237,25 +237,25 @@ class FairMQChannel unsigned long GetMessagesTx() const; unsigned long GetMessagesRx() const; - auto Transport() const -> const FairMQTransportFactory* + auto Transport() -> FairMQTransportFactory* { return fTransportFactory.get(); }; template - FairMQMessagePtr NewMessage(Args&&... args) const + FairMQMessagePtr NewMessage(Args&&... args) { return Transport()->CreateMessage(std::forward(args)...); } template - FairMQMessagePtr NewSimpleMessage(const T& data) const + FairMQMessagePtr NewSimpleMessage(const T& data) { return Transport()->NewSimpleMessage(data); } template - FairMQMessagePtr NewStaticMessage(const T& data) const + FairMQMessagePtr NewStaticMessage(const T& data) { return Transport()->NewStaticMessage(data); } @@ -279,10 +279,10 @@ class FairMQChannel std::shared_ptr fTransportFactory; - void CheckSendCompatibility(FairMQMessagePtr& msg) const; - void CheckSendCompatibility(std::vector& msgVec) const; - void CheckReceiveCompatibility(FairMQMessagePtr& msg) const; - void CheckReceiveCompatibility(std::vector& msgVec) const; + void CheckSendCompatibility(FairMQMessagePtr& msg); + void CheckSendCompatibility(std::vector& msgVec); + void CheckReceiveCompatibility(FairMQMessagePtr& msg); + void CheckReceiveCompatibility(std::vector& msgVec); void InitTransport(std::shared_ptr factory); diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index 2a12a6df..5219c812 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -725,7 +725,7 @@ void FairMQDevice::PollForTransport(const FairMQTransportFactory* factory, const } } -bool FairMQDevice::HandleMsgInput(const string& chName, const InputMsgCallback& callback, int i) const +bool FairMQDevice::HandleMsgInput(const string& chName, const InputMsgCallback& callback, int i) { unique_ptr input(fChannels.at(chName).at(i).fTransportFactory->CreateMessage()); @@ -739,7 +739,7 @@ bool FairMQDevice::HandleMsgInput(const string& chName, const InputMsgCallback& } } -bool FairMQDevice::HandleMultipartInput(const string& chName, const InputMultipartCallback& callback, int i) const +bool FairMQDevice::HandleMultipartInput(const string& chName, const InputMultipartCallback& callback, int i) { FairMQParts input; diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index 5ebdda2f..76c05e29 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -102,7 +102,7 @@ class FairMQDevice : public FairMQStateMachine /// @param i channel index /// @param sndTimeoutInMs send timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send) /// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. -1 if there was an error. - int Send(FairMQMessagePtr& msg, const std::string& chan, const int i = 0, int sndTimeoutInMs = -1) const + int Send(FairMQMessagePtr& msg, const std::string& chan, const int i = 0, int sndTimeoutInMs = -1) { return fChannels.at(chan).at(i).Send(msg, sndTimeoutInMs); } @@ -113,16 +113,16 @@ class FairMQDevice : public FairMQStateMachine /// @param i channel index /// @param rcvTimeoutInMs receive timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive) /// @return Number of bytes that have been received. -2 if reading from the queue was not possible or timed out. -1 if there was an error. - int Receive(FairMQMessagePtr& msg, const std::string& chan, const int i = 0, int rcvTimeoutInMs = -1) const + int Receive(FairMQMessagePtr& msg, const std::string& chan, const int i = 0, int rcvTimeoutInMs = -1) { return fChannels.at(chan).at(i).Receive(msg, rcvTimeoutInMs); } - int SendAsync(FairMQMessagePtr& msg, const std::string& chan, const int i = 0) const __attribute__((deprecated("For non-blocking Send, use timeout version with timeout of 0: Send(msg, \"channelA\", subchannelIndex, timeout);"))) + int SendAsync(FairMQMessagePtr& msg, const std::string& chan, const int i = 0) __attribute__((deprecated("For non-blocking Send, use timeout version with timeout of 0: Send(msg, \"channelA\", subchannelIndex, timeout);"))) { return fChannels.at(chan).at(i).Send(msg, 0); } - int ReceiveAsync(FairMQMessagePtr& msg, const std::string& chan, const int i = 0) const __attribute__((deprecated("For non-blocking Receive, use timeout version with timeout of 0: Receive(msg, \"channelA\", subchannelIndex, timeout);"))) + int ReceiveAsync(FairMQMessagePtr& msg, const std::string& chan, const int i = 0) __attribute__((deprecated("For non-blocking Receive, use timeout version with timeout of 0: Receive(msg, \"channelA\", subchannelIndex, timeout);"))) { return fChannels.at(chan).at(i).Receive(msg, 0); } @@ -133,7 +133,7 @@ class FairMQDevice : public FairMQStateMachine /// @param i channel index /// @param sndTimeoutInMs send timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send) /// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. -1 if there was an error. - int64_t Send(FairMQParts& parts, const std::string& chan, const int i = 0, int sndTimeoutInMs = -1) const + int64_t Send(FairMQParts& parts, const std::string& chan, const int i = 0, int sndTimeoutInMs = -1) { return fChannels.at(chan).at(i).Send(parts.fParts, sndTimeoutInMs); } @@ -144,58 +144,58 @@ class FairMQDevice : public FairMQStateMachine /// @param i channel index /// @param rcvTimeoutInMs receive timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive) /// @return Number of bytes that have been received. -2 if reading from the queue was not possible or timed out. -1 if there was an error. - int64_t Receive(FairMQParts& parts, const std::string& chan, const int i = 0, int rcvTimeoutInMs = -1) const + int64_t Receive(FairMQParts& parts, const std::string& chan, const int i = 0, int rcvTimeoutInMs = -1) { return fChannels.at(chan).at(i).Receive(parts.fParts, rcvTimeoutInMs); } - int64_t SendAsync(FairMQParts& parts, const std::string& chan, const int i = 0) const __attribute__((deprecated("For non-blocking Send, use timeout version with timeout of 0: Send(parts, \"channelA\", subchannelIndex, timeout);"))) + int64_t SendAsync(FairMQParts& parts, const std::string& chan, const int i = 0) __attribute__((deprecated("For non-blocking Send, use timeout version with timeout of 0: Send(parts, \"channelA\", subchannelIndex, timeout);"))) { return fChannels.at(chan).at(i).Send(parts.fParts, 0); } - int64_t ReceiveAsync(FairMQParts& parts, const std::string& chan, const int i = 0) const __attribute__((deprecated("For non-blocking Receive, use timeout version with timeout of 0: Receive(parts, \"channelA\", subchannelIndex, timeout);"))) + int64_t ReceiveAsync(FairMQParts& parts, const std::string& chan, const int i = 0) __attribute__((deprecated("For non-blocking Receive, use timeout version with timeout of 0: Receive(parts, \"channelA\", subchannelIndex, timeout);"))) { return fChannels.at(chan).at(i).Receive(parts.fParts, 0); } /// @brief Getter for default transport factory - auto Transport() const -> const FairMQTransportFactory* + auto Transport() const -> FairMQTransportFactory* { return fTransportFactory.get(); } template - FairMQMessagePtr NewMessage(Args&&... args) const + FairMQMessagePtr NewMessage(Args&&... args) { return Transport()->CreateMessage(std::forward(args)...); } template - FairMQMessagePtr NewMessageFor(const std::string& channel, int index, Args&&... args) const + FairMQMessagePtr NewMessageFor(const std::string& channel, int index, Args&&... args) { return fChannels.at(channel).at(index).NewMessage(std::forward(args)...); } template - FairMQMessagePtr NewStaticMessage(const T& data) const + FairMQMessagePtr NewStaticMessage(const T& data) { return Transport()->NewStaticMessage(data); } template - FairMQMessagePtr NewStaticMessageFor(const std::string& channel, int index, const T& data) const + FairMQMessagePtr NewStaticMessageFor(const std::string& channel, int index, const T& data) { return fChannels.at(channel).at(index).NewStaticMessage(data); } template - FairMQMessagePtr NewSimpleMessage(const T& data) const + FairMQMessagePtr NewSimpleMessage(const T& data) { return Transport()->NewSimpleMessage(data); } template - FairMQMessagePtr NewSimpleMessageFor(const std::string& channel, int index, const T& data) const + FairMQMessagePtr NewSimpleMessageFor(const std::string& channel, int index, const T& data) { return fChannels.at(channel).at(index).NewSimpleMessage(data); } @@ -233,7 +233,7 @@ class FairMQDevice : public FairMQStateMachine return fChannels.at(chans.at(0)).at(0).Transport()->CreatePoller(fChannels, chans); } - FairMQPollerPtr NewPoller(const std::vector& channels) + FairMQPollerPtr NewPoller(const std::vector& channels) { // if more than one channel provided, check compatibility if (channels.size() > 1) @@ -490,8 +490,8 @@ class FairMQDevice : public FairMQStateMachine void HandleMultipleTransportInput(); void PollForTransport(const FairMQTransportFactory* factory, const std::vector& channelKeys); - bool HandleMsgInput(const std::string& chName, const InputMsgCallback& callback, int i) const; - bool HandleMultipartInput(const std::string& chName, const InputMultipartCallback& callback, int i) const; + bool HandleMsgInput(const std::string& chName, const InputMsgCallback& callback, int i); + bool HandleMultipartInput(const std::string& chName, const InputMultipartCallback& callback, int i); void CreateOwnConfig(); diff --git a/fairmq/FairMQMessage.h b/fairmq/FairMQMessage.h index a2d48921..c29fecd2 100644 --- a/fairmq/FairMQMessage.h +++ b/fairmq/FairMQMessage.h @@ -15,10 +15,13 @@ #include using fairmq_free_fn = void(void* data, void* hint); +class FairMQTransportFactory; class FairMQMessage { public: + FairMQMessage() = default; + FairMQMessage(FairMQTransportFactory* factory):fTransport{factory} {} virtual void Rebuild() = 0; virtual void Rebuild(const size_t size) = 0; virtual void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) = 0; @@ -29,11 +32,16 @@ class FairMQMessage virtual bool SetUsedSize(const size_t size) = 0; virtual fair::mq::Transport GetType() const = 0; + FairMQTransportFactory* GetTransport() { return fTransport; } + //void SetTransport(FairMQTransportFactory* transport) { fTransport = transport; } virtual void Copy(const std::unique_ptr& msg) __attribute__((deprecated("Use 'Copy(const FairMQMessage& msg)'"))) = 0; virtual void Copy(const FairMQMessage& msg) = 0; virtual ~FairMQMessage() {}; + + private: + FairMQTransportFactory* fTransport{nullptr}; }; using FairMQMessagePtr = std::unique_ptr; diff --git a/fairmq/FairMQTransportFactory.h b/fairmq/FairMQTransportFactory.h index 32065c30..c8770804 100644 --- a/fairmq/FairMQTransportFactory.h +++ b/fairmq/FairMQTransportFactory.h @@ -46,20 +46,20 @@ class FairMQTransportFactory /// @brief Create empty FairMQMessage /// @return pointer to FairMQMessage - virtual FairMQMessagePtr CreateMessage() const = 0; + virtual FairMQMessagePtr CreateMessage() = 0; /// @brief Create new FairMQMessage of specified size /// @param size message size /// @return pointer to FairMQMessage - virtual FairMQMessagePtr CreateMessage(const size_t size) const = 0; + virtual FairMQMessagePtr CreateMessage(const size_t size) = 0; /// @brief Create new FairMQMessage with user provided buffer and size /// @param data pointer to user provided buffer /// @param size size of the user provided buffer /// @param ffn callback, called when the message is transfered (and can be deleted) /// @param obj optional helper pointer that can be used in the callback /// @return pointer to FairMQMessage - virtual FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) const = 0; + virtual FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) = 0; - virtual FairMQMessagePtr CreateMessage(FairMQUnmanagedRegionPtr& unmanagedRegion, void* data, const size_t size, void* hint = 0) const = 0; + virtual FairMQMessagePtr CreateMessage(FairMQUnmanagedRegionPtr& unmanagedRegion, void* data, const size_t size, void* hint = 0) = 0; /// Create a socket virtual FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name) const = 0; @@ -67,7 +67,7 @@ class FairMQTransportFactory /// Create a poller for a single channel (all subchannels) virtual FairMQPollerPtr CreatePoller(const std::vector& channels) const = 0; /// Create a poller for specific channels - virtual FairMQPollerPtr CreatePoller(const std::vector& channels) const = 0; + virtual FairMQPollerPtr CreatePoller(const std::vector& channels) const = 0; /// Create a poller for specific channels (all subchannels) virtual FairMQPollerPtr CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const = 0; @@ -95,7 +95,7 @@ class FairMQTransportFactory } template - FairMQMessagePtr NewSimpleMessage(const T& data) const + FairMQMessagePtr NewSimpleMessage(const T& data) { // todo: is_trivially_copyable not available on gcc < 5, workaround? // static_assert(std::is_trivially_copyable::value, "The argument type for NewSimpleMessage has to be trivially copyable!"); @@ -104,13 +104,13 @@ class FairMQTransportFactory } template - FairMQMessagePtr NewSimpleMessage(const char(&data)[N]) const + FairMQMessagePtr NewSimpleMessage(const char(&data)[N]) { std::string* msgStr = new std::string(data); return CreateMessage(const_cast(msgStr->c_str()), msgStr->length(), FairMQSimpleMsgCleanup, msgStr); } - FairMQMessagePtr NewSimpleMessage(const std::string& str) const + FairMQMessagePtr NewSimpleMessage(const std::string& str) { std::string* msgStr = new std::string(str); @@ -118,12 +118,12 @@ class FairMQTransportFactory } template - FairMQMessagePtr NewStaticMessage(const T& data) const + FairMQMessagePtr NewStaticMessage(const T& data) { return CreateMessage(data, sizeof(T), FairMQNoCleanup, nullptr); } - FairMQMessagePtr NewStaticMessage(const std::string& str) const + FairMQMessagePtr NewStaticMessage(const std::string& str) { return CreateMessage(const_cast(str.c_str()), str.length(), FairMQNoCleanup, nullptr); } diff --git a/fairmq/MemoryResourceTools.h b/fairmq/MemoryResourceTools.h index 6dfbf36f..569c4660 100644 --- a/fairmq/MemoryResourceTools.h +++ b/fairmq/MemoryResourceTools.h @@ -75,17 +75,15 @@ std::vector std::vector> - adoptVector(size_t nelem, ChannelResource *upstream, FairMQMessagePtr message) + adoptVector(size_t nelem, FairMQMessagePtr message) { return std::vector>( nelem, OwningMessageSpectatorAllocator( - MessageResource{std::move(message), upstream})); + MessageResource{std::move(message)})); }; //_________________________________________________________________________________________________ @@ -93,7 +91,7 @@ std::vector> // This returns a unique_ptr of const vector, does not allow modifications at // the cost of pointer // semantics for access. -// use auto or decltype to catch the return value (or use span) +// use auto or decltype to catch the return type. // template // auto adoptVector(size_t nelem, FairMQMessage* message) //{ diff --git a/fairmq/MemoryResources.cxx b/fairmq/MemoryResources.cxx index 62b4ab9c..11cb7213 100644 --- a/fairmq/MemoryResources.cxx +++ b/fairmq/MemoryResources.cxx @@ -17,9 +17,14 @@ void *fair::mq::ChannelResource::do_allocate(std::size_t bytes, std::size_t /*alignment*/) { - FairMQMessagePtr message; - message = factory->CreateMessage(bytes); - void *addr = message->GetData(); - messageMap[addr] = std::move(message); - return addr; + return setMessage(factory->CreateMessage(bytes)); }; + +fair::mq::MessageResource::MessageResource(FairMQMessagePtr message) +: mUpstream{message->GetTransport()->GetMemoryResource()} +, mMessageSize{message->GetSize()} +, mMessageData{mUpstream ? mUpstream->setMessage(std::move(message)) + : throw std::runtime_error( + "MessageResource::MessageResource message has no upstream resource set")} +{ +} diff --git a/fairmq/MemoryResources.h b/fairmq/MemoryResources.h index bd3f84cf..2d555411 100644 --- a/fairmq/MemoryResources.h +++ b/fairmq/MemoryResources.h @@ -47,7 +47,7 @@ class FairMQMemoryResource : public boost::container::pmr::memory_resource /// a message does not make sense! virtual FairMQMessagePtr getMessage(void *p) = 0; virtual void *setMessage(FairMQMessagePtr) = 0; - virtual const FairMQTransportFactory *getTransportFactory() const noexcept = 0; + virtual FairMQTransportFactory *getTransportFactory() noexcept = 0; virtual size_t getNumberOfMessages() const noexcept = 0; }; @@ -59,7 +59,7 @@ class FairMQMemoryResource : public boost::container::pmr::memory_resource class ChannelResource : public FairMQMemoryResource { protected: - const FairMQTransportFactory *factory{nullptr}; + FairMQTransportFactory *factory{nullptr}; // TODO: for now a map to keep track of allocations, something else would // probably be // faster, but for now this does not need to be fast. @@ -68,7 +68,7 @@ class ChannelResource : public FairMQMemoryResource public: ChannelResource() = delete; - ChannelResource(const FairMQTransportFactory *_factory) + ChannelResource(FairMQTransportFactory *_factory) : FairMQMemoryResource() , factory(_factory) , messageMap() @@ -92,7 +92,7 @@ class ChannelResource : public FairMQMemoryResource return addr; } - const FairMQTransportFactory *getTransportFactory() const noexcept override { return factory; } + FairMQTransportFactory *getTransportFactory() noexcept override { return factory; } size_t getNumberOfMessages() const noexcept override { return messageMap.size(); } @@ -125,7 +125,7 @@ class SpectatorMessageResource : public FairMQMemoryResource } FairMQMessagePtr getMessage(void * /*p*/) override { return nullptr; } - const FairMQTransportFactory *getTransportFactory() const noexcept override { return nullptr; } + FairMQTransportFactory *getTransportFactory() noexcept override { return nullptr; } size_t getNumberOfMessages() const noexcept override { return 0; } void *setMessage(FairMQMessagePtr) override { return nullptr; } @@ -180,14 +180,7 @@ class MessageResource : public FairMQMemoryResource MessageResource &operator=(const MessageResource &) = default; MessageResource &operator=(MessageResource &&) = default; - MessageResource(FairMQMessagePtr message, FairMQMemoryResource *upstream) - : mUpstream{upstream} - , mMessageSize{message->GetSize()} - , mMessageData{mUpstream ? mUpstream->setMessage(std::move(message)) - : throw std::runtime_error( - "MessageResource::MessageResource upstream is nullptr")} - { - } + MessageResource(FairMQMessagePtr message); FairMQMessagePtr getMessage(void *p) override { return mUpstream->getMessage(p); } void *setMessage(FairMQMessagePtr message) override @@ -195,7 +188,7 @@ class MessageResource : public FairMQMemoryResource return mUpstream->setMessage(std::move(message)); } - const FairMQTransportFactory *getTransportFactory() const noexcept override { return nullptr; } + FairMQTransportFactory *getTransportFactory() noexcept override { return nullptr; } size_t getNumberOfMessages() const noexcept override { return mMessageData ? 1 : 0; } protected: diff --git a/fairmq/devices/FairMQMerger.cxx b/fairmq/devices/FairMQMerger.cxx index 2f32c989..c0ea0290 100644 --- a/fairmq/devices/FairMQMerger.cxx +++ b/fairmq/devices/FairMQMerger.cxx @@ -49,7 +49,7 @@ void FairMQMerger::Run() { int numInputs = fChannels.at(fInChannelName).size(); - vector chans; + vector chans; for (auto& chan : fChannels.at(fInChannelName)) { diff --git a/fairmq/nanomsg/FairMQMessageNN.cxx b/fairmq/nanomsg/FairMQMessageNN.cxx index 349dffef..b7995c5b 100644 --- a/fairmq/nanomsg/FairMQMessageNN.cxx +++ b/fairmq/nanomsg/FairMQMessageNN.cxx @@ -24,8 +24,9 @@ using namespace std; fair::mq::Transport FairMQMessageNN::fTransportType = fair::mq::Transport::NN; -FairMQMessageNN::FairMQMessageNN() - : fMessage(nullptr) +FairMQMessageNN::FairMQMessageNN(FairMQTransportFactory* factory) + : FairMQMessage{factory} + , fMessage(nullptr) , fSize(0) , fHint(0) , fReceiving(false) @@ -38,8 +39,9 @@ FairMQMessageNN::FairMQMessageNN() } } -FairMQMessageNN::FairMQMessageNN(const size_t size) - : fMessage(nullptr) +FairMQMessageNN::FairMQMessageNN(const size_t size, FairMQTransportFactory* factory) + : FairMQMessage{factory} + , fMessage(nullptr) , fSize(0) , fHint(0) , fReceiving(false) @@ -59,8 +61,9 @@ FairMQMessageNN::FairMQMessageNN(const size_t size) * create FairMQMessage object only with size parameter and fill it with data. * possible TODO: make this zero copy (will should then be as efficient as ZeroMQ). */ -FairMQMessageNN::FairMQMessageNN(void* data, const size_t size, fairmq_free_fn* ffn, void* hint) - : fMessage(nullptr) +FairMQMessageNN::FairMQMessageNN(void* data, const size_t size, fairmq_free_fn* ffn, void* hint, FairMQTransportFactory* factory) + : FairMQMessage{factory} + , fMessage(nullptr) , fSize(0) , fHint(0) , fReceiving(false) @@ -86,8 +89,9 @@ FairMQMessageNN::FairMQMessageNN(void* data, const size_t size, fairmq_free_fn* } } -FairMQMessageNN::FairMQMessageNN(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint) - : fMessage(data) +FairMQMessageNN::FairMQMessageNN(FairMQUnmanagedRegionPtr& region, void* /*data*/, const size_t size, void* hint, FairMQTransportFactory* factory) + : FairMQMessage{factory} + , fMessage(nullptr) , fSize(size) , fHint(reinterpret_cast(hint)) , fReceiving(false) diff --git a/fairmq/nanomsg/FairMQMessageNN.h b/fairmq/nanomsg/FairMQMessageNN.h index 42891735..ce506817 100644 --- a/fairmq/nanomsg/FairMQMessageNN.h +++ b/fairmq/nanomsg/FairMQMessageNN.h @@ -29,10 +29,10 @@ class FairMQMessageNN final : public FairMQMessage friend class FairMQSocketNN; public: - FairMQMessageNN(); - FairMQMessageNN(const size_t size); - FairMQMessageNN(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr); - FairMQMessageNN(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0); + FairMQMessageNN(FairMQTransportFactory* factory = nullptr); + FairMQMessageNN(const size_t size, FairMQTransportFactory* factory = nullptr); + FairMQMessageNN(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr, FairMQTransportFactory* factory = nullptr); + FairMQMessageNN(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0, FairMQTransportFactory* factory = nullptr); FairMQMessageNN(const FairMQMessageNN&) = delete; FairMQMessageNN operator=(const FairMQMessageNN&) = delete; diff --git a/fairmq/nanomsg/FairMQPollerNN.cxx b/fairmq/nanomsg/FairMQPollerNN.cxx index f40112ca..2f946fe1 100644 --- a/fairmq/nanomsg/FairMQPollerNN.cxx +++ b/fairmq/nanomsg/FairMQPollerNN.cxx @@ -44,7 +44,7 @@ FairMQPollerNN::FairMQPollerNN(const vector& channels) } } -FairMQPollerNN::FairMQPollerNN(const vector& channels) +FairMQPollerNN::FairMQPollerNN(const vector& channels) : fItems() , fNumItems(0) , fOffsetMap() diff --git a/fairmq/nanomsg/FairMQPollerNN.h b/fairmq/nanomsg/FairMQPollerNN.h index 3d1f4161..feb6c830 100644 --- a/fairmq/nanomsg/FairMQPollerNN.h +++ b/fairmq/nanomsg/FairMQPollerNN.h @@ -33,7 +33,7 @@ class FairMQPollerNN final : public FairMQPoller public: FairMQPollerNN(const std::vector& channels); - FairMQPollerNN(const std::vector& channels); + FairMQPollerNN(const std::vector& channels); FairMQPollerNN(const std::unordered_map>& channelsMap, const std::vector& channelList); FairMQPollerNN(const FairMQPollerNN&) = delete; @@ -56,4 +56,4 @@ class FairMQPollerNN final : public FairMQPoller std::unordered_map fOffsetMap; }; -#endif /* FAIRMQPOLLERNN_H_ */ \ No newline at end of file +#endif /* FAIRMQPOLLERNN_H_ */ diff --git a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx index 0d85a71c..ef37dd6c 100644 --- a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx +++ b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx @@ -23,24 +23,24 @@ FairMQTransportFactoryNN::FairMQTransportFactoryNN(const string& id, const FairM LOG(debug) << "Transport: Using nanomsg library"; } -FairMQMessagePtr FairMQTransportFactoryNN::CreateMessage() const +FairMQMessagePtr FairMQTransportFactoryNN::CreateMessage() { - return unique_ptr(new FairMQMessageNN()); + return unique_ptr(new FairMQMessageNN(this)); } -FairMQMessagePtr FairMQTransportFactoryNN::CreateMessage(const size_t size) const +FairMQMessagePtr FairMQTransportFactoryNN::CreateMessage(const size_t size) { - return unique_ptr(new FairMQMessageNN(size)); + return unique_ptr(new FairMQMessageNN(size, this)); } -FairMQMessagePtr FairMQTransportFactoryNN::CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint) const +FairMQMessagePtr FairMQTransportFactoryNN::CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint) { - return unique_ptr(new FairMQMessageNN(data, size, ffn, hint)); + return unique_ptr(new FairMQMessageNN(data, size, ffn, hint, this)); } -FairMQMessagePtr FairMQTransportFactoryNN::CreateMessage(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint) const +FairMQMessagePtr FairMQTransportFactoryNN::CreateMessage(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint) { - return unique_ptr(new FairMQMessageNN(region, data, size, hint)); + return unique_ptr(new FairMQMessageNN(region, data, size, hint, this)); } FairMQSocketPtr FairMQTransportFactoryNN::CreateSocket(const string& type, const string& name) const @@ -55,7 +55,7 @@ FairMQPollerPtr FairMQTransportFactoryNN::CreatePoller(const vector(new FairMQPollerNN(channels)); } -FairMQPollerPtr FairMQTransportFactoryNN::CreatePoller(const std::vector& channels) const +FairMQPollerPtr FairMQTransportFactoryNN::CreatePoller(const std::vector& channels) const { return unique_ptr(new FairMQPollerNN(channels)); } diff --git a/fairmq/nanomsg/FairMQTransportFactoryNN.h b/fairmq/nanomsg/FairMQTransportFactoryNN.h index 9fdffa37..ece07de4 100644 --- a/fairmq/nanomsg/FairMQTransportFactoryNN.h +++ b/fairmq/nanomsg/FairMQTransportFactoryNN.h @@ -25,15 +25,15 @@ class FairMQTransportFactoryNN final : public FairMQTransportFactory FairMQTransportFactoryNN(const std::string& id = "", const FairMQProgOptions* config = nullptr); ~FairMQTransportFactoryNN() override; - FairMQMessagePtr CreateMessage() const override; - FairMQMessagePtr CreateMessage(const size_t size) const override; - FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) const override; - FairMQMessagePtr CreateMessage(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0) const override; + FairMQMessagePtr CreateMessage() override; + FairMQMessagePtr CreateMessage(const size_t size) override; + FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override; + FairMQMessagePtr CreateMessage(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0) override; FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name) const override; FairMQPollerPtr CreatePoller(const std::vector& channels) const override; - FairMQPollerPtr CreatePoller(const std::vector& channels) const override; + FairMQPollerPtr CreatePoller(const std::vector& channels) const override; FairMQPollerPtr CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const override; FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback) const override; diff --git a/fairmq/shmem/FairMQMessageSHM.cxx b/fairmq/shmem/FairMQMessageSHM.cxx index 34fe50c9..f6102512 100644 --- a/fairmq/shmem/FairMQMessageSHM.cxx +++ b/fairmq/shmem/FairMQMessageSHM.cxx @@ -25,8 +25,9 @@ namespace bpt = ::boost::posix_time; atomic FairMQMessageSHM::fInterrupted(false); fair::mq::Transport FairMQMessageSHM::fTransportType = fair::mq::Transport::SHM; -FairMQMessageSHM::FairMQMessageSHM(Manager& manager) - : fManager(manager) +FairMQMessageSHM::FairMQMessageSHM(Manager& manager, FairMQTransportFactory* factory) + : FairMQMessage{factory} + , fManager(manager) , fMessage() , fQueued(false) , fMetaCreated(false) @@ -44,8 +45,9 @@ FairMQMessageSHM::FairMQMessageSHM(Manager& manager) fMetaCreated = true; } -FairMQMessageSHM::FairMQMessageSHM(Manager& manager, const size_t size) - : fManager(manager) +FairMQMessageSHM::FairMQMessageSHM(Manager& manager, const size_t size, FairMQTransportFactory* factory) + : FairMQMessage{factory} + , fManager(manager) , fMessage() , fQueued(false) , fMetaCreated(false) @@ -59,8 +61,9 @@ FairMQMessageSHM::FairMQMessageSHM(Manager& manager, const size_t size) InitializeChunk(size); } -FairMQMessageSHM::FairMQMessageSHM(Manager& manager, void* data, const size_t size, fairmq_free_fn* ffn, void* hint) - : fManager(manager) +FairMQMessageSHM::FairMQMessageSHM(Manager& manager, void* data, const size_t size, fairmq_free_fn* ffn, void* hint, FairMQTransportFactory* factory) + : FairMQMessage{factory} + , fManager(manager) , fMessage() , fQueued(false) , fMetaCreated(false) @@ -85,8 +88,9 @@ FairMQMessageSHM::FairMQMessageSHM(Manager& manager, void* data, const size_t si } } -FairMQMessageSHM::FairMQMessageSHM(Manager& manager, FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint) - : fManager(manager) +FairMQMessageSHM::FairMQMessageSHM(Manager& manager, FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint, FairMQTransportFactory* factory) + : FairMQMessage{factory} + , fManager(manager) , fMessage() , fQueued(false) , fMetaCreated(false) diff --git a/fairmq/shmem/FairMQMessageSHM.h b/fairmq/shmem/FairMQMessageSHM.h index a9f2dabd..d7ddd69d 100644 --- a/fairmq/shmem/FairMQMessageSHM.h +++ b/fairmq/shmem/FairMQMessageSHM.h @@ -27,10 +27,10 @@ class FairMQMessageSHM final : public FairMQMessage friend class FairMQSocketSHM; public: - FairMQMessageSHM(fair::mq::shmem::Manager& manager); - FairMQMessageSHM(fair::mq::shmem::Manager& manager, const size_t size); - FairMQMessageSHM(fair::mq::shmem::Manager& manager, void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr); - FairMQMessageSHM(fair::mq::shmem::Manager& manager, FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0); + FairMQMessageSHM(fair::mq::shmem::Manager& manager, FairMQTransportFactory* factory = nullptr); + FairMQMessageSHM(fair::mq::shmem::Manager& manager, const size_t size, FairMQTransportFactory* factory = nullptr); + FairMQMessageSHM(fair::mq::shmem::Manager& manager, void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr, FairMQTransportFactory* factory = nullptr); + FairMQMessageSHM(fair::mq::shmem::Manager& manager, FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0, FairMQTransportFactory* factory = nullptr); FairMQMessageSHM(const FairMQMessageSHM&) = delete; FairMQMessageSHM operator=(const FairMQMessageSHM&) = delete; diff --git a/fairmq/shmem/FairMQPollerSHM.cxx b/fairmq/shmem/FairMQPollerSHM.cxx index 5beda2e2..80a1e516 100644 --- a/fairmq/shmem/FairMQPollerSHM.cxx +++ b/fairmq/shmem/FairMQPollerSHM.cxx @@ -42,7 +42,7 @@ FairMQPollerSHM::FairMQPollerSHM(const vector& channels) } } -FairMQPollerSHM::FairMQPollerSHM(const vector& channels) +FairMQPollerSHM::FairMQPollerSHM(const vector& channels) : fItems() , fNumItems(0) , fOffsetMap() diff --git a/fairmq/shmem/FairMQPollerSHM.h b/fairmq/shmem/FairMQPollerSHM.h index 0c8b8f82..76579e3c 100644 --- a/fairmq/shmem/FairMQPollerSHM.h +++ b/fairmq/shmem/FairMQPollerSHM.h @@ -26,7 +26,7 @@ class FairMQPollerSHM final : public FairMQPoller public: FairMQPollerSHM(const std::vector& channels); - FairMQPollerSHM(const std::vector& channels); + FairMQPollerSHM(const std::vector& channels); FairMQPollerSHM(const std::unordered_map>& channelsMap, const std::vector& channelList); FairMQPollerSHM(const FairMQPollerSHM&) = delete; @@ -49,4 +49,4 @@ class FairMQPollerSHM final : public FairMQPoller std::unordered_map fOffsetMap; }; -#endif /* FAIRMQPOLLERSHM_H_ */ \ No newline at end of file +#endif /* FAIRMQPOLLERSHM_H_ */ diff --git a/fairmq/shmem/FairMQTransportFactorySHM.cxx b/fairmq/shmem/FairMQTransportFactorySHM.cxx index 97bfc83d..5afe25fa 100644 --- a/fairmq/shmem/FairMQTransportFactorySHM.cxx +++ b/fairmq/shmem/FairMQTransportFactorySHM.cxx @@ -213,24 +213,24 @@ void FairMQTransportFactorySHM::SendHeartbeats() } } -FairMQMessagePtr FairMQTransportFactorySHM::CreateMessage() const +FairMQMessagePtr FairMQTransportFactorySHM::CreateMessage() { - return unique_ptr(new FairMQMessageSHM(*fManager)); + return unique_ptr(new FairMQMessageSHM(*fManager, this)); } -FairMQMessagePtr FairMQTransportFactorySHM::CreateMessage(const size_t size) const +FairMQMessagePtr FairMQTransportFactorySHM::CreateMessage(const size_t size) { - return unique_ptr(new FairMQMessageSHM(*fManager, size)); + return unique_ptr(new FairMQMessageSHM(*fManager, size, this)); } -FairMQMessagePtr FairMQTransportFactorySHM::CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint) const +FairMQMessagePtr FairMQTransportFactorySHM::CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint) { - return unique_ptr(new FairMQMessageSHM(*fManager, data, size, ffn, hint)); + return unique_ptr(new FairMQMessageSHM(*fManager, data, size, ffn, hint, this)); } -FairMQMessagePtr FairMQTransportFactorySHM::CreateMessage(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint) const +FairMQMessagePtr FairMQTransportFactorySHM::CreateMessage(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint) { - return unique_ptr(new FairMQMessageSHM(*fManager, region, data, size, hint)); + return unique_ptr(new FairMQMessageSHM(*fManager, region, data, size, hint, this)); } FairMQSocketPtr FairMQTransportFactorySHM::CreateSocket(const string& type, const string& name) const @@ -244,7 +244,7 @@ FairMQPollerPtr FairMQTransportFactorySHM::CreatePoller(const vector(new FairMQPollerSHM(channels)); } -FairMQPollerPtr FairMQTransportFactorySHM::CreatePoller(const vector& channels) const +FairMQPollerPtr FairMQTransportFactorySHM::CreatePoller(const vector& channels) const { return unique_ptr(new FairMQPollerSHM(channels)); } diff --git a/fairmq/shmem/FairMQTransportFactorySHM.h b/fairmq/shmem/FairMQTransportFactorySHM.h index 909be96f..30fc502b 100644 --- a/fairmq/shmem/FairMQTransportFactorySHM.h +++ b/fairmq/shmem/FairMQTransportFactorySHM.h @@ -33,15 +33,15 @@ class FairMQTransportFactorySHM final : public FairMQTransportFactory FairMQTransportFactorySHM(const FairMQTransportFactorySHM&) = delete; FairMQTransportFactorySHM operator=(const FairMQTransportFactorySHM&) = delete; - FairMQMessagePtr CreateMessage() const override; - FairMQMessagePtr CreateMessage(const size_t size) const override; - FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) const override; - FairMQMessagePtr CreateMessage(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0) const override; + FairMQMessagePtr CreateMessage() override; + FairMQMessagePtr CreateMessage(const size_t size) override; + FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override; + FairMQMessagePtr CreateMessage(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0) override; FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name) const override; FairMQPollerPtr CreatePoller(const std::vector& channels) const override; - FairMQPollerPtr CreatePoller(const std::vector& channels) const override; + FairMQPollerPtr CreatePoller(const std::vector& channels) const override; FairMQPollerPtr CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const override; FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr) const override; diff --git a/fairmq/zeromq/FairMQMessageZMQ.cxx b/fairmq/zeromq/FairMQMessageZMQ.cxx index 60de00aa..e4a7e2eb 100644 --- a/fairmq/zeromq/FairMQMessageZMQ.cxx +++ b/fairmq/zeromq/FairMQMessageZMQ.cxx @@ -17,6 +17,7 @@ #include "FairMQLogger.h" #include #include "FairMQUnmanagedRegionZMQ.h" +#include #include @@ -24,8 +25,9 @@ using namespace std; fair::mq::Transport FairMQMessageZMQ::fTransportType = fair::mq::Transport::ZMQ; -FairMQMessageZMQ::FairMQMessageZMQ() - : fUsedSizeModified(false) +FairMQMessageZMQ::FairMQMessageZMQ(FairMQTransportFactory* factory) + : FairMQMessage{factory} + , fUsedSizeModified(false) , fUsedSize() , fMsg(fair::mq::tools::make_unique()) , fViewMsg(nullptr) @@ -36,8 +38,9 @@ FairMQMessageZMQ::FairMQMessageZMQ() } } -FairMQMessageZMQ::FairMQMessageZMQ(const size_t size) - : fUsedSizeModified(false) +FairMQMessageZMQ::FairMQMessageZMQ(const size_t size, FairMQTransportFactory* factory) + : FairMQMessage{factory} + , fUsedSizeModified(false) , fUsedSize(size) , fMsg(fair::mq::tools::make_unique()) , fViewMsg(nullptr) @@ -48,8 +51,9 @@ FairMQMessageZMQ::FairMQMessageZMQ(const size_t size) } } -FairMQMessageZMQ::FairMQMessageZMQ(void* data, const size_t size, fairmq_free_fn* ffn, void* hint) - : fUsedSizeModified(false) +FairMQMessageZMQ::FairMQMessageZMQ(void* data, const size_t size, fairmq_free_fn* ffn, void* hint, FairMQTransportFactory* factory) + : FairMQMessage{factory} + , fUsedSizeModified(false) , fUsedSize() , fMsg(fair::mq::tools::make_unique()) , fViewMsg(nullptr) @@ -60,8 +64,9 @@ FairMQMessageZMQ::FairMQMessageZMQ(void* data, const size_t size, fairmq_free_fn } } -FairMQMessageZMQ::FairMQMessageZMQ(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint) - : fUsedSizeModified(false) +FairMQMessageZMQ::FairMQMessageZMQ(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint, FairMQTransportFactory* factory) + : FairMQMessage{factory} + , fUsedSizeModified(false) , fUsedSize() , fMsg(fair::mq::tools::make_unique()) , fViewMsg(nullptr) diff --git a/fairmq/zeromq/FairMQMessageZMQ.h b/fairmq/zeromq/FairMQMessageZMQ.h index c2f4b64e..84a44fad 100644 --- a/fairmq/zeromq/FairMQMessageZMQ.h +++ b/fairmq/zeromq/FairMQMessageZMQ.h @@ -23,6 +23,7 @@ #include "FairMQMessage.h" #include "FairMQUnmanagedRegion.h" +class FairMQTransportFactory; class FairMQSocketZMQ; @@ -31,10 +32,10 @@ class FairMQMessageZMQ final : public FairMQMessage friend class FairMQSocketZMQ; public: - FairMQMessageZMQ(); - FairMQMessageZMQ(const size_t size); - FairMQMessageZMQ(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr); - FairMQMessageZMQ(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0); + FairMQMessageZMQ(FairMQTransportFactory* = nullptr); + FairMQMessageZMQ(const size_t size, FairMQTransportFactory* = nullptr); + FairMQMessageZMQ(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr, FairMQTransportFactory* = nullptr); + FairMQMessageZMQ(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0, FairMQTransportFactory* = nullptr); void Rebuild() override; void Rebuild(const size_t size) override; diff --git a/fairmq/zeromq/FairMQPollerZMQ.cxx b/fairmq/zeromq/FairMQPollerZMQ.cxx index a593eacf..333f8e79 100644 --- a/fairmq/zeromq/FairMQPollerZMQ.cxx +++ b/fairmq/zeromq/FairMQPollerZMQ.cxx @@ -43,7 +43,7 @@ FairMQPollerZMQ::FairMQPollerZMQ(const vector& channels) } -FairMQPollerZMQ::FairMQPollerZMQ(const std::vector& channels) +FairMQPollerZMQ::FairMQPollerZMQ(const std::vector& channels) : fItems() , fNumItems(0) , fOffsetMap() diff --git a/fairmq/zeromq/FairMQPollerZMQ.h b/fairmq/zeromq/FairMQPollerZMQ.h index c72797ff..8247d36b 100644 --- a/fairmq/zeromq/FairMQPollerZMQ.h +++ b/fairmq/zeromq/FairMQPollerZMQ.h @@ -34,7 +34,7 @@ class FairMQPollerZMQ final : public FairMQPoller public: FairMQPollerZMQ(const std::vector& channels); - FairMQPollerZMQ(const std::vector& channels); + FairMQPollerZMQ(const std::vector& channels); FairMQPollerZMQ(const std::unordered_map>& channelsMap, const std::vector& channelList); FairMQPollerZMQ(const FairMQPollerZMQ&) = delete; @@ -57,4 +57,4 @@ class FairMQPollerZMQ final : public FairMQPoller std::unordered_map fOffsetMap; }; -#endif /* FAIRMQPOLLERZMQ_H_ */ \ No newline at end of file +#endif /* FAIRMQPOLLERZMQ_H_ */ diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx b/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx index 93cdb093..7027df0b 100644 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx +++ b/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx @@ -49,24 +49,24 @@ FairMQTransportFactoryZMQ::FairMQTransportFactoryZMQ(const string& id, const Fai } -FairMQMessagePtr FairMQTransportFactoryZMQ::CreateMessage() const +FairMQMessagePtr FairMQTransportFactoryZMQ::CreateMessage() { - return unique_ptr(new FairMQMessageZMQ()); + return unique_ptr(new FairMQMessageZMQ(this)); } -FairMQMessagePtr FairMQTransportFactoryZMQ::CreateMessage(const size_t size) const +FairMQMessagePtr FairMQTransportFactoryZMQ::CreateMessage(const size_t size) { - return unique_ptr(new FairMQMessageZMQ(size)); + return unique_ptr(new FairMQMessageZMQ(size, this)); } -FairMQMessagePtr FairMQTransportFactoryZMQ::CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint) const +FairMQMessagePtr FairMQTransportFactoryZMQ::CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint) { - return unique_ptr(new FairMQMessageZMQ(data, size, ffn, hint)); + return unique_ptr(new FairMQMessageZMQ(data, size, ffn, hint, this)); } -FairMQMessagePtr FairMQTransportFactoryZMQ::CreateMessage(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint) const +FairMQMessagePtr FairMQTransportFactoryZMQ::CreateMessage(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint) { - return unique_ptr(new FairMQMessageZMQ(region, data, size, hint)); + return unique_ptr(new FairMQMessageZMQ(region, data, size, hint, this)); } FairMQSocketPtr FairMQTransportFactoryZMQ::CreateSocket(const string& type, const string& name) const @@ -80,7 +80,7 @@ FairMQPollerPtr FairMQTransportFactoryZMQ::CreatePoller(const vector(new FairMQPollerZMQ(channels)); } -FairMQPollerPtr FairMQTransportFactoryZMQ::CreatePoller(const std::vector& channels) const +FairMQPollerPtr FairMQTransportFactoryZMQ::CreatePoller(const std::vector& channels) const { return unique_ptr(new FairMQPollerZMQ(channels)); } diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.h b/fairmq/zeromq/FairMQTransportFactoryZMQ.h index b2917b81..11abbfce 100644 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.h +++ b/fairmq/zeromq/FairMQTransportFactoryZMQ.h @@ -34,15 +34,15 @@ class FairMQTransportFactoryZMQ final : public FairMQTransportFactory ~FairMQTransportFactoryZMQ() override; - FairMQMessagePtr CreateMessage() const override; - FairMQMessagePtr CreateMessage(const size_t size) const override; - FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) const override; - FairMQMessagePtr CreateMessage(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0) const override; + FairMQMessagePtr CreateMessage() override; + FairMQMessagePtr CreateMessage(const size_t size) override; + FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override; + FairMQMessagePtr CreateMessage(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0) override; FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name) const override; FairMQPollerPtr CreatePoller(const std::vector& channels) const override; - FairMQPollerPtr CreatePoller(const std::vector& channels) const override; + FairMQPollerPtr CreatePoller(const std::vector& channels) const override; FairMQPollerPtr CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const override; FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback) const override; diff --git a/test/helper/devices/TestPollIn.h b/test/helper/devices/TestPollIn.h index a683d677..74399ce0 100644 --- a/test/helper/devices/TestPollIn.h +++ b/test/helper/devices/TestPollIn.h @@ -43,7 +43,7 @@ class PollIn : public FairMQDevice auto Run() -> void override { - vector chans; + vector chans; chans.push_back(&fChannels.at("data1").at(0)); chans.push_back(&fChannels.at("data2").at(0)); diff --git a/test/memory_resources/_memory_resources.cxx b/test/memory_resources/_memory_resources.cxx index b9b65c7e..84a12b63 100644 --- a/test/memory_resources/_memory_resources.cxx +++ b/test/memory_resources/_memory_resources.cxx @@ -166,7 +166,7 @@ TEST(MemoryResources, adoptVector_test) std::memcpy(message->GetData(), tmpBuf, 3 * sizeof(testData)); auto adoptedOwner = - adoptVector(3, factoryZMQ->GetMemoryResource(), std::move(message)); + adoptVector(3, std::move(message)); EXPECT_TRUE(adoptedOwner[0].i == 3); EXPECT_TRUE(adoptedOwner[1].i == 2); EXPECT_TRUE(adoptedOwner[2].i == 1);