Compare commits

...

21 Commits

Author SHA1 Message Date
Dennis Klein
2457094b6c Require correct asiofi version v0.3.1 2019-03-21 18:03:32 +01:00
Dennis Klein
54b7742d85 Drop obsolete dependency to AZMQ 2019-03-21 18:03:32 +01:00
Dennis Klein
195644f132 Add some debug output 2019-03-21 18:03:32 +01:00
Dennis Klein
f17dade8f8 Fix example after rebase 2019-03-21 18:03:32 +01:00
Dennis Klein
cc8fd73025 Fix recv logic 2019-03-21 18:03:32 +01:00
Dennis Klein
90fdcc26bb Run multipart example with ofi 2019-03-21 18:03:32 +01:00
Dennis Klein
b45e4da2a9 Implement linger for ofi
This reduces test runtime significantly for most transports
2019-03-21 18:03:32 +01:00
Dennis Klein
a1b7efa2f4 Unify implementation of multi part and single part message interfaces 2019-03-21 18:03:32 +01:00
Dennis Klein
6ee7e5fbf0 Improve error handling 2019-03-21 18:03:32 +01:00
Alexey Rybalchenko
99ffb732f4 Use process tools for WaitFor test 2019-03-19 18:09:01 +01:00
Alexey Rybalchenko
6809d60fad Fix mismatch of docs/API, wrong return value 2019-03-19 18:09:01 +01:00
Alexey Rybalchenko
ef4d6a3310 Process tools: add print helper, support signals 2019-03-19 18:09:01 +01:00
Matthias Richter
696257fd4f Extending FairMQParts by a constructor taking list of FairMQMessagePtr
This introduces a little helper to create a FairMQParts object in place from a
variable list of arguments. As a side effect also AddParts is extended to support
more than one FairMQMessagePtr.
2019-03-19 12:48:02 +01:00
Alexey Rybalchenko
cdc1ba084c Fix broken pipe errors in tools::execute 2019-03-15 15:51:50 +01:00
Alexey Rybalchenko
922f7e9a92 Use Asio to launch processes in fair::mq::tools::execute 2019-03-14 18:15:17 +01:00
Dennis Klein
a8f1a4dfdb Try to reconnect on refused connection 2019-03-13 18:04:49 +01:00
Dennis Klein
fb42b1e2f0 Adapt to new asiofi release 2019-03-13 18:04:49 +01:00
Alexey Rybalchenko
1a00f3edbd Remove controller input for several tests
The affected tests have independent shutdown
  conditions that race with the controller inputs.
2019-03-12 17:51:33 +01:00
Alexey Rybalchenko
74881d27e3 Remove obsolete state machine code 2019-03-12 12:10:47 +01:00
Alexey Rybalchenko
dd02c01c36 Extend tests of error cases
- test raising SIGINT in every state
 - test going to Error state from every state
 - add new states (bind/connect) to exception tests
2019-03-12 12:10:47 +01:00
Alexey Rybalchenko
44a9946ea6 Allow creating region with a callback with default transport 2019-03-11 20:20:22 +01:00
33 changed files with 1165 additions and 887 deletions

View File

