From 12e6a874dbf0593da4b8c3abecae0d9e8bcef757 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Fri, 21 Aug 2020 13:43:47 +0200 Subject: [PATCH] Remove built-in devices from the main lib --- fairmq/CMakeLists.txt | 10 --- fairmq/devices/FairMQMerger.cxx | 122 ---------------------------- fairmq/devices/FairMQMerger.h | 87 ++++++++++++++++++-- fairmq/devices/FairMQMultiplier.cxx | 110 ------------------------- fairmq/devices/FairMQMultiplier.h | 86 ++++++++++++++++++-- fairmq/devices/FairMQProxy.cxx | 81 ------------------ fairmq/devices/FairMQProxy.h | 47 ++++++++++- fairmq/devices/FairMQSink.h | 6 +- fairmq/devices/FairMQSplitter.cxx | 74 ----------------- fairmq/devices/FairMQSplitter.h | 38 +++++++-- 10 files changed, 242 insertions(+), 419 deletions(-) delete mode 100644 fairmq/devices/FairMQMerger.cxx delete mode 100644 fairmq/devices/FairMQMultiplier.cxx delete mode 100644 fairmq/devices/FairMQProxy.cxx delete mode 100644 fairmq/devices/FairMQSplitter.cxx diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index a3cfb810..cec9f314 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -174,12 +174,6 @@ if(BUILD_FAIRMQ) ) set(FAIRMQ_PRIVATE_HEADER_FILES - devices/FairMQBenchmarkSampler.h - devices/FairMQMerger.h - devices/FairMQMultiplier.h - devices/FairMQProxy.h - devices/FairMQSink.h - devices/FairMQSplitter.h plugins/Builtin.h plugins/config/Config.h plugins/Control.h @@ -221,10 +215,6 @@ if(BUILD_FAIRMQ) FairMQPoller.cxx FairMQSocket.cxx FairMQTransportFactory.cxx - devices/FairMQMerger.cxx - devices/FairMQMultiplier.cxx - devices/FairMQProxy.cxx - devices/FairMQSplitter.cxx Plugin.cxx PluginManager.cxx PluginServices.cxx diff --git a/fairmq/devices/FairMQMerger.cxx b/fairmq/devices/FairMQMerger.cxx deleted file mode 100644 index b36ebc28..00000000 --- a/fairmq/devices/FairMQMerger.cxx +++ /dev/null @@ -1,122 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ -/** - * FairMQMerger.cxx - * - * @since 2012-12-06 - * @author D. Klein, A. Rybalchenko - */ - -#include "FairMQMerger.h" -#include "../FairMQLogger.h" -#include "../FairMQPoller.h" - -using namespace std; - -FairMQMerger::FairMQMerger() - : fMultipart(true) - , fInChannelName("data-in") - , fOutChannelName("data-out") -{ -} - -void FairMQMerger::RegisterChannelEndpoints() -{ - RegisterChannelEndpoint(fInChannelName, 1, 10000); - RegisterChannelEndpoint(fOutChannelName, 1, 1); - - PrintRegisteredChannels(); -} - -FairMQMerger::~FairMQMerger() -{ -} - -void FairMQMerger::InitTask() -{ - fMultipart = fConfig->GetProperty("multipart"); - fInChannelName = fConfig->GetProperty("in-channel"); - fOutChannelName = fConfig->GetProperty("out-channel"); -} - -void FairMQMerger::Run() -{ - int numInputs = fChannels.at(fInChannelName).size(); - - vector chans; - - for (auto& chan : fChannels.at(fInChannelName)) - { - chans.push_back(&chan); - } - - FairMQPollerPtr poller(NewPoller(chans)); - - if (fMultipart) - { - while (!NewStatePending()) - { - poller->Poll(100); - - // Loop over the data input channels. - for (int i = 0; i < numInputs; ++i) - { - // Check if the channel has data ready to be received. - if (poller->CheckInput(i)) - { - FairMQParts payload; - - if (Receive(payload, fInChannelName, i) >= 0) - { - if (Send(payload, fOutChannelName) < 0) - { - LOG(debug) << "Transfer interrupted"; - break; - } - } - else - { - LOG(debug) << "Transfer interrupted"; - break; - } - } - } - } - } - else - { - while (!NewStatePending()) - { - poller->Poll(100); - - // Loop over the data input channels. - for (int i = 0; i < numInputs; ++i) - { - // Check if the channel has data ready to be received. - if (poller->CheckInput(i)) - { - FairMQMessagePtr payload(fTransportFactory->CreateMessage()); - - if (Receive(payload, fInChannelName, i) >= 0) - { - if (Send(payload, fOutChannelName) < 0) - { - LOG(debug) << "Transfer interrupted"; - break; - } - } - else - { - LOG(debug) << "Transfer interrupted"; - break; - } - } - } - } - } -} diff --git a/fairmq/devices/FairMQMerger.h b/fairmq/devices/FairMQMerger.h index 5f4e0d14..c3e097ba 100644 --- a/fairmq/devices/FairMQMerger.h +++ b/fairmq/devices/FairMQMerger.h @@ -16,23 +16,100 @@ #define FAIRMQMERGER_H_ #include "FairMQDevice.h" +#include "../FairMQPoller.h" +#include "../FairMQLogger.h" #include +#include class FairMQMerger : public FairMQDevice { public: - FairMQMerger(); - virtual ~FairMQMerger(); + FairMQMerger() + : fMultipart(true) + , fInChannelName("data-in") + , fOutChannelName("data-out") + {} + ~FairMQMerger() {} protected: bool fMultipart; std::string fInChannelName; std::string fOutChannelName; - virtual void RegisterChannelEndpoints() override; - virtual void Run() override; - virtual void InitTask() override; + void InitTask() override + { + fMultipart = fConfig->GetProperty("multipart"); + fInChannelName = fConfig->GetProperty("in-channel"); + fOutChannelName = fConfig->GetProperty("out-channel"); + } + + void RegisterChannelEndpoints() override + { + RegisterChannelEndpoint(fInChannelName, 1, 10000); + RegisterChannelEndpoint(fOutChannelName, 1, 1); + + PrintRegisteredChannels(); + } + + void Run() override + { + int numInputs = fChannels.at(fInChannelName).size(); + + std::vector chans; + + for (auto& chan : fChannels.at(fInChannelName)) { + chans.push_back(&chan); + } + + FairMQPollerPtr poller(NewPoller(chans)); + + if (fMultipart) { + while (!NewStatePending()) { + poller->Poll(100); + + // Loop over the data input channels. + for (int i = 0; i < numInputs; ++i) { + // Check if the channel has data ready to be received. + if (poller->CheckInput(i)) { + FairMQParts payload; + + if (Receive(payload, fInChannelName, i) >= 0) { + if (Send(payload, fOutChannelName) < 0) { + LOG(debug) << "Transfer interrupted"; + break; + } + } else { + LOG(debug) << "Transfer interrupted"; + break; + } + } + } + } + } else { + while (!NewStatePending()) { + poller->Poll(100); + + // Loop over the data input channels. + for (int i = 0; i < numInputs; ++i) { + // Check if the channel has data ready to be received. + if (poller->CheckInput(i)) { + FairMQMessagePtr payload(fTransportFactory->CreateMessage()); + + if (Receive(payload, fInChannelName, i) >= 0) { + if (Send(payload, fOutChannelName) < 0) { + LOG(debug) << "Transfer interrupted"; + break; + } + } else { + LOG(debug) << "Transfer interrupted"; + break; + } + } + } + } + } + } }; #endif /* FAIRMQMERGER_H_ */ diff --git a/fairmq/devices/FairMQMultiplier.cxx b/fairmq/devices/FairMQMultiplier.cxx deleted file mode 100644 index 440cac30..00000000 --- a/fairmq/devices/FairMQMultiplier.cxx +++ /dev/null @@ -1,110 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ - -#include "FairMQMultiplier.h" - -#include "../FairMQLogger.h" - -using namespace std; - -FairMQMultiplier::FairMQMultiplier() - : fMultipart(true) - , fNumOutputs(0) - , fInChannelName() - , fOutChannelNames() -{ -} - -FairMQMultiplier::~FairMQMultiplier() -{ -} - -void FairMQMultiplier::InitTask() -{ - fMultipart = fConfig->GetProperty("multipart"); - fInChannelName = fConfig->GetProperty("in-channel"); - fOutChannelNames = fConfig->GetProperty>("out-channel"); - fNumOutputs = fChannels.at(fOutChannelNames.at(0)).size(); - - if (fMultipart) - { - OnData(fInChannelName, &FairMQMultiplier::HandleMultipartData); - } - else - { - OnData(fInChannelName, &FairMQMultiplier::HandleSingleData); - } -} - -bool FairMQMultiplier::HandleSingleData(std::unique_ptr& payload, int /*index*/) -{ - for (unsigned int i = 0; i < fOutChannelNames.size() - 1; ++i) // all except last channel - { - for (unsigned int j = 0; j < fChannels.at(fOutChannelNames.at(i)).size(); ++j) // all subChannels in a channel - { - FairMQMessagePtr msgCopy(fTransportFactory->CreateMessage()); - msgCopy->Copy(*payload); - - Send(msgCopy, fOutChannelNames.at(i), j); - } - } - - unsigned int lastChannelSize = fChannels.at(fOutChannelNames.back()).size(); - - for (unsigned int i = 0; i < lastChannelSize - 1; ++i) // iterate over all except last subChannels of the last channel - { - FairMQMessagePtr msgCopy(fTransportFactory->CreateMessage()); - msgCopy->Copy(*payload); - - Send(msgCopy, fOutChannelNames.back(), i); - } - - Send(payload, fOutChannelNames.back(), lastChannelSize - 1); // send final message to last subChannel of last channel - - return true; -} - -bool FairMQMultiplier::HandleMultipartData(FairMQParts& payload, int /*index*/) -{ - for (unsigned int i = 0; i < fOutChannelNames.size() - 1; ++i) // all except last channel - { - for (unsigned int j = 0; j < fChannels.at(fOutChannelNames.at(i)).size(); ++j) // all subChannels in a channel - { - FairMQParts parts; - - for (int k = 0; k < payload.Size(); ++k) - { - FairMQMessagePtr msgCopy(fTransportFactory->CreateMessage()); - msgCopy->Copy(payload.AtRef(k)); - parts.AddPart(std::move(msgCopy)); - } - - Send(parts, fOutChannelNames.at(i), j); - } - } - - unsigned int lastChannelSize = fChannels.at(fOutChannelNames.back()).size(); - - for (unsigned int i = 0; i < lastChannelSize - 1; ++i) // iterate over all except last subChannels of the last channel - { - FairMQParts parts; - - for (int k = 0; k < payload.Size(); ++k) - { - FairMQMessagePtr msgCopy(fTransportFactory->CreateMessage()); - msgCopy->Copy(payload.AtRef(k)); - parts.AddPart(std::move(msgCopy)); - } - - Send(parts, fOutChannelNames.back(), i); - } - - Send(payload, fOutChannelNames.back(), lastChannelSize - 1); // send final message to last subChannel of last channel - - return true; -} diff --git a/fairmq/devices/FairMQMultiplier.h b/fairmq/devices/FairMQMultiplier.h index 06da096f..32656805 100644 --- a/fairmq/devices/FairMQMultiplier.h +++ b/fairmq/devices/FairMQMultiplier.h @@ -12,12 +12,18 @@ #include "FairMQDevice.h" #include +#include class FairMQMultiplier : public FairMQDevice { public: - FairMQMultiplier(); - virtual ~FairMQMultiplier(); + FairMQMultiplier() + : fMultipart(true) + , fNumOutputs(0) + , fInChannelName() + , fOutChannelNames() + {} + ~FairMQMultiplier() {} protected: bool fMultipart; @@ -25,10 +31,80 @@ class FairMQMultiplier : public FairMQDevice std::string fInChannelName; std::vector fOutChannelNames; - virtual void InitTask(); + void InitTask() override + { + fMultipart = fConfig->GetProperty("multipart"); + fInChannelName = fConfig->GetProperty("in-channel"); + fOutChannelNames = fConfig->GetProperty>("out-channel"); + fNumOutputs = fChannels.at(fOutChannelNames.at(0)).size(); - bool HandleSingleData(std::unique_ptr&, int); - bool HandleMultipartData(FairMQParts&, int); + if (fMultipart) { + OnData(fInChannelName, &FairMQMultiplier::HandleMultipartData); + } else { + OnData(fInChannelName, &FairMQMultiplier::HandleSingleData); + } + } + + + bool HandleSingleData(std::unique_ptr& payload, int) + { + for (unsigned int i = 0; i < fOutChannelNames.size() - 1; ++i) { // all except last channel + for (unsigned int j = 0; j < fChannels.at(fOutChannelNames.at(i)).size(); ++j) { // all subChannels in a channel + FairMQMessagePtr msgCopy(fTransportFactory->CreateMessage()); + msgCopy->Copy(*payload); + + Send(msgCopy, fOutChannelNames.at(i), j); + } + } + + unsigned int lastChannelSize = fChannels.at(fOutChannelNames.back()).size(); + + for (unsigned int i = 0; i < lastChannelSize - 1; ++i) { // iterate over all except last subChannels of the last channel + FairMQMessagePtr msgCopy(fTransportFactory->CreateMessage()); + msgCopy->Copy(*payload); + + Send(msgCopy, fOutChannelNames.back(), i); + } + + Send(payload, fOutChannelNames.back(), lastChannelSize - 1); // send final message to last subChannel of last channel + + return true; + } + + bool HandleMultipartData(FairMQParts& payload, int) + { + for (unsigned int i = 0; i < fOutChannelNames.size() - 1; ++i) { // all except last channel + for (unsigned int j = 0; j < fChannels.at(fOutChannelNames.at(i)).size(); ++j) { // all subChannels in a channel + FairMQParts parts; + + for (int k = 0; k < payload.Size(); ++k) { + FairMQMessagePtr msgCopy(fTransportFactory->CreateMessage()); + msgCopy->Copy(payload.AtRef(k)); + parts.AddPart(std::move(msgCopy)); + } + + Send(parts, fOutChannelNames.at(i), j); + } + } + + unsigned int lastChannelSize = fChannels.at(fOutChannelNames.back()).size(); + + for (unsigned int i = 0; i < lastChannelSize - 1; ++i) { // iterate over all except last subChannels of the last channel + FairMQParts parts; + + for (int k = 0; k < payload.Size(); ++k) { + FairMQMessagePtr msgCopy(fTransportFactory->CreateMessage()); + msgCopy->Copy(payload.AtRef(k)); + parts.AddPart(std::move(msgCopy)); + } + + Send(parts, fOutChannelNames.back(), i); + } + + Send(payload, fOutChannelNames.back(), lastChannelSize - 1); // send final message to last subChannel of last channel + + return true; + } }; #endif /* FAIRMQMULTIPLIER_H_ */ diff --git a/fairmq/devices/FairMQProxy.cxx b/fairmq/devices/FairMQProxy.cxx deleted file mode 100644 index 6bf5a61a..00000000 --- a/fairmq/devices/FairMQProxy.cxx +++ /dev/null @@ -1,81 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ -/** - * FairMQProxy.cxx - * - * @since 2013-10-02 - * @author A. Rybalchenko - */ - -#include "FairMQProxy.h" - -#include "../FairMQLogger.h" - -using namespace std; - -FairMQProxy::FairMQProxy() - : fMultipart(true) - , fInChannelName() - , fOutChannelName() -{ -} - -FairMQProxy::~FairMQProxy() -{ -} - -void FairMQProxy::InitTask() -{ - fMultipart = fConfig->GetProperty("multipart"); - fInChannelName = fConfig->GetProperty("in-channel"); - fOutChannelName = fConfig->GetProperty("out-channel"); -} - -void FairMQProxy::Run() -{ - if (fMultipart) - { - while (!NewStatePending()) - { - FairMQParts payload; - if (Receive(payload, fInChannelName) >= 0) - { - if (Send(payload, fOutChannelName) < 0) - { - LOG(debug) << "Transfer interrupted"; - break; - } - } - else - { - LOG(debug) << "Transfer interrupted"; - break; - } - } - } - else - { - while (!NewStatePending()) - { - unique_ptr payload(fTransportFactory->CreateMessage()); - if (Receive(payload, fInChannelName) >= 0) - { - if (Send(payload, fOutChannelName) < 0) - { - LOG(debug) << "Transfer interrupted"; - break; - } - } - else - { - LOG(debug) << "Transfer interrupted"; - break; - } - } - } -} diff --git a/fairmq/devices/FairMQProxy.h b/fairmq/devices/FairMQProxy.h index d2e344c5..dd10e1f4 100644 --- a/fairmq/devices/FairMQProxy.h +++ b/fairmq/devices/FairMQProxy.h @@ -22,16 +22,55 @@ class FairMQProxy : public FairMQDevice { public: - FairMQProxy(); - virtual ~FairMQProxy(); + FairMQProxy() + : fMultipart(true) + , fInChannelName() + , fOutChannelName() + {} + ~FairMQProxy() {} protected: bool fMultipart; std::string fInChannelName; std::string fOutChannelName; - virtual void Run(); - virtual void InitTask(); + void InitTask() override + { + fMultipart = fConfig->GetProperty("multipart"); + fInChannelName = fConfig->GetProperty("in-channel"); + fOutChannelName = fConfig->GetProperty("out-channel"); + } + + void Run() override + { + if (fMultipart) { + while (!NewStatePending()) { + FairMQParts payload; + if (Receive(payload, fInChannelName) >= 0) { + if (Send(payload, fOutChannelName) < 0) { + LOG(debug) << "Transfer interrupted"; + break; + } + } else { + LOG(debug) << "Transfer interrupted"; + break; + } + } + } else { + while (!NewStatePending()) { + FairMQMessagePtr payload(fTransportFactory->CreateMessage()); + if (Receive(payload, fInChannelName) >= 0) { + if (Send(payload, fOutChannelName) < 0) { + LOG(debug) << "Transfer interrupted"; + break; + } + } else { + LOG(debug) << "Transfer interrupted"; + break; + } + } + } + } }; #endif /* FAIRMQPROXY_H_ */ diff --git a/fairmq/devices/FairMQSink.h b/fairmq/devices/FairMQSink.h index 86dc2141..9d193c27 100644 --- a/fairmq/devices/FairMQSink.h +++ b/fairmq/devices/FairMQSink.h @@ -32,7 +32,7 @@ class FairMQSink : public FairMQDevice //, public OutputPolicy , fInChannelName() {} - virtual ~FairMQSink() {} + ~FairMQSink() {} protected: bool fMultipart; @@ -40,14 +40,14 @@ class FairMQSink : public FairMQDevice //, public OutputPolicy uint64_t fNumIterations; std::string fInChannelName; - virtual void InitTask() + void InitTask() override { fMultipart = fConfig->GetProperty("multipart"); fMaxIterations = fConfig->GetProperty("max-iterations"); fInChannelName = fConfig->GetProperty("in-channel"); } - virtual void Run() + void Run() override { // store the channel reference to avoid traversing the map on every loop iteration FairMQChannel& dataInChannel = fChannels.at(fInChannelName).at(0); diff --git a/fairmq/devices/FairMQSplitter.cxx b/fairmq/devices/FairMQSplitter.cxx deleted file mode 100644 index a1a82cf0..00000000 --- a/fairmq/devices/FairMQSplitter.cxx +++ /dev/null @@ -1,74 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ -/** - * FairMQSplitter.cxx - * - * @since 2012-12-06 - * @author D. Klein, A. Rybalchenko - */ - -#include "FairMQSplitter.h" - -#include "../FairMQLogger.h" - -using namespace std; - -FairMQSplitter::FairMQSplitter() - : fMultipart(true) - , fNumOutputs(0) - , fDirection(0) - , fInChannelName() - , fOutChannelName() -{ -} - -FairMQSplitter::~FairMQSplitter() -{ -} - -void FairMQSplitter::InitTask() -{ - fMultipart = fConfig->GetProperty("multipart"); - fInChannelName = fConfig->GetProperty("in-channel"); - fOutChannelName = fConfig->GetProperty("out-channel"); - fNumOutputs = fChannels.at(fOutChannelName).size(); - fDirection = 0; - - if (fMultipart) - { - OnData(fInChannelName, &FairMQSplitter::HandleMultipartData); - } - else - { - OnData(fInChannelName, &FairMQSplitter::HandleSingleData); - } -} - -bool FairMQSplitter::HandleSingleData(FairMQMessagePtr& payload, int /*index*/) -{ - Send(payload, fOutChannelName, fDirection); - - if (++fDirection >= fNumOutputs) - { - fDirection = 0; - } - - return true; -} - -bool FairMQSplitter::HandleMultipartData(FairMQParts& payload, int /*index*/) -{ - Send(payload, fOutChannelName, fDirection); - - if (++fDirection >= fNumOutputs) - { - fDirection = 0; - } - - return true; -} diff --git a/fairmq/devices/FairMQSplitter.h b/fairmq/devices/FairMQSplitter.h index 5a163228..3d7bef02 100644 --- a/fairmq/devices/FairMQSplitter.h +++ b/fairmq/devices/FairMQSplitter.h @@ -22,8 +22,14 @@ class FairMQSplitter : public FairMQDevice { public: - FairMQSplitter(); - virtual ~FairMQSplitter(); + FairMQSplitter() + : fMultipart(true) + , fNumOutputs(0) + , fDirection(0) + , fInChannelName() + , fOutChannelName() + {} + ~FairMQSplitter() {} protected: bool fMultipart; @@ -32,10 +38,32 @@ class FairMQSplitter : public FairMQDevice std::string fInChannelName; std::string fOutChannelName; - virtual void InitTask(); + void InitTask() override + { + fMultipart = fConfig->GetProperty("multipart"); + fInChannelName = fConfig->GetProperty("in-channel"); + fOutChannelName = fConfig->GetProperty("out-channel"); + fNumOutputs = fChannels.at(fOutChannelName).size(); + fDirection = 0; - bool HandleSingleData(std::unique_ptr&, int); - bool HandleMultipartData(FairMQParts&, int); + if (fMultipart) { + OnData(fInChannelName, &FairMQSplitter::HandleData); + } else { + OnData(fInChannelName, &FairMQSplitter::HandleData); + } + } + + template + bool HandleData(T& payload, int) + { + Send(payload, fOutChannelName, fDirection); + + if (++fDirection >= fNumOutputs) { + fDirection = 0; + } + + return true; + } }; #endif /* FAIRMQSPLITTER_H_ */