From a7ab33a10e60be1b1b69613f81f0646ddadccef5 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Mon, 17 Aug 2015 14:45:31 +0200 Subject: [PATCH] Add new Send/Receive methods with smart pointers and no flag checks. --- fairmq/FairMQChannel.cxx | 72 ++++++++++++++++++++++- fairmq/FairMQChannel.h | 12 ++++ fairmq/FairMQConfigurable.h | 3 +- fairmq/FairMQDevice.cxx | 11 ++-- fairmq/FairMQDevice.h | 2 + fairmq/FairMQMessage.h | 2 + fairmq/devices/FairMQBenchmarkSampler.cxx | 9 +-- fairmq/devices/FairMQBenchmarkSampler.h | 3 +- fairmq/devices/FairMQBuffer.cxx | 4 +- fairmq/devices/FairMQMerger.cxx | 16 ++--- fairmq/devices/FairMQProxy.cxx | 4 +- fairmq/devices/FairMQSink.cxx | 4 +- fairmq/devices/FairMQSplitter.cxx | 4 +- fairmq/devices/GenericFileSink.h | 15 ++--- fairmq/devices/GenericFileSink.tpl | 11 ++-- fairmq/devices/GenericMerger.h | 17 +++--- fairmq/devices/GenericProcessor.h | 18 +++--- fairmq/devices/GenericSampler.h | 53 ++++++++--------- fairmq/devices/GenericSampler.tpl | 41 ++++++------- fairmq/run/runBenchmarkSampler.cxx | 5 +- fairmq/zeromq/FairMQMessageZMQ.cxx | 18 ++++++ fairmq/zeromq/FairMQMessageZMQ.h | 1 + 22 files changed, 204 insertions(+), 121 deletions(-) diff --git a/fairmq/FairMQChannel.cxx b/fairmq/FairMQChannel.cxx index ede4b98b..bfd4fac1 100644 --- a/fairmq/FairMQChannel.cxx +++ b/fairmq/FairMQChannel.cxx @@ -36,6 +36,8 @@ FairMQChannel::FairMQChannel() , fPoller(nullptr) , fCmdSocket(nullptr) , fTransportFactory(nullptr) + , fNoBlockFlag(0) + , fSndMoreFlag(0) { } @@ -52,6 +54,8 @@ FairMQChannel::FairMQChannel(const string& type, const string& method, const str , fPoller(nullptr) , fCmdSocket(nullptr) , fTransportFactory(nullptr) + , fNoBlockFlag(0) + , fSndMoreFlag(0) { } @@ -347,10 +351,21 @@ bool FairMQChannel::InitCommandInterface(FairMQTransportFactory* factory) fTransportFactory = factory; fCmdSocket = fTransportFactory->CreateSocket("sub", "device-commands", 1); - fCmdSocket->Connect("inproc://commands"); + if (fCmdSocket) + { + fCmdSocket->Connect("inproc://commands"); - fPoller = fTransportFactory->CreatePoller(*fSocket, *fCmdSocket); - return true; + fNoBlockFlag = fCmdSocket->NOBLOCK; + fSndMoreFlag = fCmdSocket->SNDMORE; + + fPoller = fTransportFactory->CreatePoller(*fSocket, *fCmdSocket); + + return true; + } + else + { + return false; + } } void FairMQChannel::ResetChannel() @@ -359,6 +374,57 @@ void FairMQChannel::ResetChannel() // TODO: implement channel resetting } +int FairMQChannel::Send(const unique_ptr& msg) const +{ + fPoller->Poll(-1); + + if (fPoller->CheckInput(0)) + { + HandleCommand(); + return -1; + } + + if (fPoller->CheckOutput(1)) + { + return fSocket->Send(msg.get(), 0); + } + + return -1; +} + +int FairMQChannel::SendAsync(const unique_ptr& msg) const +{ + return fSocket->Send(msg.get(), fNoBlockFlag); +} + +int FairMQChannel::SendPart(const unique_ptr& msg) const +{ + return fSocket->Send(msg.get(), fSndMoreFlag); +} + +int FairMQChannel::Receive(const unique_ptr& msg) const +{ + fPoller->Poll(-1); + + if (fPoller->CheckInput(0)) + { + HandleCommand(); + return -1; + } + + if (fPoller->CheckInput(1)) + { + return fSocket->Receive(msg.get(), 0); + } + + return -1; +} + +int FairMQChannel::ReceiveAsync(const unique_ptr& msg) const +{ + return fSocket->Receive(msg.get(), fNoBlockFlag); +} + int FairMQChannel::Send(FairMQMessage* msg, const string& flag) const { if (flag == "") diff --git a/fairmq/FairMQChannel.h b/fairmq/FairMQChannel.h index b9a7f21c..0669addd 100644 --- a/fairmq/FairMQChannel.h +++ b/fairmq/FairMQChannel.h @@ -16,6 +16,7 @@ #define FAIRMQCHANNEL_H_ #include +#include // unique_ptr #include @@ -59,6 +60,14 @@ class FairMQChannel FairMQSocket* fSocket; // Wrappers for the socket methods to simplify the usage of channels + int Send(const std::unique_ptr& msg) const; + int SendAsync(const std::unique_ptr& msg) const; + int SendPart(const std::unique_ptr& msg) const; + + int Receive(const std::unique_ptr& msg) const; + int ReceiveAsync(const std::unique_ptr& msg) const; + + // DEPRECATED socket method wrappers with raw pointers and flag checks int Send(FairMQMessage* msg, const std::string& flag = "") const; int Send(FairMQMessage* msg, const int flags) const; int Receive(FairMQMessage* msg, const std::string& flag = "") const; @@ -84,6 +93,9 @@ class FairMQChannel FairMQTransportFactory* fTransportFactory; + int fNoBlockFlag; + int fSndMoreFlag; + bool HandleCommand() const; // use static mutex to make the class easily copyable diff --git a/fairmq/FairMQConfigurable.h b/fairmq/FairMQConfigurable.h index 1c88e9cd..d35bbe99 100644 --- a/fairmq/FairMQConfigurable.h +++ b/fairmq/FairMQConfigurable.h @@ -25,11 +25,12 @@ class FairMQConfigurable Last = 1 }; FairMQConfigurable(); + virtual ~FairMQConfigurable(); + virtual void SetProperty(const int key, const std::string& value); virtual std::string GetProperty(const int key, const std::string& default_ = ""); virtual void SetProperty(const int key, const int value); virtual int GetProperty(const int key, const int default_ = 0); - virtual ~FairMQConfigurable(); }; #endif /* FAIRMQCONFIGURABLE_H_ */ diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index edce9ee0..08ed5e1d 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -418,6 +418,11 @@ int FairMQDevice::GetProperty(const int key, const int default_ /*= 0*/) } } +void FairMQDevice::SetTransport(unique_ptr& factory) +{ + fTransportFactory = factory.get(); +} + void FairMQDevice::SetTransport(FairMQTransportFactory* factory) { fTransportFactory = factory; @@ -637,8 +642,6 @@ void FairMQDevice::ResetWrapper() void FairMQDevice::Reset() { - LOG(DEBUG) << "Resetting Device..."; - // iterate over the channels map for (auto mi = fChannels.begin(); mi != fChannels.end(); ++mi) { @@ -657,8 +660,6 @@ void FairMQDevice::Reset() vi->fCmdSocket = nullptr; } } - - LOG(DEBUG) << "Device reset finished!"; } void FairMQDevice::Terminate() @@ -725,4 +726,6 @@ FairMQDevice::~FairMQDevice() delete fCmdSocket; fCmdSocket = nullptr; } + + delete fTransportFactory; } diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index 25e3f0df..bdac4c88 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -16,6 +16,7 @@ #define FAIRMQDEVICE_H_ #include +#include // unique_ptr #include #include #include @@ -60,6 +61,7 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable virtual int GetProperty(const int key, const int default_ = 0); virtual void SetTransport(FairMQTransportFactory* factory); + virtual void SetTransport(std::unique_ptr& factory); static bool SortSocketsByAddress(const FairMQChannel &lhs, const FairMQChannel &rhs); diff --git a/fairmq/FairMQMessage.h b/fairmq/FairMQMessage.h index 9811eeb6..d21b13a7 100644 --- a/fairmq/FairMQMessage.h +++ b/fairmq/FairMQMessage.h @@ -16,6 +16,7 @@ #define FAIRMQMESSAGE_H_ #include // for size_t +#include // unique_ptr typedef void (fairmq_free_fn) (void *data, void *hint); @@ -33,6 +34,7 @@ class FairMQMessage virtual void CloseMessage() = 0; virtual void Copy(FairMQMessage* msg) = 0; + virtual void Copy(const std::unique_ptr& msg) = 0; virtual ~FairMQMessage() {}; }; diff --git a/fairmq/devices/FairMQBenchmarkSampler.cxx b/fairmq/devices/FairMQBenchmarkSampler.cxx index 1d53ac07..68ee9cb8 100644 --- a/fairmq/devices/FairMQBenchmarkSampler.cxx +++ b/fairmq/devices/FairMQBenchmarkSampler.cxx @@ -38,14 +38,15 @@ void FairMQBenchmarkSampler::Run() boost::thread resetEventCounter(boost::bind(&FairMQBenchmarkSampler::ResetEventCounter, this)); void* buffer = operator new[](fEventSize); - FairMQMessage* baseMsg = fTransportFactory->CreateMessage(buffer, fEventSize); + + unique_ptr baseMsg(fTransportFactory->CreateMessage(buffer, fEventSize)); // store the channel reference to avoid traversing the map on every loop iteration const FairMQChannel& dataChannel = fChannels.at("data-out").at(0); while (CheckCurrentState(RUNNING)) { - FairMQMessage* msg = fTransportFactory->CreateMessage(); + unique_ptr msg(fTransportFactory->CreateMessage()); msg->Copy(baseMsg); dataChannel.Send(msg); @@ -56,12 +57,8 @@ void FairMQBenchmarkSampler::Run() { boost::this_thread::sleep(boost::posix_time::milliseconds(1)); } - - delete msg; } - delete baseMsg; - try { resetEventCounter.interrupt(); resetEventCounter.join(); diff --git a/fairmq/devices/FairMQBenchmarkSampler.h b/fairmq/devices/FairMQBenchmarkSampler.h index 63a87e5b..b1953c85 100644 --- a/fairmq/devices/FairMQBenchmarkSampler.h +++ b/fairmq/devices/FairMQBenchmarkSampler.h @@ -28,8 +28,7 @@ class FairMQBenchmarkSampler : public FairMQDevice public: enum { - InputFile = FairMQDevice::Last, - EventSize, + EventSize = FairMQDevice::Last, EventRate, Last }; diff --git a/fairmq/devices/FairMQBuffer.cxx b/fairmq/devices/FairMQBuffer.cxx index c36d49df..ed64374b 100644 --- a/fairmq/devices/FairMQBuffer.cxx +++ b/fairmq/devices/FairMQBuffer.cxx @@ -32,14 +32,12 @@ void FairMQBuffer::Run() while (CheckCurrentState(RUNNING)) { - FairMQMessage* msg = fTransportFactory->CreateMessage(); + std::unique_ptr msg(fTransportFactory->CreateMessage()); if (dataInChannel.Receive(msg) > 0) { dataOutChannel.Send(msg); } - - delete msg; } } diff --git a/fairmq/devices/FairMQMerger.cxx b/fairmq/devices/FairMQMerger.cxx index b8ff6e7c..dc6ff908 100644 --- a/fairmq/devices/FairMQMerger.cxx +++ b/fairmq/devices/FairMQMerger.cxx @@ -29,28 +29,32 @@ FairMQMerger::~FairMQMerger() void FairMQMerger::Run() { - FairMQPoller* poller = fTransportFactory->CreatePoller(fChannels.at("data-in")); + std::unique_ptr poller(fTransportFactory->CreatePoller(fChannels.at("data-in"))); // store the channel references to avoid traversing the map on every loop iteration const FairMQChannel& dataOutChannel = fChannels.at("data-out").at(0); - FairMQChannel* dataInChannels[fChannels.at("data-in").size()]; + std::vector dataInChannels(fChannels.at("data-in").size()); for (int i = 0; i < fChannels.at("data-in").size(); ++i) { - dataInChannels[i] = &(fChannels.at("data-in").at(i)); + dataInChannels.at(i) = &(fChannels.at("data-in").at(i)); } while (CheckCurrentState(RUNNING)) { - FairMQMessage* msg = fTransportFactory->CreateMessage(); + std::unique_ptr msg(fTransportFactory->CreateMessage()); poller->Poll(100); + // Loop over the data input channels. for (int i = 0; i < fChannels.at("data-in").size(); ++i) { + // Check if the channel has data ready to be received. if (poller->CheckInput(i)) { + // Try receiving the data. if (dataInChannels[i]->Receive(msg) > 0) { + // If data was received, send it to output. if (dataOutChannel.Send(msg) < 0) { LOG(DEBUG) << "Blocking send interrupted by a command"; @@ -64,9 +68,5 @@ void FairMQMerger::Run() } } } - - delete msg; } - - delete poller; } diff --git a/fairmq/devices/FairMQProxy.cxx b/fairmq/devices/FairMQProxy.cxx index 65cdfc0f..6458121d 100644 --- a/fairmq/devices/FairMQProxy.cxx +++ b/fairmq/devices/FairMQProxy.cxx @@ -28,7 +28,7 @@ FairMQProxy::~FairMQProxy() void FairMQProxy::Run() { - FairMQMessage* msg = fTransportFactory->CreateMessage(); + std::unique_ptr msg(fTransportFactory->CreateMessage()); // store the channel references to avoid traversing the map on every loop iteration const FairMQChannel& dataInChannel = fChannels.at("data-in").at(0); @@ -41,6 +41,4 @@ void FairMQProxy::Run() dataOutChannel.Send(msg); } } - - delete msg; } diff --git a/fairmq/devices/FairMQSink.cxx b/fairmq/devices/FairMQSink.cxx index f3563ead..5a254255 100644 --- a/fairmq/devices/FairMQSink.cxx +++ b/fairmq/devices/FairMQSink.cxx @@ -29,11 +29,9 @@ void FairMQSink::Run() while (CheckCurrentState(RUNNING)) { - FairMQMessage* msg = fTransportFactory->CreateMessage(); + std::unique_ptr msg(fTransportFactory->CreateMessage()); dataChannel.Receive(msg); - - delete msg; } } diff --git a/fairmq/devices/FairMQSplitter.cxx b/fairmq/devices/FairMQSplitter.cxx index 46ba8f59..4466c89e 100644 --- a/fairmq/devices/FairMQSplitter.cxx +++ b/fairmq/devices/FairMQSplitter.cxx @@ -41,7 +41,7 @@ void FairMQSplitter::Run() while (CheckCurrentState(RUNNING)) { - FairMQMessage* msg = fTransportFactory->CreateMessage(); + std::unique_ptr msg(fTransportFactory->CreateMessage()); if (dataInChannel.Receive(msg) > 0) { @@ -52,7 +52,5 @@ void FairMQSplitter::Run() direction = 0; } } - - delete msg; } } diff --git a/fairmq/devices/GenericFileSink.h b/fairmq/devices/GenericFileSink.h index 3d00a7dd..77f40904 100644 --- a/fairmq/devices/GenericFileSink.h +++ b/fairmq/devices/GenericFileSink.h @@ -71,17 +71,18 @@ class GenericFileSink : public FairMQDevice, public InputPolicy, public OutputPo { int receivedMsg = 0; - while (GetCurrentState() == RUNNING) - { - FairMQMessage* msg = fTransportFactory->CreateMessage(); + // store the channel reference to avoid traversing the map on every loop iteration + const FairMQChannel& inputChannel = fChannels["data-in"].at(0); - if (fChannels["data-in"].at(0).Receive(msg) > 0) + while (CheckCurrentState(RUNNING)) + { + std::unique_ptr msg(fTransportFactory->CreateMessage()); + + if (inputChannel.Receive(msg) > 0) { - OutputPolicy::AddToFile(InputPolicy::DeSerializeMsg(msg)); + OutputPolicy::AddToFile(InputPolicy::DeSerializeMsg(msg.get())); receivedMsg++; } - - delete msg; } MQLOG(INFO) << "Received " << receivedMsg << " messages!"; diff --git a/fairmq/devices/GenericFileSink.tpl b/fairmq/devices/GenericFileSink.tpl index e52d2e7c..091ee987 100644 --- a/fairmq/devices/GenericFileSink.tpl +++ b/fairmq/devices/GenericFileSink.tpl @@ -44,15 +44,18 @@ void GenericFileSink::Run() { int receivedMsg = 0; + // store the channel reference to avoid traversing the map on every loop iteration + const FairMQChannel& inputChannel = fChannels["data-in"].at(0); + while (CheckCurrentState(RUNNING)) { - FairMQMessage* msg = fTransportFactory->CreateMessage(); - if (fChannels.at("data-in").at(0).Receive(msg) > 0) + std::unique_ptr msg(fTransportFactory->CreateMessage()); + + if (inputChannel.Receive(msg) > 0) { - OutputPolicy::AddToFile(InputPolicy::DeSerializeMsg(msg)); + OutputPolicy::AddToFile(InputPolicy::DeSerializeMsg(msg.get())); receivedMsg++; } - delete msg; } MQLOG(INFO) << "Received " << receivedMsg << " messages!"; diff --git a/fairmq/devices/GenericMerger.h b/fairmq/devices/GenericMerger.h index 5dd2849b..bd2fc37e 100644 --- a/fairmq/devices/GenericMerger.h +++ b/fairmq/devices/GenericMerger.h @@ -38,22 +38,25 @@ class GenericMerger : public FairMQDevice, public MergerPolicy, public InputPoli virtual void Run() { - FairMQPoller* poller = fTransportFactory->CreatePoller(fChannels["data-in"]); + std::unique_ptr poller(fTransportFactory->CreatePoller(fChannels["data-in"])); int received = 0; while (GetCurrentState() == RUNNING) { - FairMQMessage* msg = fTransportFactory->CreateMessage(); + std::unique_ptr msg(fTransportFactory->CreateMessage()); // MergerPolicy:: poller->Poll(fBlockingTime); - for (int i = 0; i < fChannels["datain"].size(); i++) + for (int i = 0; i < fChannels.at("data-in").size(); i++) { if (poller->CheckInput(i)) { - received = fChannels["data-in"].at(i).Receive(msg) - MergerPolicy::Merge(InputPolicy::DeSerializeMsg(msg)); + received = fChannels.at("data-in").at(i).Receive(msg) + if (received > 0) + { + MergerPolicy::Merge(InputPolicy::DeSerializeMsg(msg)); + } } OutputPolicy::SetMessage(msg); @@ -64,11 +67,7 @@ class GenericMerger : public FairMQDevice, public MergerPolicy, public InputPoli received = 0; } } - - delete msg; } - - delete poller; } }; diff --git a/fairmq/devices/GenericProcessor.h b/fairmq/devices/GenericProcessor.h index 3c80816b..4d3f2678 100644 --- a/fairmq/devices/GenericProcessor.h +++ b/fairmq/devices/GenericProcessor.h @@ -120,31 +120,29 @@ class GenericProcessor : public FairMQDevice, public InputPolicy, public OutputP int receivedMsgs = 0; int sentMsgs = 0; + const FairMQChannel& inputChannel = fChannels["data-in"].at(0); + const FairMQChannel& outputChannel = fChannels["data-out"].at(0); + while (CheckCurrentState(RUNNING)) { - FairMQMessage* msg = fTransportFactory->CreateMessage(); + std::unique_ptr msg(fTransportFactory->CreateMessage()); ++receivedMsgs; - if (fChannels["data-in"].at(0).Receive(msg) > 0) + if (inputChannel.Receive(msg) > 0) { // InputPolicy::DeSerializeMsg(msg) --> deserialize data of msg and fill output container // TaskPolicy::ExecuteTask( ... ) --> process output container - TaskPolicy::ExecuteTask(InputPolicy::DeSerializeMsg(msg)); + TaskPolicy::ExecuteTask(InputPolicy::DeSerializeMsg(msg.get())); // OutputPolicy::fMessage point to msg - OutputPolicy::SetMessage(msg); + OutputPolicy::SetMessage(msg.get()); // TaskPolicy::GetOutputData() --> Get processed output container // OutputPolicy::message(...) --> Serialize output container and fill fMessage - fChannels["data-out"].at(0).Send(OutputPolicy::SerializeMsg(TaskPolicy::GetOutputData())); + outputChannel.Send(OutputPolicy::SerializeMsg(TaskPolicy::GetOutputData())); sentMsgs++; } - - if (msg) - { - msg->CloseMessage(); - } } MQLOG(INFO) << "Received " << receivedMsgs << " and sent " << sentMsgs << " messages!"; diff --git a/fairmq/devices/GenericSampler.h b/fairmq/devices/GenericSampler.h index 7b04ab47..d6b5afbd 100644 --- a/fairmq/devices/GenericSampler.h +++ b/fairmq/devices/GenericSampler.h @@ -13,7 +13,7 @@ */ #ifndef GENERICSAMPLER_H -#define GENERICSAMPLER_H +#define GENERICSAMPLER_H #include #include @@ -62,7 +62,7 @@ class base_GenericSampler : public FairMQDevice, public T, public U typedef K key_type; typedef L task_type; typedef base_GenericSampler self_type; - + public: enum { @@ -82,7 +82,6 @@ class base_GenericSampler : public FairMQDevice, public T, public U typedef source_type source_type; typedef serialization_type serialization_type; }; - */ virtual void SetTransport(FairMQTransportFactory* factory); @@ -98,14 +97,12 @@ class base_GenericSampler : public FairMQDevice, public T, public U virtual int GetProperty(const int key, const int default_ = 0); virtual void SetProperty(const int key, const std::string& value); virtual std::string GetProperty(const int key, const std::string& default_ = ""); - + void SendPart(int socketIdx); int GetSocketNumber() const; int GetCurrentIndex() const; void SetContinuous(bool flag); - - - /// /////////////////////////////////////////////////////////////////////////////////////// + /* register the tasks you want to process and, which will be called by ExecuteTasks() member function. The registration is done by filling @@ -130,29 +127,27 @@ class base_GenericSampler : public FairMQDevice, public T, public U To communicate with the Host derived class via callback, three methods from the host class are callable (only after binding these methods in the GenericSampler::InitTask() ) - */ template void RegisterTask(RegistrationManager manage) { - manage(this,fTaskList); - LOG(DEBUG)<<"Current Number of registered tasks = "< fTaskList; // to handle Task list - - + std::map fTaskList; // to handle Task list + // automatically enable or disable the call of policy function members for binding of host functions. // this template functions use SFINAE to detect the existence of the policy function signature. - template = 0 > - void BindingSendPart(){} - template = 0 > + template = 0> + void BindingSendPart() {} + template = 0> void BindingSendPart() { source_type::BindSendPart(std::bind(&base_GenericSampler::SendPart,this,std::placeholders::_1) ); } - - template = 0 > - void BindingGetSocketNumber(){} - template = 0 > + + template = 0> + void BindingGetSocketNumber() {} + template = 0> void BindingGetSocketNumber() { source_type::BindGetSocketNumber(std::bind(&base_GenericSampler::GetSocketNumber,this) ); } - - template = 0 > - void BindingGetCurrentIndex(){} - template = 0 > + + template = 0> + void BindingGetCurrentIndex() {} + template = 0> void BindingGetCurrentIndex() { source_type::BindGetCurrentIndex(std::bind(&base_GenericSampler::GetCurrentIndex,this) ); @@ -193,4 +187,3 @@ class base_GenericSampler : public FairMQDevice, public T, public U #include "GenericSampler.tpl" #endif /* GENERICSAMPLER_H */ - diff --git a/fairmq/devices/GenericSampler.tpl b/fairmq/devices/GenericSampler.tpl index 8ec3587b..16c70894 100644 --- a/fairmq/devices/GenericSampler.tpl +++ b/fairmq/devices/GenericSampler.tpl @@ -33,21 +33,18 @@ void base_GenericSampler::InitTask() BindingSendPart(); BindingGetSocketNumber(); BindingGetCurrentIndex(); - + source_type::InitSampler(); fNumEvents = source_type::GetNumberOfEvent(); } - - - template void base_GenericSampler::Run() { // boost::thread resetEventCounter(boost::bind(&GenericSampler::ResetEventCounter, this)); int sentMsgs = 0; - + boost::timer::auto_cpu_timer timer; LOG(INFO) << "Number of events to process: " << fNumEvents; @@ -56,32 +53,36 @@ void base_GenericSampler::Run() { for (fCurrentIdx = 0; fCurrentIdx < fNumEvents; fCurrentIdx++) { - for(auto& p : fChannels[fOutChanName]) + for (auto& p : fChannels[fOutChanName]) { - FairMQMessage* msg = fTransportFactory->CreateMessage(); - serialization_type::SetMessage(msg); + std::unique_ptr msg(fTransportFactory->CreateMessage()); + serialization_type::SetMessage(msg.get()); source_type::SetIndex(fCurrentIdx); ExecuteTasks(); p.Send(serialization_type::SerializeMsg(source_type::GetOutData())); - if (msg) - msg->CloseMessage(); sentMsgs++; - if(fChannels[fOutChanName].size()>1) + if (fChannels[fOutChanName].size() > 1) + { fCurrentIdx++; - + } + // Optional event rate limiting // --fEventCounter; // while (fEventCounter == 0) { // boost::this_thread::sleep(boost::posix_time::milliseconds(1)); // } - + if (!CheckCurrentState(RUNNING)) + { break; + } } // if more than one socket, remove the last incrementation - if(fChannels[fOutChanName].size()>1) + if (fChannels[fOutChanName].size() > 1) + { fCurrentIdx--; + } } } while (CheckCurrentState(RUNNING) && fContinuous); @@ -96,14 +97,12 @@ template void base_GenericSampler::SendPart(int socketIdx) { fCurrentIdx++; - if(fCurrentIdxCreateMessage(); - serialization_type::SetMessage(msg); + std::unique_ptr msg(fTransportFactory->CreateMessage()); + serialization_type::SetMessage(msg.get()); source_type::SetIndex(fCurrentIdx); fChannels[fOutChanName].at(socketIdx).Send(serialization_type::SerializeMsg(source_type::GetOutData()), "snd-more"); - if (msg) - msg->CloseMessage(); } } @@ -129,7 +128,7 @@ void base_GenericSampler::SetContinuous(bool flag) template void base_GenericSampler::ResetEventCounter() { - while (GetCurrentState() == RUNNING) + while (CheckCurrentState(RUNNING)) { try { @@ -197,8 +196,6 @@ std::string base_GenericSampler::GetProperty(const int key, const std:: } } - - template using GenericSampler = base_GenericSampler >; typedef std::map > SamplerTasksMap; diff --git a/fairmq/run/runBenchmarkSampler.cxx b/fairmq/run/runBenchmarkSampler.cxx index 536c01ab..55ab362a 100644 --- a/fairmq/run/runBenchmarkSampler.cxx +++ b/fairmq/run/runBenchmarkSampler.cxx @@ -67,12 +67,11 @@ int main(int argc, char** argv) LOG(INFO) << "PID: " << getpid(); #ifdef NANOMSG - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); + sampler.SetTransport(new FairMQTransportFactoryNN()); #else - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); + sampler.SetTransport(new FairMQTransportFactoryZMQ()); #endif - sampler.SetTransport(transportFactory); sampler.SetProperty(FairMQBenchmarkSampler::Id, id); sampler.SetProperty(FairMQBenchmarkSampler::EventSize, eventSize); diff --git a/fairmq/zeromq/FairMQMessageZMQ.cxx b/fairmq/zeromq/FairMQMessageZMQ.cxx index 907f3ed0..acdcee31 100644 --- a/fairmq/zeromq/FairMQMessageZMQ.cxx +++ b/fairmq/zeromq/FairMQMessageZMQ.cxx @@ -95,6 +95,24 @@ void FairMQMessageZMQ::SetMessage(void* data, size_t size) } void FairMQMessageZMQ::Copy(FairMQMessage* msg) +{ + // DEPRECATED: Use Copy(const unique_ptr&) + + // Shares the message buffer between msg and this fMessage. + if (zmq_msg_copy(&fMessage, (zmq_msg_t*)msg->GetMessage()) != 0) + { + LOG(ERROR) << "failed copying message, reason: " << zmq_strerror(errno); + } + + // Alternatively, following code does a hard copy of the message, which allows to modify the original after making a copy, without affecting the new msg. + + // CloseMessage(); + // size_t size = msg->GetSize(); + // zmq_msg_init_size(&fMessage, size); + // memcpy(zmq_msg_data(&fMessage), msg->GetData(), size); +} + +void FairMQMessageZMQ::Copy(const unique_ptr& msg) { // Shares the message buffer between msg and this fMessage. if (zmq_msg_copy(&fMessage, (zmq_msg_t*)msg->GetMessage()) != 0) diff --git a/fairmq/zeromq/FairMQMessageZMQ.h b/fairmq/zeromq/FairMQMessageZMQ.h index 5749349d..fbcf7764 100644 --- a/fairmq/zeromq/FairMQMessageZMQ.h +++ b/fairmq/zeromq/FairMQMessageZMQ.h @@ -40,6 +40,7 @@ class FairMQMessageZMQ : public FairMQMessage virtual void CloseMessage(); virtual void Copy(FairMQMessage* msg); + virtual void Copy(const std::unique_ptr& msg); static void CleanUp(void* data, void* hint);