From 9317f06c10e338b721c6136365742c697fbd3bee Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Fri, 10 Oct 2014 18:36:44 +0200 Subject: [PATCH] Add REQ-REP zmq/nn sockets to FairMQ together with a little example program. Also includes example of using boost::program_options. --- fairmq/CMakeLists.txt | 12 +- .../examples/req-rep/FairMQExampleClient.cxx | 110 ++++++++++++++ fairmq/examples/req-rep/FairMQExampleClient.h | 48 ++++++ .../examples/req-rep/FairMQExampleServer.cxx | 69 +++++++++ fairmq/examples/req-rep/FairMQExampleServer.h | 32 ++++ fairmq/examples/req-rep/README.md | 3 + fairmq/examples/req-rep/runExampleClient.cxx | 142 ++++++++++++++++++ fairmq/examples/req-rep/runExampleServer.cxx | 96 ++++++++++++ fairmq/nanomsg/FairMQSocketNN.cxx | 4 + fairmq/nanomsg/FairMQSocketNN.h | 1 + fairmq/zeromq/FairMQSocketZMQ.cxx | 4 + 11 files changed, 519 insertions(+), 2 deletions(-) create mode 100644 fairmq/examples/req-rep/FairMQExampleClient.cxx create mode 100644 fairmq/examples/req-rep/FairMQExampleClient.h create mode 100644 fairmq/examples/req-rep/FairMQExampleServer.cxx create mode 100644 fairmq/examples/req-rep/FairMQExampleServer.h create mode 100644 fairmq/examples/req-rep/README.md create mode 100644 fairmq/examples/req-rep/runExampleClient.cxx create mode 100644 fairmq/examples/req-rep/runExampleServer.cxx diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index 78291a89..f58952f6 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -15,6 +15,7 @@ if(PROTOBUF_FOUND) set(INCLUDE_DIRECTORIES ${INCLUDE_DIRECTORIES} ${PROTOBUF_INCLUDE_DIR} + ${CMAKE_SOURCE_DIR}/fairmq/examples/req-rep # # following directory is only for protobuf tests and is not essential part of FairMQ #${CMAKE_SOURCE_DIR}/fairmq/prototest ) @@ -57,6 +58,8 @@ set(SRCS "devices/FairMQSplitter.cxx" "devices/FairMQMerger.cxx" "FairMQPoller.cxx" + "examples/req-rep/FairMQExampleClient.cxx" + "examples/req-rep/FairMQExampleServer.cxx" ) if(PROTOBUF_FOUND) @@ -104,7 +107,7 @@ endif(NANOMSG_FOUND) set(DEPENDENCIES ${DEPENDENCIES} - boost_thread boost_timer boost_system + boost_thread boost_timer boost_system boost_program_options ) set(LIBRARY_NAME FairMQ) @@ -117,7 +120,10 @@ set(Exe_Names splitter merger sink - proxy) + proxy + example_client + example_server +) # following executables are only for protobuf tests and are not essential part of FairMQ # if(PROTOBUF_FOUND) @@ -137,6 +143,8 @@ set(Exe_Source run/runMerger.cxx run/runSink.cxx run/runProxy.cxx + examples/req-rep/runExampleClient.cxx + examples/req-rep/runExampleServer.cxx ) # following source files are only for protobuf tests and are not essential part of FairMQ diff --git a/fairmq/examples/req-rep/FairMQExampleClient.cxx b/fairmq/examples/req-rep/FairMQExampleClient.cxx new file mode 100644 index 00000000..387d41ab --- /dev/null +++ b/fairmq/examples/req-rep/FairMQExampleClient.cxx @@ -0,0 +1,110 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * FairMQExampleClient.cpp + * + * @since 2014-10-10 + * @author A. Rybalchenko + */ + +#include +#include + +#include "FairMQExampleClient.h" +#include "FairMQLogger.h" + +FairMQExampleClient::FairMQExampleClient() +{ +} + +FairMQExampleClient::~FairMQExampleClient() +{ +} + +void FairMQExampleClient::CustomCleanup(void *data, void *hint) +{ + delete (string*)hint; +} + +void FairMQExampleClient::Run() +{ + // boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); + + while (fState == RUNNING) + { + boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); + + string* text = new string(fText); + + FairMQMessage* request = fTransportFactory->CreateMessage(const_cast(text->c_str()), text->length(), CustomCleanup, text); + FairMQMessage* reply = fTransportFactory->CreateMessage(); + + LOG(INFO) << "Sending \"" << fText << "\" to server."; + + fPayloadOutputs->at(0)->Send(request); + fPayloadOutputs->at(0)->Receive(reply); + + LOG(INFO) << "Received reply from server: \"" << string(static_cast(reply->GetData()), reply->GetSize()) << "\""; + + delete reply; + } + + // rateLogger.interrupt(); + // rateLogger.join(); + + FairMQDevice::Shutdown(); + + boost::lock_guard lock(fRunningMutex); + fRunningFinished = true; + fRunningCondition.notify_one(); +} + + +void FairMQExampleClient::SetProperty(const int key, const string& value, const int slot /*= 0*/) +{ + switch (key) + { + case Text: + fText = value; + break; + default: + FairMQDevice::SetProperty(key, value, slot); + break; + } +} + +string FairMQExampleClient::GetProperty(const int key, const string& default_ /*= ""*/, const int slot /*= 0*/) +{ + switch (key) + { + case Text: + return fText; + break; + default: + return FairMQDevice::GetProperty(key, default_, slot); + } +} + +void FairMQExampleClient::SetProperty(const int key, const int value, const int slot /*= 0*/) +{ + switch (key) + { + default: + FairMQDevice::SetProperty(key, value, slot); + break; + } +} + +int FairMQExampleClient::GetProperty(const int key, const int default_ /*= 0*/, const int slot /*= 0*/) +{ + switch (key) + { + default: + return FairMQDevice::GetProperty(key, default_, slot); + } +} diff --git a/fairmq/examples/req-rep/FairMQExampleClient.h b/fairmq/examples/req-rep/FairMQExampleClient.h new file mode 100644 index 00000000..7f01c897 --- /dev/null +++ b/fairmq/examples/req-rep/FairMQExampleClient.h @@ -0,0 +1,48 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * FairMQExampleClient.h + * + * @since 2014-10-10 + * @author A. Rybalchenko + */ + +#ifndef FAIRMQEXAMPLECLIENT_H_ +#define FAIRMQEXAMPLECLIENT_H_ + +#include + +#include "FairMQDevice.h" + +using namespace std; + +class FairMQExampleClient : public FairMQDevice +{ + public: + enum + { + Text = FairMQDevice::Last, + Last + }; + FairMQExampleClient(); + virtual ~FairMQExampleClient(); + + static void CustomCleanup(void *data, void* hint); + + virtual void SetProperty(const int key, const string& value, const int slot = 0); + virtual string GetProperty(const int key, const string& default_ = "", const int slot = 0); + virtual void SetProperty(const int key, const int value, const int slot = 0); + virtual int GetProperty(const int key, const int default_ = 0, const int slot = 0); + + protected: + string fText; + + virtual void Run(); +}; + +#endif /* FAIRMQEXAMPLECLIENT_H_ */ diff --git a/fairmq/examples/req-rep/FairMQExampleServer.cxx b/fairmq/examples/req-rep/FairMQExampleServer.cxx new file mode 100644 index 00000000..549bb2e4 --- /dev/null +++ b/fairmq/examples/req-rep/FairMQExampleServer.cxx @@ -0,0 +1,69 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * FairMQExampleServer.cxx + * + * @since 2014-10-10 + * @author A. Rybalchenko + */ + +#include +#include + +#include "FairMQExampleServer.h" +#include "FairMQLogger.h" + +using namespace std; + +FairMQExampleServer::FairMQExampleServer() +{ +} + +void FairMQExampleServer::CustomCleanup(void *data, void *hint) +{ + delete (string*)hint; +} + +void FairMQExampleServer::Run() +{ + // boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); + + while (fState == RUNNING) + { + boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); + + FairMQMessage* request = fTransportFactory->CreateMessage(); + + fPayloadInputs->at(0)->Receive(request); + + LOG(INFO) << "Received request from client: \"" << string(static_cast(request->GetData()), request->GetSize()) << "\""; + + string* text = new string("Thank you for the \"" + string(static_cast(request->GetData()), request->GetSize()) + "\"!"); + + delete request; + + LOG(INFO) << "Sending reply to client."; + + FairMQMessage* reply = fTransportFactory->CreateMessage(const_cast(text->c_str()), text->length(), CustomCleanup, text); + + fPayloadInputs->at(0)->Send(reply); + } + + // rateLogger.interrupt(); + // rateLogger.join(); + + FairMQDevice::Shutdown(); + + boost::lock_guard lock(fRunningMutex); + fRunningFinished = true; + fRunningCondition.notify_one(); +} + +FairMQExampleServer::~FairMQExampleServer() +{ +} diff --git a/fairmq/examples/req-rep/FairMQExampleServer.h b/fairmq/examples/req-rep/FairMQExampleServer.h new file mode 100644 index 00000000..754b9c29 --- /dev/null +++ b/fairmq/examples/req-rep/FairMQExampleServer.h @@ -0,0 +1,32 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * FairMQExampleServer.h + * + * @since 2014-10-10 + * @author A. Rybalchenko + */ + +#ifndef FAIRMQEXAMPLESERVER_H_ +#define FAIRMQEXAMPLESERVER_H_ + +#include "FairMQDevice.h" + +class FairMQExampleServer : public FairMQDevice +{ + public: + FairMQExampleServer(); + virtual ~FairMQExampleServer(); + + static void CustomCleanup(void *data, void* hint); + + protected: + virtual void Run(); +}; + +#endif /* FAIRMQEXAMPLESERVER_H_ */ diff --git a/fairmq/examples/req-rep/README.md b/fairmq/examples/req-rep/README.md new file mode 100644 index 00000000..e399752e --- /dev/null +++ b/fairmq/examples/req-rep/README.md @@ -0,0 +1,3 @@ +# FairMQ Request-Reply Example + +This example demonstrates usage of the request-reply pattern together with FairMQ. Two processes - example_client and example_server communicate. The client sends a text string and the server respondes by returning the string back to the client. The communication happens over a **single** REP-REP socket. Works both with ZeroMQ and with nanomsg transport. diff --git a/fairmq/examples/req-rep/runExampleClient.cxx b/fairmq/examples/req-rep/runExampleClient.cxx new file mode 100644 index 00000000..f95d14b4 --- /dev/null +++ b/fairmq/examples/req-rep/runExampleClient.cxx @@ -0,0 +1,142 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * runExampleClient.cxx + * + * @since 2013-04-23 + * @author D. Klein, A. Rybalchenko + */ + +#include +#include + +#include "boost/program_options.hpp" + +#include "FairMQLogger.h" +#include "FairMQExampleClient.h" + +#ifdef NANOMSG +#include "FairMQTransportFactoryNN.h" +#else +#include "FairMQTransportFactoryZMQ.h" +#endif + +using namespace std; + +FairMQExampleClient client; + +static void s_signal_handler(int signal) +{ + cout << endl << "Caught signal " << signal << endl; + + client.ChangeState(FairMQExampleClient::STOP); + client.ChangeState(FairMQExampleClient::END); + + cout << "Shutdown complete. Bye!" << endl; + exit(1); +} + +static void s_catch_signals(void) +{ + struct sigaction action; + action.sa_handler = s_signal_handler; + action.sa_flags = 0; + sigemptyset(&action.sa_mask); + sigaction(SIGINT, &action, NULL); + sigaction(SIGTERM, &action, NULL); +} + +typedef struct DeviceOptions +{ + string text; +} DeviceOptions_t; + +inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) +{ + if (_options == NULL) + throw std::runtime_error("Internal error: options' container is empty."); + + namespace bpo = boost::program_options; + bpo::options_description desc("Options"); + desc.add_options() + ("text,t", bpo::value()->default_value("something"), "Text to send to server") + ("help", "Print help messages"); + + bpo::variables_map vm; + bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm); + + if ( vm.count("help") ) + { + LOG(INFO) << "EPN" << endl << desc; + return false; + } + + bpo::notify(vm); + + if ( vm.count("text") ) + _options->text = vm["text"].as(); + + return true; +} + +int main(int argc, char** argv) +{ + s_catch_signals(); + + DeviceOptions_t options; + try + { + if (!parse_cmd_line(argc, argv, &options)) + return 0; + } + catch (exception& e) + { + LOG(ERROR) << e.what(); + return 1; + } + + LOG(INFO) << "PID: " << getpid(); + +#ifdef NANOMSG + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); +#else + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); +#endif + + client.SetTransport(transportFactory); + + client.SetProperty(FairMQExampleClient::Id, "client"); + client.SetProperty(FairMQExampleClient::NumIoThreads, 1); + client.SetProperty(FairMQExampleClient::NumInputs, 0); + client.SetProperty(FairMQExampleClient::NumOutputs, 1); + + client.ChangeState(FairMQExampleClient::INIT); + + client.SetProperty(FairMQExampleClient::OutputSocketType, "req", 0); + client.SetProperty(FairMQExampleClient::OutputSndBufSize, 10000, 0); + client.SetProperty(FairMQExampleClient::OutputMethod, "connect", 0); + client.SetProperty(FairMQExampleClient::OutputAddress, "tcp://localhost:5005", 0); + + client.SetProperty(FairMQExampleClient::Text, options.text); + + client.ChangeState(FairMQExampleClient::SETOUTPUT); + client.ChangeState(FairMQExampleClient::SETINPUT); + client.ChangeState(FairMQExampleClient::RUN); + + // wait until the running thread has finished processing. + boost::unique_lock lock(client.fRunningMutex); + while (!client.fRunningFinished) + { + client.fRunningCondition.wait(lock); + } + + client.ChangeState(FairMQExampleClient::STOP); + client.ChangeState(FairMQExampleClient::END); + + return 0; +} diff --git a/fairmq/examples/req-rep/runExampleServer.cxx b/fairmq/examples/req-rep/runExampleServer.cxx new file mode 100644 index 00000000..3c8bc310 --- /dev/null +++ b/fairmq/examples/req-rep/runExampleServer.cxx @@ -0,0 +1,96 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * runExampleServer.cxx + * + * @since 2013-04-23 + * @author D. Klein, A. Rybalchenko + */ + +#include +#include + +#include "FairMQLogger.h" +#include "FairMQExampleServer.h" + +#ifdef NANOMSG +#include "FairMQTransportFactoryNN.h" +#else +#include "FairMQTransportFactoryZMQ.h" +#endif + +using namespace std; + +FairMQExampleServer server; + +static void s_signal_handler(int signal) +{ + cout << endl << "Caught signal " << signal << endl; + + server.ChangeState(FairMQExampleServer::STOP); + server.ChangeState(FairMQExampleServer::END); + + cout << "Shutdown complete. Bye!" << endl; + exit(1); +} + +static void s_catch_signals(void) +{ + struct sigaction action; + action.sa_handler = s_signal_handler; + action.sa_flags = 0; + sigemptyset(&action.sa_mask); + sigaction(SIGINT, &action, NULL); + sigaction(SIGTERM, &action, NULL); +} + +int main(int argc, char** argv) +{ + s_catch_signals(); + + LOG(INFO) << "PID: " << getpid(); + +#ifdef NANOMSG + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); +#else + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); +#endif + + server.SetTransport(transportFactory); + + server.SetProperty(FairMQExampleServer::Id, "server"); + server.SetProperty(FairMQExampleServer::NumIoThreads, 1); + server.SetProperty(FairMQExampleServer::NumInputs, 1); + server.SetProperty(FairMQExampleServer::NumOutputs, 0); + + server.ChangeState(FairMQExampleServer::INIT); + + server.SetProperty(FairMQExampleServer::InputSocketType, "rep", 0); + server.SetProperty(FairMQExampleServer::InputSndBufSize, 10000, 0); + server.SetProperty(FairMQExampleServer::InputMethod, "bind", 0); + server.SetProperty(FairMQExampleServer::InputAddress, "tcp://*:5005", 0); + + server.ChangeState(FairMQExampleServer::SETOUTPUT); + server.ChangeState(FairMQExampleServer::SETINPUT); + + LOG(INFO) << "Listening for requests!"; + + server.ChangeState(FairMQExampleServer::RUN); + + // wait until the running thread has finished processing. + boost::unique_lock lock(server.fRunningMutex); + while (!server.fRunningFinished) + { + server.fRunningCondition.wait(lock); + } + + server.ChangeState(FairMQExampleServer::STOP); + server.ChangeState(FairMQExampleServer::END); + + return 0; +} diff --git a/fairmq/nanomsg/FairMQSocketNN.cxx b/fairmq/nanomsg/FairMQSocketNN.cxx index 7c444d45..209f0c6e 100644 --- a/fairmq/nanomsg/FairMQSocketNN.cxx +++ b/fairmq/nanomsg/FairMQSocketNN.cxx @@ -179,6 +179,10 @@ int FairMQSocketNN::GetConstant(const string& constant) return NN_PUSH; if (constant == "pull") return NN_PULL; + if (constant == "req") + return NN_REQ; + if (constant == "rep") + return NN_REP; if (constant == "snd-hwm") return NN_SNDBUF; if (constant == "rcv-hwm") diff --git a/fairmq/nanomsg/FairMQSocketNN.h b/fairmq/nanomsg/FairMQSocketNN.h index 4d8b6c38..4bdc03af 100644 --- a/fairmq/nanomsg/FairMQSocketNN.h +++ b/fairmq/nanomsg/FairMQSocketNN.h @@ -18,6 +18,7 @@ #include #include #include +#include #include "FairMQSocket.h" diff --git a/fairmq/zeromq/FairMQSocketZMQ.cxx b/fairmq/zeromq/FairMQSocketZMQ.cxx index 9e7aa7da..a776eaf6 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.cxx +++ b/fairmq/zeromq/FairMQSocketZMQ.cxx @@ -224,6 +224,10 @@ int FairMQSocketZMQ::GetConstant(const string& constant) return ZMQ_PUSH; if (constant == "pull") return ZMQ_PULL; + if (constant == "req") + return ZMQ_REQ; + if (constant == "rep") + return ZMQ_REP; if (constant == "snd-hwm") return ZMQ_SNDHWM; if (constant == "rcv-hwm")