diff --git a/fairmq/devices/FairMQBenchmarkSampler.h b/fairmq/devices/FairMQBenchmarkSampler.h index 77d437b3..4e9afd40 100644 --- a/fairmq/devices/FairMQBenchmarkSampler.h +++ b/fairmq/devices/FairMQBenchmarkSampler.h @@ -37,8 +37,6 @@ class FairMQBenchmarkSampler : public FairMQDevice FairMQBenchmarkSampler(); virtual ~FairMQBenchmarkSampler(); - void Log(int intervalInMs); - virtual void SetProperty(const int key, const std::string& value); virtual std::string GetProperty(const int key, const std::string& default_ = ""); virtual void SetProperty(const int key, const int value); diff --git a/fairmq/devices/FairMQMerger.cxx b/fairmq/devices/FairMQMerger.cxx index ac563dad..975c7b3e 100644 --- a/fairmq/devices/FairMQMerger.cxx +++ b/fairmq/devices/FairMQMerger.cxx @@ -19,7 +19,10 @@ #include "FairMQMerger.h" #include "FairMQPoller.h" +using namespace std; + FairMQMerger::FairMQMerger() + : fMultipart(1) { } @@ -33,32 +36,130 @@ void FairMQMerger::Run() std::unique_ptr poller(fTransportFactory->CreatePoller(fChannels.at("data-in"))); - while (CheckCurrentState(RUNNING)) + if (fMultipart) { - poller->Poll(100); - - // Loop over the data input channels. - for (int i = 0; i < numInputs; ++i) + while (CheckCurrentState(RUNNING)) { - // Check if the channel has data ready to be received. - if (poller->CheckInput(i)) - { - FairMQParts parts; + poller->Poll(100); - if (Receive(parts, "data-in", i) >= 0) + // Loop over the data input channels. + for (int i = 0; i < numInputs; ++i) + { + // Check if the channel has data ready to be received. + if (poller->CheckInput(i)) { - if (Send(parts, "data-out") < 0) + FairMQParts payload; + + if (Receive(payload, "data-in", i) >= 0) + { + if (Send(payload, "data-out") < 0) + { + LOG(DEBUG) << "Transfer interrupted"; + break; + } + } + else { LOG(DEBUG) << "Transfer interrupted"; break; } } - else + } + } + } + else + { + while (CheckCurrentState(RUNNING)) + { + poller->Poll(100); + + // Loop over the data input channels. + for (int i = 0; i < numInputs; ++i) + { + // Check if the channel has data ready to be received. + if (poller->CheckInput(i)) { - LOG(DEBUG) << "Transfer interrupted"; - break; + unique_ptr payload(fTransportFactory->CreateMessage()); + + if (Receive(payload, "data-in", i) >= 0) + { + if (Send(payload, "data-out") < 0) + { + LOG(DEBUG) << "Transfer interrupted"; + break; + } + } + else + { + LOG(DEBUG) << "Transfer interrupted"; + break; + } } } } } } + +void FairMQMerger::SetProperty(const int key, const string& value) +{ + switch (key) + { + default: + FairMQDevice::SetProperty(key, value); + break; + } +} + +string FairMQMerger::GetProperty(const int key, const string& default_ /*= ""*/) +{ + switch (key) + { + default: + return FairMQDevice::GetProperty(key, default_); + } +} + +void FairMQMerger::SetProperty(const int key, const int value) +{ + switch (key) + { + case Multipart: + fMultipart = value; + break; + default: + FairMQDevice::SetProperty(key, value); + break; + } +} + +int FairMQMerger::GetProperty(const int key, const int default_ /*= 0*/) +{ + switch (key) + { + case Multipart: + return fMultipart; + default: + return FairMQDevice::GetProperty(key, default_); + } +} + +string FairMQMerger::GetPropertyDescription(const int key) +{ + switch (key) + { + case Multipart: + return "Multipart: Handle payloads as multipart messages."; + default: + return FairMQDevice::GetPropertyDescription(key); + } +} + +void FairMQMerger::ListProperties() +{ + LOG(INFO) << "Properties of FairMQMerger:"; + for (int p = FairMQConfigurable::Last; p < FairMQMerger::Last; ++p) + { + LOG(INFO) << " " << GetPropertyDescription(p); + } + LOG(INFO) << "---------------------------"; +} diff --git a/fairmq/devices/FairMQMerger.h b/fairmq/devices/FairMQMerger.h index 4073bc98..d6385330 100644 --- a/fairmq/devices/FairMQMerger.h +++ b/fairmq/devices/FairMQMerger.h @@ -20,10 +20,26 @@ class FairMQMerger : public FairMQDevice { public: + enum + { + Multipart = FairMQDevice::Last, + Last + }; + FairMQMerger(); virtual ~FairMQMerger(); + virtual void SetProperty(const int key, const std::string& value); + virtual std::string GetProperty(const int key, const std::string& default_ = ""); + virtual void SetProperty(const int key, const int value); + virtual int GetProperty(const int key, const int default_ = 0); + + virtual std::string GetPropertyDescription(const int key); + virtual void ListProperties(); + protected: + int fMultipart; + virtual void Run(); }; diff --git a/fairmq/devices/FairMQProxy.cxx b/fairmq/devices/FairMQProxy.cxx index dce61526..d4feb820 100644 --- a/fairmq/devices/FairMQProxy.cxx +++ b/fairmq/devices/FairMQProxy.cxx @@ -18,7 +18,10 @@ #include "FairMQLogger.h" #include "FairMQProxy.h" +using namespace std; + FairMQProxy::FairMQProxy() + : fMultipart(1) { } @@ -28,22 +31,109 @@ FairMQProxy::~FairMQProxy() void FairMQProxy::Run() { - while (CheckCurrentState(RUNNING)) + if (fMultipart) { - FairMQParts parts; - - if (Receive(parts, "data-in") >= 0) + while (CheckCurrentState(RUNNING)) { - if (Send(parts, "data-out") < 0) + FairMQParts payload; + if (Receive(payload, "data-in") >= 0) + { + if (Send(payload, "data-out") < 0) + { + LOG(DEBUG) << "Transfer interrupted"; + break; + } + } + else { LOG(DEBUG) << "Transfer interrupted"; break; } } - else + } + else + { + while (CheckCurrentState(RUNNING)) { - LOG(DEBUG) << "Transfer interrupted"; - break; + unique_ptr payload(fTransportFactory->CreateMessage()); + if (Receive(payload, "data-in") >= 0) + { + if (Send(payload, "data-out") < 0) + { + LOG(DEBUG) << "Transfer interrupted"; + break; + } + } + else + { + LOG(DEBUG) << "Transfer interrupted"; + break; + } } } } + +void FairMQProxy::SetProperty(const int key, const string& value) +{ + switch (key) + { + default: + FairMQDevice::SetProperty(key, value); + break; + } +} + +string FairMQProxy::GetProperty(const int key, const string& default_ /*= ""*/) +{ + switch (key) + { + default: + return FairMQDevice::GetProperty(key, default_); + } +} + +void FairMQProxy::SetProperty(const int key, const int value) +{ + switch (key) + { + case Multipart: + fMultipart = value; + break; + default: + FairMQDevice::SetProperty(key, value); + break; + } +} + +int FairMQProxy::GetProperty(const int key, const int default_ /*= 0*/) +{ + switch (key) + { + case Multipart: + return fMultipart; + default: + return FairMQDevice::GetProperty(key, default_); + } +} + +string FairMQProxy::GetPropertyDescription(const int key) +{ + switch (key) + { + case Multipart: + return "Multipart: Handle payloads as multipart messages."; + default: + return FairMQDevice::GetPropertyDescription(key); + } +} + +void FairMQProxy::ListProperties() +{ + LOG(INFO) << "Properties of FairMQProxy:"; + for (int p = FairMQConfigurable::Last; p < FairMQProxy::Last; ++p) + { + LOG(INFO) << " " << GetPropertyDescription(p); + } + LOG(INFO) << "---------------------------"; +} + diff --git a/fairmq/devices/FairMQProxy.h b/fairmq/devices/FairMQProxy.h index ef72f1bf..d9c0de51 100644 --- a/fairmq/devices/FairMQProxy.h +++ b/fairmq/devices/FairMQProxy.h @@ -20,10 +20,26 @@ class FairMQProxy : public FairMQDevice { public: + enum + { + Multipart = FairMQDevice::Last, + Last + }; + FairMQProxy(); virtual ~FairMQProxy(); + virtual void SetProperty(const int key, const std::string& value); + virtual std::string GetProperty(const int key, const std::string& default_ = ""); + virtual void SetProperty(const int key, const int value); + virtual int GetProperty(const int key, const int default_ = 0); + + virtual std::string GetPropertyDescription(const int key); + virtual void ListProperties(); + protected: + int fMultipart; + virtual void Run(); }; diff --git a/fairmq/devices/FairMQSplitter.cxx b/fairmq/devices/FairMQSplitter.cxx index f67e43a1..2c1fe1ae 100644 --- a/fairmq/devices/FairMQSplitter.cxx +++ b/fairmq/devices/FairMQSplitter.cxx @@ -18,7 +18,10 @@ #include "FairMQLogger.h" #include "FairMQSplitter.h" +using namespace std; + FairMQSplitter::FairMQSplitter() + : fMultipart(1) { } @@ -32,28 +35,123 @@ void FairMQSplitter::Run() int direction = 0; - while (CheckCurrentState(RUNNING)) + if (fMultipart) { - FairMQParts parts; - - if (Receive(parts, "data-in") >= 0) + while (CheckCurrentState(RUNNING)) { - if (Send(parts, "data-out", direction) < 0) + FairMQParts payload; + + if (Receive(payload, "data-in") >= 0) + { + if (Send(payload, "data-out", direction) < 0) + { + LOG(DEBUG) << "Transfer interrupted"; + break; + } + } + else { LOG(DEBUG) << "Transfer interrupted"; break; } - } - else - { - LOG(DEBUG) << "Transfer interrupted"; - break; - } - ++direction; - if (direction >= numOutputs) + ++direction; + if (direction >= numOutputs) + { + direction = 0; + } + } + } + else + { + while (CheckCurrentState(RUNNING)) { - direction = 0; + unique_ptr payload(fTransportFactory->CreateMessage()); + + if (Receive(payload, "data-in") >= 0) + { + if (Send(payload, "data-out", direction) < 0) + { + LOG(DEBUG) << "Transfer interrupted"; + break; + } + } + else + { + LOG(DEBUG) << "Transfer interrupted"; + break; + } + + ++direction; + if (direction >= numOutputs) + { + direction = 0; + } } } } + +void FairMQSplitter::SetProperty(const int key, const string& value) +{ + switch (key) + { + default: + FairMQDevice::SetProperty(key, value); + break; + } +} + +string FairMQSplitter::GetProperty(const int key, const string& default_ /*= ""*/) +{ + switch (key) + { + default: + return FairMQDevice::GetProperty(key, default_); + } +} + +void FairMQSplitter::SetProperty(const int key, const int value) +{ + switch (key) + { + case Multipart: + fMultipart = value; + break; + default: + FairMQDevice::SetProperty(key, value); + break; + } +} + +int FairMQSplitter::GetProperty(const int key, const int default_ /*= 0*/) +{ + switch (key) + { + case Multipart: + return fMultipart; + default: + return FairMQDevice::GetProperty(key, default_); + } +} + +string FairMQSplitter::GetPropertyDescription(const int key) +{ + switch (key) + { + case Multipart: + return "Multipart: Handle payloads as multipart messages."; + default: + return FairMQDevice::GetPropertyDescription(key); + } +} + +void FairMQSplitter::ListProperties() +{ + LOG(INFO) << "Properties of FairMQSplitter:"; + for (int p = FairMQConfigurable::Last; p < FairMQSplitter::Last; ++p) + { + LOG(INFO) << " " << GetPropertyDescription(p); + } + LOG(INFO) << "---------------------------"; +} + diff --git a/fairmq/devices/FairMQSplitter.h b/fairmq/devices/FairMQSplitter.h index 1ec86bff..44515235 100644 --- a/fairmq/devices/FairMQSplitter.h +++ b/fairmq/devices/FairMQSplitter.h @@ -20,10 +20,26 @@ class FairMQSplitter : public FairMQDevice { public: + enum + { + Multipart = FairMQDevice::Last, + Last + }; + FairMQSplitter(); virtual ~FairMQSplitter(); + virtual void SetProperty(const int key, const std::string& value); + virtual std::string GetProperty(const int key, const std::string& default_ = ""); + virtual void SetProperty(const int key, const int value); + virtual int GetProperty(const int key, const int default_ = 0); + + virtual std::string GetPropertyDescription(const int key); + virtual void ListProperties(); + protected: + int fMultipart; + virtual void Run(); }; diff --git a/fairmq/run/runMerger.cxx b/fairmq/run/runMerger.cxx index aeb1923d..1d48994e 100644 --- a/fairmq/run/runMerger.cxx +++ b/fairmq/run/runMerger.cxx @@ -14,19 +14,31 @@ #include +#include "boost/program_options.hpp" + #include "FairMQLogger.h" #include "FairMQProgOptions.h" #include "FairMQMerger.h" #include "runSimpleMQStateMachine.h" +using namespace boost::program_options; + int main(int argc, char** argv) { try { + int multipart; + + options_description mergerOptions("Proxy options"); + mergerOptions.add_options() + ("multipart", value(&multipart)->default_value(1), "Handle multipart payloads"); + FairMQProgOptions config; + config.AddToCmdLineOptions(mergerOptions); config.ParseAll(argc, argv); FairMQMerger merger; + merger.SetProperty(FairMQMerger::Multipart, multipart); runStateMachine(merger, config); } catch (std::exception& e) diff --git a/fairmq/run/runProxy.cxx b/fairmq/run/runProxy.cxx index 90f92e20..40a4eb13 100644 --- a/fairmq/run/runProxy.cxx +++ b/fairmq/run/runProxy.cxx @@ -14,19 +14,31 @@ #include +#include "boost/program_options.hpp" + #include "FairMQLogger.h" #include "FairMQProgOptions.h" #include "FairMQProxy.h" #include "runSimpleMQStateMachine.h" +using namespace boost::program_options; + int main(int argc, char** argv) { try { + int multipart; + + options_description proxyOptions("Proxy options"); + proxyOptions.add_options() + ("multipart", value(&multipart)->default_value(1), "Handle multipart payloads"); + FairMQProgOptions config; + config.AddToCmdLineOptions(proxyOptions); config.ParseAll(argc, argv); FairMQProxy proxy; + proxy.SetProperty(FairMQProxy::Multipart, multipart); runStateMachine(proxy, config); } catch (std::exception& e) diff --git a/fairmq/run/runSplitter.cxx b/fairmq/run/runSplitter.cxx index 7a2d3a6f..cfc957ea 100644 --- a/fairmq/run/runSplitter.cxx +++ b/fairmq/run/runSplitter.cxx @@ -14,19 +14,31 @@ #include +#include "boost/program_options.hpp" + #include "FairMQLogger.h" #include "FairMQProgOptions.h" #include "FairMQSplitter.h" #include "runSimpleMQStateMachine.h" +using namespace boost::program_options; + int main(int argc, char** argv) { try { + int multipart; + + options_description proxyOptions("Proxy options"); + proxyOptions.add_options() + ("multipart", value(&multipart)->default_value(1), "Handle multipart payloads"); + FairMQProgOptions config; + config.AddToCmdLineOptions(proxyOptions); config.ParseAll(argc, argv); FairMQSplitter splitter; + splitter.SetProperty(FairMQSplitter::Multipart, multipart); runStateMachine(splitter, config); } catch (std::exception& e)