mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 08:41:16 +00:00
remove alternative serialization API
This commit is contained in:
parent
835c88c6d2
commit
e215049db9
|
@ -79,25 +79,6 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable
|
||||||
return fChannels.at(chan).at(i).Send(msg);
|
return fChannels.at(chan).at(i).Send(msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
template<typename Serializer, typename DataType>
|
|
||||||
inline int Send(std::unique_ptr<FairMQMessage>& msg, DataType&& data, const std::string& chan, const int i = 0) const
|
|
||||||
{
|
|
||||||
|
|
||||||
Serializer().serialize_impl(msg,std::forward<DataType>(data));
|
|
||||||
auto nbytes = fChannels.at(chan).at(i).Send(msg);
|
|
||||||
return nbytes;
|
|
||||||
}
|
|
||||||
|
|
||||||
template<typename Serializer, typename DataType>
|
|
||||||
inline int Send(DataType&& data, const std::string& chan, const int i = 0) const
|
|
||||||
{
|
|
||||||
std::unique_ptr<FairMQMessage> msg(NewMessage());
|
|
||||||
Serializer().serialize_impl(msg,std::forward<DataType>(data));
|
|
||||||
auto nbytes = fChannels.at(chan).at(i).Send(msg);
|
|
||||||
return nbytes;
|
|
||||||
}
|
|
||||||
|
|
||||||
//*
|
|
||||||
template<typename Serializer, typename DataType, typename... Args>
|
template<typename Serializer, typename DataType, typename... Args>
|
||||||
void Serialize(FairMQMessage& msg, DataType&& data, Args&&... args) const
|
void Serialize(FairMQMessage& msg, DataType&& data, Args&&... args) const
|
||||||
{
|
{
|
||||||
|
@ -110,24 +91,6 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable
|
||||||
Deserializer().Deserialize(msg,std::forward<DataType>(data),std::forward<Args>(args)...);
|
Deserializer().Deserialize(msg,std::forward<DataType>(data),std::forward<Args>(args)...);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
// temporary overload to handle the case of a return ref to FairMQMessage
|
|
||||||
template<typename Serializer, typename MessageType, typename DataType>
|
|
||||||
void Serialize(MessageType&& msg, DataType&& data) const
|
|
||||||
{
|
|
||||||
Serializer().Serialize(std::forward<MessageType>(msg),std::forward<DataType>(data));
|
|
||||||
}
|
|
||||||
|
|
||||||
template<typename Deserializer, typename MessageType, typename DataType>
|
|
||||||
void Deserialize(MessageType&& msg, DataType&& data) const
|
|
||||||
{
|
|
||||||
Deserializer().Deserialize(std::forward<MessageType>(msg), std::forward<DataType>(data));
|
|
||||||
}
|
|
||||||
|
|
||||||
// */
|
|
||||||
|
|
||||||
|
|
||||||
/// Shorthand method to receive `msg` on `chan` at index `i`
|
/// Shorthand method to receive `msg` on `chan` at index `i`
|
||||||
/// @param msg message reference
|
/// @param msg message reference
|
||||||
/// @param chan channel name
|
/// @param chan channel name
|
||||||
|
@ -138,25 +101,6 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable
|
||||||
return fChannels.at(chan).at(i).Receive(msg);
|
return fChannels.at(chan).at(i).Receive(msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
template<typename Deserializer, typename DataType>
|
|
||||||
inline int Receive(const std::unique_ptr<FairMQMessage>& msg, DataType&& data, const std::string& chan, const int i = 0) const
|
|
||||||
{
|
|
||||||
auto nbytes = fChannels.at(chan).at(i).Receive(msg);
|
|
||||||
Deserializer().deserialize_impl(msg,std::forward<DataType>(data));
|
|
||||||
return nbytes;
|
|
||||||
}
|
|
||||||
|
|
||||||
// using rvalue ref as universal reference
|
|
||||||
template<typename Deserializer, typename DataType>
|
|
||||||
inline int Receive(DataType&& data, const std::string& chan, const int i = 0) const
|
|
||||||
{
|
|
||||||
std::unique_ptr<FairMQMessage> msg(NewMessage());
|
|
||||||
auto nbytes = fChannels.at(chan).at(i).Receive(msg);
|
|
||||||
Deserializer().deserialize_impl(msg,std::forward<DataType>(data));
|
|
||||||
return nbytes;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Shorthand method to send a vector of messages on `chan` at index `i`
|
/// Shorthand method to send a vector of messages on `chan` at index `i`
|
||||||
/// @param msgVec message vector reference
|
/// @param msgVec message vector reference
|
||||||
/// @param chan channel name
|
/// @param chan channel name
|
||||||
|
|
|
@ -14,18 +14,6 @@
|
||||||
|
|
||||||
#include "FairMQTransportFactory.h"
|
#include "FairMQTransportFactory.h"
|
||||||
#include "FairMQMessage.h"
|
#include "FairMQMessage.h"
|
||||||
#include "FairMQLogger.h"
|
|
||||||
#include "zeromq/FairMQMessageZMQ.h"
|
|
||||||
//class FairMQMessageZMQ;
|
|
||||||
class FairMQMessageNN;
|
|
||||||
namespace fairmq
|
|
||||||
{
|
|
||||||
namespace transport
|
|
||||||
{
|
|
||||||
struct ZMQ{};
|
|
||||||
struct NN{};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// FairMQParts is a lightweight convenience wrapper around a vector of unique pointers to FairMQMessage, used for sending multi-part messages
|
/// FairMQParts is a lightweight convenience wrapper around a vector of unique pointers to FairMQMessage, used for sending multi-part messages
|
||||||
|
|
||||||
|
@ -51,24 +39,18 @@ class FairMQParts
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/// Adds part (std::unique_ptr<FairMQMessage>&) to the container (move)
|
||||||
|
/// @param msg unique pointer to FairMQMessage
|
||||||
|
/// lvalue ref (move not required when passing argument)
|
||||||
inline void AddPart(std::unique_ptr<FairMQMessage>& msg)
|
inline void AddPart(std::unique_ptr<FairMQMessage>& msg)
|
||||||
{
|
{
|
||||||
fParts.push_back(std::move(msg));
|
fParts.push_back(std::move(msg));
|
||||||
}
|
}
|
||||||
|
|
||||||
template<typename Serializer, typename DataType>
|
/// Adds part (std::unique_ptr<FairMQMessage>&) to the container (move)
|
||||||
inline void AddPart(DataType&& data)
|
|
||||||
{
|
|
||||||
std::unique_ptr<FairMQMessage> msg(new FairMQMessageZMQ());
|
|
||||||
Serializer().Serialize(msg, std::forward<DataType>(data));
|
|
||||||
fParts.push_back(std::move(msg));
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/// Adds part (std::unique_ptr<FairMQMessage>) to the container (move)
|
|
||||||
/// @param msg unique pointer to FairMQMessage
|
/// @param msg unique pointer to FairMQMessage
|
||||||
inline void AddPart(std::unique_ptr<FairMQMessage> msg)
|
/// rvalue ref (move required when passing argument)
|
||||||
|
inline void AddPart(std::unique_ptr<FairMQMessage>&& msg)
|
||||||
{
|
{
|
||||||
fParts.push_back(std::move(msg));
|
fParts.push_back(std::move(msg));
|
||||||
}
|
}
|
||||||
|
@ -81,16 +63,9 @@ class FairMQParts
|
||||||
/// @param index container index
|
/// @param index container index
|
||||||
inline std::unique_ptr<FairMQMessage>& At(const int index) { return fParts.at(index); }
|
inline std::unique_ptr<FairMQMessage>& At(const int index) { return fParts.at(index); }
|
||||||
|
|
||||||
|
// ref version
|
||||||
inline FairMQMessage& At_ref(const int index) { return *(fParts.at(index)); }
|
inline FairMQMessage& At_ref(const int index) { return *(fParts.at(index)); }
|
||||||
|
|
||||||
template<typename Deserializer, typename DataType>
|
|
||||||
inline void At(DataType&& data, const int index)
|
|
||||||
{
|
|
||||||
Deserializer().Deserialize(fParts.at(index), std::forward<DataType>(data));
|
|
||||||
}
|
|
||||||
|
|
||||||
inline std::unique_ptr<FairMQMessage>& At_ptr(const int index) { return fParts.at(index); }
|
|
||||||
/// Get number of parts in the container
|
/// Get number of parts in the container
|
||||||
/// @return number of parts in the container
|
/// @return number of parts in the container
|
||||||
inline int Size() const { return fParts.size(); }
|
inline int Size() const { return fParts.size(); }
|
||||||
|
|
|
@ -75,8 +75,10 @@ class GenericFileSink : public FairMQDevice, public T, public U
|
||||||
int receivedMsg = 0;
|
int receivedMsg = 0;
|
||||||
while (CheckCurrentState(RUNNING))
|
while (CheckCurrentState(RUNNING))
|
||||||
{
|
{
|
||||||
if (Receive<deserializer_type>(fInput, "data-in") > 0)
|
std::unique_ptr<FairMQMessage> msg(NewMessage());
|
||||||
|
if (Receive(msg,"data-in") > 0)
|
||||||
{
|
{
|
||||||
|
Deserialize<deserializer_type>(*msg,fInput);
|
||||||
U::Serialize(fInput);// add fInput to file
|
U::Serialize(fInput);// add fInput to file
|
||||||
receivedMsg++;
|
receivedMsg++;
|
||||||
}
|
}
|
||||||
|
|
|
@ -75,11 +75,15 @@ class GenericProcessor : public FairMQDevice, public T, public U,
|
||||||
|
|
||||||
while (CheckCurrentState(RUNNING))
|
while (CheckCurrentState(RUNNING))
|
||||||
{
|
{
|
||||||
if (Receive<T>(fInput, "data-in") > 0)
|
std::unique_ptr<FairMQMessage> msg(NewMessage());
|
||||||
|
if (Receive(fInput, "data-in") > 0)
|
||||||
{
|
{
|
||||||
|
Deserialize<T>(*msg,fInput);
|
||||||
receivedMsgs++;
|
receivedMsgs++;
|
||||||
task_type::Exec(fInput,fOutput);
|
task_type::Exec(fInput,fOutput);
|
||||||
Send<U>(fOutput, "data-out");
|
|
||||||
|
Serialize<U>(*msg,fOutput);
|
||||||
|
Send(fOutput, "data-out");
|
||||||
sentMsgs++;
|
sentMsgs++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user