diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index 456886ae..7a85e358 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -79,6 +79,54 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable return fChannels.at(chan).at(i).Send(msg); } + template + inline int Send(std::unique_ptr& msg, DataType&& data, const std::string& chan, const int i = 0) const + { + + Serializer().serialize_impl(msg,std::forward(data)); + auto nbytes = fChannels.at(chan).at(i).Send(msg); + return nbytes; + } + + template + inline int Send(DataType&& data, const std::string& chan, const int i = 0) const + { + std::unique_ptr msg(NewMessage()); + Serializer().serialize_impl(msg,std::forward(data)); + auto nbytes = fChannels.at(chan).at(i).Send(msg); + return nbytes; + } + +//* + template + void Serialize(std::unique_ptr& msg, DataType&& data, Args&&... args) const + { + Serializer().Serialize(msg,std::forward(data),std::forward(args)...); + } + + template + void Deserialize(std::unique_ptr& msg, DataType&& data, Args&&... args) const + { + Deserializer().Deserialize(msg,std::forward(data),std::forward(args)...); + } +// */ + +// temporary overload to handle the case of a return ref to FairMQMessage + template + void Serialize(MessageType&& msg, DataType&& data) const + { + Serializer().Serialize(std::forward(msg),std::forward(data)); + } + + template + void Deserialize(MessageType&& msg, DataType&& data) const + { + Deserializer().Deserialize(std::forward(msg), std::forward(data)); + } + + + + /// Shorthand method to receive `msg` on `chan` at index `i` /// @param msg message reference /// @param chan channel name @@ -89,6 +137,25 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable return fChannels.at(chan).at(i).Receive(msg); } + + template + inline int Receive(const std::unique_ptr& msg, DataType&& data, const std::string& chan, const int i = 0) const + { + auto nbytes = fChannels.at(chan).at(i).Receive(msg); + Deserializer().deserialize_impl(msg,std::forward(data)); + return nbytes; + } + + // using rvalue ref as universal reference + template + inline int Receive(DataType&& data, const std::string& chan, const int i = 0) const + { + std::unique_ptr msg(NewMessage()); + auto nbytes = fChannels.at(chan).at(i).Receive(msg); + Deserializer().deserialize_impl(msg,std::forward(data)); + return nbytes; + } + /// Shorthand method to send a vector of messages on `chan` at index `i` /// @param msgVec message vector reference /// @param chan channel name diff --git a/fairmq/FairMQParts.h b/fairmq/FairMQParts.h index 4a4a555b..dc2ad793 100644 --- a/fairmq/FairMQParts.h +++ b/fairmq/FairMQParts.h @@ -14,6 +14,18 @@ #include "FairMQTransportFactory.h" #include "FairMQMessage.h" +#include "FairMQLogger.h" +#include "zeromq/FairMQMessageZMQ.h" +//class FairMQMessageZMQ; +class FairMQMessageNN; +namespace fairmq +{ + namespace transport + { + struct ZMQ{}; + struct NN{}; + } +} /// FairMQParts is a lightweight convenience wrapper around a vector of unique pointers to FairMQMessage, used for sending multi-part messages @@ -37,6 +49,22 @@ class FairMQParts { fParts.push_back(std::unique_ptr(msg)); } + + + inline void AddPart(std::unique_ptr& msg) + { + fParts.push_back(std::move(msg)); + } + + template + inline void AddPart(DataType&& data) + { + std::unique_ptr msg(new FairMQMessageZMQ()); + Serializer().Serialize(msg, std::forward(data)); + fParts.push_back(std::move(msg)); + } + + /// Adds part (std::unique_ptr) to the container (move) /// @param msg unique pointer to FairMQMessage @@ -53,6 +81,13 @@ class FairMQParts /// @param index container index inline std::unique_ptr& At(const int index) { return fParts.at(index); } + template + inline void At(DataType&& data, const int index) + { + Deserializer().Deserialize(fParts.at(index), std::forward(data)); + } + + inline std::unique_ptr& At_ptr(const int index) { return fParts.at(index); } /// Get number of parts in the container /// @return number of parts in the container inline int Size() const { return fParts.size(); } diff --git a/fairmq/devices/GenericFileSink.h b/fairmq/devices/GenericFileSink.h index 125c6a9d..d9be195c 100644 --- a/fairmq/devices/GenericFileSink.h +++ b/fairmq/devices/GenericFileSink.h @@ -41,55 +41,48 @@ template class GenericFileSink : public FairMQDevice, public T, public U { - protected: - typedef T deserialization_type; - typedef U sink_type; public: + typedef T input_policy; + typedef U sink_type; GenericFileSink() - : deserialization_type() - , sink_type() + : FairMQDevice(), input_policy(), sink_type() {} virtual ~GenericFileSink() {} - template - void SetTransport(Args... args) - { - FairMQDevice::SetTransport(std::forward(args)...); - } - template - void InitInputContainer(Args... args) + template + void InitInputData(Args&&... args) { - deserialization_type::InitContainer(std::forward(args)...); + input_policy::Create(std::forward(args)...); } protected: + + + using input_policy::fInput; + + virtual void InitTask() { sink_type::InitOutputFile(); } + typedef typename input_policy::deserialization_type deserializer_type; virtual void Run() { int receivedMsg = 0; - - // store the channel reference to avoid traversing the map on every loop iteration - const FairMQChannel& inputChannel = fChannels.at("data-in").at(0); - while (CheckCurrentState(RUNNING)) { - std::unique_ptr msg(fTransportFactory->CreateMessage()); - - if (inputChannel.Receive(msg) > 0) + if (Receive(fInput, "data-in") > 0) { - sink_type::AddToFile(deserialization_type::DeserializeMsg(msg.get())); + U::Serialize(fInput);// add fInput to file receivedMsg++; } } - MQLOG(INFO) << "Received " << receivedMsg << " messages!"; + LOG(INFO) << "Received " << receivedMsg << " messages!"; } }; diff --git a/fairmq/devices/GenericProcessor.h b/fairmq/devices/GenericProcessor.h index 3510bab9..8909e3ac 100644 --- a/fairmq/devices/GenericProcessor.h +++ b/fairmq/devices/GenericProcessor.h @@ -17,107 +17,55 @@ #include "FairMQDevice.h" -/********************************************************************* - * -------------- NOTES ----------------------- - * All policies must have a default constructor - * Function to define in (parent) policy classes : - * - * -------- INPUT POLICY -------- - * deserialization_type::InitContainer(...) - * CONTAINER_TYPE deserialization_type::DeserializeMsg(FairMQMessage* msg) - * deserialization_type::InitContainer(...) // if GenericProcessor::InitInputContainer(...) is used - * - * - * -------- OUTPUT POLICY -------- - * serialization_type::SerializeMsg(CONTAINER_TYPE) - * serialization_type::SetMessage(FairMQMessage* msg) - * serialization_type::InitContainer(...) // if GenericProcessor::InitOutputContainer(...) is used - * - * -------- TASK POLICY -------- - * CONTAINER_TYPE proc_task_type::GetOutputData() - * proc_task_type::ExecuteTask(CONTAINER_TYPE container) - * proc_task_type::InitTask(...) // if GenericProcessor::InitTask(...) is used - * - **********************************************************************/ - -template -class GenericProcessor : public FairMQDevice, public T, public U, public V +template < typename T/*=deserialization type*/, + typename U/*=serialization type*/, + typename V/*=task type*///, + //typename W/*=input creator type*/, + //typename X/*=output creator type*/ + > +class GenericProcessor : public FairMQDevice, public T, public U, + public V//, + //public W, + //public X { - protected: - typedef T deserialization_type; - typedef U serialization_type; - typedef V proc_task_type; - public: + public: + typedef T input_policy; + typedef U output_policy; + typedef V task_type; + + //typedef W input_creator_type; + //typedef X output_creator_type; + + + GenericProcessor() - : deserialization_type() - , serialization_type() - , proc_task_type() + : FairMQDevice(), T(), U() + , task_type() + //, input_creator_type() + //, output_creator_type() {} virtual ~GenericProcessor() {} - // the four following methods ensure - // that the correct policy method is called - - template - void SetTransport(Args... args) + template + void InitInputData(Args&&... args) { - FairMQDevice::SetTransport(std::forward(args)...); + input_policy::Create(std::forward(args)...); } - template - void InitTask(Args... args) + template + void InitOutputData(Args&&... args) { - proc_task_type::InitTask(std::forward(args)...); + output_policy::Create(std::forward(args)...); } - template - void InitInputContainer(Args... args) - { - deserialization_type::InitContainer(std::forward(args)...); - } - - template - void InitOutputContainer(Args... args) - { - serialization_type::InitContainer(std::forward(args)...); - } - - /* - * - // *********************** TODO: implement multipart features - void SendPart() - { - fChannels["data-out"].at(0).Send(serialization_type::SerializeMsg(proc_task_type::GetData()), "snd-more"); - serialization_type::CloseMessage(); - } - - // void SendPart(); - // bool ReceivePart(); - bool ReceivePart() - { - if (fChannels["data-in"].at(0).ExpectsAnotherPart()) - { - deserialization_type::CloseMessage(); - // fProcessorTask->GetPayload()->CloseMessage(); - fProcessorTask->SetPayload(fTransportFactory->CreateMessage()); - return fChannels["data-in"].at(0).Receive(fProcessorTask->GetPayload()); - } - else - { - return false; - } - } - */ - protected: + using input_policy::fInput; + using output_policy::fOutput; + virtual void InitTask() { - // TODO: implement multipart features - // fProcessorTask->InitTask(); - // fProcessorTask->SetSendPart(boost::bind(&FairMQProcessor::SendPart, this)); - // fProcessorTask->SetReceivePart(boost::bind(&FairMQProcessor::ReceivePart, this)); } virtual void Run() @@ -125,34 +73,22 @@ class GenericProcessor : public FairMQDevice, public T, public U, public V int receivedMsgs = 0; int sentMsgs = 0; - const FairMQChannel& inputChannel = fChannels["data-in"].at(0); - const FairMQChannel& outputChannel = fChannels["data-out"].at(0); - while (CheckCurrentState(RUNNING)) { - std::unique_ptr msg(fTransportFactory->CreateMessage()); - - if (inputChannel.Receive(msg) > 0) + if (Receive(fInput, "data-in") > 0) { receivedMsgs++; - // deserialization_type::DeserializeMsg(msg) --> deserialize data of msg and fill output container - // proc_task_type::ExecuteTask( ... ) --> process output container - proc_task_type::ExecuteTask(deserialization_type::DeserializeMsg(msg.get())); - - // serialization_type::fMessage point to msg - serialization_type::SetMessage(msg.get()); - - // proc_task_type::GetOutputData() --> Get processed output container - // serialization_type::message(...) --> Serialize output container and fill fMessage - outputChannel.Send(serialization_type::SerializeMsg(proc_task_type::GetOutputData())); + task_type::Exec(fInput,fOutput); + Send(fOutput, "data-out"); sentMsgs++; } } - - MQLOG(INFO) << "Received " << receivedMsgs << " and sent " << sentMsgs << " messages!"; + LOG(INFO) << "Received " << receivedMsgs << " and sent " << sentMsgs << " messages!"; } - }; + + + #endif /* GENERICPROCESSOR_H */ diff --git a/fairmq/devices/GenericSampler.h b/fairmq/devices/GenericSampler.h index fe84b043..adb1f817 100644 --- a/fairmq/devices/GenericSampler.h +++ b/fairmq/devices/GenericSampler.h @@ -29,177 +29,83 @@ #include "FairMQTools.h" /* GENERIC SAMPLER (data source) MQ-DEVICE */ -/********************************************************************* - * -------------- NOTES ----------------------- - * All policies must have a default constructor - * Function to define in (parent) policy classes : - * - * -------- INPUT POLICY (SAMPLER POLICY) -------- - * source_type::InitSource() // must be there to compile - * int64_t source_type::GetNumberOfEvent() // must be there to compile - * source_type::SetIndex(int64_t eventIdx) // must be there to compile - * CONTAINER_TYPE source_type::GetOutData() // must be there to compile - * source_type::SetFileProperties(Args&... args) // must be there to compile - * - * void BindSendHeader(std::function callback) // enabled if exists - * void BindGetSocketNumber(std::function callback) // enabled if exists - * void GetHeader(std::function callback) // enabled if exists - * - * -------- OUTPUT POLICY -------- - * serialization_type::SerializeMsg(CONTAINER_TYPE) // must be there to compile - * serialization_type::SetMessage(FairMQMessage* msg) // must be there to compile - * - **********************************************************************/ +/********************************************************************* */ + +template +using enable_if_match = typename std::enable_if::value,int>::type; -//template -//class base_GenericSampler : public FairMQDevice, public source_type, public serialization_type -template +struct DefaultSamplerRun {}; + +template < typename T, + typename U, + typename R=DefaultSamplerRun + > class base_GenericSampler : public FairMQDevice, public T, public U { - protected: - typedef T source_type; - typedef U serialization_type; - typedef K key_type; - typedef L task_type; - typedef base_GenericSampler self_type; - public: - enum - { - EventRate = FairMQDevice::Last, - OutChannelName - }; + typedef T input_policy;// sampler source + typedef U output_policy;// deserialization + typedef R run_type; - base_GenericSampler(); - virtual ~base_GenericSampler(); - /* - struct trait : source_type::trait, serialization_type::trait - { - //static const SerializationTag serialization = serialization_type::trait::serialization; - //static const FileTag - static const DeviceTag device = kSampler; - typedef base_GenericSampler self_type; - typedef source_type source_type; - typedef serialization_type serialization_type; - }; - */ + base_GenericSampler() : FairMQDevice(), fOutChanName("data-out"), T(), U() + {} + + virtual ~base_GenericSampler() + {} template - void SetTransport(Args... args) + void SetFileProperties(Args&&... args) { - FairMQDevice::SetTransport(std::forward(args)...); - } - - void ResetEventCounter(); - - template - void SetFileProperties(Args&... args) - { - source_type::SetFileProperties(args...); - } - - virtual void SetProperty(const int key, const int value); - virtual int GetProperty(const int key, const int default_ = 0); - virtual void SetProperty(const int key, const std::string& value); - virtual std::string GetProperty(const int key, const std::string& default_ = ""); - - void SendHeader(int socketIdx=0); - int GetSocketNumber() const; - int GetCurrentIndex() const; - void SetContinuous(bool flag); - - /* - register the tasks you want to process and, which will be - called by ExecuteTasks() member function. The registration is done by filling - a std::map where key_type is int and task_type - is std::function by default (when using GenericSampler alias template). - The template parameter must take a pointer to this class or derived class as first argument, - and a reference to a std::map as second argument. - It is convenient to use a lambda expression in the place of the template argument. - For example if we want to register the simple function, - //< - void myfunction() {std::cout << "hello World" << std::endl;} - //> - ,and the MultiPartTask() template function member of this class , then - we can do in the main function as follow : - - sampler.RegisterTask( - [&](TSampler* s, std::map>& task_list) - { - task_list[0]=std::bind(myfunction); - task_list[1]=std::bind(&U::template MultiPartTask<5>, s); - }); - - To communicate with the Host derived class via callback, three methods from the host class are callable (only - after binding these methods in the GenericSampler::InitTask() ) - */ - template - void RegisterTask(RegistrationManager manage) - { - manage(this, fTaskList); - LOG(DEBUG) << "Current Number of registered tasks = " << fTaskList.size(); - } - - void ExecuteTasks() - { - for(const auto& p : fTaskList) - { - LOG(DEBUG) << "Execute Task " << p.first; - p.second(); - } + input_policy::SetFileProperties(std::forward(args)...); } protected: - virtual void InitTask(); - virtual void Run(); + + + using input_policy::fInput; + virtual void Init() + { + input_policy::InitSource(); + + } + + template = 0> + inline void Run_impl() + { + int64_t sentMsgs(0); + int64_t numEvents = input_policy::GetNumberOfEvent(); + LOG(INFO) << "Number of events to process: " << numEvents; + boost::timer::auto_cpu_timer timer; + for (int64_t idx(0); idx < numEvents; idx++) + { + std::unique_ptr msg(NewMessage()); + T::Deserialize(idx); + Send(fInput, fOutChanName); + sentMsgs++; + if (!CheckCurrentState(RUNNING)) + break; + + } + + boost::timer::cpu_times const elapsed_time(timer.elapsed()); + LOG(INFO) << "Sent everything in:\n" << boost::timer::format(elapsed_time, 2); + LOG(INFO) << "Sent " << sentMsgs << " messages!"; + } + + virtual void Run() + { + Run_impl(); + } private: std::string fOutChanName; - int64_t fNumEvents; - int64_t fCurrentIdx; - int fEventRate; - int fEventCounter; - bool fContinuous; - std::map fTaskList; // to handle Task list - // automatically enable or disable the call of policy function members for binding of host functions. - // this template functions use SFINAE to detect the existence of the policy function signature. - template = 0> - void BindingSendPart() {} - template = 0> - void BindingSendPart() - { - source_type::BindSendHeader(std::bind(&base_GenericSampler::SendPart,this,std::placeholders::_1) ); - } - - template = 0> - void BindingGetSocketNumber() {} - template = 0> - void BindingGetSocketNumber() - { - source_type::BindGetSocketNumber(std::bind(&base_GenericSampler::GetSocketNumber,this) ); - } - - template = 0> - void SendHeader(int /*socketIdx*/) {} - template = 0> - void SendHeader(int socketIdx) - { - std::unique_ptr msg(fTransportFactory->CreateMessage()); - serialization_type::SetMessage(msg.get()); - // remark : serialization_type must have an overload of the SerializeMsg to serialize the Header structure - fChannels.at(fOutChanName).at(socketIdx).Send(serialization_type::SerializeMsg(source_type::GetHeader()), "snd-more"); - } - - template = 0> - void BindingGetCurrentIndex() {} - template = 0> - void BindingGetCurrentIndex() - { - source_type::BindGetCurrentIndex(std::bind(&base_GenericSampler::GetCurrentIndex,this) ); - } }; -#include "GenericSampler.tpl" + + void SendHeader(int /*socketIdx*/) {} + + + #endif /* GENERICSAMPLER_H */