mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 08:41:16 +00:00
move New*Message apis to TransportFactory
* add facades to Device and Channel * add Transport() getter to Device * inline Transport() getter from Channel
This commit is contained in:
parent
97ca52aa0e
commit
87252edbe0
|
@ -841,12 +841,6 @@ unsigned long FairMQChannel::GetMessagesRx() const
|
||||||
return fSocket->GetMessagesRx();
|
return fSocket->GetMessagesRx();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
FairMQTransportFactory* FairMQChannel::Transport()
|
|
||||||
{
|
|
||||||
return fTransportFactory.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
bool FairMQChannel::CheckCompatibility(unique_ptr<FairMQMessage>& msg) const
|
bool FairMQChannel::CheckCompatibility(unique_ptr<FairMQMessage>& msg) const
|
||||||
{
|
{
|
||||||
if (fTransportType == msg->GetType())
|
if (fTransportType == msg->GetType())
|
||||||
|
|
|
@ -258,7 +258,28 @@ class FairMQChannel
|
||||||
unsigned long GetMessagesTx() const;
|
unsigned long GetMessagesTx() const;
|
||||||
unsigned long GetMessagesRx() const;
|
unsigned long GetMessagesRx() const;
|
||||||
|
|
||||||
FairMQTransportFactory* Transport();
|
auto Transport() const -> const FairMQTransportFactory*
|
||||||
|
{
|
||||||
|
return fTransportFactory.get();
|
||||||
|
};
|
||||||
|
|
||||||
|
template<typename... Args>
|
||||||
|
inline FairMQMessagePtr NewMessage(Args&&... args) const
|
||||||
|
{
|
||||||
|
return Transport()->CreateMessage(std::forward<Args>(args)...);
|
||||||
|
}
|
||||||
|
|
||||||
|
template<typename T>
|
||||||
|
inline FairMQMessagePtr NewSimpleMessage(const T& data) const
|
||||||
|
{
|
||||||
|
return Transport()->NewSimpleMessage(data);
|
||||||
|
}
|
||||||
|
|
||||||
|
template<typename T>
|
||||||
|
inline FairMQMessagePtr NewStaticMessage(const T& data) const
|
||||||
|
{
|
||||||
|
return Transport()->NewStaticMessage(data);
|
||||||
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::unique_ptr<FairMQSocket> fSocket;
|
std::unique_ptr<FairMQSocket> fSocket;
|
||||||
|
|
|
@ -195,90 +195,46 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable
|
||||||
return fChannels.at(chan).at(i).ReceiveAsync(parts.fParts);
|
return fChannels.at(chan).at(i).ReceiveAsync(parts.fParts);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// @brief Create empty FairMQMessage
|
/// @brief Getter for default transport factory
|
||||||
/// @return pointer to FairMQMessage
|
auto Transport() const -> const FairMQTransportFactory*
|
||||||
inline FairMQMessagePtr NewMessage() const
|
|
||||||
{
|
{
|
||||||
return fTransportFactory->CreateMessage();
|
return fTransports.cbegin()->second.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// @brief Create new FairMQMessage of specified size
|
template<typename... Args>
|
||||||
/// @param size message size
|
inline FairMQMessagePtr NewMessage(Args&&... args) const
|
||||||
/// @return pointer to FairMQMessage
|
|
||||||
inline FairMQMessagePtr NewMessage(int size) const
|
|
||||||
{
|
{
|
||||||
return fTransportFactory->CreateMessage(size);
|
return Transport()->CreateMessage(std::forward<Args>(args)...);
|
||||||
}
|
|
||||||
|
|
||||||
/// @brief Create new FairMQMessage with user provided buffer and size
|
|
||||||
/// @param data pointer to user provided buffer
|
|
||||||
/// @param size size of the user provided buffer
|
|
||||||
/// @param ffn callback, called when the message is transfered (and can be deleted)
|
|
||||||
/// @param obj optional helper pointer that can be used in the callback
|
|
||||||
/// @return pointer to FairMQMessage
|
|
||||||
inline FairMQMessagePtr NewMessage(void* data, int size, fairmq_free_fn* ffn, void* obj = nullptr) const
|
|
||||||
{
|
|
||||||
return fTransportFactory->CreateMessage(data, size, ffn, obj);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
template<typename... Args>
|
template<typename... Args>
|
||||||
inline FairMQMessagePtr NewMessageFor(const std::string& channel, int index, Args&&... args) const
|
inline FairMQMessagePtr NewMessageFor(const std::string& channel, int index, Args&&... args) const
|
||||||
{
|
{
|
||||||
return fChannels.at(channel).at(index).fTransportFactory->CreateMessage(std::forward<Args>(args)...);
|
return fChannels.at(channel).at(index).Transport()->CreateMessage(std::forward<Args>(args)...);
|
||||||
}
|
|
||||||
|
|
||||||
static void FairMQNoCleanup(void* /*data*/, void* /*obj*/)
|
|
||||||
{
|
|
||||||
}
|
}
|
||||||
|
|
||||||
template<typename T>
|
template<typename T>
|
||||||
inline FairMQMessagePtr NewStaticMessage(const T& data) const
|
inline FairMQMessagePtr NewStaticMessage(const T& data) const
|
||||||
{
|
{
|
||||||
return fTransportFactory->CreateMessage(data, sizeof(T), FairMQNoCleanup, nullptr);
|
return Transport()->NewStaticMessage(data);
|
||||||
}
|
|
||||||
|
|
||||||
inline FairMQMessagePtr NewStaticMessage(const std::string& str) const
|
|
||||||
{
|
|
||||||
return fTransportFactory->CreateMessage(const_cast<char*>(str.c_str()), str.length(), FairMQNoCleanup, nullptr);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
template<typename T>
|
template<typename T>
|
||||||
inline FairMQMessagePtr NewStaticMessageFor(const std::string& channel, int index, const T& data) const
|
inline FairMQMessagePtr NewStaticMessageFor(const std::string& channel, int index, const T& data) const
|
||||||
{
|
{
|
||||||
return fChannels.at(channel).at(index).fTransportFactory->CreateMessage(data, sizeof(T), FairMQNoCleanup, nullptr);
|
return fChannels.at(channel).at(index).NewStaticMessage(data);
|
||||||
}
|
|
||||||
|
|
||||||
inline FairMQMessagePtr NewStaticMessageFor(const std::string& channel, int index, const std::string& str) const
|
|
||||||
{
|
|
||||||
return fChannels.at(channel).at(index).fTransportFactory->CreateMessage(const_cast<char*>(str.c_str()), str.length(), FairMQNoCleanup, nullptr);
|
|
||||||
}
|
|
||||||
|
|
||||||
template<typename T>
|
|
||||||
static void FairMQSimpleMsgCleanup(void* /*data*/, void* obj)
|
|
||||||
{
|
|
||||||
delete static_cast<T*>(obj);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
template<typename T>
|
template<typename T>
|
||||||
inline FairMQMessagePtr NewSimpleMessage(const T& data) const
|
inline FairMQMessagePtr NewSimpleMessage(const T& data) const
|
||||||
{
|
{
|
||||||
// todo: is_trivially_copyable not available on gcc < 5, workaround?
|
return Transport()->NewSimpleMessage(data);
|
||||||
// static_assert(std::is_trivially_copyable<T>::value, "The argument type for NewSimpleMessage has to be trivially copyable!");
|
|
||||||
T* dataCopy = new T(data);
|
|
||||||
return fTransportFactory->CreateMessage(dataCopy, sizeof(T), FairMQSimpleMsgCleanup<T>, dataCopy);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
template<std::size_t N>
|
template<typename T>
|
||||||
inline FairMQMessagePtr NewSimpleMessage(const char(&data)[N]) const
|
inline FairMQMessagePtr NewSimpleMessageFor(const std::string& channel, int index, const T& data) const
|
||||||
{
|
{
|
||||||
std::string* msgStr = new std::string(data);
|
return fChannels.at(channel).at(index).NewSimpleMessage(data);
|
||||||
return fTransportFactory->CreateMessage(const_cast<char*>(msgStr->c_str()), msgStr->length(), FairMQSimpleMsgCleanup<std::string>, msgStr);
|
|
||||||
}
|
|
||||||
|
|
||||||
inline FairMQMessagePtr NewSimpleMessage(const std::string& str) const
|
|
||||||
{
|
|
||||||
std::string* msgStr = new std::string(str);
|
|
||||||
return fTransportFactory->CreateMessage(const_cast<char*>(msgStr->c_str()), msgStr->length(), FairMQSimpleMsgCleanup<std::string>, msgStr);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Waits for the first initialization run to finish
|
/// Waits for the first initialization run to finish
|
||||||
|
|
|
@ -35,11 +35,19 @@ class FairMQTransportFactory
|
||||||
/// Initialize transport
|
/// Initialize transport
|
||||||
virtual void Initialize(const FairMQProgOptions* config) = 0;
|
virtual void Initialize(const FairMQProgOptions* config) = 0;
|
||||||
|
|
||||||
/// Create an empty message (e.g. for receiving)
|
/// @brief Create empty FairMQMessage
|
||||||
|
/// @return pointer to FairMQMessage
|
||||||
virtual FairMQMessagePtr CreateMessage() const = 0;
|
virtual FairMQMessagePtr CreateMessage() const = 0;
|
||||||
/// Create an empty of a specified size
|
/// @brief Create new FairMQMessage of specified size
|
||||||
|
/// @param size message size
|
||||||
|
/// @return pointer to FairMQMessage
|
||||||
virtual FairMQMessagePtr CreateMessage(const size_t size) const = 0;
|
virtual FairMQMessagePtr CreateMessage(const size_t size) const = 0;
|
||||||
/// Create a message from a supplied buffer, size and a deallocation callback
|
/// @brief Create new FairMQMessage with user provided buffer and size
|
||||||
|
/// @param data pointer to user provided buffer
|
||||||
|
/// @param size size of the user provided buffer
|
||||||
|
/// @param ffn callback, called when the message is transfered (and can be deleted)
|
||||||
|
/// @param obj optional helper pointer that can be used in the callback
|
||||||
|
/// @return pointer to FairMQMessage
|
||||||
virtual FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) const = 0;
|
virtual FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) const = 0;
|
||||||
|
|
||||||
/// Create a socket
|
/// Create a socket
|
||||||
|
@ -63,6 +71,50 @@ class FairMQTransportFactory
|
||||||
virtual ~FairMQTransportFactory() {};
|
virtual ~FairMQTransportFactory() {};
|
||||||
|
|
||||||
static auto CreateTransportFactory(const std::string& type) -> std::shared_ptr<FairMQTransportFactory>;
|
static auto CreateTransportFactory(const std::string& type) -> std::shared_ptr<FairMQTransportFactory>;
|
||||||
|
|
||||||
|
static void FairMQNoCleanup(void* /*data*/, void* /*obj*/)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
template<typename T>
|
||||||
|
static void FairMQSimpleMsgCleanup(void* /*data*/, void* obj)
|
||||||
|
{
|
||||||
|
delete static_cast<T*>(obj);
|
||||||
|
}
|
||||||
|
|
||||||
|
template<typename T>
|
||||||
|
inline FairMQMessagePtr NewSimpleMessage(const T& data) const
|
||||||
|
{
|
||||||
|
// todo: is_trivially_copyable not available on gcc < 5, workaround?
|
||||||
|
// static_assert(std::is_trivially_copyable<T>::value, "The argument type for NewSimpleMessage has to be trivially copyable!");
|
||||||
|
T* dataCopy = new T(data);
|
||||||
|
return CreateMessage(dataCopy, sizeof(T), FairMQSimpleMsgCleanup<T>, dataCopy);
|
||||||
|
}
|
||||||
|
|
||||||
|
template<std::size_t N>
|
||||||
|
inline FairMQMessagePtr NewSimpleMessage(const char(&data)[N]) const
|
||||||
|
{
|
||||||
|
std::string* msgStr = new std::string(data);
|
||||||
|
return CreateMessage(const_cast<char*>(msgStr->c_str()), msgStr->length(), FairMQSimpleMsgCleanup<std::string>, msgStr);
|
||||||
|
}
|
||||||
|
|
||||||
|
inline FairMQMessagePtr NewSimpleMessage(const std::string& str) const
|
||||||
|
{
|
||||||
|
|
||||||
|
std::string* msgStr = new std::string(str);
|
||||||
|
return CreateMessage(const_cast<char*>(msgStr->c_str()), msgStr->length(), FairMQSimpleMsgCleanup<std::string>, msgStr);
|
||||||
|
}
|
||||||
|
|
||||||
|
template<typename T>
|
||||||
|
inline FairMQMessagePtr NewStaticMessage(const T& data) const
|
||||||
|
{
|
||||||
|
return CreateMessage(data, sizeof(T), FairMQNoCleanup, nullptr);
|
||||||
|
}
|
||||||
|
|
||||||
|
inline FairMQMessagePtr NewStaticMessage(const std::string& str) const
|
||||||
|
{
|
||||||
|
return CreateMessage(const_cast<char*>(str.c_str()), str.length(), FairMQNoCleanup, nullptr);
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
#endif /* FAIRMQTRANSPORTFACTORY_H_ */
|
#endif /* FAIRMQTRANSPORTFACTORY_H_ */
|
||||||
|
|
|
@ -8,8 +8,7 @@
|
||||||
|
|
||||||
#include <gtest/gtest.h>
|
#include <gtest/gtest.h>
|
||||||
#include <FairMQParts.h>
|
#include <FairMQParts.h>
|
||||||
#include <FairMQDevice.h>
|
#include <FairMQTransportFactory.h>
|
||||||
#include <zeromq/FairMQTransportFactoryZMQ.h>
|
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <sstream>
|
#include <sstream>
|
||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
|
@ -28,7 +27,7 @@ class RandomAccessIterator : public ::testing::Test {
|
||||||
|
|
||||||
RandomAccessIterator()
|
RandomAccessIterator()
|
||||||
: mParts(FairMQParts{}),
|
: mParts(FairMQParts{}),
|
||||||
mFactory(FairMQTransportFactoryZMQ{}),
|
mFactory(FairMQTransportFactory::CreateTransportFactory("zeromq")),
|
||||||
mS1("1"),
|
mS1("1"),
|
||||||
mS2("2"),
|
mS2("2"),
|
||||||
mS3("3")
|
mS3("3")
|
||||||
|
|
Loading…
Reference in New Issue
Block a user