diff --git a/fairmq/FairMQChannel.cxx b/fairmq/FairMQChannel.cxx index 54040f8a..d4d966b9 100644 --- a/fairmq/FairMQChannel.cxx +++ b/fairmq/FairMQChannel.cxx @@ -841,12 +841,6 @@ unsigned long FairMQChannel::GetMessagesRx() const return fSocket->GetMessagesRx(); } - -FairMQTransportFactory* FairMQChannel::Transport() -{ - return fTransportFactory.get(); -} - bool FairMQChannel::CheckCompatibility(unique_ptr& msg) const { if (fTransportType == msg->GetType()) diff --git a/fairmq/FairMQChannel.h b/fairmq/FairMQChannel.h index 386ce186..60971135 100644 --- a/fairmq/FairMQChannel.h +++ b/fairmq/FairMQChannel.h @@ -258,7 +258,28 @@ class FairMQChannel unsigned long GetMessagesTx() const; unsigned long GetMessagesRx() const; - FairMQTransportFactory* Transport(); + auto Transport() const -> const FairMQTransportFactory* + { + return fTransportFactory.get(); + }; + + template + inline FairMQMessagePtr NewMessage(Args&&... args) const + { + return Transport()->CreateMessage(std::forward(args)...); + } + + template + inline FairMQMessagePtr NewSimpleMessage(const T& data) const + { + return Transport()->NewSimpleMessage(data); + } + + template + inline FairMQMessagePtr NewStaticMessage(const T& data) const + { + return Transport()->NewStaticMessage(data); + } private: std::unique_ptr fSocket; diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index ca9443d3..17842eb5 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -195,90 +195,46 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable return fChannels.at(chan).at(i).ReceiveAsync(parts.fParts); } - /// @brief Create empty FairMQMessage - /// @return pointer to FairMQMessage - inline FairMQMessagePtr NewMessage() const + /// @brief Getter for default transport factory + auto Transport() const -> const FairMQTransportFactory* { - return fTransportFactory->CreateMessage(); + return fTransports.cbegin()->second.get(); } - /// @brief Create new FairMQMessage of specified size - /// @param size message size - /// @return pointer to FairMQMessage - inline FairMQMessagePtr NewMessage(int size) const + template + inline FairMQMessagePtr NewMessage(Args&&... args) const { - return fTransportFactory->CreateMessage(size); - } - - /// @brief Create new FairMQMessage with user provided buffer and size - /// @param data pointer to user provided buffer - /// @param size size of the user provided buffer - /// @param ffn callback, called when the message is transfered (and can be deleted) - /// @param obj optional helper pointer that can be used in the callback - /// @return pointer to FairMQMessage - inline FairMQMessagePtr NewMessage(void* data, int size, fairmq_free_fn* ffn, void* obj = nullptr) const - { - return fTransportFactory->CreateMessage(data, size, ffn, obj); + return Transport()->CreateMessage(std::forward(args)...); } template inline FairMQMessagePtr NewMessageFor(const std::string& channel, int index, Args&&... args) const { - return fChannels.at(channel).at(index).fTransportFactory->CreateMessage(std::forward(args)...); - } - - static void FairMQNoCleanup(void* /*data*/, void* /*obj*/) - { + return fChannels.at(channel).at(index).Transport()->CreateMessage(std::forward(args)...); } template inline FairMQMessagePtr NewStaticMessage(const T& data) const { - return fTransportFactory->CreateMessage(data, sizeof(T), FairMQNoCleanup, nullptr); - } - - inline FairMQMessagePtr NewStaticMessage(const std::string& str) const - { - return fTransportFactory->CreateMessage(const_cast(str.c_str()), str.length(), FairMQNoCleanup, nullptr); + return Transport()->NewStaticMessage(data); } template inline FairMQMessagePtr NewStaticMessageFor(const std::string& channel, int index, const T& data) const { - return fChannels.at(channel).at(index).fTransportFactory->CreateMessage(data, sizeof(T), FairMQNoCleanup, nullptr); - } - - inline FairMQMessagePtr NewStaticMessageFor(const std::string& channel, int index, const std::string& str) const - { - return fChannels.at(channel).at(index).fTransportFactory->CreateMessage(const_cast(str.c_str()), str.length(), FairMQNoCleanup, nullptr); - } - - template - static void FairMQSimpleMsgCleanup(void* /*data*/, void* obj) - { - delete static_cast(obj); + return fChannels.at(channel).at(index).NewStaticMessage(data); } template inline FairMQMessagePtr NewSimpleMessage(const T& data) const { - // todo: is_trivially_copyable not available on gcc < 5, workaround? - // static_assert(std::is_trivially_copyable::value, "The argument type for NewSimpleMessage has to be trivially copyable!"); - T* dataCopy = new T(data); - return fTransportFactory->CreateMessage(dataCopy, sizeof(T), FairMQSimpleMsgCleanup, dataCopy); + return Transport()->NewSimpleMessage(data); } - template - inline FairMQMessagePtr NewSimpleMessage(const char(&data)[N]) const + template + inline FairMQMessagePtr NewSimpleMessageFor(const std::string& channel, int index, const T& data) const { - std::string* msgStr = new std::string(data); - return fTransportFactory->CreateMessage(const_cast(msgStr->c_str()), msgStr->length(), FairMQSimpleMsgCleanup, msgStr); - } - - inline FairMQMessagePtr NewSimpleMessage(const std::string& str) const - { - std::string* msgStr = new std::string(str); - return fTransportFactory->CreateMessage(const_cast(msgStr->c_str()), msgStr->length(), FairMQSimpleMsgCleanup, msgStr); + return fChannels.at(channel).at(index).NewSimpleMessage(data); } /// Waits for the first initialization run to finish diff --git a/fairmq/FairMQTransportFactory.h b/fairmq/FairMQTransportFactory.h index bf8a9ad0..f7d97360 100644 --- a/fairmq/FairMQTransportFactory.h +++ b/fairmq/FairMQTransportFactory.h @@ -35,11 +35,19 @@ class FairMQTransportFactory /// Initialize transport virtual void Initialize(const FairMQProgOptions* config) = 0; - /// Create an empty message (e.g. for receiving) + /// @brief Create empty FairMQMessage + /// @return pointer to FairMQMessage virtual FairMQMessagePtr CreateMessage() const = 0; - /// Create an empty of a specified size + /// @brief Create new FairMQMessage of specified size + /// @param size message size + /// @return pointer to FairMQMessage virtual FairMQMessagePtr CreateMessage(const size_t size) const = 0; - /// Create a message from a supplied buffer, size and a deallocation callback + /// @brief Create new FairMQMessage with user provided buffer and size + /// @param data pointer to user provided buffer + /// @param size size of the user provided buffer + /// @param ffn callback, called when the message is transfered (and can be deleted) + /// @param obj optional helper pointer that can be used in the callback + /// @return pointer to FairMQMessage virtual FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) const = 0; /// Create a socket @@ -63,6 +71,50 @@ class FairMQTransportFactory virtual ~FairMQTransportFactory() {}; static auto CreateTransportFactory(const std::string& type) -> std::shared_ptr; + + static void FairMQNoCleanup(void* /*data*/, void* /*obj*/) + { + } + + template + static void FairMQSimpleMsgCleanup(void* /*data*/, void* obj) + { + delete static_cast(obj); + } + + template + inline FairMQMessagePtr NewSimpleMessage(const T& data) const + { + // todo: is_trivially_copyable not available on gcc < 5, workaround? + // static_assert(std::is_trivially_copyable::value, "The argument type for NewSimpleMessage has to be trivially copyable!"); + T* dataCopy = new T(data); + return CreateMessage(dataCopy, sizeof(T), FairMQSimpleMsgCleanup, dataCopy); + } + + template + inline FairMQMessagePtr NewSimpleMessage(const char(&data)[N]) const + { + std::string* msgStr = new std::string(data); + return CreateMessage(const_cast(msgStr->c_str()), msgStr->length(), FairMQSimpleMsgCleanup, msgStr); + } + + inline FairMQMessagePtr NewSimpleMessage(const std::string& str) const + { + + std::string* msgStr = new std::string(str); + return CreateMessage(const_cast(msgStr->c_str()), msgStr->length(), FairMQSimpleMsgCleanup, msgStr); + } + + template + inline FairMQMessagePtr NewStaticMessage(const T& data) const + { + return CreateMessage(data, sizeof(T), FairMQNoCleanup, nullptr); + } + + inline FairMQMessagePtr NewStaticMessage(const std::string& str) const + { + return CreateMessage(const_cast(str.c_str()), str.length(), FairMQNoCleanup, nullptr); + } }; #endif /* FAIRMQTRANSPORTFACTORY_H_ */ diff --git a/fairmq/test/parts/_iterator_interface.cxx b/fairmq/test/parts/_iterator_interface.cxx index 387ccf28..300c55a7 100644 --- a/fairmq/test/parts/_iterator_interface.cxx +++ b/fairmq/test/parts/_iterator_interface.cxx @@ -8,8 +8,7 @@ #include #include -#include -#include +#include #include #include #include @@ -28,7 +27,7 @@ class RandomAccessIterator : public ::testing::Test { RandomAccessIterator() : mParts(FairMQParts{}), - mFactory(FairMQTransportFactoryZMQ{}), + mFactory(FairMQTransportFactory::CreateTransportFactory("zeromq")), mS1("1"), mS2("2"), mS3("3")