diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index 34a06687..73068c76 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -19,6 +19,7 @@ add_subdirectory(examples/4-copypush) add_subdirectory(examples/5-req-rep) add_subdirectory(examples/6-multiple-channels) add_subdirectory(examples/7-parameters) +add_subdirectory(examples/8-multipart) add_subdirectory(test) diff --git a/fairmq/FairMQChannel.cxx b/fairmq/FairMQChannel.cxx index d44af423..719971fe 100644 --- a/fairmq/FairMQChannel.cxx +++ b/fairmq/FairMQChannel.cxx @@ -63,7 +63,7 @@ FairMQChannel::FairMQChannel(const string& type, const string& method, const str { } -std::string FairMQChannel::GetType() const +string FairMQChannel::GetType() const { try { @@ -77,7 +77,7 @@ std::string FairMQChannel::GetType() const } } -std::string FairMQChannel::GetMethod() const +string FairMQChannel::GetMethod() const { try { @@ -91,7 +91,7 @@ std::string FairMQChannel::GetMethod() const } } -std::string FairMQChannel::GetAddress() const +string FairMQChannel::GetAddress() const { try { @@ -147,7 +147,7 @@ int FairMQChannel::GetRateLogging() const } } -void FairMQChannel::UpdateType(const std::string& type) +void FairMQChannel::UpdateType(const string& type) { try { @@ -162,7 +162,7 @@ void FairMQChannel::UpdateType(const std::string& type) } } -void FairMQChannel::UpdateMethod(const std::string& method) +void FairMQChannel::UpdateMethod(const string& method) { try { @@ -177,7 +177,7 @@ void FairMQChannel::UpdateMethod(const std::string& method) } } -void FairMQChannel::UpdateAddress(const std::string& address) +void FairMQChannel::UpdateAddress(const string& address) { try { @@ -420,6 +420,41 @@ int FairMQChannel::SendPart(const unique_ptr& msg) const return fSocket->Send(msg.get(), fSndMoreFlag); } +// int FairMQChannel::SendParts(initializer_list> partsList) const +// { +// int totalSize = 0; +// initializer_list>::iterator it = partsList.end(); +// auto &last = --it; +// for (auto &p : partsList) +// { +// if (&p != last) +// { +// int nbytes = SendPart(p); +// if (nbytes >= 0) +// { +// totalSize += nbytes; +// } +// else +// { +// return nbytes; +// } +// } +// else +// { +// int nbytes = Send(p); +// if (nbytes >= 0) +// { +// totalSize += nbytes; +// } +// else +// { +// return nbytes; +// } +// } +// } +// return totalSize; +// } + int FairMQChannel::Receive(const unique_ptr& msg) const { fPoller->Poll(fRcvTimeoutInMs); diff --git a/fairmq/FairMQChannel.h b/fairmq/FairMQChannel.h index cb19224a..46ba00b2 100644 --- a/fairmq/FairMQChannel.h +++ b/fairmq/FairMQChannel.h @@ -122,6 +122,12 @@ class FairMQChannel /// @return Returns the number of bytes that have been queued. -2 If queueing was not possible. In case of errors, returns -1. int SendPart(const std::unique_ptr& msg) const; + // /// Sends the messages provided as arguments as a multi-part message. + // /// + // /// @param partsList Initializer list of FairMQMessages + // /// @return Returns the number of bytes that have been queued. -2 If queueing was not possible. In case of errors, returns -1. + // int SendParts(std::initializer_list> partsList) const; + /// Receives a message from the socket queue. /// @details Receive method attempts to receive a message from the input queue. /// If the queue is empty the method blocks. diff --git a/fairmq/examples/1-sampler-sink/FairMQExample1Sink.cxx b/fairmq/examples/1-sampler-sink/FairMQExample1Sink.cxx index dd923eeb..551af399 100644 --- a/fairmq/examples/1-sampler-sink/FairMQExample1Sink.cxx +++ b/fairmq/examples/1-sampler-sink/FairMQExample1Sink.cxx @@ -27,7 +27,7 @@ void FairMQExample1Sink::Run() { unique_ptr msg(fTransportFactory->CreateMessage()); - if (fChannels.at("data-in").at(0).Receive(msg) > 0) + if (fChannels.at("data-in").at(0).Receive(msg) >= 0) { LOG(INFO) << "Received message: \"" << string(static_cast(msg->GetData()), msg->GetSize()) diff --git a/fairmq/examples/2-sampler-processor-sink/FairMQExample2Processor.cxx b/fairmq/examples/2-sampler-processor-sink/FairMQExample2Processor.cxx index d35f68e2..b539d514 100644 --- a/fairmq/examples/2-sampler-processor-sink/FairMQExample2Processor.cxx +++ b/fairmq/examples/2-sampler-processor-sink/FairMQExample2Processor.cxx @@ -40,7 +40,7 @@ void FairMQExample2Processor::Run() // Receive the message (blocks until received or interrupted (e.g. by state change)). // Returns size of the received message or -1 if interrupted. - if (fChannels.at("data-in").at(0).Receive(input) > 0) + if (fChannels.at("data-in").at(0).Receive(input) >= 0) { LOG(INFO) << "Received data, processing..."; diff --git a/fairmq/examples/2-sampler-processor-sink/FairMQExample2Sink.cxx b/fairmq/examples/2-sampler-processor-sink/FairMQExample2Sink.cxx index ec4728d4..cb4dbf52 100644 --- a/fairmq/examples/2-sampler-processor-sink/FairMQExample2Sink.cxx +++ b/fairmq/examples/2-sampler-processor-sink/FairMQExample2Sink.cxx @@ -30,7 +30,7 @@ void FairMQExample2Sink::Run() { unique_ptr msg(fTransportFactory->CreateMessage()); - if (fChannels.at("data-in").at(0).Receive(msg) > 0) + if (fChannels.at("data-in").at(0).Receive(msg) >= 0) { LOG(INFO) << "Received message: \"" << string(static_cast(msg->GetData()), msg->GetSize()) diff --git a/fairmq/examples/3-dds/FairMQExample3Processor.cxx b/fairmq/examples/3-dds/FairMQExample3Processor.cxx index 6fb6ed25..e1db6d48 100644 --- a/fairmq/examples/3-dds/FairMQExample3Processor.cxx +++ b/fairmq/examples/3-dds/FairMQExample3Processor.cxx @@ -40,7 +40,7 @@ void FairMQExample3Processor::Run() // Receive the message (blocks until received or interrupted (e.g. by state change)). // Returns size of the received message or -1 if interrupted. - if (fChannels.at("data-in").at(0).Receive(input) > 0) + if (fChannels.at("data-in").at(0).Receive(input) >= 0) { LOG(INFO) << "Received data, processing..."; diff --git a/fairmq/examples/3-dds/FairMQExample3Sink.cxx b/fairmq/examples/3-dds/FairMQExample3Sink.cxx index c2c66c7c..c51def8a 100644 --- a/fairmq/examples/3-dds/FairMQExample3Sink.cxx +++ b/fairmq/examples/3-dds/FairMQExample3Sink.cxx @@ -30,7 +30,7 @@ void FairMQExample3Sink::Run() { unique_ptr msg(fTransportFactory->CreateMessage()); - if (fChannels.at("data-in").at(0).Receive(msg) > 0) + if (fChannels.at("data-in").at(0).Receive(msg) >= 0) { LOG(INFO) << "Received message: \"" << string(static_cast(msg->GetData()), msg->GetSize()) diff --git a/fairmq/examples/5-req-rep/FairMQExample5Server.cxx b/fairmq/examples/5-req-rep/FairMQExample5Server.cxx index e1c600f4..72c33c1f 100644 --- a/fairmq/examples/5-req-rep/FairMQExample5Server.cxx +++ b/fairmq/examples/5-req-rep/FairMQExample5Server.cxx @@ -35,7 +35,7 @@ void FairMQExample5Server::Run() { unique_ptr request(fTransportFactory->CreateMessage()); - if (fChannels.at("data").at(0).Receive(request) > 0) + if (fChannels.at("data").at(0).Receive(request) >= 0) { LOG(INFO) << "Received request from client: \"" << string(static_cast(request->GetData()), request->GetSize()) << "\""; diff --git a/fairmq/examples/8-multipart/CMakeLists.txt b/fairmq/examples/8-multipart/CMakeLists.txt new file mode 100644 index 00000000..7bf6be7f --- /dev/null +++ b/fairmq/examples/8-multipart/CMakeLists.txt @@ -0,0 +1,87 @@ + ################################################################################ + # Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # + # # + # This software is distributed under the terms of the # + # GNU Lesser General Public Licence version 3 (LGPL) version 3, # + # copied verbatim in the file "LICENSE" # + ################################################################################ + +configure_file(${CMAKE_SOURCE_DIR}/fairmq/examples/8-multipart/ex8-multipart.json ${CMAKE_BINARY_DIR}/bin/config/ex8-multipart.json) + +Set(INCLUDE_DIRECTORIES + ${CMAKE_SOURCE_DIR}/fairmq + ${CMAKE_SOURCE_DIR}/fairmq/devices + ${CMAKE_SOURCE_DIR}/fairmq/tools + ${CMAKE_SOURCE_DIR}/fairmq/options + ${CMAKE_SOURCE_DIR}/fairmq/examples/8-multipart + ${CMAKE_CURRENT_BINARY_DIR} +) + +Set(SYSTEM_INCLUDE_DIRECTORIES + ${Boost_INCLUDE_DIR} +) + +If(NANOMSG_FOUND) + Set(INCLUDE_DIRECTORIES + ${INCLUDE_DIRECTORIES} + ${CMAKE_SOURCE_DIR}/fairmq/nanomsg + ) + Set(SYSTEM_INCLUDE_DIRECTORIES + ${SYSTEM_INCLUDE_DIRECTORIES} + ${ZMQ_INCLUDE_DIR} + ) +Else(NANOMSG_FOUND) + Set(INCLUDE_DIRECTORIES + ${INCLUDE_DIRECTORIES} + ${CMAKE_SOURCE_DIR}/fairmq/zeromq + ) + Set(SYSTEM_INCLUDE_DIRECTORIES + ${SYSTEM_INCLUDE_DIRECTORIES} + ${ZMQ_INCLUDE_DIR} + ) +EndIf(NANOMSG_FOUND) + +Include_Directories(${INCLUDE_DIRECTORIES}) +Include_Directories(SYSTEM ${SYSTEM_INCLUDE_DIRECTORIES}) + +Set(LINK_DIRECTORIES + ${Boost_LIBRARY_DIRS} +) + +Link_Directories(${LINK_DIRECTORIES}) + +Set(SRCS + "FairMQExample8Sampler.cxx" + "FairMQExample8Sink.cxx" +) + +Set(DEPENDENCIES + ${DEPENDENCIES} + FairMQ +) + +Set(LIBRARY_NAME FairMQExample8) + +GENERATE_LIBRARY() + +Set(Exe_Names + ex8-sampler + ex8-sink +) + +Set(Exe_Source + runExample8Sampler.cxx + runExample8Sink.cxx +) + +list(LENGTH Exe_Names _length) +math(EXPR _length ${_length}-1) + +ForEach(_file RANGE 0 ${_length}) + list(GET Exe_Names ${_file} _name) + list(GET Exe_Source ${_file} _src) + Set(EXE_NAME ${_name}) + Set(SRCS ${_src}) + Set(DEPENDENCIES FairMQExample8) + GENERATE_EXECUTABLE() +EndForEach(_file RANGE 0 ${_length}) diff --git a/fairmq/examples/8-multipart/FairMQExample8Sampler.cxx b/fairmq/examples/8-multipart/FairMQExample8Sampler.cxx new file mode 100644 index 00000000..5cc47996 --- /dev/null +++ b/fairmq/examples/8-multipart/FairMQExample8Sampler.cxx @@ -0,0 +1,70 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * FairMQExample8Sampler.cpp + * + * @since 2014-10-10 + * @author A. Rybalchenko + */ + +#include +#include + +#include "FairMQExample8Sampler.h" +#include "FairMQLogger.h" + +using namespace std; + +struct Ex8Header { + int32_t stopFlag; +}; + +FairMQExample8Sampler::FairMQExample8Sampler() +{ +} + +void FairMQExample8Sampler::Run() +{ + int counter = 0; + + // Check if we are still in the RUNNING state. + while (CheckCurrentState(RUNNING)) + { + Ex8Header* header = new Ex8Header; + // Set stopFlag to 1 for the first 4 messages, and to 0 for the 5th. + counter < 5 ? header->stopFlag = 0 : header->stopFlag = 1; + + // Create message part with the header. + unique_ptr headerPart(fTransportFactory->CreateMessage(header, sizeof(Ex8Header))); + // Create message part with the body of 1000 bytes size. + unique_ptr dataPart(fTransportFactory->CreateMessage(1000)); + + LOG(INFO) << "Sending header with stopFlag: " << header->stopFlag; + + // Schedule the header part for sending. + fChannels.at("data-out").at(0).SendPart(headerPart); + // Add body part (final part). `Send()` will send/queue all parts. + fChannels.at("data-out").at(0).Send(dataPart); + + // Go out of the sending loop if the stopFlag was sent. + if (counter == 5) + { + break; + } + + counter++; + // Wait a second to keep the output readable. + boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); + } + + LOG(INFO) << "Going out of RUNNING state."; +} + +FairMQExample8Sampler::~FairMQExample8Sampler() +{ +} diff --git a/fairmq/examples/8-multipart/FairMQExample8Sampler.h b/fairmq/examples/8-multipart/FairMQExample8Sampler.h new file mode 100644 index 00000000..479a057e --- /dev/null +++ b/fairmq/examples/8-multipart/FairMQExample8Sampler.h @@ -0,0 +1,32 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * FairMQExample8Sampler.h + * + * @since 2014-10-10 + * @author A. Rybalchenko + */ + +#ifndef FAIRMQEXAMPLE8SAMPLER_H_ +#define FAIRMQEXAMPLE8SAMPLER_H_ + +#include + +#include "FairMQDevice.h" + +class FairMQExample8Sampler : public FairMQDevice +{ + public: + FairMQExample8Sampler(); + virtual ~FairMQExample8Sampler(); + + protected: + virtual void Run(); +}; + +#endif /* FAIRMQEXAMPLE8SAMPLER_H_ */ diff --git a/fairmq/examples/8-multipart/FairMQExample8Sink.cxx b/fairmq/examples/8-multipart/FairMQExample8Sink.cxx new file mode 100644 index 00000000..0dc83784 --- /dev/null +++ b/fairmq/examples/8-multipart/FairMQExample8Sink.cxx @@ -0,0 +1,57 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * FairMQExample8Sink.cxx + * + * @since 2014-10-10 + * @author A. Rybalchenko + */ + +#include +#include + +#include "FairMQExample8Sink.h" +#include "FairMQLogger.h" + +using namespace std; + +struct Ex8Header { + int32_t stopFlag; +}; + +FairMQExample8Sink::FairMQExample8Sink() +{ +} + +void FairMQExample8Sink::Run() +{ + while (CheckCurrentState(RUNNING)) + { + unique_ptr headerPart(fTransportFactory->CreateMessage()); + unique_ptr bodyPart(fTransportFactory->CreateMessage()); + + if (fChannels.at("data-in").at(0).Receive(headerPart) >= 0) + { + if (fChannels.at("data-in").at(0).Receive(bodyPart) >= 0) + { + Ex8Header header; + header.stopFlag = (static_cast(headerPart->GetData()))->stopFlag; + LOG(INFO) << "Received header with stopFlag: " << header.stopFlag; + if (header.stopFlag == 1) + { + LOG(INFO) << "Flag is 0, exiting Run()"; + break; + } + } + } + } +} + +FairMQExample8Sink::~FairMQExample8Sink() +{ +} diff --git a/fairmq/examples/8-multipart/FairMQExample8Sink.h b/fairmq/examples/8-multipart/FairMQExample8Sink.h new file mode 100644 index 00000000..e71851b2 --- /dev/null +++ b/fairmq/examples/8-multipart/FairMQExample8Sink.h @@ -0,0 +1,30 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * FairMQExample8Sink.h + * + * @since 2014-10-10 + * @author A. Rybalchenko + */ + +#ifndef FAIRMQEXAMPLE8SINK_H_ +#define FAIRMQEXAMPLE8SINK_H_ + +#include "FairMQDevice.h" + +class FairMQExample8Sink : public FairMQDevice +{ + public: + FairMQExample8Sink(); + virtual ~FairMQExample8Sink(); + + protected: + virtual void Run(); +}; + +#endif /* FAIRMQEXAMPLE8SINK_H_ */ diff --git a/fairmq/examples/8-multipart/README.md b/fairmq/examples/8-multipart/README.md new file mode 100644 index 00000000..841cfcd9 --- /dev/null +++ b/fairmq/examples/8-multipart/README.md @@ -0,0 +1,13 @@ +Example 8: Sending Multipart messages +=============== + +A topology of two devices - Sampler and Sink, communicating with PUSH-PULL pattern. + +The Sampler sends a multipart message to the Sink, consisting of two message parts - header and body. + +Each message part is a regular FairMQMessage. To combine them into a multi-part message, simply send all but the last part with `SendPart()` and the last part with `Send()` as shown in the example. + +The ZeroMQ transport guarantees delivery of both parts together. Meaning that when the Receive call of the Sink receives the first part, following parts have arrived too. + +The header contains a simple data structure with one integer. The integer in this structure is used as a stop flag for the sink. As long as its value is 0, the Sink will keep processing the data. Once its value is 1, the Sink will exit its `Run()` method. + diff --git a/fairmq/examples/8-multipart/ex8-multipart.json b/fairmq/examples/8-multipart/ex8-multipart.json new file mode 100644 index 00000000..075aae96 --- /dev/null +++ b/fairmq/examples/8-multipart/ex8-multipart.json @@ -0,0 +1,40 @@ +{ + "fairMQOptions": + { + "device": + { + "id": "sampler1", + "channel": + { + "name": "data-out", + "socket": + { + "type": "push", + "method": "bind", + "address": "tcp://*:5555", + "sndBufSize": "1000", + "rcvBufSize": "1000", + "rateLogging": "0" + } + } + }, + + "device": + { + "id": "sink1", + "channel": + { + "name": "data-in", + "socket": + { + "type": "pull", + "method": "connect", + "address": "tcp://localhost:5555", + "sndBufSize": "1000", + "rcvBufSize": "1000", + "rateLogging": "0" + } + } + } + } +} diff --git a/fairmq/examples/8-multipart/runExample8Sampler.cxx b/fairmq/examples/8-multipart/runExample8Sampler.cxx new file mode 100644 index 00000000..3dbeca4a --- /dev/null +++ b/fairmq/examples/8-multipart/runExample8Sampler.cxx @@ -0,0 +1,83 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * runExample8Sampler.cxx + * + * @since 2013-04-23 + * @author D. Klein, A. Rybalchenko + */ + +#include + +#include "boost/program_options.hpp" + +#include "FairMQLogger.h" +#include "FairMQParser.h" +#include "FairMQProgOptions.h" +#include "FairMQExample8Sampler.h" + +#ifdef NANOMSG +#include "FairMQTransportFactoryNN.h" +#else +#include "FairMQTransportFactoryZMQ.h" +#endif + +using namespace boost::program_options; + +int main(int argc, char** argv) +{ + FairMQExample8Sampler sampler; + sampler.CatchSignals(); + + FairMQProgOptions config; + + try + { + if (config.ParseAll(argc, argv)) + { + return 0; + } + + std::string filename = config.GetValue("config-json-file"); + std::string id = config.GetValue("id"); + + config.UserParser(filename, id); + + sampler.fChannels = config.GetFairMQMap(); + + LOG(INFO) << "PID: " << getpid(); + +#ifdef NANOMSG + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); +#else + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); +#endif + + sampler.SetTransport(transportFactory); + + sampler.SetProperty(FairMQExample8Sampler::Id, id); + + sampler.ChangeState("INIT_DEVICE"); + sampler.WaitForEndOfState("INIT_DEVICE"); + + sampler.ChangeState("INIT_TASK"); + sampler.WaitForEndOfState("INIT_TASK"); + + sampler.ChangeState("RUN"); + sampler.InteractiveStateLoop(); + } + catch (std::exception& e) + { + LOG(ERROR) << e.what(); + LOG(INFO) << "Command line options are the following: "; + config.PrintHelp(); + return 1; + } + + return 0; +} diff --git a/fairmq/examples/8-multipart/runExample8Sink.cxx b/fairmq/examples/8-multipart/runExample8Sink.cxx new file mode 100644 index 00000000..70e1f253 --- /dev/null +++ b/fairmq/examples/8-multipart/runExample8Sink.cxx @@ -0,0 +1,79 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * runExample8Sink.cxx + * + * @since 2013-04-23 + * @author D. Klein, A. Rybalchenko + */ + +#include + +#include "FairMQLogger.h" +#include "FairMQParser.h" +#include "FairMQProgOptions.h" +#include "FairMQExample8Sink.h" + +#ifdef NANOMSG +#include "FairMQTransportFactoryNN.h" +#else +#include "FairMQTransportFactoryZMQ.h" +#endif + +int main(int argc, char** argv) +{ + FairMQExample8Sink sink; + sink.CatchSignals(); + + FairMQProgOptions config; + + try + { + if (config.ParseAll(argc, argv)) + { + return 0; + } + + std::string filename = config.GetValue("config-json-file"); + std::string id = config.GetValue("id"); + + config.UserParser(filename, id); + + sink.fChannels = config.GetFairMQMap(); + + LOG(INFO) << "PID: " << getpid(); + +#ifdef NANOMSG + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); +#else + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); +#endif + + sink.SetTransport(transportFactory); + + sink.SetProperty(FairMQExample8Sink::Id, id); + + sink.ChangeState("INIT_DEVICE"); + sink.WaitForEndOfState("INIT_DEVICE"); + + sink.ChangeState("INIT_TASK"); + sink.WaitForEndOfState("INIT_TASK"); + + sink.ChangeState("RUN"); + sink.InteractiveStateLoop(); + } + catch (std::exception& e) + { + LOG(ERROR) << e.what(); + LOG(INFO) << "Command line options are the following: "; + config.PrintHelp(); + return 1; + } + + return 0; +}