From a15d59c725f140bd555de7a79cc3bffa37648e9f Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Fri, 8 May 2020 08:19:02 +0200 Subject: [PATCH] Remove nanomsg transport --- CMakeLists.txt | 28 - FairMQTest.cmake | 1 - README.md | 10 +- docs/Device.md | 2 +- docs/Transport.md | 24 +- examples/1-1/CMakeLists.txt | 5 - examples/1-n-1/CMakeLists.txt | 5 - examples/CMakeLists.txt | 4 +- examples/README.md | 2 +- examples/builtin-devices/CMakeLists.txt | 10 - examples/copypush/CMakeLists.txt | 5 - examples/multipart/CMakeLists.txt | 5 - examples/multiple-channels/CMakeLists.txt | 5 - examples/multiple-transports/README.md | 4 +- .../fairmq-start-ex-multiple-transports.sh.in | 4 +- .../test-ex-multiple-transports.sh.in | 4 +- examples/region/CMakeLists.txt | 5 - examples/req-rep/CMakeLists.txt | 5 - fairmq/CMakeLists.txt | 29 +- fairmq/FairMQChannel.h | 6 +- fairmq/FairMQDevice.h | 4 +- fairmq/FairMQTransportFactory.cxx | 13 +- fairmq/Transports.h | 3 - fairmq/nanomsg/FairMQMessageNN.cxx | 227 ------- fairmq/nanomsg/FairMQMessageNN.h | 68 -- fairmq/nanomsg/FairMQPollerNN.cxx | 207 ------ fairmq/nanomsg/FairMQPollerNN.h | 58 -- fairmq/nanomsg/FairMQSocketNN.cxx | 628 ------------------ fairmq/nanomsg/FairMQSocketNN.h | 81 --- fairmq/nanomsg/FairMQTransportFactoryNN.cxx | 100 --- fairmq/nanomsg/FairMQTransportFactoryNN.h | 59 -- fairmq/nanomsg/FairMQUnmanagedRegionNN.cxx | 52 -- fairmq/nanomsg/FairMQUnmanagedRegionNN.h | 41 -- fairmq/plugins/config/Config.cxx | 2 +- fairmq/run/startMQBenchmark.sh.in | 2 +- test/CMakeLists.txt | 4 - test/message/_message.cxx | 14 - test/poller/_poller.cxx | 14 - test/protocols/_pair.cxx | 7 - test/protocols/_pub_sub.cxx | 7 - test/protocols/_push_pull.cxx | 7 - test/protocols/_push_pull_multipart.cxx | 28 - test/protocols/_req_rep.cxx | 7 - test/protocols/config.json.in | 217 ------ test/transport/_options.cxx | 25 +- test/transport/_transfer_timeout.cxx | 7 - 46 files changed, 36 insertions(+), 2009 deletions(-) delete mode 100644 fairmq/nanomsg/FairMQMessageNN.cxx delete mode 100644 fairmq/nanomsg/FairMQMessageNN.h delete mode 100644 fairmq/nanomsg/FairMQPollerNN.cxx delete mode 100644 fairmq/nanomsg/FairMQPollerNN.h delete mode 100644 fairmq/nanomsg/FairMQSocketNN.cxx delete mode 100644 fairmq/nanomsg/FairMQSocketNN.h delete mode 100644 fairmq/nanomsg/FairMQTransportFactoryNN.cxx delete mode 100644 fairmq/nanomsg/FairMQTransportFactoryNN.h delete mode 100644 fairmq/nanomsg/FairMQUnmanagedRegionNN.cxx delete mode 100644 fairmq/nanomsg/FairMQUnmanagedRegionNN.h diff --git a/CMakeLists.txt b/CMakeLists.txt index f141b694..e1279fd4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -35,8 +35,6 @@ fairmq_build_option(BUILD_FAIRMQ "Build FairMQ library and devices." DEFAULT ON) fairmq_build_option(BUILD_TESTING "Build tests." DEFAULT OFF REQUIRES "BUILD_FAIRMQ") -fairmq_build_option(BUILD_NANOMSG_TRANSPORT "Build nanomsg transport." - DEFAULT OFF REQUIRES "BUILD_FAIRMQ") fairmq_build_option(BUILD_OFI_TRANSPORT "Build experimental OFI transport." DEFAULT OFF REQUIRES "BUILD_FAIRMQ") fairmq_build_option(BUILD_SDK_COMMANDS "Build the FairMQ SDK commands." @@ -65,11 +63,6 @@ set(CMAKE_THREAD_PREFER_PTHREAD TRUE) set(THREADS_PREFER_PTHREAD_FLAG TRUE) find_package(Threads REQUIRED) -if(BUILD_NANOMSG_TRANSPORT) - find_package2(PRIVATE nanomsg REQUIRED) - set(PROJECT_nanomsg_VERSION 1.1.3) # Once upstream releases 1.1.5, we should bump again and use version check -endif() - if(BUILD_OFI_TRANSPORT) find_package2(PRIVATE asiofi REQUIRED VERSION 0.3.1 @@ -79,12 +72,6 @@ if(BUILD_OFI_TRANSPORT) ) endif() -if(BUILD_NANOMSG_TRANSPORT) - find_package2(PRIVATE msgpack REQUIRED - VERSION 3.1.0 - ) -endif() - if(BUILD_SDK_COMMANDS) find_package2(PRIVATE Flatbuffers REQUIRED) endif() @@ -215,9 +202,6 @@ endif() if(BUILD_PMIX_PLUGIN) list(APPEND PROJECT_PACKAGE_COMPONENTS pmix_plugin) endif() -if(BUILD_NANOMSG_TRANSPORT) - list(APPEND PROJECT_PACKAGE_COMPONENTS nanomsg_transport) -endif() if(BUILD_OFI_TRANSPORT) list(APPEND PROJECT_PACKAGE_COMPONENTS ofi_transport) endif() @@ -320,16 +304,10 @@ if(PROJECT_PACKAGE_DEPENDENCIES) endif() elseif(${dep} STREQUAL GTest) get_filename_component(prefix ${GTEST_INCLUDE_DIRS}/.. ABSOLUTE) - elseif(${dep} STREQUAL msgpack) - get_target_property(msgpack_include msgpackc-cxx INTERFACE_INCLUDE_DIRECTORIES) - get_filename_component(prefix ${msgpack_include}/.. ABSOLUTE) elseif(${dep} STREQUAL asiofi) set(prefix ${asiofi_ROOT}) elseif(${dep} STREQUAL OFI) get_filename_component(prefix ${${dep}_INCLUDE_DIRS}/.. ABSOLUTE) - elseif(${dep} STREQUAL nanomsg) - get_target_property(nn_include nanomsg INTERFACE_INCLUDE_DIRECTORIES) - get_filename_component(prefix ${nn_include}/.. ABSOLUTE) elseif(${dep} STREQUAL DDS) set(prefix "${DDS_INSTALL_PREFIX}") elseif(${dep} STREQUAL Boost) @@ -378,12 +356,6 @@ else() set(tests_summary "${BRed} NO${CR} (enable with ${BMagenta}-DBUILD_TESTING=ON${CR})") endif() message(STATUS " ${BWhite}tests${CR} ${tests_summary}") -if(BUILD_NANOMSG_TRANSPORT) - set(nn_summary "${BGreen}YES${CR} (disable with ${BMagenta}-DBUILD_NANOMSG_TRANSPORT=OFF${CR})") -else() - set(nn_summary "${BRed} NO${CR} (default, enable with ${BMagenta}-DBUILD_NANOMSG_TRANSPORT=ON${CR})") -endif() -message(STATUS " ${BWhite}nanomsg_transport${CR} ${nn_summary}") if(BUILD_OFI_TRANSPORT) set(ofi_summary "${BGreen}YES${CR} EXPERIMENTAL (requires C++14) (disable with ${BMagenta}-DBUILD_OFI_TRANSPORT=OFF${CR})") else() diff --git a/FairMQTest.cmake b/FairMQTest.cmake index 7e64d4d8..7e822ec5 100644 --- a/FairMQTest.cmake +++ b/FairMQTest.cmake @@ -26,7 +26,6 @@ Set(configure_options "${configure_options};-DCTEST_USE_LAUNCHERS=${CTEST_USE_LA Set(configure_options "${configure_options};-DDISABLE_COLOR=ON") Set(configure_options "${configure_options};-DCMAKE_PREFIX_PATH=$ENV{SIMPATH}") -Set(configure_options "${configure_options};-DBUILD_NANOMSG_TRANSPORT=ON") # Set(configure_options "${configure_options};-DBUILD_OFI_TRANSPORT=ON") Set(configure_options "${configure_options};-DBUILD_DDS_PLUGIN=ON") Set(configure_options "${configure_options};-DBUILD_SDK=ON") diff --git a/README.md b/README.md index 6b23366c..18ee0a94 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,7 @@ FairMQ is designed to help implementing large-scale data processing workflows ne The core of FairMQ provides an abstract asynchronous message passing API with scalability protocols inspired by [ZeroMQ](https://github.com/zeromq/libzmq) (e.g. PUSH/PULL, PUB/SUB). FairMQ provides multiple implementations for its API (so-called "transports", -e.g. `zeromq`, `shmem`, `nanomsg`, and `ofi` (in development)) to cover a variety of use cases +e.g. `zeromq`, `shmem` and `ofi` (in development)) to cover a variety of use cases (e.g. inter-thread, inter-process, inter-node communication) and machines (e.g. Ethernet, Infiniband). In addition to this core functionality FairMQ provides a framework for creating "devices" - actors which are communicating through message passing. FairMQ does not only allow the user to use different transport but also to mix them; i.e: A Device can communicate using different transport on different channels at the same time. Device execution is modelled as a simple state machine that @@ -47,7 +47,7 @@ cmake --build fairmq_build --target install Please consult the [manpages of your CMake version](https://cmake.org/cmake/help/latest/manual/cmake.1.html) for more options. -If dependencies are not installed in standard system directories, you can hint the installation location via `-DCMAKE_PREFIX_PATH=...` or per dependency via `-D{DEPENDENCY}_ROOT=...`. `{DEPENDENCY}` can be `GTEST`, `BOOST`, `FAIRLOGGER`, `ZEROMQ`, `MSGPACK`, `NANOMSG`, `OFI`, `PMIX`, `ASIO`, `ASIOFI` or `DDS` (`*_ROOT` variables can also be environment variables). +If dependencies are not installed in standard system directories, you can hint the installation location via `-DCMAKE_PREFIX_PATH=...` or per dependency via `-D{DEPENDENCY}_ROOT=...`. `{DEPENDENCY}` can be `GTEST`, `BOOST`, `FAIRLOGGER`, `ZEROMQ`, `OFI`, `PMIX`, `ASIO`, `ASIOFI` or `DDS` (`*_ROOT` variables can also be environment variables). ## Usage @@ -86,7 +86,7 @@ If your project shares a dependency with FairMQ or if you want to omit a certain Optionally, you can require certain FairMQ package components and a minimum version: ```cmake -find_package(FairMQ 1.1.0 COMPONENTS nanomsg_transport dds_plugin) +find_package(FairMQ 1.1.0 COMPONENTS dds_plugin) ``` When building FairMQ, CMake will print a summary table of all available package components. @@ -101,8 +101,6 @@ When building FairMQ, CMake will print a summary table of all available package * [Doxygen](http://www.doxygen.org/) * [FairLogger](https://github.com/FairRootGroup/FairLogger) * [GTest](https://github.com/google/googletest) (optionally bundled) - * [Msgpack](https://msgpack.org/index.html) - * [nanomsg](http://nanomsg.org/) * [PMIx](https://pmix.org/) * [ZeroMQ](http://zeromq.org/) @@ -117,7 +115,6 @@ On command line: * `-DDISABLE_COLOR=ON` disables coloured console output. * `-DBUILD_TESTING=OFF` disables building of tests. * `-DBUILD_EXAMPLES=OFF` disables building of examples. - * `-DBUILD_NANOMSG_TRANSPORT=ON` enables building of nanomsg transport. * `-DBUILD_OFI_TRANSPORT=ON` enables building of the experimental OFI transport. * `-DBUILD_DDS_PLUGIN=ON` enables building of the DDS plugin. * `-DBUILD_PMIX_PLUGIN=ON` enables building of the PMIx plugin. @@ -182,4 +179,3 @@ After the `find_package(FairMQ)` call the following CMake variables are defined: 1. [DDS](docs/Plugins.md#731-dds) 2. [PMIx](docs/Plugins.md#732-pmix) 8. [Controller SDK](docs/SDK.md) - diff --git a/docs/Device.md b/docs/Device.md index ff8a0d78..e0ce1c9c 100644 --- a/docs/Device.md +++ b/docs/Device.md @@ -18,7 +18,7 @@ Topology configuration is currently happening via setup scripts. This is very ru ## 1.2 Communication Patterns -FairMQ devices communicate via the communication patterns offered by ZeroMQ (or nanomsg): PUSH-PULL, PUB-SUB, REQ-REP, PAIR, [more info here](http://api.zeromq.org/4-0:zmq-socket). Each transport may provide further patterns. +FairMQ devices communicate via the communication patterns offered by ZeroMQ: PUSH-PULL, PUB-SUB, REQ-REP, PAIR, [more info here](http://api.zeromq.org/4-0:zmq-socket). Each transport may provide further patterns. ## 1.3 State Machine diff --git a/docs/Transport.md b/docs/Transport.md index 128fd4bf..29c28db1 100644 --- a/docs/Transport.md +++ b/docs/Transport.md @@ -2,7 +2,7 @@ # 2. Transport Interface -The communication layer is available through the transport interface. Three interface implementations are currently available. Main implementation uses the [ZeroMQ](http://zeromq.org) library. Alternative implementation relies on the [nanomsg](http://nanomsg.org) library. Third transport implementation is using shared memory via boost::interprocess & ZeroMQ combination. +The communication layer is available through the transport interface. Three interface implementations are currently available. Main implementation uses the [ZeroMQ](http://zeromq.org) library. Second transport implementation is using shared memory via boost::interprocess & ZeroMQ combination. Here is an overview to give an idea how the interface is implemented: @@ -10,20 +10,20 @@ Here is an overview to give an idea how the interface is implemented: Currently, the transports have been tested to work with these communication patterns: -| | zeromq | nanomsg | shmem | -| ------------- |--------| ------- | ----- | -| PAIR | yes | yes | yes | -| PUSH/PULL | yes | yes | yes | -| PUB/SUB | yes | yes | no | -| REQ/REP | yes | yes | yes | +| | zeromq | shmem | +| ------------- |--------| ----- | +| PAIR | yes | yes | +| PUSH/PULL | yes | yes | +| PUB/SUB | yes | no | +| REQ/REP | yes | yes | The next table shows the supported address types for each transport implementation: -| | zeromq | nanomsg | shmem | comment | -| ----------- | ------ | ------- | ----- | --------------------------------------------- | -| `inproc://` | yes | yes | yes | in process: useful for unit testing | -| `ipc://` | yes | yes | yes | inter process comm: useful on single machine | -| `tcp://` | yes | yes | yes | useful for any communication, local or remote | +| | zeromq | shmem | comment | +| ----------- | ------ | ----- | --------------------------------------------- | +| `inproc://` | yes | yes | in process: useful for unit testing | +| `ipc://` | yes | yes | inter process comm: useful on single machine | +| `tcp://` | yes | yes | useful for any communication, local or remote | ## 2.1 Message diff --git a/examples/1-1/CMakeLists.txt b/examples/1-1/CMakeLists.txt index 1365f284..882c8815 100644 --- a/examples/1-1/CMakeLists.txt +++ b/examples/1-1/CMakeLists.txt @@ -34,11 +34,6 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test-ex-1-1.sh.in ${CMAKE_CURRENT_BIN add_test(NAME Example.1-1.zeromq COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-1-1.sh zeromq) set_tests_properties(Example.1-1.zeromq PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received: ") -if(BUILD_NANOMSG_TRANSPORT) - add_test(NAME Example.1-1.nanomsg COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-1-1.sh nanomsg) - set_tests_properties(Example.1-1.nanomsg PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received: ") -endif() - add_test(NAME Example.1-1.shmem COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-1-1.sh shmem) set_tests_properties(Example.1-1.shmem PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received: ") diff --git a/examples/1-n-1/CMakeLists.txt b/examples/1-n-1/CMakeLists.txt index 138e28da..032f5a1a 100644 --- a/examples/1-n-1/CMakeLists.txt +++ b/examples/1-n-1/CMakeLists.txt @@ -42,11 +42,6 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test-ex-1-n-1.sh.in ${CMAKE_CURRENT_B add_test(NAME Example.1-n-1.zeromq COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-1-n-1.sh zeromq) set_tests_properties(Example.1-n-1.zeromq PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received: ") -if(BUILD_NANOMSG_TRANSPORT) - add_test(NAME Example.1-n-1.nanomsg COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-1-n-1.sh nanomsg) - set_tests_properties(Example.1-n-1.nanomsg PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received: ") -endif() - add_test(NAME Example.1-n-1.shmem COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-1-n-1.sh shmem) set_tests_properties(Example.1-n-1.shmem PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received: ") diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 3856dab9..f9096cbd 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -13,9 +13,7 @@ add_subdirectory(copypush) add_subdirectory(dds) add_subdirectory(multipart) add_subdirectory(multiple-channels) -if(BUILD_NANOMSG_TRANSPORT) - add_subdirectory(multiple-transports) -endif() +add_subdirectory(multiple-transports) add_subdirectory(n-m) add_subdirectory(qc) add_subdirectory(readout) diff --git a/examples/README.md b/examples/README.md index a4855954..e0c52f62 100644 --- a/examples/README.md +++ b/examples/README.md @@ -32,7 +32,7 @@ This example demonstrates how to work with multiple channels and multiplex betwe ## Multiple Transports -This examples shows how to combine different channel transports (zeromq/nanomsg/shmem) inside of one device and/or topology. +This examples shows how to combine different channel transports (zeromq/shmem) inside of one device and/or topology. ## n-m diff --git a/examples/builtin-devices/CMakeLists.txt b/examples/builtin-devices/CMakeLists.txt index e7ef75ee..679c7a8d 100644 --- a/examples/builtin-devices/CMakeLists.txt +++ b/examples/builtin-devices/CMakeLists.txt @@ -17,22 +17,12 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test-ex-builtin-devices.sh.in ${CMAKE add_test(NAME Example.BuiltinDevices.zeromq COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-builtin-devices.sh zeromq) set_tests_properties(Example.BuiltinDevices.zeromq PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Configured maximum number of iterations reached") -if(BUILD_NANOMSG_TRANSPORT) - add_test(NAME Example.BuiltinDevices.nanomsg COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-builtin-devices.sh nanomsg) - set_tests_properties(Example.BuiltinDevices.nanomsg PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Configured maximum number of iterations reached") -endif() - add_test(NAME Example.BuiltinDevices.shmem COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-builtin-devices.sh shmem) set_tests_properties(Example.BuiltinDevices.shmem PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Configured maximum number of iterations reached") add_test(NAME Example.BuiltinDevices.multipart.zeromq COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-builtin-devices.sh zeromq true 2) set_tests_properties(Example.BuiltinDevices.multipart.zeromq PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Configured maximum number of iterations reached") -if(BUILD_NANOMSG_TRANSPORT) - add_test(NAME Example.BuiltinDevices.multipart.nanomsg COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-builtin-devices.sh nanomsg true 2) - set_tests_properties(Example.BuiltinDevices.multipart.nanomsg PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Configured maximum number of iterations reached") -endif() - add_test(NAME Example.BuiltinDevices.multipart.shmem COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-builtin-devices.sh shmem true 2) set_tests_properties(Example.BuiltinDevices.multipart.shmem PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Configured maximum number of iterations reached") diff --git a/examples/copypush/CMakeLists.txt b/examples/copypush/CMakeLists.txt index 279d2def..b27c9619 100644 --- a/examples/copypush/CMakeLists.txt +++ b/examples/copypush/CMakeLists.txt @@ -35,11 +35,6 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test-ex-copypush.sh.in ${CMAKE_CURREN add_test(NAME Example.CopyPush.zeromq COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-copypush.sh zeromq) set_tests_properties(Example.CopyPush.zeromq PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message: ") -if(BUILD_NANOMSG_TRANSPORT) - add_test(NAME Example.CopyPush.nanomsg COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-copypush.sh nanomsg) - set_tests_properties(Example.CopyPush.nanomsg PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message: ") -endif() - add_test(NAME Example.CopyPush.shmem COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-copypush.sh shmem) set_tests_properties(Example.CopyPush.shmem PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message: ") diff --git a/examples/multipart/CMakeLists.txt b/examples/multipart/CMakeLists.txt index b74b6e66..500c7edd 100644 --- a/examples/multipart/CMakeLists.txt +++ b/examples/multipart/CMakeLists.txt @@ -34,11 +34,6 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test-ex-multipart.sh.in ${CMAKE_CURRE add_test(NAME Example.Multipart.zeromq COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh zeromq) set_tests_properties(Example.Multipart.zeromq PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 7 parts") -if(BUILD_NANOMSG_TRANSPORT) - add_test(NAME Example.Multipart.nanomsg COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh nanomsg) - set_tests_properties(Example.Multipart.nanomsg PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 7 parts") -endif() - add_test(NAME Example.Multipart.shmem COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh shmem) set_tests_properties(Example.Multipart.shmem PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 7 parts") diff --git a/examples/multiple-channels/CMakeLists.txt b/examples/multiple-channels/CMakeLists.txt index a0664ce4..9a0a02d4 100644 --- a/examples/multiple-channels/CMakeLists.txt +++ b/examples/multiple-channels/CMakeLists.txt @@ -39,11 +39,6 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test-ex-multiple-channels.sh.in ${CMA add_test(NAME Example.MultipleChannels.zeromq COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multiple-channels.sh zeromq) set_tests_properties(Example.MultipleChannels.zeromq PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received messages from both sources.") -if(BUILD_NANOMSG_TRANSPORT) - add_test(NAME Example.MultipleChannels.nanomsg COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multiple-channels.sh nanomsg) - set_tests_properties(Example.MultipleChannels.nanomsg PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received messages from both sources.") -endif() - # install install( diff --git a/examples/multiple-transports/README.md b/examples/multiple-transports/README.md index 490a7ba9..b47405a9 100644 --- a/examples/multiple-transports/README.md +++ b/examples/multiple-transports/README.md @@ -1,12 +1,12 @@ Multiple Transports example =========================== -This example demonstrates use of multiple transports (zeromq/nanomsg/shmem) within the same topology and/or device. It is a simple topology consisting of two samplers and a sink. The devices are connected via 3 channels: +This example demonstrates use of multiple transports (zeromq/shmem) within the same topology and/or device. It is a simple topology consisting of two samplers and a sink. The devices are connected via 3 channels: ![Multiple Transports example](../../docs/images/example_multiple_transports.png?raw=true "Multiple Transports example") Each device has main transport that it uses. By default it is ZeroMQ, and can be overriden via the `--transport` cmd option. The device will initialize additional transports if any of the channels have them configured (e.g. as an option to `--channel-config`). -In this example sampler1 and sink are started with `--transport shmem`, making shared memory their main transport, sampler2 with `--transport nanomsg`. Additionally, the ack channel is configured to use zeromq as its transport. +In this example sampler1 and sink are started with `--transport shmem`, making shared memory their main transport, sampler2 with `--transport zeromq`. Additionally, the ack channel is configured to use zeromq as its transport. The main two things that a transport does is transfer of data and allocation of memory for the messages. By default, new messages are created via the main device transport. If a message has been created with one transport and is to be transferred with another, it has to be copied into a new message of the target transport. This happens automatically behind the scenes. To avoid this copy the device can create messages via `NewMessageFor(const string& channelName, int subChannelIndex, ...)` method, that creates the messages via the transport of the given channel (check sampler1 and sink for an example) or as the channel directly to create a message. diff --git a/examples/multiple-transports/fairmq-start-ex-multiple-transports.sh.in b/examples/multiple-transports/fairmq-start-ex-multiple-transports.sh.in index d79a110b..f24648e0 100755 --- a/examples/multiple-transports/fairmq-start-ex-multiple-transports.sh.in +++ b/examples/multiple-transports/fairmq-start-ex-multiple-transports.sh.in @@ -13,7 +13,7 @@ xterm -geometry 80x30+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER1 & SAMPLER2="fairmq-ex-multiple-transports-sampler2" SAMPLER2+=" --id sampler2" SAMPLER2+=" --severity debug" -SAMPLER2+=" --transport nanomsg" +SAMPLER2+=" --transport shmem" SAMPLER2+=" --channel-config name=data2,type=push,method=bind,address=tcp://127.0.0.1:5556" xterm -geometry 80x30+0+450 -hold -e @EX_BIN_DIR@/$SAMPLER2 & @@ -22,6 +22,6 @@ SINK+=" --id sink1" SINK+=" --severity debug" SINK+=" --transport shmem" SINK+=" --channel-config name=data1,type=pull,method=connect,address=tcp://127.0.0.1:5555" -SINK+=" name=data2,type=pull,method=connect,address=tcp://127.0.0.1:5556,transport=nanomsg" +SINK+=" name=data2,type=pull,method=connect,address=tcp://127.0.0.1:5556,transport=shmem" SINK+=" name=ack,type=pub,method=connect,address=tcp://127.0.0.1:5557,transport=zeromq" xterm -geometry 80x30+500+225 -hold -e @EX_BIN_DIR@/$SINK & diff --git a/examples/multiple-transports/test-ex-multiple-transports.sh.in b/examples/multiple-transports/test-ex-multiple-transports.sh.in index 8660cf7e..5a62b0ed 100755 --- a/examples/multiple-transports/test-ex-multiple-transports.sh.in +++ b/examples/multiple-transports/test-ex-multiple-transports.sh.in @@ -14,7 +14,7 @@ SINK+=" --max-iterations 1" SINK+=" --control static --color false" SINK+=" --transport shmem" SINK+=" --channel-config name=data1,type=pull,method=connect,address=tcp://127.0.0.1:5555" -SINK+=" name=data2,type=pull,method=connect,address=tcp://127.0.0.1:5556,transport=nanomsg" +SINK+=" name=data2,type=pull,method=connect,address=tcp://127.0.0.1:5556,transport=zeromq" SINK+=" name=ack,type=pub,method=connect,address=tcp://127.0.0.1:5557,transport=zeromq" @CMAKE_CURRENT_BINARY_DIR@/$SINK & SINK_PID=$! @@ -37,7 +37,7 @@ SAMPLER2+=" --session $SESSION" SAMPLER2+=" --verbosity veryhigh" SAMPLER2+=" --max-iterations 1" SAMPLER2+=" --control static --color false" -SAMPLER2+=" --transport nanomsg" +SAMPLER2+=" --transport zeromq" SAMPLER2+=" --channel-config name=data2,type=push,method=bind,address=tcp://127.0.0.1:5556" @CMAKE_CURRENT_BINARY_DIR@/$SAMPLER2 & SAMPLER2_PID=$! diff --git a/examples/region/CMakeLists.txt b/examples/region/CMakeLists.txt index 8876b5d4..b233fc3b 100644 --- a/examples/region/CMakeLists.txt +++ b/examples/region/CMakeLists.txt @@ -34,11 +34,6 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test-ex-region.sh.in ${CMAKE_CURRENT_ add_test(NAME Example.Region.zeromq COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-region.sh zeromq) set_tests_properties(Example.Region.zeromq PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received ack") -if(BUILD_NANOMSG_TRANSPORT) - add_test(NAME Example.Region.nanomsg COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-region.sh nanomsg) - set_tests_properties(Example.Region.nanomsg PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received ack") -endif() - add_test(NAME Example.Region.shmem COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-region.sh shmem) set_tests_properties(Example.Region.shmem PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received ack") diff --git a/examples/req-rep/CMakeLists.txt b/examples/req-rep/CMakeLists.txt index 733db7ca..df7080d8 100644 --- a/examples/req-rep/CMakeLists.txt +++ b/examples/req-rep/CMakeLists.txt @@ -35,11 +35,6 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test-ex-req-rep.sh.in ${CMAKE_CURRENT add_test(NAME Example.ReqRep.zeromq COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-req-rep.sh zeromq) set_tests_properties(Example.ReqRep.zeromq PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received reply from server: ") -if(BUILD_NANOMSG_TRANSPORT) - add_test(NAME Example.ReqRep.nanomsg COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-req-rep.sh nanomsg) - set_tests_properties(Example.ReqRep.nanomsg PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received reply from server: ") -endif() - add_test(NAME Example.ReqRep.shmem COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-req-rep.sh shmem) set_tests_properties(Example.ReqRep.shmem PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received reply from server: ") diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index e3136be5..d61825c5 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -197,16 +197,6 @@ if(BUILD_FAIRMQ) zeromq/FairMQTransportFactoryZMQ.h ) - if(BUILD_NANOMSG_TRANSPORT) - set(FAIRMQ_PRIVATE_HEADER_FILES ${FAIRMQ_PRIVATE_HEADER_FILES} - nanomsg/FairMQMessageNN.h - nanomsg/FairMQPollerNN.h - nanomsg/FairMQUnmanagedRegionNN.h - nanomsg/FairMQSocketNN.h - nanomsg/FairMQTransportFactoryNN.h - ) - endif() - if(BUILD_OFI_TRANSPORT) set(FAIRMQ_PRIVATE_HEADER_FILES ${FAIRMQ_PRIVATE_HEADER_FILES} ofi/Context.h @@ -257,16 +247,6 @@ if(BUILD_FAIRMQ) MemoryResources.cxx ) - if(BUILD_NANOMSG_TRANSPORT) - set(FAIRMQ_SOURCE_FILES ${FAIRMQ_SOURCE_FILES} - nanomsg/FairMQMessageNN.cxx - nanomsg/FairMQPollerNN.cxx - nanomsg/FairMQUnmanagedRegionNN.cxx - nanomsg/FairMQSocketNN.cxx - nanomsg/FairMQTransportFactoryNN.cxx - ) - endif() - if(BUILD_OFI_TRANSPORT) set(FAIRMQ_SOURCE_FILES ${FAIRMQ_SOURCE_FILES} ofi/Context.cxx @@ -307,9 +287,6 @@ if(BUILD_FAIRMQ) # preprocessor definitions # ############################ target_compile_definitions(${_target} PUBLIC BOOST_ERROR_CODE_HEADER_ONLY) - if(BUILD_NANOMSG_TRANSPORT) - target_compile_definitions(${_target} PRIVATE BUILD_NANOMSG_TRANSPORT) - endif() if(BUILD_OFI_TRANSPORT) target_compile_definitions(${_target} PRIVATE BUILD_OFI_TRANSPORT) endif() @@ -330,16 +307,13 @@ if(BUILD_FAIRMQ) ################## # link libraries # ################## - if(BUILD_NANOMSG_TRANSPORT) - set(NANOMSG_DEPS nanomsg msgpackc-cxx) - endif() if(BUILD_OFI_TRANSPORT) set(OFI_DEPS asiofi::asiofi Boost::container ) endif() - set(optional_deps ${NANOMSG_DEPS} ${OFI_DEPS}) + set(optional_deps ${OFI_DEPS}) if(optional_deps) list(REMOVE_DUPLICATES optional_deps) endif() @@ -362,7 +336,6 @@ if(BUILD_FAIRMQ) PRIVATE # only libFairMQ links against private dependencies libzmq - ${NANOMSG_DEPS} ${OFI_DEPS} ) set_target_properties(${_target} PROPERTIES diff --git a/fairmq/FairMQChannel.h b/fairmq/FairMQChannel.h index 8054d8b5..6753699d 100644 --- a/fairmq/FairMQChannel.h +++ b/fairmq/FairMQChannel.h @@ -127,8 +127,8 @@ class FairMQChannel /// @return Returns socket address (e.g. "tcp://127.0.0.1:5555" or "ipc://abc") std::string GetAddress() const; - /// Get channel transport name ("default", "zeromq", "nanomsg" or "shmem") - /// @return Returns channel transport name (e.g. "default", "zeromq", "nanomsg" or "shmem") + /// Get channel transport name ("default", "zeromq" or "shmem") + /// @return Returns channel transport name (e.g. "default", "zeromq" or "shmem") std::string GetTransportName() const; /// Get channel transport type @@ -184,7 +184,7 @@ class FairMQChannel void UpdateAddress(const std::string& address); /// Set channel transport - /// @param transport transport string ("default", "zeromq", "nanomsg" or "shmem") + /// @param transport transport string ("default", "zeromq" or "shmem") void UpdateTransport(const std::string& transport); /// Set socket send buffer size diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index d475d74d..a7d8ffc1 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -302,7 +302,7 @@ class FairMQDevice } /// Adds a transport to the device if it doesn't exist - /// @param transport Transport string ("zeromq"/"nanomsg"/"shmem") + /// @param transport Transport string ("zeromq"/"shmem") std::shared_ptr AddTransport(const fair::mq::Transport transport); /// Assigns config to the device @@ -417,7 +417,7 @@ class FairMQDevice int GetInitTimeoutInS() const { return fConfig->GetProperty("init-timeout", DefaultInitTimeout); } /// Sets the default transport for the device - /// @param transport Transport string ("zeromq"/"nanomsg"/"shmem") + /// @param transport Transport string ("zeromq"/"shmem") void SetTransport(const std::string& transport) { fConfig->SetProperty("transport", transport); } /// Gets the default transport name std::string GetTransportName() const { return fConfig->GetProperty("transport", DefaultTransportName); } diff --git a/fairmq/FairMQTransportFactory.cxx b/fairmq/FairMQTransportFactory.cxx index 4b53a9aa..29ea87ef 100644 --- a/fairmq/FairMQTransportFactory.cxx +++ b/fairmq/FairMQTransportFactory.cxx @@ -9,9 +9,6 @@ #include #include #include -#ifdef BUILD_NANOMSG_TRANSPORT -#include -#endif /* BUILD_NANOMSG_TRANSPORT */ #ifdef BUILD_OFI_TRANSPORT #include #endif @@ -43,11 +40,6 @@ auto FairMQTransportFactory::CreateTransportFactory(const string& type, } else if (type == "shmem") { return make_shared(finalId, config); } -#ifdef BUILD_NANOMSG_TRANSPORT - else if (type == "nanomsg") { - return make_shared(finalId, config); - } -#endif /* BUILD_NANOMSG_TRANSPORT */ #ifdef BUILD_OFI_TRANSPORT else if (type == "ofi") { return make_shared(finalId, config); @@ -57,11 +49,8 @@ auto FairMQTransportFactory::CreateTransportFactory(const string& type, LOG(error) << "Unavailable transport requested: " << "\"" << type << "\"" << ". Available are: " - << "\"zeromq\"" + << "\"zeromq\"," << "\"shmem\"" -#ifdef BUILD_NANOMSG_TRANSPORT - << ", \"nanomsg\"" -#endif /* BUILD_NANOMSG_TRANSPORT */ #ifdef BUILD_OFI_TRANSPORT << ", and \"ofi\"" #endif /* BUILD_OFI_TRANSPORT */ diff --git a/fairmq/Transports.h b/fairmq/Transports.h index 6fe1169e..a0b74d42 100644 --- a/fairmq/Transports.h +++ b/fairmq/Transports.h @@ -24,7 +24,6 @@ enum class Transport { DEFAULT, ZMQ, - NN, SHM, OFI }; @@ -48,7 +47,6 @@ namespace mq static std::unordered_map TransportTypes { { "default", Transport::DEFAULT }, { "zeromq", Transport::ZMQ }, - { "nanomsg", Transport::NN }, { "shmem", Transport::SHM }, { "ofi", Transport::OFI } }; @@ -56,7 +54,6 @@ static std::unordered_map TransportTypes { static std::unordered_map TransportNames { { Transport::DEFAULT, "default" }, { Transport::ZMQ, "zeromq" }, - { Transport::NN, "nanomsg" }, { Transport::SHM, "shmem" }, { Transport::OFI, "ofi" } }; diff --git a/fairmq/nanomsg/FairMQMessageNN.cxx b/fairmq/nanomsg/FairMQMessageNN.cxx deleted file mode 100644 index c5b6a737..00000000 --- a/fairmq/nanomsg/FairMQMessageNN.cxx +++ /dev/null @@ -1,227 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ -/** - * FairMQMessageNN.cxx - * - * @since 2013-12-05 - * @author A. Rybalchenko - */ - -#include -#include - -#include - -#include "FairMQMessageNN.h" -#include "FairMQLogger.h" - -using namespace std; - -fair::mq::Transport FairMQMessageNN::fTransportType = fair::mq::Transport::NN; - -FairMQMessageNN::FairMQMessageNN(FairMQTransportFactory* factory) - : FairMQMessage{factory} - , fMessage(nullptr) - , fSize(0) - , fHint(0) - , fReceiving(false) - , fRegionPtr(nullptr) -{ - fMessage = nn_allocmsg(0, 0); - if (!fMessage) - { - LOG(error) << "failed allocating message, reason: " << nn_strerror(errno); - } -} - -FairMQMessageNN::FairMQMessageNN(const size_t size, FairMQTransportFactory* factory) - : FairMQMessage{factory} - , fMessage(nullptr) - , fSize(0) - , fHint(0) - , fReceiving(false) - , fRegionPtr(nullptr) -{ - fMessage = nn_allocmsg(size, 0); - if (!fMessage) - { - LOG(error) << "failed allocating message, reason: " << nn_strerror(errno); - } - fSize = size; -} - - -/* nanomsg does not offer support for creating a message out of an existing buffer, - * therefore the following method is using memcpy. For more efficient handling, - * create FairMQMessage object only with size parameter and fill it with data. - * possible TODO: make this zero copy (will should then be as efficient as ZeroMQ). -*/ -FairMQMessageNN::FairMQMessageNN(void* data, const size_t size, fairmq_free_fn* ffn, void* hint, FairMQTransportFactory* factory) - : FairMQMessage{factory} - , fMessage(nullptr) - , fSize(0) - , fHint(0) - , fReceiving(false) - , fRegionPtr(nullptr) -{ - fMessage = nn_allocmsg(size, 0); - if (!fMessage) - { - LOG(error) << "failed allocating message, reason: " << nn_strerror(errno); - } - else - { - memcpy(fMessage, data, size); - fSize = size; - if (ffn) - { - ffn(data, hint); - } - else - { - free(data); - } - } -} - -FairMQMessageNN::FairMQMessageNN(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint, FairMQTransportFactory* factory) - : FairMQMessage{factory} - , fMessage(data) - , fSize(size) - , fHint(reinterpret_cast(hint)) - , fReceiving(false) - , fRegionPtr(region.get()) -{ - // currently nanomsg will copy the buffer (data) inside nn_sendmsg() -} - -void FairMQMessageNN::Rebuild() -{ - CloseMessage(); - fReceiving = false; -} - -void FairMQMessageNN::Rebuild(const size_t size) -{ - CloseMessage(); - fMessage = nn_allocmsg(size, 0); - if (!fMessage) - { - LOG(error) << "failed allocating message, reason: " << nn_strerror(errno); - } - fSize = size; - fReceiving = false; -} - -void FairMQMessageNN::Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint) -{ - CloseMessage(); - fMessage = nn_allocmsg(size, 0); - if (!fMessage) - { - LOG(error) << "failed allocating message, reason: " << nn_strerror(errno); - } - else - { - memcpy(fMessage, data, size); - fSize = size; - fReceiving = false; - - if (ffn) - { - ffn(data, hint); - } - } -} - -void* FairMQMessageNN::GetMessage() const -{ - return fMessage; -} - -void* FairMQMessageNN::GetData() const -{ - return fMessage; -} - -size_t FairMQMessageNN::GetSize() const -{ - return fSize; -} - -bool FairMQMessageNN::SetUsedSize(const size_t size) -{ - if (size <= fSize) - { - // with size smaller than original nanomsg will simply "chop" the data, avoiding reallocation - fMessage = nn_reallocmsg(fMessage, size); - fSize = size; - return true; - } - else - { - LOG(error) << "cannot set used size higher than original."; - return false; - } -} - -void FairMQMessageNN::SetMessage(void* data, const size_t size) -{ - fMessage = data; - fSize = size; -} - -fair::mq::Transport FairMQMessageNN::GetType() const -{ - return fTransportType; -} - -void FairMQMessageNN::Copy(const FairMQMessage& msg) -{ - if (fMessage) - { - if (nn_freemsg(fMessage) < 0) - { - LOG(error) << "failed freeing message, reason: " << nn_strerror(errno); - } - } - - size_t size = msg.GetSize(); - - fMessage = nn_allocmsg(size, 0); - if (!fMessage) - { - LOG(error) << "failed allocating message, reason: " << nn_strerror(errno); - } - else - { - memcpy(fMessage, static_cast(msg).GetMessage(), size); - fSize = size; - } -} - -void FairMQMessageNN::CloseMessage() -{ - if (nn_freemsg(fMessage) < 0) - { - LOG(error) << "failed freeing message, reason: " << nn_strerror(errno); - } - else - { - fMessage = nullptr; - fSize = 0; - } -} - -FairMQMessageNN::~FairMQMessageNN() -{ - if (fReceiving) - { - CloseMessage(); - } -} diff --git a/fairmq/nanomsg/FairMQMessageNN.h b/fairmq/nanomsg/FairMQMessageNN.h deleted file mode 100644 index 762dc13f..00000000 --- a/fairmq/nanomsg/FairMQMessageNN.h +++ /dev/null @@ -1,68 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ -/** - * FairMQMessageNN.h - * - * @since 2013-12-05 - * @author A. Rybalchenko - */ - -#ifndef FAIRMQMESSAGENN_H_ -#define FAIRMQMESSAGENN_H_ - -#include -#include -#include - -#include "FairMQMessage.h" -#include "FairMQUnmanagedRegion.h" - -class FairMQSocketNN; - -class FairMQMessageNN final : public FairMQMessage -{ - friend class FairMQSocketNN; - - public: - FairMQMessageNN(FairMQTransportFactory* factory = nullptr); - FairMQMessageNN(const size_t size, FairMQTransportFactory* factory = nullptr); - FairMQMessageNN(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr, FairMQTransportFactory* factory = nullptr); - FairMQMessageNN(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0, FairMQTransportFactory* factory = nullptr); - - FairMQMessageNN(const FairMQMessageNN&) = delete; - FairMQMessageNN operator=(const FairMQMessageNN&) = delete; - - void Rebuild() override; - void Rebuild(const size_t size) override; - void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override; - - void* GetData() const override; - size_t GetSize() const override; - - bool SetUsedSize(const size_t size) override; - - fair::mq::Transport GetType() const override; - - void Copy(const FairMQMessage& msg) override; - - ~FairMQMessageNN() override; - - private: - void* fMessage; - size_t fSize; - size_t fHint; - bool fReceiving; - FairMQUnmanagedRegion* fRegionPtr; - static fair::mq::Transport fTransportType; - - void* GetMessage() const; - void CloseMessage(); - void SetMessage(void* data, const size_t size); -}; - -#endif /* FAIRMQMESSAGENN_H_ */ diff --git a/fairmq/nanomsg/FairMQPollerNN.cxx b/fairmq/nanomsg/FairMQPollerNN.cxx deleted file mode 100644 index eb268957..00000000 --- a/fairmq/nanomsg/FairMQPollerNN.cxx +++ /dev/null @@ -1,207 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ -/** - * FairMQPollerNN.cxx - * - * @since 2014-01-23 - * @author A. Rybalchenko - */ - -#include -#include -#include -#include -#include - -#include "FairMQPollerNN.h" -#include "FairMQSocketNN.h" -#include "FairMQLogger.h" - -using namespace std; - -FairMQPollerNN::FairMQPollerNN(const vector& channels) - : fItems() - , fNumItems(0) - , fOffsetMap() -{ - fNumItems = channels.size(); - fItems = new nn_pollfd[fNumItems]; - - for (int i = 0; i < fNumItems; ++i) - { - fItems[i].fd = static_cast(&(channels.at(i).GetSocket()))->GetSocket(); - - int type = 0; - size_t sz = sizeof(type); - nn_getsockopt(static_cast(&(channels.at(i).GetSocket()))->GetSocket(), NN_SOL_SOCKET, NN_PROTOCOL, &type, &sz); - - SetItemEvents(fItems[i], type); - } -} - -FairMQPollerNN::FairMQPollerNN(const vector& channels) - : fItems() - , fNumItems(0) - , fOffsetMap() -{ - fNumItems = channels.size(); - fItems = new nn_pollfd[fNumItems]; - - for (int i = 0; i < fNumItems; ++i) - { - fItems[i].fd = static_cast(&(channels.at(i)->GetSocket()))->GetSocket(); - - int type = 0; - size_t sz = sizeof(type); - nn_getsockopt(static_cast(&(channels.at(i)->GetSocket()))->GetSocket(), NN_SOL_SOCKET, NN_PROTOCOL, &type, &sz); - - SetItemEvents(fItems[i], type); - } -} - -FairMQPollerNN::FairMQPollerNN(const unordered_map>& channelsMap, const vector& channelList) - : fItems() - , fNumItems(0) - , fOffsetMap() -{ - try - { - int offset = 0; - // calculate offsets and the total size of the poll item set - for (string channel : channelList) - { - fOffsetMap[channel] = offset; - offset += channelsMap.at(channel).size(); - fNumItems += channelsMap.at(channel).size(); - } - - fItems = new nn_pollfd[fNumItems]; - - int index = 0; - for (string channel : channelList) - { - for (unsigned int i = 0; i < channelsMap.at(channel).size(); ++i) - { - index = fOffsetMap[channel] + i; - fItems[index].fd = static_cast(&(channelsMap.at(channel).at(i).GetSocket()))->GetSocket(); - - int type = 0; - size_t sz = sizeof(type); - nn_getsockopt(static_cast(&(channelsMap.at(channel).at(i).GetSocket()))->GetSocket(), NN_SOL_SOCKET, NN_PROTOCOL, &type, &sz); - - SetItemEvents(fItems[index], type); - } - } - } - catch (const std::out_of_range& oor) - { - LOG(error) << "at least one of the provided channel keys for poller initialization is invalid"; - LOG(error) << "out of range error: " << oor.what() << '\n'; - exit(EXIT_FAILURE); - } -} - -void FairMQPollerNN::SetItemEvents(nn_pollfd& item, const int type) -{ - if (type == NN_REQ || type == NN_REP || type == NN_PAIR) - { - item.events = NN_POLLIN|NN_POLLOUT; - } - else if (type == NN_PUSH || type == NN_PUB) - { - item.events = NN_POLLOUT; - } - else if (type == NN_PULL || type == NN_SUB) - { - item.events = NN_POLLIN; - } - else - { - LOG(error) << "invalid poller configuration, exiting."; - exit(EXIT_FAILURE); - } -} - -void FairMQPollerNN::Poll(const int timeout) -{ - if (nn_poll(fItems, fNumItems, timeout) < 0) - { - if (errno == ETERM) - { - LOG(debug) << "polling exited, reason: " << nn_strerror(errno); - } - else - { - LOG(error) << "polling failed, reason: " << nn_strerror(errno); - throw std::runtime_error("polling failed"); - } - } -} - -bool FairMQPollerNN::CheckInput(const int index) -{ - if (fItems[index].revents & (NN_POLLIN | NN_POLLOUT)) - { - return true; - } - - return false; -} - -bool FairMQPollerNN::CheckOutput(const int index) -{ - if (fItems[index].revents & NN_POLLOUT) - { - return true; - } - - return false; -} - -bool FairMQPollerNN::CheckInput(const string& channelKey, const int index) -{ - try - { - if (fItems[fOffsetMap.at(channelKey) + index].revents & (NN_POLLIN | NN_POLLOUT)) - { - return true; - } - - return false; - } - catch (const std::out_of_range& oor) - { - LOG(error) << "invalid channel key: \"" << channelKey << "\""; - LOG(error) << "out of range error: " << oor.what() << '\n'; - exit(EXIT_FAILURE); - } -} - -bool FairMQPollerNN::CheckOutput(const string& channelKey, const int index) -{ - try - { - if (fItems[fOffsetMap.at(channelKey) + index].revents & NN_POLLOUT) - { - return true; - } - - return false; - } - catch (const std::out_of_range& oor) - { - LOG(error) << "invalid channel key: \"" << channelKey << "\""; - LOG(error) << "out of range error: " << oor.what() << '\n'; - exit(EXIT_FAILURE); - } -} - -FairMQPollerNN::~FairMQPollerNN() -{ - delete[] fItems; -} diff --git a/fairmq/nanomsg/FairMQPollerNN.h b/fairmq/nanomsg/FairMQPollerNN.h deleted file mode 100644 index e47d2856..00000000 --- a/fairmq/nanomsg/FairMQPollerNN.h +++ /dev/null @@ -1,58 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ -/** - * FairMQPollerNN.h - * - * @since 2014-01-23 - * @author A. Rybalchenko - */ - -#ifndef FAIRMQPOLLERNN_H_ -#define FAIRMQPOLLERNN_H_ - -#include -#include - -#include "FairMQPoller.h" -#include "FairMQChannel.h" -#include "FairMQTransportFactoryNN.h" - -class FairMQChannel; -struct nn_pollfd; - -class FairMQPollerNN final : public FairMQPoller -{ - friend class FairMQChannel; - friend class FairMQTransportFactoryNN; - - public: - FairMQPollerNN(const std::vector& channels); - FairMQPollerNN(const std::vector& channels); - FairMQPollerNN(const std::unordered_map>& channelsMap, const std::vector& channelList); - - FairMQPollerNN(const FairMQPollerNN&) = delete; - FairMQPollerNN operator=(const FairMQPollerNN&) = delete; - - void SetItemEvents(nn_pollfd& item, const int type); - - void Poll(const int timeout) override; - bool CheckInput(const int index) override; - bool CheckOutput(const int index) override; - bool CheckInput(const std::string& channelKey, const int index) override; - bool CheckOutput(const std::string& channelKey, const int index) override; - - ~FairMQPollerNN() override; - - private: - nn_pollfd* fItems; - int fNumItems; - - std::unordered_map fOffsetMap; -}; - -#endif /* FAIRMQPOLLERNN_H_ */ diff --git a/fairmq/nanomsg/FairMQSocketNN.cxx b/fairmq/nanomsg/FairMQSocketNN.cxx deleted file mode 100644 index 3ba8dca6..00000000 --- a/fairmq/nanomsg/FairMQSocketNN.cxx +++ /dev/null @@ -1,628 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ -/** - * FairMQSocketNN.cxx - * - * @since 2012-12-05 - * @author A. Rybalchenko - */ - -#include "FairMQSocketNN.h" -#include "FairMQMessageNN.h" -#include "FairMQLogger.h" -#include "FairMQUnmanagedRegionNN.h" -#include - -#include -#include -#include -#include -#include - -#include -#include - -using namespace std; -using namespace fair::mq; - -atomic FairMQSocketNN::fInterrupted(false); - -FairMQSocketNN::FairMQSocketNN(const string& type, const string& name, const string& id /*= ""*/, FairMQTransportFactory* fac /*=nullptr*/) - : FairMQSocket{fac} - , fSocket(-1) - , fId(id + "." + name + "." + type) - , fBytesTx(0) - , fBytesRx(0) - , fMessagesTx(0) - , fMessagesRx(0) - , fSndTimeout(100) - , fRcvTimeout(100) - , fLinger(500) -{ - if (type == "router" || type == "dealer") - { - // Additional info about using the sockets ROUTER and DEALER with nanomsg can be found in: - // http://250bpm.com/blog:14 - // http://www.freelists.org/post/nanomsg/a-stupid-load-balancing-question,1 - fSocket = nn_socket(AF_SP_RAW, GetConstant(type)); - if (fSocket == -1) - { - LOG(error) << "failed creating socket " << fId << ", reason: " << nn_strerror(errno); - exit(EXIT_FAILURE); - } - } - else - { - fSocket = nn_socket(AF_SP, GetConstant(type)); - if (fSocket == -1) - { - LOG(error) << "failed creating socket " << fId << ", reason: " << nn_strerror(errno); - exit(EXIT_FAILURE); - } - if (type == "sub") - { - nn_setsockopt(fSocket, NN_SUB, NN_SUB_SUBSCRIBE, nullptr, 0); - } - } - - if (nn_setsockopt(fSocket, NN_SOL_SOCKET, NN_SNDTIMEO, &fSndTimeout, sizeof(fSndTimeout)) != 0) - { - LOG(error) << "Failed setting NN_SNDTIMEO socket option, reason: " << nn_strerror(errno); - } - - if (nn_setsockopt(fSocket, NN_SOL_SOCKET, NN_RCVTIMEO, &fRcvTimeout, sizeof(fRcvTimeout)) != 0) - { - LOG(error) << "Failed setting NN_RCVTIMEO socket option, reason: " << nn_strerror(errno); - } - -#ifdef NN_RCVMAXSIZE - int rcvSize = -1; - if (nn_setsockopt(fSocket, NN_SOL_SOCKET, NN_RCVMAXSIZE, &rcvSize, sizeof(rcvSize)) != 0) - { - LOG(error) << "Failed setting NN_RCVMAXSIZE socket option, reason: " << nn_strerror(errno); - } -#endif - - LOG(debug) << "Created socket " << GetId(); -} - -bool FairMQSocketNN::Bind(const string& address) -{ - // LOG(info) << "bind socket " << fId << " on " << address; - - if (nn_bind(fSocket, address.c_str()) < 0) - { - LOG(error) << "failed binding socket " << fId << ", reason: " << nn_strerror(errno); - return false; - } - - return true; -} - -bool FairMQSocketNN::Connect(const string& address) -{ - // LOG(info) << "connect socket " << fId << " to " << address; - - if (nn_connect(fSocket, address.c_str()) < 0) - { - LOG(error) << "failed connecting socket " << fId << ", reason: " << nn_strerror(errno); - return false; - } - - return true; -} - -int FairMQSocketNN::Send(FairMQMessagePtr& msg, const int timeout) -{ - int flags = 0; - if (timeout == 0) - { - flags = NN_DONTWAIT; - } - int nbytes = -1; - int elapsed = 0; - - FairMQMessageNN* msgPtr = static_cast(msg.get()); - void* bufPtr = msgPtr->GetMessage(); - - while (true) - { - if (msgPtr->fRegionPtr == nullptr) - { - nbytes = nn_send(fSocket, &bufPtr, NN_MSG, flags); - } - else - { - nbytes = nn_send(fSocket, bufPtr, msg->GetSize(), flags); - // nn_send copies the data, safe to call region callback here - static_cast(msgPtr->fRegionPtr)->fCallback(bufPtr, msg->GetSize(), reinterpret_cast(msgPtr->fHint)); - } - - if (nbytes >= 0) - { - fBytesTx += nbytes; - ++fMessagesTx; - static_cast(msg.get())->fReceiving = false; - - return nbytes; - } - else if (nn_errno() == ETIMEDOUT) - { - if (!fInterrupted && ((flags & NN_DONTWAIT) == 0)) - { - if (timeout > 0) - { - elapsed += fSndTimeout; - if (elapsed >= timeout) - { - return -2; - } - } - continue; - } - else - { - return -2; - } - } - else if (nn_errno() == EAGAIN) - { - return -2; - } - else if (nn_errno() == ETERM) - { - LOG(info) << "terminating socket " << fId; - return -1; - } - else - { - LOG(error) << "Failed sending on socket " << fId << ", reason: " << nn_strerror(errno); - return nbytes; - } - } -} - -int FairMQSocketNN::Receive(FairMQMessagePtr& msg, const int timeout) -{ - int flags = 0; - if (timeout == 0) - { - flags = NN_DONTWAIT; - } - int elapsed = 0; - - FairMQMessageNN* msgPtr = static_cast(msg.get()); - - while (true) - { - void* ptr = nullptr; - int nbytes = nn_recv(fSocket, &ptr, NN_MSG, flags); - if (nbytes >= 0) - { - fBytesRx += nbytes; - ++fMessagesRx; - msgPtr->SetMessage(ptr, nbytes); - msgPtr->fReceiving = true; - return nbytes; - } - else if (nn_errno() == ETIMEDOUT) - { - if (!fInterrupted && ((flags & NN_DONTWAIT) == 0)) - { - if (timeout > 0) - { - elapsed += fRcvTimeout; - if (elapsed >= timeout) - { - return -2; - } - } - continue; - } - else - { - return -2; - } - } - else if (nn_errno() == EAGAIN) - { - return -2; - } - else if (nn_errno() == ETERM) - { - LOG(info) << "terminating socket " << fId; - return -1; - } - else - { - LOG(error) << "Failed receiving on socket " << fId << ", reason: " << nn_strerror(errno); - return nbytes; - } - } -} - -int64_t FairMQSocketNN::Send(vector& msgVec, const int timeout) -{ - int flags = 0; - if (timeout == 0) - { - flags = NN_DONTWAIT; - } - const unsigned int vecSize = msgVec.size(); - int elapsed = 0; - - // create msgpack simple buffer - msgpack::sbuffer sbuf; - // create msgpack packer - msgpack::packer packer(&sbuf); - - // pack all parts into a single msgpack simple buffer - for (unsigned int i = 0; i < vecSize; ++i) - { - FairMQMessageNN* partPtr = static_cast(msgVec[i].get()); - - partPtr->fReceiving = false; - packer.pack_bin(msgVec[i]->GetSize()); - packer.pack_bin_body(static_cast(msgVec[i]->GetData()), msgVec[i]->GetSize()); - // call region callback - if (partPtr->fRegionPtr) - { - static_cast(partPtr->fRegionPtr)->fCallback(partPtr->GetMessage(), partPtr->GetSize(), reinterpret_cast(partPtr->fHint)); - } - } - - while (true) - { - int64_t nbytes = nn_send(fSocket, sbuf.data(), sbuf.size(), flags); - if (nbytes >= 0) - { - fBytesTx += nbytes; - ++fMessagesTx; - return nbytes; - } - else if (nn_errno() == ETIMEDOUT) - { - if (!fInterrupted && ((flags & NN_DONTWAIT) == 0)) - { - if (timeout > 0) - { - elapsed += fSndTimeout; - if (elapsed >= timeout) - { - return -2; - } - } - continue; - } - else - { - return -2; - } - } - else if (nn_errno() == EAGAIN) - { - return -2; - } - else if (nn_errno() == ETERM) - { - LOG(info) << "terminating socket " << fId; - return -1; - } - else - { - LOG(error) << "Failed sending on socket " << fId << ", reason: " << nn_strerror(errno); - return nbytes; - } - } -} - -int64_t FairMQSocketNN::Receive(vector& msgVec, const int timeout) -{ - int flags = 0; - if (timeout == 0) - { - flags = NN_DONTWAIT; - } - // Warn if the vector is filled before Receive() and empty it. - // if (msgVec.size() > 0) - // { - // LOG(warn) << "Message vector contains elements before Receive(), they will be deleted!"; - // msgVec.clear(); - // } - - int elapsed = 0; - - while (true) - { - // pointer to point to received message buffer - char* ptr = nullptr; - // receive the message into a buffer allocated by nanomsg and let ptr point to it - int nbytes = nn_recv(fSocket, &ptr, NN_MSG, flags); - if (nbytes >= 0) // if no errors or non-blocking timeouts - { - // store statistics on how many bytes received - fBytesRx += nbytes; - // store statistics on how many messages received (count messages instead of parts) - ++fMessagesRx; - - // offset to be used by msgpack to handle separate chunks - size_t offset = 0; - while (offset != static_cast(nbytes)) // continue until all parts have been read - { - // vector of chars to hold blob (unlike char*/void* this type can be converted to by msgpack) - vector buf; - - // unpack and convert chunk - msgpack::unpacked result; - unpack(result, ptr, nbytes, offset); - msgpack::object object(result.get()); - object.convert(buf); - // get the single message size - size_t size = buf.size() * sizeof(char); - FairMQMessagePtr part(new FairMQMessageNN(size, GetTransport())); - static_cast(part.get())->fReceiving = true; - memcpy(part->GetData(), buf.data(), size); - msgVec.push_back(move(part)); - } - - nn_freemsg(ptr); - return nbytes; - } - else if (nn_errno() == ETIMEDOUT) - { - if (!fInterrupted && ((flags & NN_DONTWAIT) == 0)) - { - if (timeout > 0) - { - elapsed += fRcvTimeout; - if (elapsed >= timeout) - { - return -2; - } - } - continue; - } - else - { - return -2; - } - } - else if (nn_errno() == EAGAIN) - { - return -2; - } - else if (nn_errno() == ETERM) - { - LOG(info) << "terminating socket " << fId; - return -1; - } - else - { - LOG(error) << "Failed receiving on socket " << fId << ", reason: " << nn_strerror(errno); - return nbytes; - } - } -} - -void FairMQSocketNN::Close() -{ - nn_close(fSocket); -} - -void FairMQSocketNN::Interrupt() -{ - fInterrupted = true; -} - -void FairMQSocketNN::Resume() -{ - fInterrupted = false; -} - -int FairMQSocketNN::GetSocket() const -{ - return fSocket; -} - -void FairMQSocketNN::SetOption(const string& option, const void* value, size_t valueSize) -{ - if (option == "snd-size" || option == "rcv-size") - { - int val = *(static_cast(const_cast(value))); - if (val <= 0) - { - LOG(warn) << "value for sndKernelSize/rcvKernelSize should be greater than 0, leaving unchanged."; - return; - } - } - - if (option == "snd-hwm" || option == "rcv-hwm") - { - return; - } - - if (option == "linger") - { - fLinger = *static_cast(const_cast(value)); - return; - } - - int rc = nn_setsockopt(fSocket, NN_SOL_SOCKET, GetConstant(option), value, valueSize); - if (rc < 0) - { - LOG(error) << "failed setting socket option, reason: " << nn_strerror(errno); - } -} - -void FairMQSocketNN::GetOption(const string& option, void* value, size_t* valueSize) -{ - if (option == "linger") - { - *static_cast(value) = fLinger; - return; - } - - if (option == "snd-hwm" || option == "rcv-hwm") - { - *static_cast(value) = -1; - return; - } - - - int rc = nn_getsockopt(fSocket, NN_SOL_SOCKET, GetConstant(option), value, valueSize); - if (rc < 0) - { - LOG(error) << "failed getting socket option, reason: " << nn_strerror(errno); - } -} - -void FairMQSocketNN::SetLinger(const int value) -{ - fLinger = value; -} - -int FairMQSocketNN::GetLinger() const -{ - return fLinger; -} - -void FairMQSocketNN::SetSndBufSize(const int /* value */) -{ - // not used in nanomsg -} - -int FairMQSocketNN::GetSndBufSize() const -{ - // not used in nanomsg - return -1; -} - -void FairMQSocketNN::SetRcvBufSize(const int /* value */) -{ - // not used in nanomsg -} - -int FairMQSocketNN::GetRcvBufSize() const -{ - // not used in nanomsg - return -1; -} - -void FairMQSocketNN::SetSndKernelSize(const int value) -{ - if (nn_setsockopt(fSocket, NN_SOL_SOCKET, NN_SNDBUF, &value, sizeof(value)) < 0) { - throw SocketError(tools::ToString("failed setting NN_SNDBUF, reason: ", nn_strerror(errno))); - } -} - -int FairMQSocketNN::GetSndKernelSize() const -{ - int value = 0; - size_t valueSize; - if (nn_getsockopt(fSocket, NN_SOL_SOCKET, NN_SNDBUF, &value, &valueSize) < 0) { - throw SocketError(tools::ToString("failed getting NN_SNDBUF, reason: ", nn_strerror(errno))); - } - return value; -} - -void FairMQSocketNN::SetRcvKernelSize(const int value) -{ - if (nn_setsockopt(fSocket, NN_SOL_SOCKET, NN_RCVBUF, &value, sizeof(value)) < 0) { - throw SocketError(tools::ToString("failed setting NN_RCVBUF, reason: ", nn_strerror(errno))); - } -} - -int FairMQSocketNN::GetRcvKernelSize() const -{ - int value = 0; - size_t valueSize; - if (nn_getsockopt(fSocket, NN_SOL_SOCKET, NN_RCVBUF, &value, &valueSize) < 0) { - throw SocketError(tools::ToString("failed getting NN_RCVBUF, reason: ", nn_strerror(errno))); - } - return value; -} - - -unsigned long FairMQSocketNN::GetBytesTx() const -{ - return fBytesTx; -} - -unsigned long FairMQSocketNN::GetBytesRx() const -{ - return fBytesRx; -} - -unsigned long FairMQSocketNN::GetMessagesTx() const -{ - return fMessagesTx; -} - -unsigned long FairMQSocketNN::GetMessagesRx() const -{ - return fMessagesRx; -} - -int FairMQSocketNN::GetConstant(const string& constant) -{ - if (constant == "") - return 0; - if (constant == "sub") - return NN_SUB; - if (constant == "pub") - return NN_PUB; - if (constant == "xsub") - return NN_SUB; - if (constant == "xpub") - return NN_PUB; - if (constant == "push") - return NN_PUSH; - if (constant == "pull") - return NN_PULL; - if (constant == "req") - return NN_REQ; - if (constant == "rep") - return NN_REP; - if (constant == "dealer") - return NN_REQ; - if (constant == "router") - return NN_REP; - if (constant == "pair") - return NN_PAIR; - - if (constant == "snd-hwm") - return NN_SNDBUF; - if (constant == "rcv-hwm") - return NN_RCVBUF; - if (constant == "snd-size") - return NN_SNDBUF; - if (constant == "rcv-size") - return NN_RCVBUF; - if (constant == "snd-more") - { - LOG(error) << "Multipart messages functionality currently not supported by nanomsg!"; - return -1; - } - if (constant == "rcv-more") - { - LOG(error) << "Multipart messages functionality currently not supported by nanomsg!"; - return -1; - } - - if (constant == "linger") - return NN_LINGER; - if (constant == "no-block") - return NN_DONTWAIT; - - return -1; -} - -FairMQSocketNN::~FairMQSocketNN() -{ - Close(); -} diff --git a/fairmq/nanomsg/FairMQSocketNN.h b/fairmq/nanomsg/FairMQSocketNN.h deleted file mode 100644 index 6905383f..00000000 --- a/fairmq/nanomsg/FairMQSocketNN.h +++ /dev/null @@ -1,81 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ - -#ifndef FAIRMQSOCKETNN_H_ -#define FAIRMQSOCKETNN_H_ - -#include -#include - -#include "FairMQSocket.h" -#include "FairMQMessage.h" -class FairMQTransportFactory; - -class FairMQSocketNN final : public FairMQSocket -{ - public: - FairMQSocketNN(const std::string& type, const std::string& name, const std::string& id = "", FairMQTransportFactory* fac = nullptr); - FairMQSocketNN(const FairMQSocketNN&) = delete; - FairMQSocketNN operator=(const FairMQSocketNN&) = delete; - - std::string GetId() const override { return fId; } - - bool Bind(const std::string& address) override; - bool Connect(const std::string& address) override; - - int Send(FairMQMessagePtr& msg, const int timeout = -1) override; - int Receive(FairMQMessagePtr& msg, const int timeout = -1) override; - int64_t Send(std::vector>& msgVec, const int timeout = -1) override; - int64_t Receive(std::vector>& msgVec, const int timeout = -1) override; - - int GetSocket() const; - - void Close() override; - - static void Interrupt(); - static void Resume(); - - void SetOption(const std::string& option, const void* value, size_t valueSize) override; - void GetOption(const std::string& option, void* value, size_t* valueSize) override; - - void SetLinger(const int value) override; - int GetLinger() const override; - void SetSndBufSize(const int value) override; - int GetSndBufSize() const override; - void SetRcvBufSize(const int value) override; - int GetRcvBufSize() const override; - void SetSndKernelSize(const int value) override; - int GetSndKernelSize() const override; - void SetRcvKernelSize(const int value) override; - int GetRcvKernelSize() const override; - - unsigned long GetBytesTx() const override; - unsigned long GetBytesRx() const override; - unsigned long GetMessagesTx() const override; - unsigned long GetMessagesRx() const override; - - static int GetConstant(const std::string& constant); - - ~FairMQSocketNN() override; - - private: - int fSocket; - std::string fId; - std::atomic fBytesTx; - std::atomic fBytesRx; - std::atomic fMessagesTx; - std::atomic fMessagesRx; - - static std::atomic fInterrupted; - - int fSndTimeout; - int fRcvTimeout; - int fLinger; -}; - -#endif /* FAIRMQSOCKETNN_H_ */ diff --git a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx deleted file mode 100644 index 7b706313..00000000 --- a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx +++ /dev/null @@ -1,100 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014-2017 GSI Helmholtzzentrum fuer Schwerionenforschung Gmb * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ - -#include "FairMQTransportFactoryNN.h" - -#include -#include -#include -#include - -using namespace std; - -fair::mq::Transport FairMQTransportFactoryNN::fTransportType = fair::mq::Transport::NN; - -FairMQTransportFactoryNN::FairMQTransportFactoryNN(const string& id, const fair::mq::ProgOptions* /*config*/) - : FairMQTransportFactory(id) - , fRegionCounter(0) -{ - LOG(debug) << "Transport: Using nanomsg library"; -} - -FairMQMessagePtr FairMQTransportFactoryNN::CreateMessage() -{ - return unique_ptr(new FairMQMessageNN(this)); -} - -FairMQMessagePtr FairMQTransportFactoryNN::CreateMessage(const size_t size) -{ - return unique_ptr(new FairMQMessageNN(size, this)); -} - -FairMQMessagePtr FairMQTransportFactoryNN::CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint) -{ - return unique_ptr(new FairMQMessageNN(data, size, ffn, hint, this)); -} - -FairMQMessagePtr FairMQTransportFactoryNN::CreateMessage(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint) -{ - return unique_ptr(new FairMQMessageNN(region, data, size, hint, this)); -} - -FairMQSocketPtr FairMQTransportFactoryNN::CreateSocket(const string& type, const string& name) -{ - unique_ptr socket(new FairMQSocketNN(type, name, GetId(), this)); - fSockets.push_back(socket.get()); - return socket; -} - -FairMQPollerPtr FairMQTransportFactoryNN::CreatePoller(const vector& channels) const -{ - return unique_ptr(new FairMQPollerNN(channels)); -} - -FairMQPollerPtr FairMQTransportFactoryNN::CreatePoller(const std::vector& channels) const -{ - return unique_ptr(new FairMQPollerNN(channels)); -} - -FairMQPollerPtr FairMQTransportFactoryNN::CreatePoller(const unordered_map>& channelsMap, const vector& channelList) const -{ - return unique_ptr(new FairMQPollerNN(channelsMap, channelList)); -} - -FairMQUnmanagedRegionPtr FairMQTransportFactoryNN::CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */) -{ - return unique_ptr(new FairMQUnmanagedRegionNN(++fRegionCounter, size, callback, path, flags, this)); -} - -FairMQUnmanagedRegionPtr FairMQTransportFactoryNN::CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */) -{ - return unique_ptr(new FairMQUnmanagedRegionNN(size, userFlags, callback, path, flags, this)); -} - -fair::mq::Transport FairMQTransportFactoryNN::GetType() const -{ - return fTransportType; -} - -void FairMQTransportFactoryNN::Reset() -{ - auto it = max_element(fSockets.begin(), fSockets.end(), [](FairMQSocket* s1, FairMQSocket* s2) { - return static_cast(s1)->GetLinger() < static_cast(s2)->GetLinger(); - }); - if (it != fSockets.end()) { - this_thread::sleep_for(chrono::milliseconds(static_cast(*it)->GetLinger())); - } - fSockets.clear(); -} - -FairMQTransportFactoryNN::~FairMQTransportFactoryNN() -{ - LOG(debug) << "Destroying Shared Memory transport..."; - // nn_term(); - // see https://www.freelists.org/post/nanomsg/Getting-rid-of-nn-init-and-nn-term,8 -} diff --git a/fairmq/nanomsg/FairMQTransportFactoryNN.h b/fairmq/nanomsg/FairMQTransportFactoryNN.h deleted file mode 100644 index a0fe995f..00000000 --- a/fairmq/nanomsg/FairMQTransportFactoryNN.h +++ /dev/null @@ -1,59 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014-2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ - -#ifndef FAIRMQTRANSPORTFACTORYNN_H_ -#define FAIRMQTRANSPORTFACTORYNN_H_ - -#include "FairMQTransportFactory.h" -#include "FairMQMessageNN.h" -#include "FairMQSocketNN.h" -#include "FairMQPollerNN.h" -#include "FairMQUnmanagedRegionNN.h" -#include - -#include -#include - -class FairMQTransportFactoryNN final : public FairMQTransportFactory -{ - public: - FairMQTransportFactoryNN(const std::string& id = "", const fair::mq::ProgOptions* config = nullptr); - ~FairMQTransportFactoryNN() override; - - FairMQMessagePtr CreateMessage() override; - FairMQMessagePtr CreateMessage(const size_t size) override; - FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override; - FairMQMessagePtr CreateMessage(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0) override; - - FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name) override; - - FairMQPollerPtr CreatePoller(const std::vector& channels) const override; - FairMQPollerPtr CreatePoller(const std::vector& channels) const override; - FairMQPollerPtr CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const override; - - FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const std::string& path = "", int flags = 0) override; - FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) override; - - void SubscribeToRegionEvents(FairMQRegionEventCallback /* callback */) override { LOG(error) << "SubscribeToRegionEvents not yet implemented for nanomsg"; } - bool SubscribedToRegionEvents() override { LOG(error) << "Region event subscriptions not yet implemented for nanomsg"; return false; } - void UnsubscribeFromRegionEvents() override { LOG(error) << "UnsubscribeFromRegionEvents not yet implemented for nanomsg"; } - std::vector GetRegionInfo() override { LOG(error) << "GetRegionInfo not yet implemented for nanomsg, returning empty vector"; return std::vector(); } - - fair::mq::Transport GetType() const override; - - void Interrupt() override { FairMQSocketNN::Interrupt(); } - void Resume() override { FairMQSocketNN::Resume(); } - void Reset() override; - - private: - static fair::mq::Transport fTransportType; - uint64_t fRegionCounter; - mutable std::vector fSockets; -}; - -#endif /* FAIRMQTRANSPORTFACTORYNN_H_ */ diff --git a/fairmq/nanomsg/FairMQUnmanagedRegionNN.cxx b/fairmq/nanomsg/FairMQUnmanagedRegionNN.cxx deleted file mode 100644 index 95c9fb6a..00000000 --- a/fairmq/nanomsg/FairMQUnmanagedRegionNN.cxx +++ /dev/null @@ -1,52 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ - -#include "FairMQUnmanagedRegionNN.h" -#include "FairMQLogger.h" - -using namespace std; - -FairMQUnmanagedRegionNN::FairMQUnmanagedRegionNN(uint64_t id, const size_t size, FairMQRegionCallback callback, const std::string& /*path = "" */, int /*flags = 0 */, FairMQTransportFactory* factory /* = nullptr */) - : FairMQUnmanagedRegion(factory) - , fId(id) - , fBuffer(malloc(size)) - , fSize(size) - , fCallback(callback) -{ -} - -FairMQUnmanagedRegionNN::FairMQUnmanagedRegionNN(uint64_t id, const size_t size, const int64_t /*userFlags*/, FairMQRegionCallback callback, const std::string& /*path = "" */, int /*flags = 0 */, FairMQTransportFactory* factory /* = nullptr */) - : FairMQUnmanagedRegion(factory) - , fId(id) - , fBuffer(malloc(size)) - , fSize(size) - , fCallback(callback) -{ -} - -void* FairMQUnmanagedRegionNN::GetData() const -{ - return fBuffer; -} - -size_t FairMQUnmanagedRegionNN::GetSize() const -{ - return fSize; -} - -uint64_t FairMQUnmanagedRegionNN::GetId() const -{ - return fId; -} - - -FairMQUnmanagedRegionNN::~FairMQUnmanagedRegionNN() -{ - LOG(debug) << "destroying region"; - free(fBuffer); -} diff --git a/fairmq/nanomsg/FairMQUnmanagedRegionNN.h b/fairmq/nanomsg/FairMQUnmanagedRegionNN.h deleted file mode 100644 index ecc3d117..00000000 --- a/fairmq/nanomsg/FairMQUnmanagedRegionNN.h +++ /dev/null @@ -1,41 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ - -#ifndef FAIRMQUNMANAGEDREGIONNN_H_ -#define FAIRMQUNMANAGEDREGIONNN_H_ - -#include "FairMQUnmanagedRegion.h" - -#include // size_t -#include - -class FairMQUnmanagedRegionNN final : public FairMQUnmanagedRegion -{ - friend class FairMQSocketNN; - - public: - FairMQUnmanagedRegionNN(uint64_t id, const size_t size, FairMQRegionCallback callback, const std::string& path = "", int flags = 0, FairMQTransportFactory* factory = nullptr); - FairMQUnmanagedRegionNN(uint64_t id, const size_t size, const int64_t userFlags, FairMQRegionCallback callback, const std::string& path = "", int flags = 0, FairMQTransportFactory* factory = nullptr); - - FairMQUnmanagedRegionNN(const FairMQUnmanagedRegionNN&) = delete; - FairMQUnmanagedRegionNN operator=(const FairMQUnmanagedRegionNN&) = delete; - - void* GetData() const override; - size_t GetSize() const override; - uint64_t GetId() const override; - - virtual ~FairMQUnmanagedRegionNN(); - - private: - uint64_t fId; - void* fBuffer; - size_t fSize; - FairMQRegionCallback fCallback; -}; - -#endif /* FAIRMQUNMANAGEDREGIONNN_H_ */ diff --git a/fairmq/plugins/config/Config.cxx b/fairmq/plugins/config/Config.cxx index ea0e1ac2..f4f577f7 100644 --- a/fairmq/plugins/config/Config.cxx +++ b/fairmq/plugins/config/Config.cxx @@ -63,7 +63,7 @@ Plugin::ProgOptions ConfigPluginProgramOptions() pluginOptions.add_options() ("id", po::value()->default_value(""), "Device ID.") ("io-threads", po::value()->default_value(1), "Number of I/O threads.") - ("transport", po::value()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg'/'shmem').") + ("transport", po::value()->default_value("zeromq"), "Transport ('zeromq'/'shmem').") ("network-interface", po::value()->default_value("default"), "Network interface to bind on (e.g. eth0, ib0..., default will try to detect the interface of the default route).") ("init-timeout", po::value()->default_value(120), "Timeout for the initialization in seconds (when expecting dynamic initialization).") ("max-run-time", po::value()->default_value(0), "Maximum runtime for the Running state handler, after which state will change to Ready (in seconds, 0 for no limit).") diff --git a/fairmq/run/startMQBenchmark.sh.in b/fairmq/run/startMQBenchmark.sh.in index 2b7b68b6..62105aed 100755 --- a/fairmq/run/startMQBenchmark.sh.in +++ b/fairmq/run/startMQBenchmark.sh.in @@ -59,7 +59,7 @@ else fi echo "" -echo "Usage: startBenchmark [message size=1000000] [number of iterations=0] [transport=zeromq/nanomsg/shmem] [affinity=false]" +echo "Usage: startBenchmark [message size=1000000] [number of iterations=0] [transport=zeromq/shmem] [affinity=false]" SAMPLER="fairmq-bsampler" SAMPLER+=" --id bsampler1" diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index d84fbaac..53df7fa7 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -32,10 +32,6 @@ add_testhelper(runTestDevice LINKS FairMQ ) -if(BUILD_NANOMSG_TRANSPORT) - list(APPEND definitions BUILD_NANOMSG_TRANSPORT) -endif() - if(BUILD_OFI_TRANSPORT) LIST(APPEND definitions BUILD_OFI_TRANSPORT) endif() diff --git a/test/message/_message.cxx b/test/message/_message.cxx index ab23f704..0c2f8d80 100644 --- a/test/message/_message.cxx +++ b/test/message/_message.cxx @@ -87,13 +87,6 @@ TEST(Resize, shmem) RunPushPullWithMsgResize("shmem", "ipc://test_message_resize"); } -#ifdef BUILD_NANOMSG_TRANSPORT -TEST(Resize, nanomsg) -{ - RunPushPullWithMsgResize("nanomsg", "ipc://test_message_resize"); -} -#endif /* BUILD_NANOMSG_TRANSPORT */ - TEST(Rebuild, zeromq) { RunMsgRebuild("zeromq"); @@ -104,11 +97,4 @@ TEST(Rebuild, shmem) RunMsgRebuild("shmem"); } -#ifdef BUILD_NANOMSG_TRANSPORT -TEST(Rebuild, nanomsg) -{ - RunMsgRebuild("nanomsg"); -} -#endif /* BUILD_NANOMSG_TRANSPORT */ - } // namespace diff --git a/test/poller/_poller.cxx b/test/poller/_poller.cxx index 9c43263d..9a3d5f41 100644 --- a/test/poller/_poller.cxx +++ b/test/poller/_poller.cxx @@ -55,13 +55,6 @@ TEST(Subchannel, zeromq) EXPECT_EXIT(RunPoller("zeromq", 0), ::testing::ExitedWithCode(0), "POLL test successfull"); } -#ifdef BUILD_NANOMSG_TRANSPORT -TEST(Subchannel, nanomsg) -{ - EXPECT_EXIT(RunPoller("nanomsg", 0), ::testing::ExitedWithCode(0), "POLL test successfull"); -} -#endif /* BUILD_NANOMSG_TRANSPORT */ - TEST(Subchannel, shmem) { EXPECT_EXIT(RunPoller("shmem", 0), ::testing::ExitedWithCode(0), "POLL test successfull"); @@ -72,13 +65,6 @@ TEST(Channel, zeromq) EXPECT_EXIT(RunPoller("zeromq", 1), ::testing::ExitedWithCode(0), "POLL test successfull"); } -#ifdef BUILD_NANOMSG_TRANSPORT -TEST(Channel, nanomsg) -{ - EXPECT_EXIT(RunPoller("nanomsg", 1), ::testing::ExitedWithCode(0), "POLL test successfull"); -} -#endif /* BUILD_NANOMSG_TRANSPORT */ - TEST(Channel, shmem) { EXPECT_EXIT(RunPoller("shmem", 1), ::testing::ExitedWithCode(0), "POLL test successfull"); diff --git a/test/protocols/_pair.cxx b/test/protocols/_pair.cxx index 9dcc26c1..d6891ecb 100644 --- a/test/protocols/_pair.cxx +++ b/test/protocols/_pair.cxx @@ -56,13 +56,6 @@ TEST(Pair, SingleMsg_MP_tcp_shmem) EXPECT_EXIT(RunPair("shmem"), ::testing::ExitedWithCode(0), "PAIR test successfull"); } -#ifdef BUILD_NANOMSG_TRANSPORT -TEST(Pair, SingleMsg_MP_tcp_nanomsg) -{ - EXPECT_EXIT(RunPair("nanomsg"), ::testing::ExitedWithCode(0), "PAIR test successfull"); -} -#endif /* BUILD_NANOMSG_TRANSPORT */ - #ifdef BUILD_OFI_TRANSPORT TEST(Pair, SingleMsg_MP_tcp_ofi) { diff --git a/test/protocols/_pub_sub.cxx b/test/protocols/_pub_sub.cxx index 23eb77f5..7e0ba0cc 100644 --- a/test/protocols/_pub_sub.cxx +++ b/test/protocols/_pub_sub.cxx @@ -60,11 +60,4 @@ TEST(PubSub, zeromq) EXPECT_EXIT(RunPubSub("zeromq"), ::testing::ExitedWithCode(0), "PUB-SUB test successfull"); } -#ifdef BUILD_NANOMSG_TRANSPORT -TEST(PubSub, nanomsg) -{ - EXPECT_EXIT(RunPubSub("nanomsg"), ::testing::ExitedWithCode(0), "PUB-SUB test successfull"); -} -#endif /* BUILD_NANOMSG_TRANSPORT */ - } // namespace diff --git a/test/protocols/_push_pull.cxx b/test/protocols/_push_pull.cxx index 7a23ffca..5e9df414 100644 --- a/test/protocols/_push_pull.cxx +++ b/test/protocols/_push_pull.cxx @@ -56,11 +56,4 @@ TEST(PushPull, SingleMsg_MP_tcp_shmem) EXPECT_EXIT(RunPushPull("shmem"), ::testing::ExitedWithCode(0), "PUSH-PULL test successfull"); } -#ifdef BUILD_NANOMSG_TRANSPORT -TEST(PushPull, SingleMsg_MP_tcp_nanomsg) -{ - EXPECT_EXIT(RunPushPull("nanomsg"), ::testing::ExitedWithCode(0), "PUSH-PULL test successfull"); -} -#endif /* BUILD_NANOMSG_TRANSPORT */ - } // namespace diff --git a/test/protocols/_push_pull_multipart.cxx b/test/protocols/_push_pull_multipart.cxx index 84246981..bb1994ad 100644 --- a/test/protocols/_push_pull_multipart.cxx +++ b/test/protocols/_push_pull_multipart.cxx @@ -158,13 +158,6 @@ TEST(PushPull, Multipart_ST_inproc_shmem) RunSingleThreadedMultipart("shmem", "inproc://test1", "inproc://test2"); } -#ifdef BUILD_NANOMSG_TRANSPORT -TEST(PushPull, Multipart_ST_inproc_nanomsg) -{ - RunSingleThreadedMultipart("nanomsg", "inproc://test1", "inproc://test2"); -} -#endif /* BUILD_NANOMSG_TRANSPORT */ - TEST(PushPull, Multipart_ST_ipc_zeromq) { RunSingleThreadedMultipart("zeromq", "ipc://test_Multipart_ST_ipc_zeromq_1", "ipc://test_Multipart_ST_ipc_zeromq_2"); @@ -175,13 +168,6 @@ TEST(PushPull, Multipart_ST_ipc_shmen) RunSingleThreadedMultipart("shmem", "ipc://test_Multipart_ST_ipc_shmen_1", "ipc://test_Multipart_ST_ipc_shmen_2"); } -#ifdef BUILD_NANOMSG_TRANSPORT -TEST(PushPull, Multipart_ST_ipc_nanomsg) -{ - RunSingleThreadedMultipart("nanomsg", "ipc://test_Multipart_ST_ipc_nanomsg_1", "ipc://test_Multipart_ST_ipc_nanomsg_2"); -} -#endif /* BUILD_NANOMSG_TRANSPORT */ - TEST(PushPull, Multipart_MT_inproc_zeromq) { RunMultiThreadedMultipart("zeromq", "inproc://test_1"); @@ -192,13 +178,6 @@ TEST(PushPull, Multipart_MT_inproc_shmem) RunMultiThreadedMultipart("shmem", "inproc://test_1"); } -#ifdef BUILD_NANOMSG_TRANSPORT -TEST(PushPull, Multipart_MT_inproc_nanomsg) -{ - RunMultiThreadedMultipart("nanomsg", "inproc://test_1"); -} -#endif /* BUILD_NANOMSG_TRANSPORT */ - TEST(PushPull, Multipart_MT_ipc_zeromq) { RunMultiThreadedMultipart("zeromq", "ipc://test_Multipart_MT_ipc_zeromq_1"); @@ -209,11 +188,4 @@ TEST(PushPull, Multipart_MT_ipc_shmem) RunMultiThreadedMultipart("shmem", "ipc://test_Multipart_MT_ipc_shmem_1"); } -#ifdef BUILD_NANOMSG_TRANSPORT -TEST(PushPull, Multipart_MT_ipc_nanomsg) -{ - RunMultiThreadedMultipart("nanomsg", "ipc://test_Multipart_MT_ipc_nanomsg_1"); -} -#endif /* BUILD_NANOMSG_TRANSPORT */ - } // namespace diff --git a/test/protocols/_req_rep.cxx b/test/protocols/_req_rep.cxx index df4c9a3f..a6a3ae4b 100644 --- a/test/protocols/_req_rep.cxx +++ b/test/protocols/_req_rep.cxx @@ -65,11 +65,4 @@ TEST(ReqRep, shmem) EXPECT_EXIT(RunReqRep("shmem"), ::testing::ExitedWithCode(0), "REQ-REP test successfull"); } -#ifdef BUILD_NANOMSG_TRANSPORT -TEST(ReqRep, nanomsg) -{ - EXPECT_EXIT(RunReqRep("nanomsg"), ::testing::ExitedWithCode(0), "REQ-REP test successfull"); -} -#endif /* BUILD_NANOMSG_TRANSPORT */ - } // namespace diff --git a/test/protocols/config.json.in b/test/protocols/config.json.in index 32dd00f9..a76cec17 100644 --- a/test/protocols/config.json.in +++ b/test/protocols/config.json.in @@ -27,32 +27,6 @@ } ] }, - { - "id": "pairleft_nanomsg", - "channels": [ - { - "address": "tcp://127.0.0.1:5757", - "method": "bind", - "name": "data", - "rateLogging": 0, - "transport": "nanomsg", - "type": "pair" - } - ] - }, - { - "id": "pairright_nanomsg", - "channels": [ - { - "address": "tcp://127.0.0.1:5757", - "method": "connect", - "name": "data", - "rateLogging": 0, - "transport": "nanomsg", - "type": "pair" - } - ] - }, { "id": "pairleft_shmem", "channels": [ @@ -131,32 +105,6 @@ } ] }, - { - "id": "push_nanomsg", - "channels": [ - { - "address": "tcp://127.0.0.1:5757", - "method": "bind", - "name": "data", - "rateLogging": 0, - "transport": "nanomsg", - "type": "push" - } - ] - }, - { - "id": "pull_nanomsg", - "channels": [ - { - "address": "tcp://127.0.0.1:5757", - "method": "connect", - "name": "data", - "rateLogging": 0, - "transport": "nanomsg", - "type": "pull" - } - ] - }, { "id": "push_shmem", "channels": [ @@ -204,27 +152,6 @@ } ] }, - { - "id": "pub_nanomsg", - "channels": [ - { - "address": "tcp://127.0.0.1:5756", - "method": "bind", - "name": "data", - "rateLogging": 0, - "transport": "nanomsg", - "type": "pub" - }, - { - "address": "tcp://127.0.0.1:5755", - "method": "bind", - "name": "control", - "rateLogging": 0, - "transport": "nanomsg", - "type": "pull" - } - ] - }, { "id": "sub_1zeromq", "channels": [ @@ -267,48 +194,6 @@ } ] }, - { - "id": "sub_1nanomsg", - "channels": [ - { - "address": "tcp://127.0.0.1:5756", - "method": "connect", - "name": "data", - "rateLogging": 0, - "transport": "nanomsg", - "type": "sub" - }, - { - "address": "tcp://127.0.0.1:5755", - "method": "connect", - "name": "control", - "rateLogging": 0, - "transport": "nanomsg", - "type": "push" - } - ] - }, - { - "id": "sub_2nanomsg", - "channels": [ - { - "address": "tcp://127.0.0.1:5756", - "method": "connect", - "name": "data", - "rateLogging": 0, - "transport": "nanomsg", - "type": "sub" - }, - { - "address": "tcp://127.0.0.1:5755", - "method": "connect", - "name": "control", - "rateLogging": 0, - "transport": "nanomsg", - "type": "push" - } - ] - }, { "id": "req_1zeromq", "channels": [ @@ -322,19 +207,6 @@ } ] }, - { - "id": "req_1nanomsg", - "channels": [ - { - "address": "tcp://127.0.0.1:5758", - "method": "connect", - "name": "data", - "rateLogging": 0, - "transport": "nanomsg", - "type": "req" - } - ] - }, { "id": "req_2zeromq", "channels": [ @@ -348,19 +220,6 @@ } ] }, - { - "id": "req_2nanomsg", - "channels": [ - { - "address": "tcp://127.0.0.1:5758", - "method": "connect", - "name": "data", - "rateLogging": 0, - "transport": "nanomsg", - "type": "req" - } - ] - }, { "id": "req_1shmem", "channels": [ @@ -400,19 +259,6 @@ } ] }, - { - "id": "rep_nanomsg", - "channels": [ - { - "address": "tcp://127.0.0.1:5758", - "method": "bind", - "name": "data", - "rateLogging": 0, - "transport": "nanomsg", - "type": "rep" - } - ] - }, { "id": "rep_shmem", "channels": [ @@ -468,27 +314,6 @@ } ] }, - { - "id": "transfer_timeout_nanomsg", - "channels": [ - { - "address": "tcp://127.0.0.1:5759", - "method": "bind", - "name": "data-in", - "rateLogging": 0, - "transport": "nanomsg", - "type": "pull" - }, - { - "address": "tcp://127.0.0.1:5560", - "method": "bind", - "name": "data-out", - "rateLogging": 0, - "transport": "nanomsg", - "type": "push" - } - ] - }, { "id": "pollout_zeromq", "channels": [ @@ -531,48 +356,6 @@ } ] }, - { - "id": "pollout_nanomsg", - "channels": [ - { - "address": "tcp://127.0.0.1:6002", - "method": "bind", - "name": "data1", - "rateLogging": 0, - "transport": "nanomsg", - "type": "push" - }, - { - "address": "tcp://127.0.0.1:6003", - "method": "bind", - "name": "data2", - "rateLogging": 0, - "transport": "nanomsg", - "type": "push" - } - ] - }, - { - "id": "pollin_nanomsg", - "channels": [ - { - "address": "tcp://127.0.0.1:6002", - "method": "connect", - "name": "data1", - "rateLogging": 0, - "transport": "nanomsg", - "type": "pull" - }, - { - "address": "tcp://127.0.0.1:6003", - "method": "connect", - "name": "data2", - "rateLogging": 0, - "transport": "nanomsg", - "type": "pull" - } - ] - }, { "id": "pollout_shmem", "channels": [ diff --git a/test/transport/_options.cxx b/test/transport/_options.cxx index 957b8d33..3a40e5ed 100644 --- a/test/transport/_options.cxx +++ b/test/transport/_options.cxx @@ -32,11 +32,7 @@ void CheckOldOptionInterface(FairMQChannel& channel, const string& option, const value = 0; size_t valueSize = sizeof(value); channel.GetSocket().GetOption(option, &value, &valueSize); - if (transport == "nanomsg" && (option == "snd-hwm" || option == "rcv-hwm")) { - ASSERT_EQ(value, -1); - } else { - ASSERT_EQ(value, 500); - } + ASSERT_EQ(value, 500); } void RunOptionsTest(const string& transport) @@ -58,18 +54,10 @@ void RunOptionsTest(const string& transport) ASSERT_EQ(channel.GetSocket().GetLinger(), 300); channel.GetSocket().SetSndBufSize(500); - if (transport == "nanomsg") { // nanomsg doesn't use this option and the getter always returns -1 - ASSERT_EQ(channel.GetSocket().GetSndBufSize(), -1); - } else { - ASSERT_EQ(channel.GetSocket().GetSndBufSize(), 500); - } + ASSERT_EQ(channel.GetSocket().GetSndBufSize(), 500); channel.GetSocket().SetRcvBufSize(500); - if (transport == "nanomsg") { // nanomsg doesn't use this option and the getter always returns -1 - ASSERT_EQ(channel.GetSocket().GetRcvBufSize(), -1); - } else { - ASSERT_EQ(channel.GetSocket().GetRcvBufSize(), 500); - } + ASSERT_EQ(channel.GetSocket().GetRcvBufSize(), 500); channel.GetSocket().SetSndKernelSize(8000); ASSERT_EQ(channel.GetSocket().GetSndKernelSize(), 8000); @@ -88,11 +76,4 @@ TEST(Options, shmem) RunOptionsTest("shmem"); } -#ifdef BUILD_NANOMSG_TRANSPORT -TEST(Options, nanomsg) -{ - RunOptionsTest("nanomsg"); -} -#endif /* BUILD_NANOMSG_TRANSPORT */ - } // namespace diff --git a/test/transport/_transfer_timeout.cxx b/test/transport/_transfer_timeout.cxx index bfed6089..a63de9eb 100644 --- a/test/transport/_transfer_timeout.cxx +++ b/test/transport/_transfer_timeout.cxx @@ -41,11 +41,4 @@ TEST(TransferTimeout, shmem) EXPECT_EXIT(RunTransferTimeout("shmem"), ::testing::ExitedWithCode(0), "Transfer timeout test successfull"); } -#ifdef BUILD_NANOMSG_TRANSPORT -TEST(TransferTimeout, nanomsg) -{ - EXPECT_EXIT(RunTransferTimeout("nanomsg"), ::testing::ExitedWithCode(0), "Transfer timeout test successfull"); -} -#endif /* BUILD_NANOMSG_TRANSPORT */ - } // namespace