@@ -61,14 +61,12 @@ endif()
if(BUILD_OFI_TRANSPORT)
find_package2(PRIVATE asiofi REQUIRED
VERSION 0.2.0
VERSION 0.3.1
)
find_package2(PRIVATE OFI REQUIRED
VERSION ${asiofi_OFI_VERSION}
COMPONENTS ${asiofi_OFI_COMPONENTS}
)
find_package2(PRIVATE AZMQ REQUIRED)
set(PROJECT_AZMQ_VERSION 1.0.2)
endif()
if(BUILD_NANOMSG_TRANSPORT)
@@ -194,11 +192,6 @@ if(BUILD_DDS_PLUGIN)
DESTINATION ${PROJECT_INSTALL_CMAKEMODDIR}
)
endif()
if(BUILD_OFI_TRANSPORT)
install(FILES cmake/FindAZMQ.cmake
DESTINATION ${PROJECT_INSTALL_CMAKEMODDIR}
)
endif()
if(BUILD_DOCS)
install(DIRECTORY ${CMAKE_BINARY_DIR}/doxygen/html
DESTINATION ${PROJECT_INSTALL_DATADIR}/docs

View File

@@ -38,7 +38,7 @@ a simulation, reconstruction and analysis framework.
* PUBLIC: [**Boost**](https://www.boost.org/), [**FairLogger**](https://github.com/FairRootGroup/FairLogger)
* BUILD: [CMake](https://cmake.org/), [GTest](https://github.com/google/googletest), [Doxygen](http://www.doxygen.org/)
* PRIVATE: [ZeroMQ](http://zeromq.org/), [Msgpack](https://msgpack.org/index.html), [nanomsg](http://nanomsg.org/),
[asiofi](https://github.com/FairRootGroup/asiofi), [DDS](http://dds.gsi.de), [PMIx](https://pmix.org/), [AZMQ](https://github.com/zeromq/azmq), [asiofi](https://github.com/FairRootGroup/asiofi)
[asiofi](https://github.com/FairRootGroup/asiofi), [DDS](http://dds.gsi.de), [PMIx](https://pmix.org/)
Supported platforms: Linux and MacOS.
@@ -51,7 +51,7 @@ cmake -DCMAKE_INSTALL_PREFIX=./fairmq_install ../fairmq
cmake --build . --target install
```
If dependencies are not installed in standard system directories, you can hint the installation location via `-DCMAKE_PREFIX_PATH=...` or per dependency via `-D{DEPENDENCY}_ROOT=...`. `{DEPENDENCY}` can be `GTEST`, `BOOST`, `FAIRLOGGER`, `ZEROMQ`, `MSGPACK`, `NANOMSG`, `OFI`, `PMIX`, `ASIOFI`, `AZMQ` or `DDS` (`*_ROOT` variables can also be environment variables).
If dependencies are not installed in standard system directories, you can hint the installation location via `-DCMAKE_PREFIX_PATH=...` or per dependency via `-D{DEPENDENCY}_ROOT=...`. `{DEPENDENCY}` can be `GTEST`, `BOOST`, `FAIRLOGGER`, `ZEROMQ`, `MSGPACK`, `NANOMSG`, `OFI`, `PMIX`, `ASIOFI` or `DDS` (`*_ROOT` variables can also be environment variables).
## Usage

View File

@@ -1,57 +0,0 @@
################################################################################
# Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# #
# This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, #
# copied verbatim in the file "LICENSE" #
################################################################################
#
# ###########################
# # Locate the AZMQ library #
# ###########################
#
#
# Usage:
#
# find_package(AZMQ [version] [QUIET] [REQUIRED])
#
#
# Defines the following variables:
#
# AZMQ_FOUND - Found the ZeroMQ library
# AZMQ_INCLUDE_DIR (CMake cache) - Include directory
#
#
# Accepts the following variables as hints for installation directories:
#
# AZMQ_ROOT (CMake var, ENV var)
#
#
# If the above variables are not defined, or if ZeroMQ could not be found there,
# it will look for it in the system directories. Custom ZeroMQ installations
# will always have priority over system ones.
#
if(NOT AZMQ_ROOT)
set(AZMQ_ROOT $ENV{AZMQ_ROOT})
endif()
find_path(AZMQ_INCLUDE_DIR
NAMES azmq/socket.hpp
HINTS ${AZMQ_ROOT} $ENV{AZMQ_ROOT}
PATH_SUFFIXES include
DOC "AZMQ include directory"
)
include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(AZMQ
REQUIRED_VARS AZMQ_INCLUDE_DIR
)
if(AZMQ_FOUND AND NOT TARGET AZMQ::AZMQ)
add_library(AZMQ::AZMQ INTERFACE IMPORTED)
set_target_properties(AZMQ::AZMQ PROPERTIES
INTERFACE_INCLUDE_DIRECTORIES ${AZMQ_INCLUDE_DIR}
INTERFACE_LINK_LIBRARIES "libzmq;Boost::boost;Boost::container;Boost::system"
)
endif()

View File

@@ -32,15 +32,20 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-multipart.sh.in ${CMA
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test-ex-multipart.sh.in ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh)
add_test(NAME Example.Multipart.zeromq COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh zeromq)
set_tests_properties(Example.Multipart.zeromq PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 2 parts")
set_tests_properties(Example.Multipart.zeromq PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 5 parts")
if(BUILD_NANOMSG_TRANSPORT)
add_test(NAME Example.Multipart.nanomsg COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh nanomsg)
set_tests_properties(Example.Multipart.nanomsg PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 2 parts")
set_tests_properties(Example.Multipart.nanomsg PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 5 parts")
endif()
add_test(NAME Example.Multipart.shmem COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh shmem)
set_tests_properties(Example.Multipart.shmem PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 2 parts")
set_tests_properties(Example.Multipart.shmem PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 5 parts")
if(BUILD_OFI_TRANSPORT)
add_test(NAME Example.Multipart.ofi COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh ofi)
set_tests_properties(Example.Multipart.ofi PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 5 parts")
endif()
# install

View File

@@ -53,6 +53,13 @@ bool Sampler::ConditionalRun()
parts.AddPart(NewSimpleMessage(header));
parts.AddPart(NewMessage(1000));
// create more data parts, testing the FairMQParts in-place constructor
FairMQParts auxData{ NewMessage(500), NewMessage(600), NewMessage(700) };
assert(auxData.Size() == 3);
parts.AddPart(std::move(auxData));
assert(auxData.Size() == 0);
assert(parts.Size() == 5);
LOG(info) << "Sending body of size: " << parts.At(1)->GetSize();
Send(parts, "data");
@@ -74,4 +81,4 @@ Sampler::~Sampler()
{
}
} // namespace example_multipart
} // namespace example_multipart

View File

@@ -20,7 +20,7 @@ SAMPLER+=" --verbosity veryhigh"
SAMPLER+=" --session $SESSION"
SAMPLER+=" --max-iterations 1"
SAMPLER+=" --control static --color false"
SAMPLER+=" --channel-config name=data,type=push,method=connect,rateLogging=0,address=tcp://127.0.0.1:5555"
SAMPLER+=" --channel-config name=data,type=pair,method=connect,rateLogging=0,address=tcp://127.0.0.1:5555"
@CMAKE_CURRENT_BINARY_DIR@/$SAMPLER &
SAMPLER_PID=$!
@@ -30,7 +30,7 @@ SINK+=" --transport $transport"
SINK+=" --verbosity veryhigh"
SINK+=" --session $SESSION"
SINK+=" --control static --color false"
SINK+=" --channel-config name=data,type=pull,method=bind,rateLogging=0,address=tcp://127.0.0.1:5555"
SINK+=" --channel-config name=data,type=pair,method=bind,rateLogging=0,address=tcp://127.0.0.1:5555"
@CMAKE_CURRENT_BINARY_DIR@/$SINK &
SINK_PID=$!

View File

@@ -224,7 +224,6 @@ if(BUILD_OFI_TRANSPORT)
set(OFI_DEPS
asiofi::asiofi
Boost::container
AZMQ::AZMQ
)
endif()
set(optional_deps ${NANOMSG_DEPS} ${OFI_DEPS})

View File

@@ -345,6 +345,11 @@ class FairMQChannel
return Transport()->NewStaticMessage(data);
}
FairMQUnmanagedRegionPtr NewUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr)
{
return Transport()->CreateUnmanagedRegion(size, callback);
}
private:
std::shared_ptr<FairMQTransportFactory> fTransportFactory;
fair::mq::Transport fTransportType;

View File

@@ -191,50 +191,58 @@ class FairMQDevice
return fTransportFactory.get();
}
// creates message with the default device transport
template<typename... Args>
FairMQMessagePtr NewMessage(Args&&... args)
{
return Transport()->CreateMessage(std::forward<Args>(args)...);
}
// creates message with the transport of the specified channel
template<typename... Args>
FairMQMessagePtr NewMessageFor(const std::string& channel, int index, Args&&... args)
{
return GetChannel(channel, index).NewMessage(std::forward<Args>(args)...);
}
// creates a message that will not be cleaned up after transfer, with the default device transport
template<typename T>
FairMQMessagePtr NewStaticMessage(const T& data)
{
return Transport()->NewStaticMessage(data);
}
// creates a message that will not be cleaned up after transfer, with the transport of the specified channel
template<typename T>
FairMQMessagePtr NewStaticMessageFor(const std::string& channel, int index, const T& data)
{
return GetChannel(channel, index).NewStaticMessage(data);
}
// creates a message with a copy of the provided data, with the default device transport
template<typename T>
FairMQMessagePtr NewSimpleMessage(const T& data)
{
return Transport()->NewSimpleMessage(data);
}
// creates a message with a copy of the provided data, with the transport of the specified channel
template<typename T>
FairMQMessagePtr NewSimpleMessageFor(const std::string& channel, int index, const T& data)
{
return GetChannel(channel, index).NewSimpleMessage(data);
}
FairMQUnmanagedRegionPtr NewUnmanagedRegion(const size_t size)
// creates unamanaged region with the default device transport
FairMQUnmanagedRegionPtr NewUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr)
{
return Transport()->CreateUnmanagedRegion(size);
return Transport()->CreateUnmanagedRegion(size, callback);
}
// creates unmanaged region with the transport of the specified channel
FairMQUnmanagedRegionPtr NewUnmanagedRegionFor(const std::string& channel, int index, const size_t size, FairMQRegionCallback callback = nullptr)
{
return GetChannel(channel, index).Transport()->CreateUnmanagedRegion(size, callback);
return GetChannel(channel, index).NewUnmanagedRegion(size, callback);
}
template<typename ...Ts>
@@ -422,7 +430,7 @@ class FairMQDevice
template<class Rep, class Period>
bool WaitFor(std::chrono::duration<Rep, Period> const& duration)
{
return fStateMachine.WaitForPendingStateFor(std::chrono::duration_cast<std::chrono::milliseconds>(duration).count());
return !fStateMachine.WaitForPendingStateFor(std::chrono::duration_cast<std::chrono::milliseconds>(duration).count());
}
protected:

View File

@@ -31,6 +31,9 @@ class FairMQParts
FairMQParts(FairMQParts&& p) = default;
/// Assignment operator
FairMQParts& operator=(const FairMQParts&) = delete;
/// Constructor from argument pack of std::unique_ptr<FairMQMessage> rvalues
template <typename... Ts>
FairMQParts(Ts&&... messages) : fParts() {AddPart(std::forward<Ts>(messages)...);}
/// Default destructor
~FairMQParts() {};
@@ -41,14 +44,6 @@ class FairMQParts
fParts.push_back(std::unique_ptr<FairMQMessage>(msg));
}
/// Adds part (std::unique_ptr<FairMQMessage>&) to the container (move)
/// @param msg unique pointer to FairMQMessage
/// lvalue ref (move not required when passing argument)
// inline void AddPart(std::unique_ptr<FairMQMessage>& msg)
// {
// fParts.push_back(std::move(msg));
// }
/// Adds part (std::unique_ptr<FairMQMessage>&) to the container (move)
/// @param msg unique pointer to FairMQMessage
/// rvalue ref (move required when passing argument)
@@ -57,6 +52,23 @@ class FairMQParts
fParts.push_back(std::move(msg));
}
/// Add variable list of parts to the container (move)
template <typename... Ts>
void AddPart(std::unique_ptr<FairMQMessage>&& first, Ts&&... remaining)
{
AddPart(std::move(first));
AddPart(std::forward<Ts>(remaining)...);
}
/// Add content of another object by move
void AddPart(FairMQParts&& other)
{
container parts = std::move(other.fParts);
for (auto& part : parts) {
fParts.push_back(std::move(part));
}
}
/// Get reference to part in the container at index (without bounds check)
/// @param index container index
FairMQMessage& operator[](const int index) { return *(fParts[index]); }

View File

@@ -28,6 +28,7 @@
#include <array>
#include <unordered_map>
#include <mutex>
#include <stdexcept>
using namespace std;
using namespace boost::msm;
@@ -167,7 +168,6 @@ struct Machine_ : public state_machine_def<Machine_>
Machine_()
: fLastTransitionResult(true)
, fNewStatePending(false)
, fWorkOngoing(false)
{}
virtual ~Machine_() {}
@@ -258,9 +258,7 @@ struct Machine_ : public state_machine_def<Machine_>
mutex fStateMtx;
atomic<bool> fNewStatePending;
atomic<bool> fWorkOngoing;
condition_variable fNewStatePendingCV;
condition_variable fWorkDoneCV;
boost::signals2::signal<void(const State)> fStateChangeSignal;
boost::signals2::signal<void(const State)> fStateHandleSignal;
@@ -283,7 +281,6 @@ struct Machine_ : public state_machine_def<Machine_>
LOG(state) << fState << " ---> " << fNewState;
fState = static_cast<State>(fNewState);
fNewStatePending = false;
fWorkOngoing = true;
if (fState == State::Exiting || fState == State::Error) {
stop = true;
@@ -292,12 +289,10 @@ struct Machine_ : public state_machine_def<Machine_>
CallStateChangeCallbacks(fState);
CallStateHandler(fState);
}
{
lock_guard<mutex> lock(fStateMtx);
fWorkOngoing = false;
fWorkDoneCV.notify_one();
}
if (fState == State::Error) {
throw StateMachine::ErrorStateException("Device transitioned to error state");
}
}
@@ -463,13 +458,15 @@ void StateMachine::ProcessWork()
try {
fsm->CallStateChangeCallbacks(State::Idle);
fsm->ProcessWork();
} catch(ErrorStateException& ese) {
LOG(trace) << "ErrorStateException caught in ProcessWork(), rethrowing";
throw;
} catch(...) {
LOG(debug) << "Exception caught in ProcessWork(), going to Error state and rethrowing";
{
lock_guard<mutex> lock(fsm->fStateMtx);
fsm->fState = State::Error;
fsm->CallStateChangeCallbacks(State::Error);
fsm->fWorkOngoing = false;
fsm->fWorkDoneCV.notify_one();
}
ChangeState(Transition::ErrorFound);
throw;
@@ -480,3 +477,4 @@ string StateMachine::GetStateName(const State state) { return stateNames.at(stat
string StateMachine::GetTransitionName(const Transition transition) { return transitionNames.at(static_cast<int>(transition)); }
State StateMachine::GetState(const string& state) { return stateNumbers.at(state); }
Transition StateMachine::GetTransition(const string& transition) { return transitionNumbers.at(transition); }

View File

@@ -94,6 +94,8 @@ class StateMachine
static State GetState(const std::string& state);
static Transition GetTransition(const std::string& transition);
struct ErrorStateException : std::runtime_error { using std::runtime_error::runtime_error; };
private:
std::shared_ptr<void> fFsm;
};

View File

@@ -47,15 +47,24 @@ auto Context::InitThreadPool(int numberIoThreads) -> void
for (int i = 1; i <= numberIoThreads; ++i) {
fThreadPool.emplace_back([&, i, numberIoThreads]{
LOG(debug) << "OFI transport: I/O thread #" << i << " of " << numberIoThreads << " started";
fIoContext.run();
LOG(debug) << "OFI transport: I/O thread #" << i << " of " << numberIoThreads << " stopped";
try {
LOG(debug) << "OFI transport: I/O thread #" << i << " of " << numberIoThreads << " started";
fIoContext.run();
LOG(debug) << "OFI transport: I/O thread #" << i << " of " << numberIoThreads << " stopped";
} catch (const std::exception& e) {
LOG(error) << "OFI transport: Uncaught exception in I/O thread #" << i << ": " << e.what();
} catch (...) {
LOG(error) << "OFI transport: Uncaught exception in I/O thread #" << i;
}
});
}
}
auto Context::Reset() -> void
{
// TODO "Linger", rethink this
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
fIoContext.stop();
}

View File

@@ -35,59 +35,76 @@ namespace ofi {
enum class ControlMessageType
{
DataAddressAnnouncement = 1,
Empty = 1,
PostBuffer,
PostBufferAcknowledgement
PostMultiPartStartBuffer
};
struct Empty
{};
struct PostBuffer
{
uint64_t size; // buffer size (size_t)
};
struct PostMultiPartStartBuffer
{
uint32_t numParts; // buffer size (size_t)
uint64_t size; // buffer size (size_t)
};
union ControlMessageContent
{
PostBuffer postBuffer;
PostMultiPartStartBuffer postMultiPartStartBuffer;
};
struct ControlMessage
{
ControlMessageType type;
};
struct DataAddressAnnouncement : ControlMessage
{
uint32_t ipv4; // in_addr_t from <netinet/in.h>
uint32_t port; // in_port_t from <netinet/in.h>
};
struct PostBuffer : ControlMessage
{
uint64_t size; // buffer size (size_t)
ControlMessageContent msg;
};
template<typename T>
using unique_ptr = std::unique_ptr<T, std::function<void(T*)>>;
template<typename T, typename... Args>
auto MakeControlMessageWithPmr(boost::container::pmr::memory_resource* pmr, Args&&... args)
-> ofi::unique_ptr<T>
auto MakeControlMessageWithPmr(boost::container::pmr::memory_resource& pmr, Args&&... args)
-> ofi::unique_ptr<ControlMessage>
{
void* mem = pmr->allocate(sizeof(T));
T* ctrl = new (mem) T(std::forward<Args>(args)...);
void* mem = pmr.allocate(sizeof(ControlMessage));
ControlMessage* ctrl = new (mem) ControlMessage();
if (std::is_same<T, DataAddressAnnouncement>::value) {
ctrl->type = ControlMessageType::DataAddressAnnouncement;
} else if (std::is_same<T, PostBuffer>::value) {
if (std::is_same<T, PostBuffer>::value) {
ctrl->type = ControlMessageType::PostBuffer;
ctrl->msg.postBuffer = PostBuffer(std::forward<Args>(args)...);
} else if (std::is_same<T, PostMultiPartStartBuffer>::value) {
ctrl->type = ControlMessageType::PostMultiPartStartBuffer;
ctrl->msg.postMultiPartStartBuffer = PostMultiPartStartBuffer(std::forward<Args>(args)...);
} else if (std::is_same<T, Empty>::value) {
ctrl->type = ControlMessageType::Empty;
}
return ofi::unique_ptr<T>(ctrl, [=](T* p) {
p->~T();
pmr->deallocate(p, sizeof(T));
return ofi::unique_ptr<ControlMessage>(ctrl, [&pmr](ControlMessage* p) {
p->~ControlMessage();
pmr.deallocate(p, sizeof(T));
});
}
template<typename T, typename... Args>
auto MakeControlMessage(Args&&... args) -> T
auto MakeControlMessage(Args&&... args) -> ControlMessage
{
T ctrl = T(std::forward<Args>(args)...);
ControlMessage ctrl;
if (std::is_same<T, DataAddressAnnouncement>::value) {
ctrl.type = ControlMessageType::DataAddressAnnouncement;
} else if (std::is_same<T, PostBuffer>::value) {
if (std::is_same<T, PostBuffer>::value) {
ctrl.type = ControlMessageType::PostBuffer;
} else if (std::is_same<T, PostMultiPartStartBuffer>::value) {
ctrl.type = ControlMessageType::PostMultiPartStartBuffer;
} else if (std::is_same<T, Empty>::value) {
ctrl.type = ControlMessageType::Empty;
}
ctrl.msg = T(std::forward<Args>(args)...);
return ctrl;
}

View File

@@ -13,20 +13,17 @@
#include <FairMQLogger.h>
#include <asiofi.hpp>
#include <azmq/message.hpp>
#include <boost/asio/buffer.hpp>
#include <boost/asio/dispatch.hpp>
#include <boost/asio/post.hpp>
#include <chrono>
#include <cstring>
#include <functional>
#include <memory>
#include <sstream>
#include <string.h>
#include <sys/socket.h>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
namespace fair
{
@@ -52,41 +49,15 @@ Socket::Socket(Context& context, const string& type, const string& name, const s
, fMessagesRx(0)
, fSndTimeout(100)
, fRcvTimeout(100)
, fSendQueueWrite(fContext.GetIoContext(), ZMQ_PUSH)
, fSendQueueRead(fContext.GetIoContext(), ZMQ_PULL)
, fRecvQueueWrite(fContext.GetIoContext(), ZMQ_PUSH)
, fRecvQueueRead(fContext.GetIoContext(), ZMQ_PULL)
, fSendSem(fContext.GetIoContext(), 300)
, fRecvSem(fContext.GetIoContext(), 300)
, fMultiPartRecvCounter(-1)
, fSendPushSem(fContext.GetIoContext(), 384)
, fSendPopSem(fContext.GetIoContext(), 0)
, fRecvPushSem(fContext.GetIoContext(), 384)
, fRecvPopSem(fContext.GetIoContext(), 0)
, fNeedOfiMemoryRegistration(false)
{
if (type != "pair") {
throw SocketError{tools::ToString("Socket type '", type, "' not implemented for ofi transport.")};
} else {
// TODO wire this up with config
azmq::socket::snd_hwm send_max(300);
azmq::socket::rcv_hwm recv_max(300);
fSendQueueRead.set_option(send_max);
fSendQueueRead.set_option(recv_max);
fSendQueueWrite.set_option(send_max);
fSendQueueWrite.set_option(recv_max);
fRecvQueueRead.set_option(send_max);
fRecvQueueRead.set_option(recv_max);
fRecvQueueWrite.set_option(send_max);
fRecvQueueWrite.set_option(recv_max);
// Setup internal queue
auto hashed_id = std::hash<std::string>()(fId);
auto queue_id = tools::ToString("inproc://TXQUEUE", hashed_id);
LOG(debug) << "OFI transport (" << fId << "): " << "Binding SQR: " << queue_id;
fSendQueueRead.bind(queue_id);
LOG(debug) << "OFI transport (" << fId << "): " << "Connecting SQW: " << queue_id;
fSendQueueWrite.connect(queue_id);
queue_id = tools::ToString("inproc://RXQUEUE", hashed_id);
LOG(debug) << "OFI transport (" << fId << "): " << "Binding RQR: " << queue_id;
fRecvQueueRead.bind(queue_id);
LOG(debug) << "OFI transport (" << fId << "): " << "Connecting RQW: " << queue_id;
fRecvQueueWrite.connect(queue_id);
}
}
@@ -108,7 +79,7 @@ auto Socket::InitOfi(Address addr) -> void
fOfiInfo = tools::make_unique<asiofi::info>(addr.Ip.c_str(), std::to_string(addr.Port).c_str(), FI_SOURCE, hints);
}
LOG(debug) << "OFI transport: " << *fOfiInfo;
LOG(debug) << "OFI transport (" << fId << "): " << *fOfiInfo;
fOfiFabric = tools::make_unique<asiofi::fabric>(*fOfiInfo);
@@ -139,11 +110,16 @@ catch (const SilentSocketError& e)
// in case no connection could be established after trying a number of random ports from a range.
return false;
}
catch (const SocketError& e)
catch (const std::exception& e)
{
LOG(error) << "OFI transport: " << e.what();
return false;
}
catch (...)
{
LOG(error) << "OFI transport: Unknown exception in ofi::Socket::Bind";
return false;
}
auto Socket::BindControlEndpoint() -> void
{
@@ -196,6 +172,10 @@ try {
InitOfi(fRemoteAddr);
ConnectEndpoint(fControlEndpoint, Band::Control);
ConnectEndpoint(fDataEndpoint, Band::Data);
boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::SendQueueReader, this));
boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvControlQueueReader, this));
return true;
}
@@ -209,6 +189,11 @@ catch (const std::exception& e)
LOG(error) << "OFI transport: " << e.what();
return false;
}
catch (...)
{
LOG(error) << "OFI transport: Unknown exception in ofi::Socket::Connect";
return false;
}
auto Socket::ConnectEndpoint(std::unique_ptr<asiofi::connected_endpoint>& endpoint, Band type) -> void
{
@@ -221,247 +206,246 @@ auto Socket::ConnectEndpoint(std::unique_ptr<asiofi::connected_endpoint>& endpoi
LOG(debug) << "OFI transport (" << fId << "): Sending " << band << " band connection request to " << fRemoteAddr;
endpoint->connect(Context::ConvertAddress(fRemoteAddr), [&, band, type](asiofi::eq::event event) {
LOG(debug) << "OFI transport (" << fId << "): " << band << " band conn event happened";
if (event == asiofi::eq::event::connected) {
LOG(debug) << "OFI transport (" << fId << "): " << band << " band connected.";
if (type == Band::Control) {
ConnectEndpoint(fDataEndpoint, Band::Data);
} else {
boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::SendQueueReader, this));
boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvControlQueueReader, this));
}
} else {
LOG(error) << "asdf";
}
});
}
std::mutex mtx;
std::condition_variable cv;
bool notified(false), connected(false);
// auto Socket::ReceiveDataAddressAnnouncement() -> void
// {
// azmq::message ctrl;
// auto recv = fControlEndpoint.receive(ctrl);
// assert(recv == sizeof(DataAddressAnnouncement)); (void)recv;
// auto daa(static_cast<const DataAddressAnnouncement*>(ctrl.data()));
// assert(daa->type == ControlMessageType::DataAddressAnnouncement);
//
// sockaddr_in remoteAddr;
// remoteAddr.sin_family = AF_INET;
// remoteAddr.sin_port = daa->port;
// remoteAddr.sin_addr.s_addr = daa->ipv4;
//
// auto addr = Context::ConvertAddress(remoteAddr);
// addr.Protocol = fRemoteDataAddr.Protocol;
// LOG(debug) << "OFI transport (" << fId << "): Data address announcement of remote endpoint received: " << addr;
// fRemoteDataAddr = addr;
// }
//
// auto Socket::AnnounceDataAddress() -> void
// {
// fLocalDataAddr = fDataEndpoint->get_local_address();
// LOG(debug) << "Address of local ofi endpoint in socket " << fId << ": " << Context::ConvertAddress(fLocalDataAddr);
//
// Create new data address announcement message
// auto daa = MakeControlMessage<DataAddressAnnouncement>();
// auto addr = Context::ConvertAddress(fLocalDataAddr);
// daa.ipv4 = addr.sin_addr.s_addr;
// daa.port = addr.sin_port;
//
// auto sent = fControlEndpoint.send(boost::asio::buffer(daa));
// assert(sent == sizeof(addr)); (void)sent;
//
// LOG(debug) << "OFI transport (" << fId << "): data band address " << fLocalDataAddr << " announced.";
// }
while (true) {
endpoint->connect(Context::ConvertAddress(fRemoteAddr), [&, band](asiofi::eq::event event) {
// LOG(debug) << "OFI transport (" << fId << "): " << band << " band conn event happened";
std::unique_lock<std::mutex> lk2(mtx);
notified = true;
if (event == asiofi::eq::event::connected) {
LOG(debug) << "OFI transport (" << fId << "): " << band << " band connected.";
connected = true;
} else {
// LOG(debug) << "OFI transport (" << fId << "): " << band << " band connection refused. Trying again.";
}
lk2.unlock();
cv.notify_one();
});
{
std::unique_lock<std::mutex> lk(mtx);
cv.wait(lk, [&] { return notified; });
if (connected) {
break;
} else {
notified = false;
lk.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
}
}
auto Socket::Send(MessagePtr& msg, const int /*timeout*/) -> int
{
// LOG(debug) << "OFI transport (" << fId << "): ENTER Send: data=" << msg->GetData() << ",size=" << msg->GetSize();
// timeout argument not yet implemented
MessagePtr* msgptr(new std::unique_ptr<Message>(std::move(msg)));
try {
auto res = fSendQueueWrite.send(boost::asio::const_buffer(msgptr, sizeof(MessagePtr)), 0);
std::vector<MessagePtr> msgVec;
msgVec.reserve(1);
msgVec.emplace_back(std::move(msg));
// LOG(debug) << "OFI transport (" << fId << "): LEAVE Send";
return res;
} catch (const std::exception& e) {
msg = std::move(*msgptr);
LOG(error) << e.what();
return -1;
} catch (const boost::system::error_code& e) {
msg = std::move(*msgptr);
LOG(error) << e;
return -1;
return Send(msgVec);
}
auto Socket::Send(std::vector<MessagePtr>& msgVec, const int /*timeout*/) -> int64_t
try {
// timeout argument not yet implemented
int size(0);
for (auto& msg : msgVec) {
size += msg->GetSize();
}
fSendPushSem.wait();
{
std::lock_guard<std::mutex> lk(fSendQueueMutex);
fSendQueue.emplace(std::move(msgVec));
}
}
fSendPopSem.signal();
auto Socket::Receive(MessagePtr& msg, const int /*timeout*/) -> int
{
// LOG(debug) << "OFI transport (" << fId << "): ENTER Receive";
try {
azmq::message zmsg;
auto recv = fRecvQueueRead.receive(zmsg);
size_t size(0);
if (recv > 0) {
msg = std::move(*(static_cast<MessagePtr*>(zmsg.buffer().data())));
size = msg->GetSize();
}
fBytesRx += size;
fMessagesRx++;
// LOG(debug) << "OFI transport (" << fId << "): LEAVE Receive";
return size;
} catch (const std::exception& e) {
LOG(error) << e.what();
return -1;
} catch (const boost::system::error_code& e) {
LOG(error) << e;
return -1;
}
}
auto Socket::Send(std::vector<MessagePtr>& msgVec, const int timeout) -> int64_t
{
return SendImpl(msgVec, 0, timeout);
}
auto Socket::Receive(std::vector<MessagePtr>& msgVec, const int timeout) -> int64_t
{
return ReceiveImpl(msgVec, 0, timeout);
return size;
} catch (const std::exception& e) {
LOG(error) << e.what();
return -1;
}
auto Socket::SendQueueReader() -> void
{
fSendSem.async_wait([&](const boost::system::error_code& ec) {
if (!ec) {
// LOG(debug) << "OFI transport (" << fId << "): < Wait fSendSem=" <<
// fSendSem.get_value();
fSendQueueRead.async_receive([&](const boost::system::error_code& ec2,
azmq::message& zmsg,
size_t bytes_transferred) {
if (!ec2) {
OnSend(zmsg, bytes_transferred);
fSendPopSem.async_wait([&] {
// Read msg from send queue
std::unique_lock<std::mutex> lk(fSendQueueMutex);
std::vector<MessagePtr> msgVec(std::move(fSendQueue.front()));
fSendQueue.pop();
lk.unlock();
bool postMultiPartStartBuffer = msgVec.size() > 1;
for (auto& msg : msgVec) {
// Create control message
ofi::unique_ptr<ControlMessage> ctrl(nullptr);
if (postMultiPartStartBuffer) {
postMultiPartStartBuffer = false;
ctrl = MakeControlMessageWithPmr<PostMultiPartStartBuffer>(fControlMemPool);
ctrl->msg.postMultiPartStartBuffer.numParts = msgVec.size();
ctrl->msg.postMultiPartStartBuffer.size = msg->GetSize();
} else {
ctrl = MakeControlMessageWithPmr<PostBuffer>(fControlMemPool);
ctrl->msg.postBuffer.size = msg->GetSize();
}
// Send control message
boost::asio::mutable_buffer ctrlMsg(ctrl.get(), sizeof(ControlMessage));
if (fNeedOfiMemoryRegistration) {
asiofi::memory_region mr(*fOfiDomain, ctrlMsg, asiofi::mr::access::send);
auto desc = mr.desc();
fControlEndpoint->send(ctrlMsg,
desc,
[&, ctrl2 = std::move(ctrlMsg), mr2 = std::move(mr)](
boost::asio::mutable_buffer) mutable {});
} else {
fControlEndpoint->send(
ctrlMsg, [&, ctrl2 = std::move(ctrl)](boost::asio::mutable_buffer) mutable {});
}
// Send data message
const auto size = msg->GetSize();
if (size) {
boost::asio::mutable_buffer buffer(msg->GetData(), size);
if (fNeedOfiMemoryRegistration) {
asiofi::memory_region mr(*fOfiDomain, buffer, asiofi::mr::access::send);
auto desc = mr.desc();
fDataEndpoint->send(buffer,
desc,
[&, size, msg2 = std::move(msg), mr2 = std::move(mr)](
boost::asio::mutable_buffer) mutable {
fBytesTx += size;
fMessagesTx++;
fSendPushSem.signal();
});
} else {
fDataEndpoint->send(
buffer, [&, size, msg2 = std::move(msg)](boost::asio::mutable_buffer) mutable {
fBytesTx += size;
fMessagesTx++;
fSendPushSem.signal();
});
}
});
} else {
++fMessagesTx;
fSendPushSem.signal();
}
}
boost::asio::dispatch(fContext.GetIoContext(), std::bind(&Socket::SendQueueReader, this));
});
}
auto Socket::OnSend(azmq::message& zmsg, size_t /*bytes_transferred*/) -> void
{
// LOG(debug) << "OFI transport (" << fId << "): ENTER OnSend: bytes_transferred=" << bytes_transferred;
auto Socket::Receive(MessagePtr& msg, const int /*timeout*/) -> int
try {
// timeout argument not yet implemented
MessagePtr msg(std::move(*(static_cast<MessagePtr*>(zmsg.buffer().data()))));
auto size = msg->GetSize();
// LOG(debug) << "OFI transport (" << fId << "): OnSend: data=" << msg->GetData() << ",size=" << msg->GetSize();
// Create and send control message
auto ctrl = MakeControlMessageWithPmr<PostBuffer>(&fControlMemPool);
ctrl->size = size;
auto ctrl_msg = boost::asio::mutable_buffer(ctrl.get(), sizeof(PostBuffer));
if (fNeedOfiMemoryRegistration) {
asiofi::memory_region mr(*fOfiDomain, ctrl_msg, asiofi::mr::access::send);
auto desc = mr.desc();
fControlEndpoint->send(
ctrl_msg, desc, [&, ctrl2 = std::move(ctrl), mr2 = std::move(mr)](boost::asio::mutable_buffer) mutable {
// LOG(debug) << "OFI transport (" << fId << "): >>>>> Control message sent";
});
} else {
fControlEndpoint->send(ctrl_msg,
[&, ctrl2 = std::move(ctrl)](boost::asio::mutable_buffer) mutable {
// LOG(debug) << "OFI transport (" << fId << "): >>>>> Control
// message sent";
});
fRecvPopSem.wait();
{
std::lock_guard<std::mutex> lk(fRecvQueueMutex);
msg = std::move(fRecvQueue.front().front());
fRecvQueue.pop();
}
fRecvPushSem.signal();
if (size) {
boost::asio::mutable_buffer buffer(msg->GetData(), size);
int size(msg->GetSize());
fBytesRx += size;
++fMessagesRx;
if (fNeedOfiMemoryRegistration) {
asiofi::memory_region mr(*fOfiDomain, buffer, asiofi::mr::access::send);
auto desc = mr.desc();
return size;
} catch (const std::exception& e) {
LOG(error) << e.what();
return -1;
}
fDataEndpoint->send(buffer,
desc,
[&, size, msg2 = std::move(msg), mr2 = std::move(mr)](
boost::asio::mutable_buffer) mutable {
// LOG(debug) << "OFI transport (" << fId << "): >>>>> Data
// buffer sent";
fBytesTx += size;
fMessagesTx++;
fSendSem.async_signal([&](const boost::system::error_code& ec) {
if (!ec) {
// LOG(debug) << "OFI transport (" << fId << "): >
// Signal fSendSem=" << fSendSem.get_value();
}
});
});
auto Socket::Receive(std::vector<MessagePtr>& msgVec, const int /*timeout*/) -> int64_t
try {
// timeout argument not yet implemented
} else {
fDataEndpoint->send(
buffer, [&, size, msg2 = std::move(msg)](boost::asio::mutable_buffer) mutable {
// LOG(debug) << "OFI transport (" << fId << "): >>>>> Data buffer sent";
fBytesTx += size;
fMessagesTx++;
fSendSem.async_signal([&](const boost::system::error_code& ec) {
if (!ec) {
// LOG(debug) << "OFI transport (" << fId << "): > Signal fSendSem="
// << fSendSem.get_value();
}
});
});
}
} else {
++fMessagesTx;
fSendSem.async_signal([&](const boost::system::error_code& ec) {
if (!ec) {
// LOG(debug) << "OFI transport (" << fId << "): > Signal fSendSem=" << fSendSem.get_value();
}
});
fRecvPopSem.wait();
{
std::lock_guard<std::mutex> lk(fRecvQueueMutex);
msgVec = std::move(fRecvQueue.front());
fRecvQueue.pop();
}
fRecvPushSem.signal();
boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::SendQueueReader, this));
// LOG(debug) << "OFI transport (" << fId << "): LEAVE OnSend";
int64_t size(0);
for (auto& msg : msgVec) {
size += msg->GetSize();
}
fBytesRx += size;
++fMessagesRx;
return size;
} catch (const std::exception& e) {
LOG(error) << e.what();
return -1;
}
auto Socket::RecvControlQueueReader() -> void
{
fRecvSem.async_wait([&](const boost::system::error_code& ec) {
if (!ec) {
auto ctrl = MakeControlMessageWithPmr<PostBuffer>(&fControlMemPool);
auto ctrl_msg = boost::asio::mutable_buffer(ctrl.get(), sizeof(PostBuffer));
fRecvPushSem.async_wait([&] {
// Receive control message
ofi::unique_ptr<ControlMessage> ctrl(MakeControlMessageWithPmr<Empty>(fControlMemPool));
boost::asio::mutable_buffer ctrlMsg(ctrl.get(), sizeof(ControlMessage));
if (fNeedOfiMemoryRegistration) {
asiofi::memory_region mr(*fOfiDomain, ctrl_msg, asiofi::mr::access::recv);
auto desc = mr.desc();
if (fNeedOfiMemoryRegistration) {
asiofi::memory_region mr(*fOfiDomain, ctrlMsg, asiofi::mr::access::recv);
auto desc = mr.desc();
fControlEndpoint->recv(
ctrl_msg, desc, [&, ctrl2 = std::move(ctrl), mr2 = std::move(mr)](boost::asio::mutable_buffer) mutable {
OnRecvControl(std::move(ctrl2));
});
} else {
fControlEndpoint->recv(
ctrl_msg, [&, ctrl2 = std::move(ctrl)](boost::asio::mutable_buffer) mutable {
OnRecvControl(std::move(ctrl2));
});
}
fControlEndpoint->recv(
ctrlMsg,
desc,
[&, ctrl2 = std::move(ctrl), mr2 = std::move(mr)](
boost::asio::mutable_buffer) mutable { OnRecvControl(std::move(ctrl2)); });
} else {
fControlEndpoint->recv(
ctrlMsg, [&, ctrl2 = std::move(ctrl)](boost::asio::mutable_buffer) mutable {
OnRecvControl(std::move(ctrl2));
});
}
});
}
auto Socket::OnRecvControl(ofi::unique_ptr<PostBuffer> ctrl) -> void
auto Socket::OnRecvControl(ofi::unique_ptr<ControlMessage> ctrl) -> void
{
// LOG(debug) << "OFI transport (" << fId << "): ENTER OnRecvControl";
auto size = ctrl->size;
// LOG(debug) << "OFI transport (" << fId << "): OnRecvControl: PostBuffer.size=" << size;
// Check control message type
auto size(0);
if (ctrl->type == ControlMessageType::PostMultiPartStartBuffer) {
size = ctrl->msg.postMultiPartStartBuffer.size;
if (fMultiPartRecvCounter == -1) {
fMultiPartRecvCounter = ctrl->msg.postMultiPartStartBuffer.numParts;
assert(fInflightMultiPartMessage.empty());
fInflightMultiPartMessage.reserve(ctrl->msg.postMultiPartStartBuffer.numParts);
} else {
throw SocketError{tools::ToString(
"OFI transport: Received control start of new multi part message without completed "
"reception of previous multi part message. Number of parts missing: ",
fMultiPartRecvCounter)};
}
} else if (ctrl->type == ControlMessageType::PostBuffer) {
size = ctrl->msg.postBuffer.size;
} else {
throw SocketError{tools::ToString("OFI transport: Unknown control message type: '",
static_cast<int>(ctrl->type))};
}
// Receive data
auto msg = fContext.MakeReceiveMessage(size);
if (size) {
auto msg = fContext.MakeReceiveMessage(size);
boost::asio::mutable_buffer buffer(msg->GetData(), size);
if (fNeedOfiMemoryRegistration) {
@@ -472,235 +456,44 @@ auto Socket::OnRecvControl(ofi::unique_ptr<PostBuffer> ctrl) -> void
buffer,
desc,
[&, msg2 = std::move(msg), mr2 = std::move(mr)](
boost::asio::mutable_buffer) mutable {
MessagePtr* msgptr(new std::unique_ptr<Message>(std::move(msg2)));
fRecvQueueWrite.async_send(
azmq::message(boost::asio::const_buffer(msgptr, sizeof(MessagePtr))),
[&](const boost::system::error_code& ec, size_t /*bytes_transferred2*/) {
if (!ec) {
// LOG(debug) << "OFI transport (" << fId
// << "): <<<<< Data buffer received, bytes_transferred2="
// << bytes_transferred2;
fRecvSem.async_signal([&](const boost::system::error_code& ec2) {
if (!ec2) {
//LOG(debug) << "OFI transport (" << fId << "): < Signal fRecvSem";
}
});
}
});
});
boost::asio::mutable_buffer) mutable { DataMessageReceived(std::move(msg2)); });
} else {
fDataEndpoint->recv(
buffer, [&, msg2 = std::move(msg)](boost::asio::mutable_buffer) mutable {
MessagePtr* msgptr(new std::unique_ptr<Message>(std::move(msg2)));
fRecvQueueWrite.async_send(
azmq::message(boost::asio::const_buffer(msgptr, sizeof(MessagePtr))),
[&](const boost::system::error_code& ec, size_t /*bytes_transferred2*/) {
if (!ec) {
// LOG(debug) << "OFI transport (" << fId
// << "): <<<<< Data buffer received, bytes_transferred2="
// << bytes_transferred2;
fRecvSem.async_signal([&](const boost::system::error_code& ec2) {
if (!ec2) {
// LOG(debug) << "OFI transport (" << fId << "): < Signal fRecvSem";
}
fDataEndpoint->recv(buffer,
[&, msg2 = std::move(msg)](boost::asio::mutable_buffer) mutable {
DataMessageReceived(std::move(msg2));
});
}
});
});
}
} else {
fRecvQueueWrite.async_send(
azmq::message(boost::asio::const_buffer(nullptr, 0)),
[&](const boost::system::error_code& ec, size_t /*bytes_transferred2*/) {
if (!ec) {
// LOG(debug) << "OFI transport (" << fId
// << "): <<<<< Data buffer received, bytes_transferred2="
// << bytes_transferred2;
fRecvSem.async_signal([&](const boost::system::error_code& ec2) {
if (!ec2) {
// LOG(debug) << "OFI transport (" << fId << "): < Signal fRecvSem";
}
});
}
});
DataMessageReceived(std::move(msg));
}
boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvControlQueueReader, this));
// LOG(debug) << "OFI transport (" << fId << "): LEAVE OnRecvControl";
boost::asio::dispatch(fContext.GetIoContext(),
std::bind(&Socket::RecvControlQueueReader, this));
}
auto Socket::SendImpl(vector<FairMQMessagePtr>& /*msgVec*/, const int /*flags*/, const int /*timeout*/) -> int64_t
auto Socket::DataMessageReceived(MessagePtr msg) -> void
{
throw SocketError{"Not yet implemented."};
// const unsigned int vecSize = msgVec.size();
// int elapsed = 0;
//
// // Sending vector typicaly handles more then one part
// if (vecSize > 1)
// {
// int64_t totalSize = 0;
// int nbytes = -1;
// bool repeat = false;
//
// while (true && !fInterrupted)
// {
// for (unsigned int i = 0; i < vecSize; ++i)
// {
// nbytes = zmq_msg_send(static_cast<FairMQMessageSHM*>(msgVec[i].get())->GetMessage(),
// fSocket,
// (i < vecSize - 1) ? ZMQ_SNDMORE|flags : flags);
// if (nbytes >= 0)
// {
// static_cast<FairMQMessageSHM*>(msgVec[i].get())->fQueued = true;
// size_t size = msgVec[i]->GetSize();
//
// totalSize += size;
// }
// else
// {
// // according to ZMQ docs, this can only occur for the first part
// if (zmq_errno() == EAGAIN)
// {
// if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0))
// {
// if (timeout)
// {
// elapsed += fSndTimeout;
// if (elapsed >= timeout)
// {
// return -2;
// }
// }
// repeat = true;
// break;
// }
// else
// {
// return -2;
// }
// }
// if (zmq_errno() == ETERM)
// {
// LOG(info) << "terminating socket " << fId;
// return -1;
// }
// LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno);
// return nbytes;
// }
// }
//
// if (repeat)
// {
// continue;
// }
//
// // store statistics on how many messages have been sent (handle all parts as a single message)
// ++fMessagesTx;
// fBytesTx += totalSize;
// return totalSize;
// }
//
// return -1;
// } // If there's only one part, send it as a regular message
// else if (vecSize == 1)
// {
// return Send(msgVec.back(), flags);
// }
// else // if the vector is empty, something might be wrong
// {
// LOG(warn) << "Will not send empty vector";
// return -1;
// }
}
if (fMultiPartRecvCounter > 0) {
--fMultiPartRecvCounter;
fInflightMultiPartMessage.push_back(std::move(msg));
}
if (fMultiPartRecvCounter == 0) {
std::unique_lock<std::mutex> lk(fRecvQueueMutex);
fRecvQueue.push(std::move(fInflightMultiPartMessage));
lk.unlock();
fMultiPartRecvCounter = -1;
fRecvPopSem.signal();
} else if (fMultiPartRecvCounter == -1) {
std::vector<MessagePtr> msgVec;
msgVec.push_back(std::move(msg));
std::unique_lock<std::mutex> lk(fRecvQueueMutex);
fRecvQueue.push(std::move(msgVec));
lk.unlock();
fRecvPopSem.signal();
}
auto Socket::ReceiveImpl(vector<FairMQMessagePtr>& /*msgVec*/, const int /*flags*/, const int /*timeout*/) -> int64_t
{
throw SocketError{"Not yet implemented."};
// int64_t totalSize = 0;
// int64_t more = 0;
// bool repeat = false;
// int elapsed = 0;
//
// while (true)
// {
// // Warn if the vector is filled before Receive() and empty it.
// // if (msgVec.size() > 0)
// // {
// // LOG(warn) << "Message vector contains elements before Receive(), they will be deleted!";
// // msgVec.clear();
// // }
//
// totalSize = 0;
// more = 0;
// repeat = false;
//
// do
// {
// FairMQMessagePtr part(new FairMQMessageSHM(fManager, GetTransport()));
// zmq_msg_t* msgPtr = static_cast<FairMQMessageSHM*>(part.get())->GetMessage();
//
// int nbytes = zmq_msg_recv(msgPtr, fSocket, flags);
// if (nbytes == 0)
// {
// msgVec.push_back(move(part));
// }
// else if (nbytes > 0)
// {
// MetaHeader* hdr = static_cast<MetaHeader*>(zmq_msg_data(msgPtr));
// size_t size = 0;
// static_cast<FairMQMessageSHM*>(part.get())->fHandle = hdr->fHandle;
// static_cast<FairMQMessageSHM*>(part.get())->fSize = hdr->fSize;
// static_cast<FairMQMessageSHM*>(part.get())->fRegionId = hdr->fRegionId;
// static_cast<FairMQMessageSHM*>(part.get())->fHint = hdr->fHint;
// size = part->GetSize();
//
// msgVec.push_back(move(part));
//
// totalSize += size;
// }
// else if (zmq_errno() == EAGAIN)
// {
// if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0))
// {
// if (timeout)
// {
// elapsed += fSndTimeout;
// if (elapsed >= timeout)
// {
// return -2;
// }
// }
// repeat = true;
// break;
// }
// else
// {
// return -2;
// }
// }
// else
// {
// return nbytes;
// }
//
// size_t more_size = sizeof(more);
// zmq_getsockopt(fSocket, ZMQ_RCVMORE, &more, &more_size);
// }
// while (more);
//
// if (repeat)
// {
// continue;
// }
//
// // store statistics on how many messages have been received (handle all parts as a single message)
// ++fMessagesRx;
// fBytesRx += totalSize;
// return totalSize;
// }
}
auto Socket::Close() -> void {}
@@ -721,121 +514,62 @@ auto Socket::GetOption(const string& /*option*/, void* /*value*/, size_t* /*valu
void Socket::SetLinger(const int /*value*/)
{
// azmq::socket::linger opt(value);
// fControlEndpoint.set_option(opt);
LOG(debug) << "OFI transport (" << fId << "): Not yet implemented.";
}
int Socket::GetLinger() const
{
// azmq::socket::linger opt(0);
// fControlEndpoint.get_option(opt);
// return opt.value();
LOG(debug) << "OFI transport (" << fId << "): Not yet implemented.";
return 0;
}
void Socket::SetSndBufSize(const int /*value*/)
{
// azmq::socket::snd_hwm opt(value);
// fControlEndpoint.set_option(opt);
LOG(debug) << "OFI transport (" << fId << "): Not yet implemented.";
}
int Socket::GetSndBufSize() const
{
// azmq::socket::snd_hwm opt(0);
// fControlEndpoint.get_option(opt);
// return opt.value();
LOG(debug) << "OFI transport (" << fId << "): Not yet implemented.";
return 0;
}
void Socket::SetRcvBufSize(const int /*value*/)
{
// azmq::socket::rcv_hwm opt(value);
// fControlEndpoint.set_option(opt);
LOG(debug) << "OFI transport (" << fId << "): Not yet implemented.";
}
int Socket::GetRcvBufSize() const
{
// azmq::socket::rcv_hwm opt(0);
// fControlEndpoint.get_option(opt);
// return opt.value();
LOG(debug) << "OFI transport (" << fId << "): Not yet implemented.";
return 0;
}
void Socket::SetSndKernelSize(const int /*value*/)
{
// azmq::socket::snd_buf opt(value);
// fControlEndpoint.set_option(opt);
LOG(debug) << "OFI transport (" << fId << "): Not yet implemented.";
}
int Socket::GetSndKernelSize() const
{
// azmq::socket::snd_buf opt(0);
// fControlEndpoint.get_option(opt);
// return opt.value();
LOG(debug) << "OFI transport (" << fId << "): Not yet implemented.";
return 0;
}
void Socket::SetRcvKernelSize(const int /*value*/)
{
// azmq::socket::rcv_buf opt(value);
// fControlEndpoint.set_option(opt);
LOG(debug) << "OFI transport (" << fId << "): Not yet implemented.";
}
int Socket::GetRcvKernelSize() const
{
// azmq::socket::rcv_buf opt(0);
// fControlEndpoint.get_option(opt);
// return opt.value();
LOG(debug) << "OFI transport (" << fId << "): Not yet implemented.";
return 0;
}
auto Socket::GetConstant(const string& constant) -> int
auto Socket::GetConstant(const string& /*constant*/) -> int
{
if (constant == "")
return 0;
if (constant == "sub")
return ZMQ_SUB;
if (constant == "pub")
return ZMQ_PUB;
if (constant == "xsub")
return ZMQ_XSUB;
if (constant == "xpub")
return ZMQ_XPUB;
if (constant == "push")
return ZMQ_PUSH;
if (constant == "pull")
return ZMQ_PULL;
if (constant == "req")
return ZMQ_REQ;
if (constant == "rep")
return ZMQ_REP;
if (constant == "dealer")
return ZMQ_DEALER;
if (constant == "router")
return ZMQ_ROUTER;
if (constant == "pair")
return ZMQ_PAIR;
if (constant == "snd-hwm")
return ZMQ_SNDHWM;
if (constant == "rcv-hwm")
return ZMQ_RCVHWM;
if (constant == "snd-size")
return ZMQ_SNDBUF;
if (constant == "rcv-size")
return ZMQ_RCVBUF;
if (constant == "snd-more")
return ZMQ_SNDMORE;
if (constant == "rcv-more")
return ZMQ_RCVMORE;
if (constant == "linger")
return ZMQ_LINGER;
if (constant == "no-block")
return ZMQ_DONTWAIT;
if (constant == "snd-more no-block")
return ZMQ_DONTWAIT|ZMQ_SNDMORE;
LOG(debug) << "OFI transport: Not yet implemented.";
return -1;
}

View File

@@ -18,10 +18,10 @@
#include <asiofi/memory_resources.hpp>
#include <asiofi/passive_endpoint.hpp>
#include <asiofi/semaphore.hpp>
#include <azmq/socket.hpp>
#include <boost/asio.hpp>
#include <memory> // unique_ptr
#include <netinet/in.h>
#include <mutex>
namespace fair
{
@@ -97,28 +97,22 @@ class Socket final : public fair::mq::Socket
Address fLocalAddr;
int fSndTimeout;
int fRcvTimeout;
azmq::socket fSendQueueWrite, fSendQueueRead;
azmq::socket fRecvQueueWrite, fRecvQueueRead;
asiofi::semaphore fSendSem, fRecvSem;
std::mutex fSendQueueMutex, fRecvQueueMutex;
std::queue<std::vector<MessagePtr>> fSendQueue, fRecvQueue;
std::vector<MessagePtr> fInflightMultiPartMessage;
int64_t fMultiPartRecvCounter;
asiofi::synchronized_semaphore fSendPushSem, fSendPopSem, fRecvPushSem, fRecvPopSem;
std::atomic<bool> fNeedOfiMemoryRegistration;
auto SendQueueReader() -> void;
auto OnSend(azmq::message& msg, size_t bytes_transferred) -> void;
auto RecvControlQueueReader() -> void;
auto OnRecvControl(ofi::unique_ptr<PostBuffer> ctrl) -> void;
auto OnReceive() -> void;
auto ReceiveImpl(MessagePtr& msg, const int flags, const int timeout) -> int;
auto SendImpl(std::vector<MessagePtr>& msgVec, const int flags, const int timeout) -> int64_t;
auto ReceiveImpl(std::vector<MessagePtr>& msgVec, const int flags, const int timeout) -> int64_t;
// auto WaitForControlPeer() -> void;
// auto AnnounceDataAddress() -> void;
auto InitOfi(Address addr) -> void;
auto BindControlEndpoint() -> void;
auto BindDataEndpoint() -> void;
enum class Band { Control, Data };
auto ConnectEndpoint(std::unique_ptr<asiofi::connected_endpoint>& endpoint, Band type) -> void;
// auto ReceiveDataAddressAnnouncement() -> void;
auto SendQueueReader() -> void;
auto RecvControlQueueReader() -> void;
auto OnRecvControl(ofi::unique_ptr<ControlMessage> ctrl) -> void;
auto DataMessageReceived(MessagePtr msg) -> void;
}; /* class Socket */
struct SilentSocketError : SocketError { using SocketError::SocketError; };

View File

@@ -29,8 +29,7 @@ namespace
++gSignalCount;
gLastSignal = signal;
if (gSignalCount > 1)
{
if (gSignalCount > 1) {
std::abort();
}
}
@@ -57,11 +56,18 @@ Control::Control(const string& name, const Plugin::Version version, const string
{
SubscribeToDeviceStateChange([&](DeviceState newState) {
LOG(trace) << "control plugin notified on new state: " << newState;
{
lock_guard<mutex> lock{fEventsMutex};
fEvents.push(newState);
}
fNewEvent.notify_one();
if (newState == DeviceState::Error) {
fPluginShutdownRequested = true;
fDeviceShutdownRequested = true;
// throw DeviceErrorState("Controlled device transitioned to error state.");
}
});
try {
@@ -110,6 +116,25 @@ auto Control::RunStartupSequence() -> void
while (WaitForNextState() != DeviceState::Running) {}
}
auto Control::WaitForNextState() -> DeviceState
{
unique_lock<mutex> lock{fEventsMutex};
while (fEvents.empty()) {
fNewEvent.wait_for(lock, chrono::milliseconds(50));
}
auto result = fEvents.front();
if (result == DeviceState::Error) {
ReleaseDeviceControl();
throw DeviceErrorState("Controlled device transitioned to error state.");
}
fEvents.pop();
return result;
}
auto ControlPluginProgramOptions() -> Plugin::ProgOptions
{
namespace po = boost::program_options;
@@ -164,7 +189,7 @@ try {
bool keepRunning = true;
while (keepRunning) {
if (poll(cinfd, 1, 500)) {
if (poll(cinfd, 1, 100)) {
if (fDeviceShutdownRequested) {
break;
}
@@ -248,6 +273,7 @@ try {
}
if (GetCurrentDeviceState() == DeviceState::Error) {
ReleaseDeviceControl();
throw DeviceErrorState("Controlled device transitioned to error state.");
}
@@ -363,64 +389,38 @@ void Control::PrintStateMachine()
cout << ss.str() << flush;
}
auto Control::WaitForNextState() -> DeviceState
{
unique_lock<mutex> lock{fEventsMutex};
while (fEvents.empty()) {
fNewEvent.wait_for(lock, chrono::milliseconds(50));
}
auto result = fEvents.front();
if (result == DeviceState::Error) {
throw DeviceErrorState("Controlled device transitioned to error state.");
}
fEvents.pop();
return result;
}
auto Control::StaticMode() -> void
try
{
try {
RunStartupSequence();
{
// Wait for next state, which is DeviceState::Ready,
// or for device shutdown request (Ctrl-C)
unique_lock<mutex> lock{fEventsMutex};
while (fEvents.empty() && !fDeviceShutdownRequested)
{
while (fEvents.empty() && !fDeviceShutdownRequested) {
fNewEvent.wait_for(lock, chrono::milliseconds(50));
}
if (fEvents.front() == DeviceState::Error)
{
if (fEvents.front() == DeviceState::Error) {
ReleaseDeviceControl();
throw DeviceErrorState("Controlled device transitioned to error state.");
}
}
RunShutdownSequence();
}
catch (PluginServices::DeviceControlError& e)
{
} catch (PluginServices::DeviceControlError& e) {
// If we are here, it means another plugin has taken control. That's fine, just print the exception message and do nothing else.
LOG(debug) << e.what();
}
catch (DeviceErrorState&)
{
} catch (DeviceErrorState&) {
}
auto Control::SignalHandler() -> void
{
while (gSignalCount == 0 && !fPluginShutdownRequested)
{
while (gSignalCount == 0 && !fPluginShutdownRequested) {
this_thread::sleep_for(chrono::milliseconds(100));
}
if (!fPluginShutdownRequested)
{
if (!fPluginShutdownRequested) {
LOG(info) << "Received device shutdown request (signal " << gLastSignal << ").";
LOG(info) << "Waiting for graceful device shutdown. Hit Ctrl-C again to abort immediately.";
@@ -431,20 +431,14 @@ auto Control::SignalHandler() -> void
if (fControllerThread.joinable()) fControllerThread.join();
}
if (!fDeviceHasShutdown)
{
if (!fDeviceHasShutdown) {
// Take over control and attempt graceful shutdown
StealDeviceControl();
try
{
try {
RunShutdownSequence();
}
catch (PluginServices::DeviceControlError& e)
{
} catch (PluginServices::DeviceControlError& e) {
LOG(info) << "Graceful device shutdown failed: " << e.what() << " If hanging, hit Ctrl-C again to abort immediately.";
}
catch (...)
{
} catch (...) {
LOG(info) << "Graceful device shutdown failed. If hanging, hit Ctrl-C again to abort immediately.";
}
}

View File

@@ -7,8 +7,13 @@
********************************************************************************/
#include <fairmq/tools/Process.h>
#include <fairmq/tools/Strings.h>
#include <boost/process.hpp>
#include <boost/asio.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <signal.h> // kill, signals
#include <iostream>
#include <sstream>
@@ -17,6 +22,28 @@
using namespace std;
namespace bp = boost::process;
namespace ba = boost::asio;
namespace bs = boost::system;
class LinePrinter
{
public:
LinePrinter(stringstream& out, const string& prefix)
: fOut(out)
, fPrefix(prefix)
{}
// prints line with prefix on both cout (thread-safe) and output stream
void Print(const string& line)
{
cout << fair::mq::tools::ToString(fPrefix, line, "\n") << flush;
fOut << fPrefix << line << endl;
}
private:
stringstream& fOut;
const string fPrefix;
};
namespace fair
{
@@ -33,57 +60,111 @@ namespace tools
* @param[in] log_prefix How to prefix each captured output line with
* @return Captured stdout output and exit code
*/
execute_result execute(const string& cmd, const string& prefix, const string& input)
execute_result execute(const string& cmd, const string& prefix, const string& input, int sig)
{
execute_result result;
stringstream out;
// print full line thread-safe
stringstream printCmd;
printCmd << prefix << " " << cmd << "\n";
cout << printCmd.str() << flush;
LinePrinter p(out, prefix);
out << prefix << cmd << endl;
p.Print(cmd);
// Execute command and capture stdout, add prefix line by line
bp::ipstream c_stdout;
bp::opstream c_stdin;
bp::child c(cmd, bp::std_out > c_stdout, bp::std_in < c_stdin);
ba::io_service ios;
while (c.valid() && !c.running()) {
;
}
// containers for std_in
ba::const_buffer inputBuffer(ba::buffer(input));
bp::async_pipe inputPipe(ios);
// containers for std_out
ba::streambuf outputBuffer;
bp::async_pipe outputPipe(ios);
// containers for std_err
ba::streambuf errorBuffer;
bp::async_pipe errorPipe(ios);
if (!c.valid()) {
throw runtime_error("Can't execute the given process.");
}
const string delimiter = "\n";
ba::deadline_timer inputTimer(ios, boost::posix_time::milliseconds(100));
ba::deadline_timer signalTimer(ios, boost::posix_time::milliseconds(100));
// Optionally, write to stdin of the child
// child process
bp::child c(cmd, bp::std_out > outputPipe, bp::std_err > errorPipe, bp::std_in < inputPipe);
int pid = c.id();
p.Print(ToString("fair::mq::tools::execute: pid: ", pid));
// handle std_in with a delay
if (input != "") {
this_thread::sleep_for(chrono::milliseconds(100));
c_stdin << input;
c_stdin.flush();
inputTimer.async_wait([&](const bs::error_code& ec1) {
if (!ec1) {
ba::async_write(inputPipe, inputBuffer, [&](const bs::error_code& ec2, size_t /* n */) {
if (!ec2) {
// inputPipe.async_close();
} else {
p.Print(ToString("error in boost::asio::async_write: ", ec2.message()));
}
});
} else {
p.Print(ToString("error in boost::asio::deadline_timer.async_wait: ", ec1.message()));
}
});
}
string line;
while (c.running() && getline(c_stdout, line)) {
// print full line thread-safe
stringstream printLine;
printLine << prefix << line << "\n";
cout << printLine.str() << flush;
out << prefix << line << "\n";
if (sig != -1) {
signalTimer.async_wait([&](const bs::error_code& ec1) {
if (!ec1) {
kill(pid, sig);
} else {
p.Print(ToString("error in boost::asio::deadline_timer.async_wait: ", ec1.message()));
}
});
}
// handle std_out line by line
function<void(const bs::error_code&, size_t)> onStdOut = [&](const bs::error_code& ec, size_t /* n */) {
if (!ec) {
istream is(&outputBuffer);
string line;
getline(is, line);
p.Print(line);
ba::async_read_until(outputPipe, outputBuffer, delimiter, onStdOut);
} else {
if (ec == ba::error::eof) {
// outputPipe.async_close();
} else {
p.Print(ec.message());
}
}
};
ba::async_read_until(outputPipe, outputBuffer, delimiter, onStdOut);
// handle std_err line by line
function<void(const bs::error_code&, size_t)> onStdErr = [&](const bs::error_code& ec, size_t /* n */) {
if (!ec) {
istream is(&errorBuffer);
string line;
getline(is, line);
p.Print(ToString("error: ", line));
ba::async_read_until(errorPipe, errorBuffer, delimiter, onStdErr);
} else {
if (ec == ba::error::eof) {
// errorPipe.async_close();
} else {
p.Print(ec.message());
}
}
};
ba::async_read_until(errorPipe, errorBuffer, delimiter, onStdErr);
ios.run();
c.wait();
// Capture exit code
result.exit_code = c.exit_code();
out << prefix << " Exit code: " << result.exit_code << endl;
p.Print(ToString("fair::mq::tools::execute: exit code: ", result.exit_code));
result.console_out = out.str();
// Return result
return result;
}

View File

@@ -38,7 +38,8 @@ struct execute_result
*/
execute_result execute(const std::string& cmd,
const std::string& prefix = "",
const std::string& input = "");
const std::string& input = "",
int sig = -1);
} /* namespace tools */
} /* namespace mq */

View File

@@ -105,6 +105,8 @@ add_testsuite(Device
device/_config.cxx
device/_waitfor.cxx
device/_exceptions.cxx
device/_error_state.cxx
device/_signals.cxx
LINKS FairMQ
DEPENDS testhelper_runTestDevice

View File

@@ -38,12 +38,9 @@ class Receiver : public FairMQDevice
auto Run() -> void override
{
auto msg = FairMQMessagePtr{NewMessage()};
if (Receive(msg, fChannelName) >= 0)
{
if (Receive(msg, fChannelName) >= 0) {
LOG(info) << "received empty message";
}
else
{
} else {
LOG(error) << "fair::mq::test::Receiver::Run(): Receive(msg, fChannelName) < 0";
}
};

View File

@@ -38,12 +38,9 @@ class Sender : public FairMQDevice
auto Run() -> void override
{
auto msg = FairMQMessagePtr{NewMessage()};
if (Send(msg, fChannelName) >= 0)
{
if (Send(msg, fChannelName) >= 0) {
LOG(info) << "sent empty message";
}
else
{
} else {
LOG(error) << "fair::mq::test::Sender::Run(): Send(msg, fChannelName) < 0";
}
};

View File

@@ -0,0 +1,121 @@
/********************************************************************************
* Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runner.h"
#include <gtest/gtest.h>
#include <boost/process.hpp>
#include <fairmq/Tools.h>
#include <string>
#include <thread>
#include <iostream>
namespace
{
using namespace std;
using namespace fair::mq::test;
using namespace fair::mq::tools;
void RunErrorStateIn(const string& state, const string& control, const string& input = "")
{
size_t session{fair::mq::tools::UuidHash()};
execute_result result{"", 100};
thread device_thread([&]() {
stringstream cmd;
cmd << runTestDevice
<< " --id error_state_" << state << "_"
<< " --control " << control
<< " --session " << session
<< " --color false";
result = execute(cmd.str(), "[ErrorFound IN " + state + "]", input);
});
device_thread.join();
ASSERT_NE(string::npos, result.console_out.find("going to change to Error state from " + state + "()"));
exit(result.exit_code);
}
TEST(ErrorState, static_InInit)
{
EXPECT_EXIT(RunErrorStateIn("Init", "static"), ::testing::ExitedWithCode(1), "");
}
TEST(ErrorState, static_InBind)
{
EXPECT_EXIT(RunErrorStateIn("Bind", "static"), ::testing::ExitedWithCode(1), "");
}
TEST(ErrorState, static_InConnect)
{
EXPECT_EXIT(RunErrorStateIn("Connect", "static"), ::testing::ExitedWithCode(1), "");
}
TEST(ErrorState, static_InInitTask)
{
EXPECT_EXIT(RunErrorStateIn("InitTask", "static"), ::testing::ExitedWithCode(1), "");
}
TEST(ErrorState, static_InPreRun)
{
EXPECT_EXIT(RunErrorStateIn("PreRun", "static"), ::testing::ExitedWithCode(1), "");
}
TEST(ErrorState, static_InRun)
{
EXPECT_EXIT(RunErrorStateIn("Run", "static"), ::testing::ExitedWithCode(1), "");
}
TEST(ErrorState, static_InPostRun)
{
EXPECT_EXIT(RunErrorStateIn("PostRun", "static"), ::testing::ExitedWithCode(1), "");
}
TEST(ErrorState, static_InResetTask)
{
EXPECT_EXIT(RunErrorStateIn("ResetTask", "static"), ::testing::ExitedWithCode(1), "");
}
TEST(ErrorState, static_InReset)
{
EXPECT_EXIT(RunErrorStateIn("Reset", "static"), ::testing::ExitedWithCode(1), "");
}
TEST(ErrorState, interactive_InInit)
{
EXPECT_EXIT(RunErrorStateIn("Init", "interactive"), ::testing::ExitedWithCode(1), "");
}
TEST(ErrorState, interactive_InBind)
{
EXPECT_EXIT(RunErrorStateIn("Bind", "interactive"), ::testing::ExitedWithCode(1), "");
}
TEST(ErrorState, interactive_InConnect)
{
EXPECT_EXIT(RunErrorStateIn("Connect", "interactive"), ::testing::ExitedWithCode(1), "");
}
TEST(ErrorState, interactive_InInitTask)
{
EXPECT_EXIT(RunErrorStateIn("InitTask", "interactive"), ::testing::ExitedWithCode(1), "");
}
TEST(ErrorState, interactive_InPreRun)
{
EXPECT_EXIT(RunErrorStateIn("PreRun", "interactive"), ::testing::ExitedWithCode(1), "");
}
TEST(ErrorState, interactive_InRun)
{
EXPECT_EXIT(RunErrorStateIn("Run", "interactive"), ::testing::ExitedWithCode(1), "");
}
TEST(ErrorState, interactive_InPostRun)
{
EXPECT_EXIT(RunErrorStateIn("PostRun", "interactive"), ::testing::ExitedWithCode(1), "");
}
TEST(ErrorState, interactive_InResetTask)
{
EXPECT_EXIT(RunErrorStateIn("ResetTask", "interactive", "q"), ::testing::ExitedWithCode(1), "");
}
TEST(ErrorState, interactive_InReset)
{
EXPECT_EXIT(RunErrorStateIn("Reset", "interactive", "q"), ::testing::ExitedWithCode(1), "");
}
} // namespace

View File

@@ -23,7 +23,7 @@ using namespace std;
using namespace fair::mq::test;
using namespace fair::mq::tools;
void RunExceptionIn(const std::string& state, const std::string& input = "")
void RunExceptionIn(const string& state, const string& control, const string& input = "")
{
size_t session{fair::mq::tools::UuidHash()};
@@ -32,7 +32,7 @@ void RunExceptionIn(const std::string& state, const std::string& input = "")
stringstream cmd;
cmd << runTestDevice
<< " --id exceptions_" << state << "_"
<< " --control " << ((input == "") ? "static" : "interactive")
<< " --control " << control
<< " --session " << session
<< " --color false";
result = execute(cmd.str(), "[EXCEPTION IN " + state + "]", input);
@@ -40,86 +40,82 @@ void RunExceptionIn(const std::string& state, const std::string& input = "")
device_thread.join();
ASSERT_NE(std::string::npos, result.console_out.find("exception in " + state + "()"));
ASSERT_NE(string::npos, result.console_out.find("exception in " + state + "()"));
exit(result.exit_code);
}
TEST(Exceptions, static_InInit)
{
EXPECT_EXIT(RunExceptionIn("Init"), ::testing::ExitedWithCode(1), "");
EXPECT_EXIT(RunExceptionIn("Init", "static"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, static_InBind)
{
EXPECT_EXIT(RunExceptionIn("Bind", "static"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, static_InConnect)
{
EXPECT_EXIT(RunExceptionIn("Connect", "static"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, static_InInitTask)
{
EXPECT_EXIT(RunExceptionIn("InitTask"), ::testing::ExitedWithCode(1), "");
EXPECT_EXIT(RunExceptionIn("InitTask", "static"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, static_InPreRun)
{
EXPECT_EXIT(RunExceptionIn("PreRun"), ::testing::ExitedWithCode(1), "");
EXPECT_EXIT(RunExceptionIn("PreRun", "static"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, static_InRun)
{
EXPECT_EXIT(RunExceptionIn("Run"), ::testing::ExitedWithCode(1), "");
EXPECT_EXIT(RunExceptionIn("Run", "static"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, static_InPostRun)
{
EXPECT_EXIT(RunExceptionIn("PostRun"), ::testing::ExitedWithCode(1), "");
EXPECT_EXIT(RunExceptionIn("PostRun", "static"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, static_InResetTask)
{
EXPECT_EXIT(RunExceptionIn("ResetTask"), ::testing::ExitedWithCode(1), "");
EXPECT_EXIT(RunExceptionIn("ResetTask", "static"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, static_InReset)
{
EXPECT_EXIT(RunExceptionIn("Reset"), ::testing::ExitedWithCode(1), "");
EXPECT_EXIT(RunExceptionIn("Reset", "static"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, interactive_InInit)
{
EXPECT_EXIT(RunExceptionIn("Init", "q"), ::testing::ExitedWithCode(1), "");
EXPECT_EXIT(RunExceptionIn("Init", "interactive"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, interactive_InBind)
{
EXPECT_EXIT(RunExceptionIn("Bind", "interactive"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, interactive_InConnect)
{
EXPECT_EXIT(RunExceptionIn("Connect", "interactive"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, interactive_InInitTask)
{
EXPECT_EXIT(RunExceptionIn("InitTask", "q"), ::testing::ExitedWithCode(1), "");
EXPECT_EXIT(RunExceptionIn("InitTask", "interactive"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, interactive_InPreRun)
{
EXPECT_EXIT(RunExceptionIn("PreRun", "q"), ::testing::ExitedWithCode(1), "");
EXPECT_EXIT(RunExceptionIn("PreRun", "interactive"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, interactive_InRun)
{
EXPECT_EXIT(RunExceptionIn("Run", "q"), ::testing::ExitedWithCode(1), "");
EXPECT_EXIT(RunExceptionIn("Run", "interactive"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, interactive_InPostRun)
{
EXPECT_EXIT(RunExceptionIn("PostRun", "q"), ::testing::ExitedWithCode(1), "");
EXPECT_EXIT(RunExceptionIn("PostRun", "interactive"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, interactive_InResetTask)
{
EXPECT_EXIT(RunExceptionIn("ResetTask", "q"), ::testing::ExitedWithCode(1), "");
EXPECT_EXIT(RunExceptionIn("ResetTask", "interactive", "q"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, interactive_InReset)
{
EXPECT_EXIT(RunExceptionIn("Reset", "q"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, interactive_invalid_InInit)
{
EXPECT_EXIT(RunExceptionIn("Init", "_"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, interactive_invalid_InInitTask)
{
EXPECT_EXIT(RunExceptionIn("InitTask", "_"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, interactive_invalid_InPreRun)
{
EXPECT_EXIT(RunExceptionIn("PreRun", "_"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, interactive_invalid_InRun)
{
EXPECT_EXIT(RunExceptionIn("Run", "_"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, interactive_invalid_InPostRun)
{
EXPECT_EXIT(RunExceptionIn("PostRun", "_"), ::testing::ExitedWithCode(1), "");
EXPECT_EXIT(RunExceptionIn("Reset", "interactive", "q"), ::testing::ExitedWithCode(1), "");
}
} // namespace

121
test/device/_signals.cxx Normal file
View File

@@ -0,0 +1,121 @@
/********************************************************************************
* Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runner.h"
#include <gtest/gtest.h>
#include <boost/process.hpp>
#include <fairmq/Tools.h>
#include <string>
#include <thread>
#include <iostream>
namespace
{
using namespace std;
using namespace fair::mq::test;
using namespace fair::mq::tools;
void RunSignalIn(const string& state, const string& control, const string& input = "")
{
size_t session{fair::mq::tools::UuidHash()};
execute_result result{"", 100};
thread device_thread([&]() {
stringstream cmd;
cmd << runTestDevice
<< " --id signals_" << state << "_"
<< " --control " << control
<< " --session " << session
<< " --color false";
result = execute(cmd.str(), "[SIGINT IN " + state + "]", input);
});
device_thread.join();
ASSERT_NE(string::npos, result.console_out.find("raising SIGINT from " + state + "()"));
exit(result.exit_code);
}
TEST(Signal_SIGINT, static_InInit)
{
EXPECT_EXIT(RunSignalIn("Init", "static"), ::testing::ExitedWithCode(0), "");
}
TEST(Signal_SIGINT, static_InBind)
{
EXPECT_EXIT(RunSignalIn("Bind", "static"), ::testing::ExitedWithCode(0), "");
}
TEST(Signal_SIGINT, static_InConnect)
{
EXPECT_EXIT(RunSignalIn("Connect", "static"), ::testing::ExitedWithCode(0), "");
}
TEST(Signal_SIGINT, static_InInitTask)
{
EXPECT_EXIT(RunSignalIn("InitTask", "static"), ::testing::ExitedWithCode(0), "");
}
TEST(Signal_SIGINT, static_InPreRun)
{
EXPECT_EXIT(RunSignalIn("PreRun", "static"), ::testing::ExitedWithCode(0), "");
}
TEST(Signal_SIGINT, static_InRun)
{
EXPECT_EXIT(RunSignalIn("Run", "static"), ::testing::ExitedWithCode(0), "");
}
TEST(Signal_SIGINT, static_InPostRun)
{
EXPECT_EXIT(RunSignalIn("PostRun", "static"), ::testing::ExitedWithCode(0), "");
}
TEST(Signal_SIGINT, static_InResetTask)
{
EXPECT_EXIT(RunSignalIn("ResetTask", "static"), ::testing::ExitedWithCode(0), "");
}
TEST(Signal_SIGINT, static_InReset)
{
EXPECT_EXIT(RunSignalIn("Reset", "static"), ::testing::ExitedWithCode(0), "");
}
TEST(Signal_SIGINT, interactive_InInit)
{
EXPECT_EXIT(RunSignalIn("Init", "interactive"), ::testing::ExitedWithCode(0), "");
}
TEST(Signal_SIGINT, interactive_InBind)
{
EXPECT_EXIT(RunSignalIn("Bind", "interactive"), ::testing::ExitedWithCode(0), "");
}
TEST(Signal_SIGINT, interactive_InConnect)
{
EXPECT_EXIT(RunSignalIn("Connect", "interactive"), ::testing::ExitedWithCode(0), "");
}
TEST(Signal_SIGINT, interactive_InInitTask)
{
EXPECT_EXIT(RunSignalIn("InitTask", "interactive"), ::testing::ExitedWithCode(0), "");
}
TEST(Signal_SIGINT, interactive_InPreRun)
{
EXPECT_EXIT(RunSignalIn("PreRun", "interactive"), ::testing::ExitedWithCode(0), "");
}
TEST(Signal_SIGINT, interactive_InRun)
{
EXPECT_EXIT(RunSignalIn("Run", "interactive"), ::testing::ExitedWithCode(0), "");
}
TEST(Signal_SIGINT, interactive_InPostRun)
{
EXPECT_EXIT(RunSignalIn("PostRun", "interactive"), ::testing::ExitedWithCode(0), "");
}
TEST(Signal_SIGINT, interactive_InResetTask)
{
EXPECT_EXIT(RunSignalIn("ResetTask", "interactive", "q"), ::testing::ExitedWithCode(0), "");
}
TEST(Signal_SIGINT, interactive_InReset)
{
EXPECT_EXIT(RunSignalIn("Reset", "interactive", "q"), ::testing::ExitedWithCode(0), "");
}
} // namespace

View File

@@ -9,66 +9,65 @@
#include "runner.h"
#include <gtest/gtest.h>
#include <boost/process.hpp>
#include <fairmq/Tools.h>
#include <sys/types.h>
#include <signal.h>
#include <signal.h> // kill
#include <string>
#include <thread>
#include <chrono>
#include <iostream>
#include <future> // std::async, std::future
namespace
{
using namespace std;
using namespace fair::mq::test;
using namespace fair::mq::tools;
void RunWaitFor()
void RunWaitFor(const string& state, const string& control)
{
std::mutex mtx;
std::condition_variable cv;
int pid = 0;
int exit_code = 0;
thread deviceThread([&]() {
execute_result result;
thread device_thread([&] {
stringstream cmd;
cmd << runTestDevice << " --id waitfor_" << " --control static " << " --severity nolog";
boost::process::ipstream stdout;
boost::process::child c(cmd.str(), boost::process::std_out > stdout);
string line;
getline(stdout, line);
{
std::lock_guard<std::mutex> lock(mtx);
pid = c.id();
}
cv.notify_one();
c.wait();
exit_code = c.exit_code();
cmd << runTestDevice
<< " --id waitfor_" << state
<< " --control " << control
<< " --session " << UuidHash()
<< " --severity debug"
<< " --color false";
result = execute(cmd.str(), "[WaitFor]", "", SIGINT);
});
{
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [&pid]{ return pid != 0; });
}
device_thread.join();
kill(pid, SIGINT);
ASSERT_NE(string::npos, result.console_out.find("Sleeping Done. Interrupted."));
deviceThread.join();
exit(exit_code);
exit(result.exit_code);
}
TEST(Device, WaitFor)
TEST(WaitFor, static_InPreRun)
{
EXPECT_EXIT(RunWaitFor(), ::testing::ExitedWithCode(0), "");
EXPECT_EXIT(RunWaitFor("PreRun", "static"), ::testing::ExitedWithCode(0), "");
}
TEST(WaitFor, static_InRun)
{
EXPECT_EXIT(RunWaitFor("Run", "static"), ::testing::ExitedWithCode(0), "");
}
TEST(WaitFor, static_InPostRun)
{
EXPECT_EXIT(RunWaitFor("PostRun", "static"), ::testing::ExitedWithCode(0), "");
}
TEST(WaitFor, interactive_InPreRun)
{
EXPECT_EXIT(RunWaitFor("PreRun", "interactive"), ::testing::ExitedWithCode(0), "");
}
TEST(WaitFor, interactive_InRun)
{
EXPECT_EXIT(RunWaitFor("Run", "interactive"), ::testing::ExitedWithCode(0), "");
}
TEST(WaitFor, interactive_InPostRun)
{
EXPECT_EXIT(RunWaitFor("PostRun", "interactive"), ::testing::ExitedWithCode(0), "");
}
} // namespace

View File

@@ -0,0 +1,113 @@
/********************************************************************************
* Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIR_MQ_TEST_ERROR_STATE_H
#define FAIR_MQ_TEST_ERROR_STATE_H
#include <FairMQDevice.h>
#include <FairMQLogger.h>
#include <iostream>
namespace fair
{
namespace mq
{
namespace test
{
class ErrorState : public FairMQDevice
{
public:
void Init() override
{
std::string state("Init");
if (std::string::npos != GetId().find("_" + state + "_")) {
LOG(debug) << "going to change to Error state from " << state << "()";
ChangeState(fair::mq::Transition::ErrorFound);
}
}
void Bind() override
{
std::string state("Bind");
if (std::string::npos != GetId().find("_" + state + "_")) {
LOG(debug) << "going to change to Error state from " << state << "()";
ChangeState(fair::mq::Transition::ErrorFound);
}
}
void Connect() override
{
std::string state("Connect");
if (std::string::npos != GetId().find("_" + state + "_")) {
LOG(debug) << "going to change to Error state from " << state << "()";
ChangeState(fair::mq::Transition::ErrorFound);
}
}
void InitTask() override
{
std::string state("InitTask");
if (std::string::npos != GetId().find("_" + state + "_")) {
LOG(debug) << "going to change to Error state from " << state << "()";
ChangeState(fair::mq::Transition::ErrorFound);
}
}
void PreRun() override
{
std::string state("PreRun");
if (std::string::npos != GetId().find("_" + state + "_")) {
LOG(debug) << "going to change to Error state from " << state << "()";
ChangeState(fair::mq::Transition::ErrorFound);
}
}
void Run() override
{
std::string state("Run");
if (std::string::npos != GetId().find("_" + state + "_")) {
LOG(debug) << "going to change to Error state from " << state << "()";
ChangeState(fair::mq::Transition::ErrorFound);
}
}
void PostRun() override
{
std::string state("PostRun");
if (std::string::npos != GetId().find("_" + state + "_")) {
LOG(debug) << "going to change to Error state from " << state << "()";
ChangeState(fair::mq::Transition::ErrorFound);
}
}
void ResetTask() override
{
std::string state("ResetTask");
if (std::string::npos != GetId().find("_" + state + "_")) {
LOG(debug) << "going to change to Error state from " << state << "()";
ChangeState(fair::mq::Transition::ErrorFound);
}
}
void Reset() override
{
std::string state("Reset");
if (std::string::npos != GetId().find("_" + state + "_")) {
LOG(debug) << "going to change to Error state from " << state << "()";
ChangeState(fair::mq::Transition::ErrorFound);
}
}
};
} // namespace test
} // namespace mq
} // namespace fair
#endif /* FAIR_MQ_TEST_ERROR_STATE_H */

View File

@@ -33,6 +33,22 @@ class Exceptions : public FairMQDevice
}
}
auto Bind() -> void override
{
std::string state("Bind");
if (std::string::npos != GetId().find("_" + state + "_")) {
throw std::runtime_error("exception in " + state + "()");
}
}
auto Connect() -> void override
{
std::string state("Connect");
if (std::string::npos != GetId().find("_" + state + "_")) {
throw std::runtime_error("exception in " + state + "()");
}
}
auto InitTask() -> void override
{
std::string state("InitTask");

View File

@@ -57,6 +57,7 @@ class PairLeft : public FairMQDevice
if (counter == 6) LOG(info) << "Simple message with short text data successfull";
assert(counter == 6);
if (counter == 6) LOG(info) << "PAIR test successfull.";
};
};

View File

@@ -55,9 +55,6 @@ class PairRight : public FairMQDevice
auto msg6(NewSimpleMessageFor("data", 0, "testdata1234"));
if (Send(msg6, "data") >= 0) counter++;
if (counter == 6) LOG(info) << "Simple message with short text data successfull";
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
if (counter == 6) LOG(info) << "PAIR test successfull.";
};
};
@@ -65,4 +62,4 @@ class PairRight : public FairMQDevice
} // namespace mq
} // namespace fair
#endif /* FAIR_MQ_TEST_PAIRRIGHT_H */
#endif /* FAIR_MQ_TEST_PAIRRIGHT_H */

View File

@@ -0,0 +1,112 @@
/********************************************************************************
* Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIR_MQ_TEST_SIGNALS_H
#define FAIR_MQ_TEST_SIGNALS_H
#include <FairMQDevice.h>
#include <FairMQLogger.h>
#include <iostream>
#include <csignal>
namespace fair
{
namespace mq
{
namespace test
{
class Signals : public FairMQDevice
{
public:
void Init() override
{
std::string state("Init");
if (std::string::npos != GetId().find("_" + state + "_")) {
LOG(debug) << "raising SIGINT from " << state << "()";
raise(SIGINT);
}
}
void Bind() override
{
std::string state("Bind");
if (std::string::npos != GetId().find("_" + state + "_")) {
LOG(debug) << "raising SIGINT from " << state << "()";
raise(SIGINT);
}
}
void Connect() override
{
std::string state("Connect");
if (std::string::npos != GetId().find("_" + state + "_")) {
LOG(debug) << "raising SIGINT from " << state << "()";
raise(SIGINT);
}
}
void InitTask() override
{
std::string state("InitTask");
if (std::string::npos != GetId().find("_" + state + "_")) {
LOG(debug) << "raising SIGINT from " << state << "()";
raise(SIGINT);
}
}
void PreRun() override
{
std::string state("PreRun");
if (std::string::npos != GetId().find("_" + state + "_")) {
LOG(debug) << "raising SIGINT from " << state << "()";
raise(SIGINT);
}
}
void Run() override
{
std::string state("Run");
if (std::string::npos != GetId().find("_" + state + "_")) {
LOG(debug) << "raising SIGINT from " << state << "()";
raise(SIGINT);
}
}
void PostRun() override
{
std::string state("PostRun");
if (std::string::npos != GetId().find("_" + state + "_")) {
LOG(debug) << "raising SIGINT from " << state << "()";
raise(SIGINT);
}
}
void ResetTask() override
{
std::string state("ResetTask");
if (std::string::npos != GetId().find("_" + state + "_")) {
LOG(debug) << "raising SIGINT from " << state << "()";
raise(SIGINT);
}
}
void Reset() override
{
std::string state("Reset");
if (std::string::npos != GetId().find("_" + state + "_")) {
LOG(debug) << "raising SIGINT from " << state << "()";
raise(SIGINT);
}
}
};
} // namespace test
} // namespace mq
} // namespace fair
#endif /* FAIR_MQ_TEST_SIGNALS_H */

View File

@@ -24,10 +24,34 @@ namespace test
class TestWaitFor : public FairMQDevice
{
public:
void Run()
void PreRun() override
{
std::cout << "hello" << std::endl;
WaitFor(std::chrono::seconds(60));
std::string state("PreRun");
if (std::string::npos != GetId().find("_" + state)) {
LOG(info) << "Going to sleep via WaitFor() in " << state << "...";
bool result = WaitFor(std::chrono::seconds(60));
LOG(info) << (result == true ? "Sleeping Done. Not interrupted." : "Sleeping Done. Interrupted.");
}
}
void Run() override
{
std::string state("Run");
if (std::string::npos != GetId().find("_" + state)) {
LOG(info) << "Going to sleep via WaitFor() in " << state << "...";
bool result = WaitFor(std::chrono::seconds(60));
LOG(info) << (result == true ? "Sleeping Done. Not interrupted." : "Sleeping Done. Interrupted.");
}
}
void PostRun() override
{
std::string state("PostRun");
if (std::string::npos != GetId().find("_" + state)) {
LOG(info) << "Going to sleep via WaitFor() in " << state << "...";
bool result = WaitFor(std::chrono::seconds(60));
LOG(info) << (result == true ? "Sleeping Done. Not interrupted." : "Sleeping Done. Interrupted.");
}
}
};

View File

@@ -19,6 +19,8 @@
#include "devices/TestTransferTimeout.h"
#include "devices/TestWaitFor.h"
#include "devices/TestExceptions.h"
#include "devices/TestErrorState.h"
#include "devices/TestSignals.h"
#include <runFairMQDevice.h>
@@ -40,60 +42,38 @@ auto getDevice(const FairMQProgOptions& config) -> FairMQDevicePtr
using namespace fair::mq::test;
auto id = config.GetValue<std::string>("id");
if (0 == id.find("pull_"))
{
if (0 == id.find("pull_")) {
return new Pull;
}
else if (0 == id.find("push_"))
{
} else if (0 == id.find("push_")) {
return new Push;
}
else if (0 == id.find("sub_"))
{
} else if (0 == id.find("sub_")) {
return new Sub;
}
else if (0 == id.find("pub_"))
{
} else if (0 == id.find("pub_")) {
return new Pub;
}
else if (0 == id.find("req_"))
{
} else if (0 == id.find("req_")) {
return new Req;
}
else if (0 == id.find("rep_"))
{
} else if (0 == id.find("rep_")) {
return new Rep;
}
else if (0 == id.find("transfer_timeout_"))
{
} else if (0 == id.find("transfer_timeout_")) {
return new TransferTimeout;
}
else if (0 == id.find("pollout_"))
{
} else if (0 == id.find("pollout_")) {
return new PollOut;
}
else if (0 == id.find("pollin_"))
{
} else if (0 == id.find("pollin_")) {
return new PollIn;
}
else if (0 == id.find("pairleft_"))
{
} else if (0 == id.find("pairleft_")) {
return new PairLeft;
}
else if (0 == id.find("pairright_"))
{
} else if (0 == id.find("pairright_")) {
return new PairRight;
}
else if (0 == id.find("waitfor_"))
{
} else if (0 == id.find("waitfor_")) {
return new TestWaitFor;
}
else if (0 == id.find("exceptions_"))
{
} else if (0 == id.find("exceptions_")) {
return new Exceptions;
}
else
{
} else if (0 == id.find("error_state_")) {
return new ErrorState;
} else if (0 == id.find("signals_")) {
return new Signals;
} else {
cerr << "Don't know id '" << id << "'" << endl;
return nullptr;
}