Add example for sending multipart messages

This commit is contained in:
Alexey Rybalchenko 2015-11-06 13:20:11 +01:00
parent 2e789e4439
commit 307d698736
18 changed files with 545 additions and 12 deletions

View File

@ -19,6 +19,7 @@ add_subdirectory(examples/4-copypush)
add_subdirectory(examples/5-req-rep)
add_subdirectory(examples/6-multiple-channels)
add_subdirectory(examples/7-parameters)
add_subdirectory(examples/8-multipart)
add_subdirectory(test)

View File

@ -63,7 +63,7 @@ FairMQChannel::FairMQChannel(const string& type, const string& method, const str
{
}
std::string FairMQChannel::GetType() const
string FairMQChannel::GetType() const
{
try
{
@ -77,7 +77,7 @@ std::string FairMQChannel::GetType() const
}
}
std::string FairMQChannel::GetMethod() const
string FairMQChannel::GetMethod() const
{
try
{
@ -91,7 +91,7 @@ std::string FairMQChannel::GetMethod() const
}
}
std::string FairMQChannel::GetAddress() const
string FairMQChannel::GetAddress() const
{
try
{
@ -147,7 +147,7 @@ int FairMQChannel::GetRateLogging() const
}
}
void FairMQChannel::UpdateType(const std::string& type)
void FairMQChannel::UpdateType(const string& type)
{
try
{
@ -162,7 +162,7 @@ void FairMQChannel::UpdateType(const std::string& type)
}
}
void FairMQChannel::UpdateMethod(const std::string& method)
void FairMQChannel::UpdateMethod(const string& method)
{
try
{
@ -177,7 +177,7 @@ void FairMQChannel::UpdateMethod(const std::string& method)
}
}
void FairMQChannel::UpdateAddress(const std::string& address)
void FairMQChannel::UpdateAddress(const string& address)
{
try
{
@ -420,6 +420,41 @@ int FairMQChannel::SendPart(const unique_ptr<FairMQMessage>& msg) const
return fSocket->Send(msg.get(), fSndMoreFlag);
}
// int FairMQChannel::SendParts(initializer_list<unique_ptr<FairMQMessage>> partsList) const
// {
// int totalSize = 0;
// initializer_list<unique_ptr<FairMQMessage>>::iterator it = partsList.end();
// auto &last = --it;
// for (auto &p : partsList)
// {
// if (&p != last)
// {
// int nbytes = SendPart(p);
// if (nbytes >= 0)
// {
// totalSize += nbytes;
// }
// else
// {
// return nbytes;
// }
// }
// else
// {
// int nbytes = Send(p);
// if (nbytes >= 0)
// {
// totalSize += nbytes;
// }
// else
// {
// return nbytes;
// }
// }
// }
// return totalSize;
// }
int FairMQChannel::Receive(const unique_ptr<FairMQMessage>& msg) const
{
fPoller->Poll(fRcvTimeoutInMs);

View File

@ -122,6 +122,12 @@ class FairMQChannel
/// @return Returns the number of bytes that have been queued. -2 If queueing was not possible. In case of errors, returns -1.
int SendPart(const std::unique_ptr<FairMQMessage>& msg) const;
// /// Sends the messages provided as arguments as a multi-part message.
// ///
// /// @param partsList Initializer list of FairMQMessages
// /// @return Returns the number of bytes that have been queued. -2 If queueing was not possible. In case of errors, returns -1.
// int SendParts(std::initializer_list<std::unique_ptr<FairMQMessage>> partsList) const;
/// Receives a message from the socket queue.
/// @details Receive method attempts to receive a message from the input queue.
/// If the queue is empty the method blocks.

View File

@ -27,7 +27,7 @@ void FairMQExample1Sink::Run()
{
unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
if (fChannels.at("data-in").at(0).Receive(msg) > 0)
if (fChannels.at("data-in").at(0).Receive(msg) >= 0)
{
LOG(INFO) << "Received message: \""
<< string(static_cast<char*>(msg->GetData()), msg->GetSize())

View File

@ -40,7 +40,7 @@ void FairMQExample2Processor::Run()
// Receive the message (blocks until received or interrupted (e.g. by state change)).
// Returns size of the received message or -1 if interrupted.
if (fChannels.at("data-in").at(0).Receive(input) > 0)
if (fChannels.at("data-in").at(0).Receive(input) >= 0)
{
LOG(INFO) << "Received data, processing...";

View File

@ -30,7 +30,7 @@ void FairMQExample2Sink::Run()
{
unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
if (fChannels.at("data-in").at(0).Receive(msg) > 0)
if (fChannels.at("data-in").at(0).Receive(msg) >= 0)
{
LOG(INFO) << "Received message: \""
<< string(static_cast<char*>(msg->GetData()), msg->GetSize())

View File

@ -40,7 +40,7 @@ void FairMQExample3Processor::Run()
// Receive the message (blocks until received or interrupted (e.g. by state change)).
// Returns size of the received message or -1 if interrupted.
if (fChannels.at("data-in").at(0).Receive(input) > 0)
if (fChannels.at("data-in").at(0).Receive(input) >= 0)
{
LOG(INFO) << "Received data, processing...";

View File

@ -30,7 +30,7 @@ void FairMQExample3Sink::Run()
{
unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
if (fChannels.at("data-in").at(0).Receive(msg) > 0)
if (fChannels.at("data-in").at(0).Receive(msg) >= 0)
{
LOG(INFO) << "Received message: \""
<< string(static_cast<char*>(msg->GetData()), msg->GetSize())

View File

@ -35,7 +35,7 @@ void FairMQExample5Server::Run()
{
unique_ptr<FairMQMessage> request(fTransportFactory->CreateMessage());
if (fChannels.at("data").at(0).Receive(request) > 0)
if (fChannels.at("data").at(0).Receive(request) >= 0)
{
LOG(INFO) << "Received request from client: \"" << string(static_cast<char*>(request->GetData()), request->GetSize()) << "\"";

View File

@ -0,0 +1,87 @@
################################################################################
# Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# #
# This software is distributed under the terms of the #
# GNU Lesser General Public Licence version 3 (LGPL) version 3, #
# copied verbatim in the file "LICENSE" #
################################################################################
configure_file(${CMAKE_SOURCE_DIR}/fairmq/examples/8-multipart/ex8-multipart.json ${CMAKE_BINARY_DIR}/bin/config/ex8-multipart.json)
Set(INCLUDE_DIRECTORIES
${CMAKE_SOURCE_DIR}/fairmq
${CMAKE_SOURCE_DIR}/fairmq/devices
${CMAKE_SOURCE_DIR}/fairmq/tools
${CMAKE_SOURCE_DIR}/fairmq/options
${CMAKE_SOURCE_DIR}/fairmq/examples/8-multipart
${CMAKE_CURRENT_BINARY_DIR}
)
Set(SYSTEM_INCLUDE_DIRECTORIES
${Boost_INCLUDE_DIR}
)
If(NANOMSG_FOUND)
Set(INCLUDE_DIRECTORIES
${INCLUDE_DIRECTORIES}
${CMAKE_SOURCE_DIR}/fairmq/nanomsg
)
Set(SYSTEM_INCLUDE_DIRECTORIES
${SYSTEM_INCLUDE_DIRECTORIES}
${ZMQ_INCLUDE_DIR}
)
Else(NANOMSG_FOUND)
Set(INCLUDE_DIRECTORIES
${INCLUDE_DIRECTORIES}
${CMAKE_SOURCE_DIR}/fairmq/zeromq
)
Set(SYSTEM_INCLUDE_DIRECTORIES
${SYSTEM_INCLUDE_DIRECTORIES}
${ZMQ_INCLUDE_DIR}
)
EndIf(NANOMSG_FOUND)
Include_Directories(${INCLUDE_DIRECTORIES})
Include_Directories(SYSTEM ${SYSTEM_INCLUDE_DIRECTORIES})
Set(LINK_DIRECTORIES
${Boost_LIBRARY_DIRS}
)
Link_Directories(${LINK_DIRECTORIES})
Set(SRCS
"FairMQExample8Sampler.cxx"
"FairMQExample8Sink.cxx"
)
Set(DEPENDENCIES
${DEPENDENCIES}
FairMQ
)
Set(LIBRARY_NAME FairMQExample8)
GENERATE_LIBRARY()
Set(Exe_Names
ex8-sampler
ex8-sink
)
Set(Exe_Source
runExample8Sampler.cxx
runExample8Sink.cxx
)
list(LENGTH Exe_Names _length)
math(EXPR _length ${_length}-1)
ForEach(_file RANGE 0 ${_length})
list(GET Exe_Names ${_file} _name)
list(GET Exe_Source ${_file} _src)
Set(EXE_NAME ${_name})
Set(SRCS ${_src})
Set(DEPENDENCIES FairMQExample8)
GENERATE_EXECUTABLE()
EndForEach(_file RANGE 0 ${_length})

View File

@ -0,0 +1,70 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQExample8Sampler.cpp
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include "FairMQExample8Sampler.h"
#include "FairMQLogger.h"
using namespace std;
struct Ex8Header {
int32_t stopFlag;
};
FairMQExample8Sampler::FairMQExample8Sampler()
{
}
void FairMQExample8Sampler::Run()
{
int counter = 0;
// Check if we are still in the RUNNING state.
while (CheckCurrentState(RUNNING))
{
Ex8Header* header = new Ex8Header;
// Set stopFlag to 1 for the first 4 messages, and to 0 for the 5th.
counter < 5 ? header->stopFlag = 0 : header->stopFlag = 1;
// Create message part with the header.
unique_ptr<FairMQMessage> headerPart(fTransportFactory->CreateMessage(header, sizeof(Ex8Header)));
// Create message part with the body of 1000 bytes size.
unique_ptr<FairMQMessage> dataPart(fTransportFactory->CreateMessage(1000));
LOG(INFO) << "Sending header with stopFlag: " << header->stopFlag;
// Schedule the header part for sending.
fChannels.at("data-out").at(0).SendPart(headerPart);
// Add body part (final part). `Send()` will send/queue all parts.
fChannels.at("data-out").at(0).Send(dataPart);
// Go out of the sending loop if the stopFlag was sent.
if (counter == 5)
{
break;
}
counter++;
// Wait a second to keep the output readable.
boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
}
LOG(INFO) << "Going out of RUNNING state.";
}
FairMQExample8Sampler::~FairMQExample8Sampler()
{
}

View File

@ -0,0 +1,32 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQExample8Sampler.h
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#ifndef FAIRMQEXAMPLE8SAMPLER_H_
#define FAIRMQEXAMPLE8SAMPLER_H_
#include <string>
#include "FairMQDevice.h"
class FairMQExample8Sampler : public FairMQDevice
{
public:
FairMQExample8Sampler();
virtual ~FairMQExample8Sampler();
protected:
virtual void Run();
};
#endif /* FAIRMQEXAMPLE8SAMPLER_H_ */

View File

@ -0,0 +1,57 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQExample8Sink.cxx
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include "FairMQExample8Sink.h"
#include "FairMQLogger.h"
using namespace std;
struct Ex8Header {
int32_t stopFlag;
};
FairMQExample8Sink::FairMQExample8Sink()
{
}
void FairMQExample8Sink::Run()
{
while (CheckCurrentState(RUNNING))
{
unique_ptr<FairMQMessage> headerPart(fTransportFactory->CreateMessage());
unique_ptr<FairMQMessage> bodyPart(fTransportFactory->CreateMessage());
if (fChannels.at("data-in").at(0).Receive(headerPart) >= 0)
{
if (fChannels.at("data-in").at(0).Receive(bodyPart) >= 0)
{
Ex8Header header;
header.stopFlag = (static_cast<Ex8Header*>(headerPart->GetData()))->stopFlag;
LOG(INFO) << "Received header with stopFlag: " << header.stopFlag;
if (header.stopFlag == 1)
{
LOG(INFO) << "Flag is 0, exiting Run()";
break;
}
}
}
}
}
FairMQExample8Sink::~FairMQExample8Sink()
{
}

View File

@ -0,0 +1,30 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQExample8Sink.h
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#ifndef FAIRMQEXAMPLE8SINK_H_
#define FAIRMQEXAMPLE8SINK_H_
#include "FairMQDevice.h"
class FairMQExample8Sink : public FairMQDevice
{
public:
FairMQExample8Sink();
virtual ~FairMQExample8Sink();
protected:
virtual void Run();
};
#endif /* FAIRMQEXAMPLE8SINK_H_ */

View File

@ -0,0 +1,13 @@
Example 8: Sending Multipart messages
===============
A topology of two devices - Sampler and Sink, communicating with PUSH-PULL pattern.
The Sampler sends a multipart message to the Sink, consisting of two message parts - header and body.
Each message part is a regular FairMQMessage. To combine them into a multi-part message, simply send all but the last part with `SendPart()` and the last part with `Send()` as shown in the example.
The ZeroMQ transport guarantees delivery of both parts together. Meaning that when the Receive call of the Sink receives the first part, following parts have arrived too.
The header contains a simple data structure with one integer. The integer in this structure is used as a stop flag for the sink. As long as its value is 0, the Sink will keep processing the data. Once its value is 1, the Sink will exit its `Run()` method.

View File

@ -0,0 +1,40 @@
{
"fairMQOptions":
{
"device":
{
"id": "sampler1",
"channel":
{
"name": "data-out",
"socket":
{
"type": "push",
"method": "bind",
"address": "tcp://*:5555",
"sndBufSize": "1000",
"rcvBufSize": "1000",
"rateLogging": "0"
}
}
},
"device":
{
"id": "sink1",
"channel":
{
"name": "data-in",
"socket":
{
"type": "pull",
"method": "connect",
"address": "tcp://localhost:5555",
"sndBufSize": "1000",
"rcvBufSize": "1000",
"rateLogging": "0"
}
}
}
}
}

View File

@ -0,0 +1,83 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* runExample8Sampler.cxx
*
* @since 2013-04-23
* @author D. Klein, A. Rybalchenko
*/
#include <iostream>
#include "boost/program_options.hpp"
#include "FairMQLogger.h"
#include "FairMQParser.h"
#include "FairMQProgOptions.h"
#include "FairMQExample8Sampler.h"
#ifdef NANOMSG
#include "FairMQTransportFactoryNN.h"
#else
#include "FairMQTransportFactoryZMQ.h"
#endif
using namespace boost::program_options;
int main(int argc, char** argv)
{
FairMQExample8Sampler sampler;
sampler.CatchSignals();
FairMQProgOptions config;
try
{
if (config.ParseAll(argc, argv))
{
return 0;
}
std::string filename = config.GetValue<std::string>("config-json-file");
std::string id = config.GetValue<std::string>("id");
config.UserParser<FairMQParser::JSON>(filename, id);
sampler.fChannels = config.GetFairMQMap();
LOG(INFO) << "PID: " << getpid();
#ifdef NANOMSG
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
#else
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
#endif
sampler.SetTransport(transportFactory);
sampler.SetProperty(FairMQExample8Sampler::Id, id);
sampler.ChangeState("INIT_DEVICE");
sampler.WaitForEndOfState("INIT_DEVICE");
sampler.ChangeState("INIT_TASK");
sampler.WaitForEndOfState("INIT_TASK");
sampler.ChangeState("RUN");
sampler.InteractiveStateLoop();
}
catch (std::exception& e)
{
LOG(ERROR) << e.what();
LOG(INFO) << "Command line options are the following: ";
config.PrintHelp();
return 1;
}
return 0;
}

View File

@ -0,0 +1,79 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* runExample8Sink.cxx
*
* @since 2013-04-23
* @author D. Klein, A. Rybalchenko
*/
#include <iostream>
#include "FairMQLogger.h"
#include "FairMQParser.h"
#include "FairMQProgOptions.h"
#include "FairMQExample8Sink.h"
#ifdef NANOMSG
#include "FairMQTransportFactoryNN.h"
#else
#include "FairMQTransportFactoryZMQ.h"
#endif
int main(int argc, char** argv)
{
FairMQExample8Sink sink;
sink.CatchSignals();
FairMQProgOptions config;
try
{
if (config.ParseAll(argc, argv))
{
return 0;
}
std::string filename = config.GetValue<std::string>("config-json-file");
std::string id = config.GetValue<std::string>("id");
config.UserParser<FairMQParser::JSON>(filename, id);
sink.fChannels = config.GetFairMQMap();
LOG(INFO) << "PID: " << getpid();
#ifdef NANOMSG
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
#else
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
#endif
sink.SetTransport(transportFactory);
sink.SetProperty(FairMQExample8Sink::Id, id);
sink.ChangeState("INIT_DEVICE");
sink.WaitForEndOfState("INIT_DEVICE");
sink.ChangeState("INIT_TASK");
sink.WaitForEndOfState("INIT_TASK");
sink.ChangeState("RUN");
sink.InteractiveStateLoop();
}
catch (std::exception& e)
{
LOG(ERROR) << e.what();
LOG(INFO) << "Command line options are the following: ";
config.PrintHelp();
return 1;
}
return 0;
}