mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-15 09:31:45 +00:00
Compare commits
23 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
3582091b1c | ||
|
2457094b6c | ||
|
54b7742d85 | ||
|
195644f132 | ||
|
f17dade8f8 | ||
|
cc8fd73025 | ||
|
90fdcc26bb | ||
|
b45e4da2a9 | ||
|
a1b7efa2f4 | ||
|
6ee7e5fbf0 | ||
|
99ffb732f4 | ||
|
6809d60fad | ||
|
ef4d6a3310 | ||
|
696257fd4f | ||
|
cdc1ba084c | ||
|
922f7e9a92 | ||
|
a8f1a4dfdb | ||
|
fb42b1e2f0 | ||
|
1a00f3edbd | ||
|
74881d27e3 | ||
|
dd02c01c36 | ||
|
44a9946ea6 | ||
|
95ec56dcf0 |
@@ -61,14 +61,12 @@ endif()
|
||||
|
||||
if(BUILD_OFI_TRANSPORT)
|
||||
find_package2(PRIVATE asiofi REQUIRED
|
||||
VERSION 0.2.0
|
||||
VERSION 0.3.1
|
||||
)
|
||||
find_package2(PRIVATE OFI REQUIRED
|
||||
VERSION ${asiofi_OFI_VERSION}
|
||||
COMPONENTS ${asiofi_OFI_COMPONENTS}
|
||||
)
|
||||
find_package2(PRIVATE AZMQ REQUIRED)
|
||||
set(PROJECT_AZMQ_VERSION 1.0.2)
|
||||
endif()
|
||||
|
||||
if(BUILD_NANOMSG_TRANSPORT)
|
||||
@@ -194,11 +192,6 @@ if(BUILD_DDS_PLUGIN)
|
||||
DESTINATION ${PROJECT_INSTALL_CMAKEMODDIR}
|
||||
)
|
||||
endif()
|
||||
if(BUILD_OFI_TRANSPORT)
|
||||
install(FILES cmake/FindAZMQ.cmake
|
||||
DESTINATION ${PROJECT_INSTALL_CMAKEMODDIR}
|
||||
)
|
||||
endif()
|
||||
if(BUILD_DOCS)
|
||||
install(DIRECTORY ${CMAKE_BINARY_DIR}/doxygen/html
|
||||
DESTINATION ${PROJECT_INSTALL_DATADIR}/docs
|
||||
|
@@ -38,7 +38,7 @@ a simulation, reconstruction and analysis framework.
|
||||
* PUBLIC: [**Boost**](https://www.boost.org/), [**FairLogger**](https://github.com/FairRootGroup/FairLogger)
|
||||
* BUILD: [CMake](https://cmake.org/), [GTest](https://github.com/google/googletest), [Doxygen](http://www.doxygen.org/)
|
||||
* PRIVATE: [ZeroMQ](http://zeromq.org/), [Msgpack](https://msgpack.org/index.html), [nanomsg](http://nanomsg.org/),
|
||||
[asiofi](https://github.com/FairRootGroup/asiofi), [DDS](http://dds.gsi.de), [PMIx](https://pmix.org/), [AZMQ](https://github.com/zeromq/azmq), [asiofi](https://github.com/FairRootGroup/asiofi)
|
||||
[asiofi](https://github.com/FairRootGroup/asiofi), [DDS](http://dds.gsi.de), [PMIx](https://pmix.org/)
|
||||
|
||||
Supported platforms: Linux and MacOS.
|
||||
|
||||
@@ -51,7 +51,7 @@ cmake -DCMAKE_INSTALL_PREFIX=./fairmq_install ../fairmq
|
||||
cmake --build . --target install
|
||||
```
|
||||
|
||||
If dependencies are not installed in standard system directories, you can hint the installation location via `-DCMAKE_PREFIX_PATH=...` or per dependency via `-D{DEPENDENCY}_ROOT=...`. `{DEPENDENCY}` can be `GTEST`, `BOOST`, `FAIRLOGGER`, `ZEROMQ`, `MSGPACK`, `NANOMSG`, `OFI`, `PMIX`, `ASIOFI`, `AZMQ` or `DDS` (`*_ROOT` variables can also be environment variables).
|
||||
If dependencies are not installed in standard system directories, you can hint the installation location via `-DCMAKE_PREFIX_PATH=...` or per dependency via `-D{DEPENDENCY}_ROOT=...`. `{DEPENDENCY}` can be `GTEST`, `BOOST`, `FAIRLOGGER`, `ZEROMQ`, `MSGPACK`, `NANOMSG`, `OFI`, `PMIX`, `ASIOFI` or `DDS` (`*_ROOT` variables can also be environment variables).
|
||||
|
||||
## Usage
|
||||
|
||||
|
@@ -300,7 +300,6 @@ macro(find_package2 qualifier pkgname)
|
||||
set(old_CPP ${CMAKE_PREFIX_PATH})
|
||||
set(CMAKE_PREFIX_PATH ${${pkgname_upper}_ROOT} $ENV{${pkgname_upper}_ROOT} ${CMAKE_PREFIX_PATH})
|
||||
unset(__version__)
|
||||
unset(__components__)
|
||||
if(ARGS_VERSION)
|
||||
list(GET ARGS_VERSION 0 __version__)
|
||||
list(LENGTH ARGS_VERSION __length__)
|
||||
@@ -322,22 +321,21 @@ macro(find_package2 qualifier pkgname)
|
||||
if(${pkgname}_FOUND)
|
||||
if(${qualifier} STREQUAL PRIVATE)
|
||||
set(PROJECT_${pkgname}_VERSION ${__version__})
|
||||
set(PROJECT_${pkgname}_COMPONENTS ${__components__})
|
||||
set(PROJECT_${pkgname}_COMPONENTS ${ARGS_COMPONENTS})
|
||||
set(PROJECT_PACKAGE_DEPENDENCIES ${PROJECT_PACKAGE_DEPENDENCIES} ${pkgname})
|
||||
elseif(${qualifier} STREQUAL PUBLIC)
|
||||
set(PROJECT_${pkgname}_VERSION ${__version__})
|
||||
set(PROJECT_${pkgname}_COMPONENTS ${__components__})
|
||||
set(PROJECT_${pkgname}_COMPONENTS ${ARGS_COMPONENTS})
|
||||
set(PROJECT_PACKAGE_DEPENDENCIES ${PROJECT_PACKAGE_DEPENDENCIES} ${pkgname})
|
||||
set(PROJECT_INTERFACE_${pkgname}_VERSION ${__version__})
|
||||
set(PROJECT_INTERFACE_${pkgname}_COMPONENTS ${__components__})
|
||||
set(PROJECT_INTERFACE_${pkgname}_COMPONENTS ${ARGS_COMPONENTS})
|
||||
set(PROJECT_INTERFACE_PACKAGE_DEPENDENCIES ${PROJECT_INTERFACE_PACKAGE_DEPENDENCIES} ${pkgname})
|
||||
elseif(${qualifier} STREQUAL INTERFACE)
|
||||
set(PROJECT_INTERFACE_${pkgname}_VERSION ${__version__})
|
||||
set(PROJECT_INTERFACE_${pkgname}_COMPONENTS ${__components__})
|
||||
set(PROJECT_INTERFACE_${pkgname}_COMPONENTS ${ARGS_COMPONENTS})
|
||||
set(PROJECT_INTERFACE_PACKAGE_DEPENDENCIES ${PROJECT_INTERFACE_PACKAGE_DEPENDENCIES} ${pkgname})
|
||||
endif()
|
||||
endif()
|
||||
|
||||
unset(__components__)
|
||||
unset(__version__)
|
||||
endmacro()
|
||||
|
@@ -1,57 +0,0 @@
|
||||
################################################################################
|
||||
# Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
|
||||
# #
|
||||
# This software is distributed under the terms of the #
|
||||
# GNU Lesser General Public Licence (LGPL) version 3, #
|
||||
# copied verbatim in the file "LICENSE" #
|
||||
################################################################################
|
||||
#
|
||||
# ###########################
|
||||
# # Locate the AZMQ library #
|
||||
# ###########################
|
||||
#
|
||||
#
|
||||
# Usage:
|
||||
#
|
||||
# find_package(AZMQ [version] [QUIET] [REQUIRED])
|
||||
#
|
||||
#
|
||||
# Defines the following variables:
|
||||
#
|
||||
# AZMQ_FOUND - Found the ZeroMQ library
|
||||
# AZMQ_INCLUDE_DIR (CMake cache) - Include directory
|
||||
#
|
||||
#
|
||||
# Accepts the following variables as hints for installation directories:
|
||||
#
|
||||
# AZMQ_ROOT (CMake var, ENV var)
|
||||
#
|
||||
#
|
||||
# If the above variables are not defined, or if ZeroMQ could not be found there,
|
||||
# it will look for it in the system directories. Custom ZeroMQ installations
|
||||
# will always have priority over system ones.
|
||||
#
|
||||
|
||||
if(NOT AZMQ_ROOT)
|
||||
set(AZMQ_ROOT $ENV{AZMQ_ROOT})
|
||||
endif()
|
||||
|
||||
find_path(AZMQ_INCLUDE_DIR
|
||||
NAMES azmq/socket.hpp
|
||||
HINTS ${AZMQ_ROOT} $ENV{AZMQ_ROOT}
|
||||
PATH_SUFFIXES include
|
||||
DOC "AZMQ include directory"
|
||||
)
|
||||
|
||||
include(FindPackageHandleStandardArgs)
|
||||
find_package_handle_standard_args(AZMQ
|
||||
REQUIRED_VARS AZMQ_INCLUDE_DIR
|
||||
)
|
||||
|
||||
if(AZMQ_FOUND AND NOT TARGET AZMQ::AZMQ)
|
||||
add_library(AZMQ::AZMQ INTERFACE IMPORTED)
|
||||
set_target_properties(AZMQ::AZMQ PROPERTIES
|
||||
INTERFACE_INCLUDE_DIRECTORIES ${AZMQ_INCLUDE_DIR}
|
||||
INTERFACE_LINK_LIBRARIES "libzmq;Boost::boost;Boost::container;Boost::system"
|
||||
)
|
||||
endif()
|
@@ -32,15 +32,20 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-multipart.sh.in ${CMA
|
||||
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test-ex-multipart.sh.in ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh)
|
||||
|
||||
add_test(NAME Example.Multipart.zeromq COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh zeromq)
|
||||
set_tests_properties(Example.Multipart.zeromq PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 2 parts")
|
||||
set_tests_properties(Example.Multipart.zeromq PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 5 parts")
|
||||
|
||||
if(BUILD_NANOMSG_TRANSPORT)
|
||||
add_test(NAME Example.Multipart.nanomsg COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh nanomsg)
|
||||
set_tests_properties(Example.Multipart.nanomsg PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 2 parts")
|
||||
set_tests_properties(Example.Multipart.nanomsg PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 5 parts")
|
||||
endif()
|
||||
|
||||
add_test(NAME Example.Multipart.shmem COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh shmem)
|
||||
set_tests_properties(Example.Multipart.shmem PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 2 parts")
|
||||
set_tests_properties(Example.Multipart.shmem PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 5 parts")
|
||||
|
||||
if(BUILD_OFI_TRANSPORT)
|
||||
add_test(NAME Example.Multipart.ofi COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh ofi)
|
||||
set_tests_properties(Example.Multipart.ofi PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 5 parts")
|
||||
endif()
|
||||
|
||||
# install
|
||||
|
||||
|
@@ -53,6 +53,13 @@ bool Sampler::ConditionalRun()
|
||||
parts.AddPart(NewSimpleMessage(header));
|
||||
parts.AddPart(NewMessage(1000));
|
||||
|
||||
// create more data parts, testing the FairMQParts in-place constructor
|
||||
FairMQParts auxData{ NewMessage(500), NewMessage(600), NewMessage(700) };
|
||||
assert(auxData.Size() == 3);
|
||||
parts.AddPart(std::move(auxData));
|
||||
assert(auxData.Size() == 0);
|
||||
assert(parts.Size() == 5);
|
||||
|
||||
LOG(info) << "Sending body of size: " << parts.At(1)->GetSize();
|
||||
|
||||
Send(parts, "data");
|
||||
@@ -74,4 +81,4 @@ Sampler::~Sampler()
|
||||
{
|
||||
}
|
||||
|
||||
} // namespace example_multipart
|
||||
} // namespace example_multipart
|
||||
|
@@ -20,7 +20,7 @@ SAMPLER+=" --verbosity veryhigh"
|
||||
SAMPLER+=" --session $SESSION"
|
||||
SAMPLER+=" --max-iterations 1"
|
||||
SAMPLER+=" --control static --color false"
|
||||
SAMPLER+=" --channel-config name=data,type=push,method=connect,rateLogging=0,address=tcp://127.0.0.1:5555"
|
||||
SAMPLER+=" --channel-config name=data,type=pair,method=connect,rateLogging=0,address=tcp://127.0.0.1:5555"
|
||||
@CMAKE_CURRENT_BINARY_DIR@/$SAMPLER &
|
||||
SAMPLER_PID=$!
|
||||
|
||||
@@ -30,7 +30,7 @@ SINK+=" --transport $transport"
|
||||
SINK+=" --verbosity veryhigh"
|
||||
SINK+=" --session $SESSION"
|
||||
SINK+=" --control static --color false"
|
||||
SINK+=" --channel-config name=data,type=pull,method=bind,rateLogging=0,address=tcp://127.0.0.1:5555"
|
||||
SINK+=" --channel-config name=data,type=pair,method=bind,rateLogging=0,address=tcp://127.0.0.1:5555"
|
||||
@CMAKE_CURRENT_BINARY_DIR@/$SINK &
|
||||
SINK_PID=$!
|
||||
|
||||
|
@@ -224,7 +224,6 @@ if(BUILD_OFI_TRANSPORT)
|
||||
set(OFI_DEPS
|
||||
asiofi::asiofi
|
||||
Boost::container
|
||||
AZMQ::AZMQ
|
||||
)
|
||||
endif()
|
||||
set(optional_deps ${NANOMSG_DEPS} ${OFI_DEPS})
|
||||
|
@@ -345,6 +345,11 @@ class FairMQChannel
|
||||
return Transport()->NewStaticMessage(data);
|
||||
}
|
||||
|
||||
FairMQUnmanagedRegionPtr NewUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr)
|
||||
{
|
||||
return Transport()->CreateUnmanagedRegion(size, callback);
|
||||
}
|
||||
|
||||
private:
|
||||
std::shared_ptr<FairMQTransportFactory> fTransportFactory;
|
||||
fair::mq::Transport fTransportType;
|
||||
|
@@ -191,50 +191,58 @@ class FairMQDevice
|
||||
return fTransportFactory.get();
|
||||
}
|
||||
|
||||
// creates message with the default device transport
|
||||
template<typename... Args>
|
||||
FairMQMessagePtr NewMessage(Args&&... args)
|
||||
{
|
||||
return Transport()->CreateMessage(std::forward<Args>(args)...);
|
||||
}
|
||||
|
||||
// creates message with the transport of the specified channel
|
||||
template<typename... Args>
|
||||
FairMQMessagePtr NewMessageFor(const std::string& channel, int index, Args&&... args)
|
||||
{
|
||||
return GetChannel(channel, index).NewMessage(std::forward<Args>(args)...);
|
||||
}
|
||||
|
||||
// creates a message that will not be cleaned up after transfer, with the default device transport
|
||||
template<typename T>
|
||||
FairMQMessagePtr NewStaticMessage(const T& data)
|
||||
{
|
||||
return Transport()->NewStaticMessage(data);
|
||||
}
|
||||
|
||||
// creates a message that will not be cleaned up after transfer, with the transport of the specified channel
|
||||
template<typename T>
|
||||
FairMQMessagePtr NewStaticMessageFor(const std::string& channel, int index, const T& data)
|
||||
{
|
||||
return GetChannel(channel, index).NewStaticMessage(data);
|
||||
}
|
||||
|
||||
// creates a message with a copy of the provided data, with the default device transport
|
||||
template<typename T>
|
||||
FairMQMessagePtr NewSimpleMessage(const T& data)
|
||||
{
|
||||
return Transport()->NewSimpleMessage(data);
|
||||
}
|
||||
|
||||
// creates a message with a copy of the provided data, with the transport of the specified channel
|
||||
template<typename T>
|
||||
FairMQMessagePtr NewSimpleMessageFor(const std::string& channel, int index, const T& data)
|
||||
{
|
||||
return GetChannel(channel, index).NewSimpleMessage(data);
|
||||
}
|
||||
|
||||
FairMQUnmanagedRegionPtr NewUnmanagedRegion(const size_t size)
|
||||
// creates unamanaged region with the default device transport
|
||||
FairMQUnmanagedRegionPtr NewUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr)
|
||||
{
|
||||
return Transport()->CreateUnmanagedRegion(size);
|
||||
return Transport()->CreateUnmanagedRegion(size, callback);
|
||||
}
|
||||
|
||||
// creates unmanaged region with the transport of the specified channel
|
||||
FairMQUnmanagedRegionPtr NewUnmanagedRegionFor(const std::string& channel, int index, const size_t size, FairMQRegionCallback callback = nullptr)
|
||||
{
|
||||
return GetChannel(channel, index).Transport()->CreateUnmanagedRegion(size, callback);
|
||||
return GetChannel(channel, index).NewUnmanagedRegion(size, callback);
|
||||
}
|
||||
|
||||
template<typename ...Ts>
|
||||
@@ -422,7 +430,7 @@ class FairMQDevice
|
||||
template<class Rep, class Period>
|
||||
bool WaitFor(std::chrono::duration<Rep, Period> const& duration)
|
||||
{
|
||||
return fStateMachine.WaitForPendingStateFor(std::chrono::duration_cast<std::chrono::milliseconds>(duration).count());
|
||||
return !fStateMachine.WaitForPendingStateFor(std::chrono::duration_cast<std::chrono::milliseconds>(duration).count());
|
||||
}
|
||||
|
||||
protected:
|
||||
|
@@ -31,6 +31,9 @@ class FairMQParts
|
||||
FairMQParts(FairMQParts&& p) = default;
|
||||
/// Assignment operator
|
||||
FairMQParts& operator=(const FairMQParts&) = delete;
|
||||
/// Constructor from argument pack of std::unique_ptr<FairMQMessage> rvalues
|
||||
template <typename... Ts>
|
||||
FairMQParts(Ts&&... messages) : fParts() {AddPart(std::forward<Ts>(messages)...);}
|
||||
/// Default destructor
|
||||
~FairMQParts() {};
|
||||
|
||||
@@ -41,14 +44,6 @@ class FairMQParts
|
||||
fParts.push_back(std::unique_ptr<FairMQMessage>(msg));
|
||||
}
|
||||
|
||||
/// Adds part (std::unique_ptr<FairMQMessage>&) to the container (move)
|
||||
/// @param msg unique pointer to FairMQMessage
|
||||
/// lvalue ref (move not required when passing argument)
|
||||
// inline void AddPart(std::unique_ptr<FairMQMessage>& msg)
|
||||
// {
|
||||
// fParts.push_back(std::move(msg));
|
||||
// }
|
||||
|
||||
/// Adds part (std::unique_ptr<FairMQMessage>&) to the container (move)
|
||||
/// @param msg unique pointer to FairMQMessage
|
||||
/// rvalue ref (move required when passing argument)
|
||||
@@ -57,6 +52,23 @@ class FairMQParts
|
||||
fParts.push_back(std::move(msg));
|
||||
}
|
||||
|
||||
/// Add variable list of parts to the container (move)
|
||||
template <typename... Ts>
|
||||
void AddPart(std::unique_ptr<FairMQMessage>&& first, Ts&&... remaining)
|
||||
{
|
||||
AddPart(std::move(first));
|
||||
AddPart(std::forward<Ts>(remaining)...);
|
||||
}
|
||||
|
||||
/// Add content of another object by move
|
||||
void AddPart(FairMQParts&& other)
|
||||
{
|
||||
container parts = std::move(other.fParts);
|
||||
for (auto& part : parts) {
|
||||
fParts.push_back(std::move(part));
|
||||
}
|
||||
}
|
||||
|
||||
/// Get reference to part in the container at index (without bounds check)
|
||||
/// @param index container index
|
||||
FairMQMessage& operator[](const int index) { return *(fParts[index]); }
|
||||
|
@@ -28,6 +28,7 @@
|
||||
#include <array>
|
||||
#include <unordered_map>
|
||||
#include <mutex>
|
||||
#include <stdexcept>
|
||||
|
||||
using namespace std;
|
||||
using namespace boost::msm;
|
||||
@@ -167,7 +168,6 @@ struct Machine_ : public state_machine_def<Machine_>
|
||||
Machine_()
|
||||
: fLastTransitionResult(true)
|
||||
, fNewStatePending(false)
|
||||
, fWorkOngoing(false)
|
||||
{}
|
||||
|
||||
virtual ~Machine_() {}
|
||||
@@ -258,9 +258,7 @@ struct Machine_ : public state_machine_def<Machine_>
|
||||
|
||||
mutex fStateMtx;
|
||||
atomic<bool> fNewStatePending;
|
||||
atomic<bool> fWorkOngoing;
|
||||
condition_variable fNewStatePendingCV;
|
||||
condition_variable fWorkDoneCV;
|
||||
|
||||
boost::signals2::signal<void(const State)> fStateChangeSignal;
|
||||
boost::signals2::signal<void(const State)> fStateHandleSignal;
|
||||
@@ -283,7 +281,6 @@ struct Machine_ : public state_machine_def<Machine_>
|
||||
LOG(state) << fState << " ---> " << fNewState;
|
||||
fState = static_cast<State>(fNewState);
|
||||
fNewStatePending = false;
|
||||
fWorkOngoing = true;
|
||||
|
||||
if (fState == State::Exiting || fState == State::Error) {
|
||||
stop = true;
|
||||
@@ -292,12 +289,10 @@ struct Machine_ : public state_machine_def<Machine_>
|
||||
|
||||
CallStateChangeCallbacks(fState);
|
||||
CallStateHandler(fState);
|
||||
}
|
||||
|
||||
{
|
||||
lock_guard<mutex> lock(fStateMtx);
|
||||
fWorkOngoing = false;
|
||||
fWorkDoneCV.notify_one();
|
||||
}
|
||||
if (fState == State::Error) {
|
||||
throw StateMachine::ErrorStateException("Device transitioned to error state");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -463,13 +458,15 @@ void StateMachine::ProcessWork()
|
||||
try {
|
||||
fsm->CallStateChangeCallbacks(State::Idle);
|
||||
fsm->ProcessWork();
|
||||
} catch(ErrorStateException& ese) {
|
||||
LOG(trace) << "ErrorStateException caught in ProcessWork(), rethrowing";
|
||||
throw;
|
||||
} catch(...) {
|
||||
LOG(debug) << "Exception caught in ProcessWork(), going to Error state and rethrowing";
|
||||
{
|
||||
lock_guard<mutex> lock(fsm->fStateMtx);
|
||||
fsm->fState = State::Error;
|
||||
fsm->CallStateChangeCallbacks(State::Error);
|
||||
fsm->fWorkOngoing = false;
|
||||
fsm->fWorkDoneCV.notify_one();
|
||||
}
|
||||
ChangeState(Transition::ErrorFound);
|
||||
throw;
|
||||
@@ -480,3 +477,4 @@ string StateMachine::GetStateName(const State state) { return stateNames.at(stat
|
||||
string StateMachine::GetTransitionName(const Transition transition) { return transitionNames.at(static_cast<int>(transition)); }
|
||||
State StateMachine::GetState(const string& state) { return stateNumbers.at(state); }
|
||||
Transition StateMachine::GetTransition(const string& transition) { return transitionNumbers.at(transition); }
|
||||
|
||||
|
@@ -94,6 +94,8 @@ class StateMachine
|
||||
static State GetState(const std::string& state);
|
||||
static Transition GetTransition(const std::string& transition);
|
||||
|
||||
struct ErrorStateException : std::runtime_error { using std::runtime_error::runtime_error; };
|
||||
|
||||
private:
|
||||
std::shared_ptr<void> fFsm;
|
||||
};
|
||||
|
@@ -37,6 +37,7 @@ Context::Context(FairMQTransportFactory& sendFactory,
|
||||
: fIoWork(fIoContext)
|
||||
, fReceiveFactory(receiveFactory)
|
||||
, fSendFactory(sendFactory)
|
||||
, fSizeHint(0)
|
||||
{
|
||||
InitThreadPool(numberIoThreads);
|
||||
}
|
||||
@@ -47,15 +48,24 @@ auto Context::InitThreadPool(int numberIoThreads) -> void
|
||||
|
||||
for (int i = 1; i <= numberIoThreads; ++i) {
|
||||
fThreadPool.emplace_back([&, i, numberIoThreads]{
|
||||
LOG(debug) << "OFI transport: I/O thread #" << i << " of " << numberIoThreads << " started";
|
||||
fIoContext.run();
|
||||
LOG(debug) << "OFI transport: I/O thread #" << i << " of " << numberIoThreads << " stopped";
|
||||
try {
|
||||
LOG(debug) << "OFI transport: I/O thread #" << i << " of " << numberIoThreads << " started";
|
||||
fIoContext.run();
|
||||
LOG(debug) << "OFI transport: I/O thread #" << i << " of " << numberIoThreads << " stopped";
|
||||
} catch (const std::exception& e) {
|
||||
LOG(error) << "OFI transport: Uncaught exception in I/O thread #" << i << ": " << e.what();
|
||||
} catch (...) {
|
||||
LOG(error) << "OFI transport: Uncaught exception in I/O thread #" << i;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
auto Context::Reset() -> void
|
||||
{
|
||||
// TODO "Linger", rethink this
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
|
||||
|
||||
fIoContext.stop();
|
||||
}
|
||||
|
||||
|
@@ -72,6 +72,8 @@ class Context
|
||||
auto Reset() -> void;
|
||||
auto MakeReceiveMessage(size_t size) -> MessagePtr;
|
||||
auto MakeSendMessage(size_t size) -> MessagePtr;
|
||||
auto GetSizeHint() -> size_t { return fSizeHint; }
|
||||
auto SetSizeHint(size_t size) -> void { fSizeHint = size; }
|
||||
|
||||
private:
|
||||
boost::asio::io_context fIoContext;
|
||||
@@ -79,6 +81,7 @@ class Context
|
||||
std::vector<std::thread> fThreadPool;
|
||||
FairMQTransportFactory& fReceiveFactory;
|
||||
FairMQTransportFactory& fSendFactory;
|
||||
size_t fSizeHint;
|
||||
|
||||
auto InitThreadPool(int numberIoThreads) -> void;
|
||||
}; /* class Context */
|
||||
|
@@ -35,59 +35,76 @@ namespace ofi {
|
||||
|
||||
enum class ControlMessageType
|
||||
{
|
||||
DataAddressAnnouncement = 1,
|
||||
Empty = 1,
|
||||
PostBuffer,
|
||||
PostBufferAcknowledgement
|
||||
PostMultiPartStartBuffer
|
||||
};
|
||||
|
||||
struct Empty
|
||||
{};
|
||||
|
||||
struct PostBuffer
|
||||
{
|
||||
uint64_t size; // buffer size (size_t)
|
||||
};
|
||||
|
||||
struct PostMultiPartStartBuffer
|
||||
{
|
||||
uint32_t numParts; // buffer size (size_t)
|
||||
uint64_t size; // buffer size (size_t)
|
||||
};
|
||||
|
||||
union ControlMessageContent
|
||||
{
|
||||
PostBuffer postBuffer;
|
||||
PostMultiPartStartBuffer postMultiPartStartBuffer;
|
||||
};
|
||||
|
||||
struct ControlMessage
|
||||
{
|
||||
ControlMessageType type;
|
||||
};
|
||||
|
||||
struct DataAddressAnnouncement : ControlMessage
|
||||
{
|
||||
uint32_t ipv4; // in_addr_t from <netinet/in.h>
|
||||
uint32_t port; // in_port_t from <netinet/in.h>
|
||||
};
|
||||
|
||||
struct PostBuffer : ControlMessage
|
||||
{
|
||||
uint64_t size; // buffer size (size_t)
|
||||
ControlMessageContent msg;
|
||||
};
|
||||
|
||||
template<typename T>
|
||||
using unique_ptr = std::unique_ptr<T, std::function<void(T*)>>;
|
||||
|
||||
template<typename T, typename... Args>
|
||||
auto MakeControlMessageWithPmr(boost::container::pmr::memory_resource* pmr, Args&&... args)
|
||||
-> ofi::unique_ptr<T>
|
||||
auto MakeControlMessageWithPmr(boost::container::pmr::memory_resource& pmr, Args&&... args)
|
||||
-> ofi::unique_ptr<ControlMessage>
|
||||
{
|
||||
void* mem = pmr->allocate(sizeof(T));
|
||||
T* ctrl = new (mem) T(std::forward<Args>(args)...);
|
||||
void* mem = pmr.allocate(sizeof(ControlMessage));
|
||||
ControlMessage* ctrl = new (mem) ControlMessage();
|
||||
|
||||
if (std::is_same<T, DataAddressAnnouncement>::value) {
|
||||
ctrl->type = ControlMessageType::DataAddressAnnouncement;
|
||||
} else if (std::is_same<T, PostBuffer>::value) {
|
||||
if (std::is_same<T, PostBuffer>::value) {
|
||||
ctrl->type = ControlMessageType::PostBuffer;
|
||||
ctrl->msg.postBuffer = PostBuffer(std::forward<Args>(args)...);
|
||||
} else if (std::is_same<T, PostMultiPartStartBuffer>::value) {
|
||||
ctrl->type = ControlMessageType::PostMultiPartStartBuffer;
|
||||
ctrl->msg.postMultiPartStartBuffer = PostMultiPartStartBuffer(std::forward<Args>(args)...);
|
||||
} else if (std::is_same<T, Empty>::value) {
|
||||
ctrl->type = ControlMessageType::Empty;
|
||||
}
|
||||
|
||||
return ofi::unique_ptr<T>(ctrl, [=](T* p) {
|
||||
p->~T();
|
||||
pmr->deallocate(p, sizeof(T));
|
||||
return ofi::unique_ptr<ControlMessage>(ctrl, [&pmr](ControlMessage* p) {
|
||||
p->~ControlMessage();
|
||||
pmr.deallocate(p, sizeof(T));
|
||||
});
|
||||
}
|
||||
|
||||
template<typename T, typename... Args>
|
||||
auto MakeControlMessage(Args&&... args) -> T
|
||||
auto MakeControlMessage(Args&&... args) -> ControlMessage
|
||||
{
|
||||
T ctrl = T(std::forward<Args>(args)...);
|
||||
ControlMessage ctrl;
|
||||
|
||||
if (std::is_same<T, DataAddressAnnouncement>::value) {
|
||||
ctrl.type = ControlMessageType::DataAddressAnnouncement;
|
||||
} else if (std::is_same<T, PostBuffer>::value) {
|
||||
if (std::is_same<T, PostBuffer>::value) {
|
||||
ctrl.type = ControlMessageType::PostBuffer;
|
||||
} else if (std::is_same<T, PostMultiPartStartBuffer>::value) {
|
||||
ctrl.type = ControlMessageType::PostMultiPartStartBuffer;
|
||||
} else if (std::is_same<T, Empty>::value) {
|
||||
ctrl.type = ControlMessageType::Empty;
|
||||
}
|
||||
ctrl.msg = T(std::forward<Args>(args)...);
|
||||
|
||||
return ctrl;
|
||||
}
|
||||
|
File diff suppressed because it is too large
Load Diff
@@ -18,10 +18,10 @@
|
||||
#include <asiofi/memory_resources.hpp>
|
||||
#include <asiofi/passive_endpoint.hpp>
|
||||
#include <asiofi/semaphore.hpp>
|
||||
#include <azmq/socket.hpp>
|
||||
#include <boost/asio.hpp>
|
||||
#include <memory> // unique_ptr
|
||||
#include <netinet/in.h>
|
||||
#include <mutex>
|
||||
|
||||
|
||||
namespace fair
|
||||
{
|
||||
@@ -97,28 +97,24 @@ class Socket final : public fair::mq::Socket
|
||||
Address fLocalAddr;
|
||||
int fSndTimeout;
|
||||
int fRcvTimeout;
|
||||
azmq::socket fSendQueueWrite, fSendQueueRead;
|
||||
azmq::socket fRecvQueueWrite, fRecvQueueRead;
|
||||
asiofi::semaphore fSendSem, fRecvSem;
|
||||
std::mutex fSendQueueMutex, fRecvQueueMutex;
|
||||
std::queue<std::vector<MessagePtr>> fSendQueue, fRecvQueue;
|
||||
std::vector<MessagePtr> fInflightMultiPartMessage;
|
||||
int64_t fMultiPartRecvCounter;
|
||||
asiofi::synchronized_semaphore fSendPushSem, fSendPopSem, fRecvPushSem, fRecvPopSem;
|
||||
std::atomic<bool> fNeedOfiMemoryRegistration;
|
||||
|
||||
auto SendQueueReader() -> void;
|
||||
auto OnSend(azmq::message& msg, size_t bytes_transferred) -> void;
|
||||
auto RecvControlQueueReader() -> void;
|
||||
auto OnRecvControl(ofi::unique_ptr<PostBuffer> ctrl) -> void;
|
||||
auto OnReceive() -> void;
|
||||
auto ReceiveImpl(MessagePtr& msg, const int flags, const int timeout) -> int;
|
||||
auto SendImpl(std::vector<MessagePtr>& msgVec, const int flags, const int timeout) -> int64_t;
|
||||
auto ReceiveImpl(std::vector<MessagePtr>& msgVec, const int flags, const int timeout) -> int64_t;
|
||||
|
||||
// auto WaitForControlPeer() -> void;
|
||||
// auto AnnounceDataAddress() -> void;
|
||||
auto InitOfi(Address addr) -> void;
|
||||
auto BindControlEndpoint() -> void;
|
||||
auto BindDataEndpoint() -> void;
|
||||
enum class Band { Control, Data };
|
||||
auto ConnectEndpoint(std::unique_ptr<asiofi::connected_endpoint>& endpoint, Band type) -> void;
|
||||
// auto ReceiveDataAddressAnnouncement() -> void;
|
||||
auto SendQueueReader() -> void;
|
||||
auto SendQueueReaderStatic() -> void;
|
||||
auto RecvControlQueueReader() -> void;
|
||||
auto RecvQueueReaderStatic() -> void;
|
||||
auto OnRecvControl(ofi::unique_ptr<ControlMessage> ctrl) -> void;
|
||||
auto DataMessageReceived(MessagePtr msg) -> void;
|
||||
}; /* class Socket */
|
||||
|
||||
struct SilentSocketError : SocketError { using SocketError::SocketError; };
|
||||
|
@@ -23,12 +23,15 @@ namespace ofi
|
||||
|
||||
using namespace std;
|
||||
|
||||
TransportFactory::TransportFactory(const string& id, const FairMQProgOptions* /*config*/)
|
||||
TransportFactory::TransportFactory(const string& id, const FairMQProgOptions* config)
|
||||
try : FairMQTransportFactory(id)
|
||||
, fContext(*this, *this, 1)
|
||||
{
|
||||
LOG(debug) << "OFI transport: Using AZMQ & "
|
||||
<< "asiofi (" << fContext.GetAsiofiVersion() << ")";
|
||||
LOG(debug) << "OFI transport: asiofi (" << fContext.GetAsiofiVersion() << ")";
|
||||
|
||||
if (config) {
|
||||
fContext.SetSizeHint(config->GetValue<size_t>("ofi-size-hint"));
|
||||
}
|
||||
} catch (ContextError& e) {
|
||||
throw TransportFactoryError{e.what()};
|
||||
}
|
||||
|
@@ -66,6 +66,7 @@ FairMQProgOptions::FairMQProgOptions()
|
||||
("print-channels", po::value<bool >()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (<channel name>:<min num subchannels>:<max num subchannels>)")
|
||||
("shm-segment-size", po::value<size_t >()->default_value(2000000000), "Shared memory: size of the shared memory segment (in bytes).")
|
||||
("shm-monitor", po::value<bool >()->default_value(true), "Shared memory: run monitor daemon.")
|
||||
("ofi-size-hint", po::value<size_t >()->default_value(0), "EXPERIMENTAL: OFI size hint for the allocator.")
|
||||
("rate", po::value<float >()->default_value(0.), "Rate for conditional run loop (Hz).")
|
||||
("session", po::value<string >()->default_value("default"), "Session name.");
|
||||
|
||||
|
@@ -29,8 +29,7 @@ namespace
|
||||
++gSignalCount;
|
||||
gLastSignal = signal;
|
||||
|
||||
if (gSignalCount > 1)
|
||||
{
|
||||
if (gSignalCount > 1) {
|
||||
std::abort();
|
||||
}
|
||||
}
|
||||
@@ -57,11 +56,18 @@ Control::Control(const string& name, const Plugin::Version version, const string
|
||||
{
|
||||
SubscribeToDeviceStateChange([&](DeviceState newState) {
|
||||
LOG(trace) << "control plugin notified on new state: " << newState;
|
||||
|
||||
{
|
||||
lock_guard<mutex> lock{fEventsMutex};
|
||||
fEvents.push(newState);
|
||||
}
|
||||
fNewEvent.notify_one();
|
||||
|
||||
if (newState == DeviceState::Error) {
|
||||
fPluginShutdownRequested = true;
|
||||
fDeviceShutdownRequested = true;
|
||||
// throw DeviceErrorState("Controlled device transitioned to error state.");
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
@@ -110,6 +116,25 @@ auto Control::RunStartupSequence() -> void
|
||||
while (WaitForNextState() != DeviceState::Running) {}
|
||||
}
|
||||
|
||||
auto Control::WaitForNextState() -> DeviceState
|
||||
{
|
||||
unique_lock<mutex> lock{fEventsMutex};
|
||||
while (fEvents.empty()) {
|
||||
fNewEvent.wait_for(lock, chrono::milliseconds(50));
|
||||
}
|
||||
|
||||
auto result = fEvents.front();
|
||||
|
||||
if (result == DeviceState::Error) {
|
||||
ReleaseDeviceControl();
|
||||
throw DeviceErrorState("Controlled device transitioned to error state.");
|
||||
}
|
||||
|
||||
fEvents.pop();
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
auto ControlPluginProgramOptions() -> Plugin::ProgOptions
|
||||
{
|
||||
namespace po = boost::program_options;
|
||||
@@ -164,7 +189,7 @@ try {
|
||||
bool keepRunning = true;
|
||||
|
||||
while (keepRunning) {
|
||||
if (poll(cinfd, 1, 500)) {
|
||||
if (poll(cinfd, 1, 100)) {
|
||||
if (fDeviceShutdownRequested) {
|
||||
break;
|
||||
}
|
||||
@@ -248,6 +273,7 @@ try {
|
||||
}
|
||||
|
||||
if (GetCurrentDeviceState() == DeviceState::Error) {
|
||||
ReleaseDeviceControl();
|
||||
throw DeviceErrorState("Controlled device transitioned to error state.");
|
||||
}
|
||||
|
||||
@@ -363,64 +389,38 @@ void Control::PrintStateMachine()
|
||||
cout << ss.str() << flush;
|
||||
}
|
||||
|
||||
auto Control::WaitForNextState() -> DeviceState
|
||||
{
|
||||
unique_lock<mutex> lock{fEventsMutex};
|
||||
while (fEvents.empty()) {
|
||||
fNewEvent.wait_for(lock, chrono::milliseconds(50));
|
||||
}
|
||||
|
||||
auto result = fEvents.front();
|
||||
|
||||
if (result == DeviceState::Error) {
|
||||
throw DeviceErrorState("Controlled device transitioned to error state.");
|
||||
}
|
||||
|
||||
fEvents.pop();
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
auto Control::StaticMode() -> void
|
||||
try
|
||||
{
|
||||
try {
|
||||
RunStartupSequence();
|
||||
|
||||
{
|
||||
// Wait for next state, which is DeviceState::Ready,
|
||||
// or for device shutdown request (Ctrl-C)
|
||||
unique_lock<mutex> lock{fEventsMutex};
|
||||
while (fEvents.empty() && !fDeviceShutdownRequested)
|
||||
{
|
||||
while (fEvents.empty() && !fDeviceShutdownRequested) {
|
||||
fNewEvent.wait_for(lock, chrono::milliseconds(50));
|
||||
}
|
||||
|
||||
if (fEvents.front() == DeviceState::Error)
|
||||
{
|
||||
if (fEvents.front() == DeviceState::Error) {
|
||||
ReleaseDeviceControl();
|
||||
throw DeviceErrorState("Controlled device transitioned to error state.");
|
||||
}
|
||||
}
|
||||
|
||||
RunShutdownSequence();
|
||||
}
|
||||
catch (PluginServices::DeviceControlError& e)
|
||||
{
|
||||
} catch (PluginServices::DeviceControlError& e) {
|
||||
// If we are here, it means another plugin has taken control. That's fine, just print the exception message and do nothing else.
|
||||
LOG(debug) << e.what();
|
||||
}
|
||||
catch (DeviceErrorState&)
|
||||
{
|
||||
} catch (DeviceErrorState&) {
|
||||
}
|
||||
|
||||
auto Control::SignalHandler() -> void
|
||||
{
|
||||
while (gSignalCount == 0 && !fPluginShutdownRequested)
|
||||
{
|
||||
while (gSignalCount == 0 && !fPluginShutdownRequested) {
|
||||
this_thread::sleep_for(chrono::milliseconds(100));
|
||||
}
|
||||
|
||||
if (!fPluginShutdownRequested)
|
||||
{
|
||||
if (!fPluginShutdownRequested) {
|
||||
LOG(info) << "Received device shutdown request (signal " << gLastSignal << ").";
|
||||
LOG(info) << "Waiting for graceful device shutdown. Hit Ctrl-C again to abort immediately.";
|
||||
|
||||
@@ -431,20 +431,14 @@ auto Control::SignalHandler() -> void
|
||||
if (fControllerThread.joinable()) fControllerThread.join();
|
||||
}
|
||||
|
||||
if (!fDeviceHasShutdown)
|
||||
{
|
||||
if (!fDeviceHasShutdown) {
|
||||
// Take over control and attempt graceful shutdown
|
||||
StealDeviceControl();
|
||||
try
|
||||
{
|
||||
try {
|
||||
RunShutdownSequence();
|
||||
}
|
||||
catch (PluginServices::DeviceControlError& e)
|
||||
{
|
||||
} catch (PluginServices::DeviceControlError& e) {
|
||||
LOG(info) << "Graceful device shutdown failed: " << e.what() << " If hanging, hit Ctrl-C again to abort immediately.";
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
} catch (...) {
|
||||
LOG(info) << "Graceful device shutdown failed. If hanging, hit Ctrl-C again to abort immediately.";
|
||||
}
|
||||
}
|
||||
|
@@ -7,8 +7,13 @@
|
||||
********************************************************************************/
|
||||
|
||||
#include <fairmq/tools/Process.h>
|
||||
#include <fairmq/tools/Strings.h>
|
||||
|
||||
#include <boost/process.hpp>
|
||||
#include <boost/asio.hpp>
|
||||
#include <boost/date_time/posix_time/posix_time.hpp>
|
||||
|
||||
#include <signal.h> // kill, signals
|
||||
|
||||
#include <iostream>
|
||||
#include <sstream>
|
||||
@@ -17,6 +22,28 @@
|
||||
|
||||
using namespace std;
|
||||
namespace bp = boost::process;
|
||||
namespace ba = boost::asio;
|
||||
namespace bs = boost::system;
|
||||
|
||||
class LinePrinter
|
||||
{
|
||||
public:
|
||||
LinePrinter(stringstream& out, const string& prefix)
|
||||
: fOut(out)
|
||||
, fPrefix(prefix)
|
||||
{}
|
||||
|
||||
// prints line with prefix on both cout (thread-safe) and output stream
|
||||
void Print(const string& line)
|
||||
{
|
||||
cout << fair::mq::tools::ToString(fPrefix, line, "\n") << flush;
|
||||
fOut << fPrefix << line << endl;
|
||||
}
|
||||
|
||||
private:
|
||||
stringstream& fOut;
|
||||
const string fPrefix;
|
||||
};
|
||||
|
||||
namespace fair
|
||||
{
|
||||
@@ -33,57 +60,111 @@ namespace tools
|
||||
* @param[in] log_prefix How to prefix each captured output line with
|
||||
* @return Captured stdout output and exit code
|
||||
*/
|
||||
execute_result execute(const string& cmd, const string& prefix, const string& input)
|
||||
execute_result execute(const string& cmd, const string& prefix, const string& input, int sig)
|
||||
{
|
||||
execute_result result;
|
||||
stringstream out;
|
||||
|
||||
// print full line thread-safe
|
||||
stringstream printCmd;
|
||||
printCmd << prefix << " " << cmd << "\n";
|
||||
cout << printCmd.str() << flush;
|
||||
LinePrinter p(out, prefix);
|
||||
|
||||
out << prefix << cmd << endl;
|
||||
p.Print(cmd);
|
||||
|
||||
// Execute command and capture stdout, add prefix line by line
|
||||
bp::ipstream c_stdout;
|
||||
bp::opstream c_stdin;
|
||||
bp::child c(cmd, bp::std_out > c_stdout, bp::std_in < c_stdin);
|
||||
ba::io_service ios;
|
||||
|
||||
while (c.valid() && !c.running()) {
|
||||
;
|
||||
}
|
||||
// containers for std_in
|
||||
ba::const_buffer inputBuffer(ba::buffer(input));
|
||||
bp::async_pipe inputPipe(ios);
|
||||
// containers for std_out
|
||||
ba::streambuf outputBuffer;
|
||||
bp::async_pipe outputPipe(ios);
|
||||
// containers for std_err
|
||||
ba::streambuf errorBuffer;
|
||||
bp::async_pipe errorPipe(ios);
|
||||
|
||||
if (!c.valid()) {
|
||||
throw runtime_error("Can't execute the given process.");
|
||||
}
|
||||
const string delimiter = "\n";
|
||||
ba::deadline_timer inputTimer(ios, boost::posix_time::milliseconds(100));
|
||||
ba::deadline_timer signalTimer(ios, boost::posix_time::milliseconds(100));
|
||||
|
||||
// Optionally, write to stdin of the child
|
||||
// child process
|
||||
bp::child c(cmd, bp::std_out > outputPipe, bp::std_err > errorPipe, bp::std_in < inputPipe);
|
||||
int pid = c.id();
|
||||
p.Print(ToString("fair::mq::tools::execute: pid: ", pid));
|
||||
|
||||
// handle std_in with a delay
|
||||
if (input != "") {
|
||||
this_thread::sleep_for(chrono::milliseconds(100));
|
||||
c_stdin << input;
|
||||
c_stdin.flush();
|
||||
inputTimer.async_wait([&](const bs::error_code& ec1) {
|
||||
if (!ec1) {
|
||||
ba::async_write(inputPipe, inputBuffer, [&](const bs::error_code& ec2, size_t /* n */) {
|
||||
if (!ec2) {
|
||||
// inputPipe.async_close();
|
||||
} else {
|
||||
p.Print(ToString("error in boost::asio::async_write: ", ec2.message()));
|
||||
}
|
||||
});
|
||||
} else {
|
||||
p.Print(ToString("error in boost::asio::deadline_timer.async_wait: ", ec1.message()));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
string line;
|
||||
while (c.running() && getline(c_stdout, line)) {
|
||||
// print full line thread-safe
|
||||
stringstream printLine;
|
||||
printLine << prefix << line << "\n";
|
||||
cout << printLine.str() << flush;
|
||||
|
||||
out << prefix << line << "\n";
|
||||
if (sig != -1) {
|
||||
signalTimer.async_wait([&](const bs::error_code& ec1) {
|
||||
if (!ec1) {
|
||||
kill(pid, sig);
|
||||
} else {
|
||||
p.Print(ToString("error in boost::asio::deadline_timer.async_wait: ", ec1.message()));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// handle std_out line by line
|
||||
function<void(const bs::error_code&, size_t)> onStdOut = [&](const bs::error_code& ec, size_t /* n */) {
|
||||
if (!ec) {
|
||||
istream is(&outputBuffer);
|
||||
string line;
|
||||
getline(is, line);
|
||||
|
||||
p.Print(line);
|
||||
|
||||
ba::async_read_until(outputPipe, outputBuffer, delimiter, onStdOut);
|
||||
} else {
|
||||
if (ec == ba::error::eof) {
|
||||
// outputPipe.async_close();
|
||||
} else {
|
||||
p.Print(ec.message());
|
||||
}
|
||||
}
|
||||
};
|
||||
ba::async_read_until(outputPipe, outputBuffer, delimiter, onStdOut);
|
||||
|
||||
// handle std_err line by line
|
||||
function<void(const bs::error_code&, size_t)> onStdErr = [&](const bs::error_code& ec, size_t /* n */) {
|
||||
if (!ec) {
|
||||
istream is(&errorBuffer);
|
||||
string line;
|
||||
getline(is, line);
|
||||
|
||||
p.Print(ToString("error: ", line));
|
||||
|
||||
ba::async_read_until(errorPipe, errorBuffer, delimiter, onStdErr);
|
||||
} else {
|
||||
if (ec == ba::error::eof) {
|
||||
// errorPipe.async_close();
|
||||
} else {
|
||||
p.Print(ec.message());
|
||||
}
|
||||
}
|
||||
};
|
||||
ba::async_read_until(errorPipe, errorBuffer, delimiter, onStdErr);
|
||||
|
||||
ios.run();
|
||||
c.wait();
|
||||
|
||||
// Capture exit code
|
||||
result.exit_code = c.exit_code();
|
||||
out << prefix << " Exit code: " << result.exit_code << endl;
|
||||
|
||||
p.Print(ToString("fair::mq::tools::execute: exit code: ", result.exit_code));
|
||||
result.console_out = out.str();
|
||||
|
||||
// Return result
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@@ -38,7 +38,8 @@ struct execute_result
|
||||
*/
|
||||
execute_result execute(const std::string& cmd,
|
||||
const std::string& prefix = "",
|
||||
const std::string& input = "");
|
||||
const std::string& input = "",
|
||||
int sig = -1);
|
||||
|
||||
} /* namespace tools */
|
||||
} /* namespace mq */
|
||||
|
@@ -105,6 +105,8 @@ add_testsuite(Device
|
||||
device/_config.cxx
|
||||
device/_waitfor.cxx
|
||||
device/_exceptions.cxx
|
||||
device/_error_state.cxx
|
||||
device/_signals.cxx
|
||||
|
||||
LINKS FairMQ
|
||||
DEPENDS testhelper_runTestDevice
|
||||
|
@@ -38,12 +38,9 @@ class Receiver : public FairMQDevice
|
||||
auto Run() -> void override
|
||||
{
|
||||
auto msg = FairMQMessagePtr{NewMessage()};
|
||||
if (Receive(msg, fChannelName) >= 0)
|
||||
{
|
||||
if (Receive(msg, fChannelName) >= 0) {
|
||||
LOG(info) << "received empty message";
|
||||
}
|
||||
else
|
||||
{
|
||||
} else {
|
||||
LOG(error) << "fair::mq::test::Receiver::Run(): Receive(msg, fChannelName) < 0";
|
||||
}
|
||||
};
|
||||
|
@@ -38,12 +38,9 @@ class Sender : public FairMQDevice
|
||||
auto Run() -> void override
|
||||
{
|
||||
auto msg = FairMQMessagePtr{NewMessage()};
|
||||
if (Send(msg, fChannelName) >= 0)
|
||||
{
|
||||
if (Send(msg, fChannelName) >= 0) {
|
||||
LOG(info) << "sent empty message";
|
||||
}
|
||||
else
|
||||
{
|
||||
} else {
|
||||
LOG(error) << "fair::mq::test::Sender::Run(): Send(msg, fChannelName) < 0";
|
||||
}
|
||||
};
|
||||
|
121
test/device/_error_state.cxx
Normal file
121
test/device/_error_state.cxx
Normal file
@@ -0,0 +1,121 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
|
||||
#include "runner.h"
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
#include <boost/process.hpp>
|
||||
#include <fairmq/Tools.h>
|
||||
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <iostream>
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
using namespace std;
|
||||
using namespace fair::mq::test;
|
||||
using namespace fair::mq::tools;
|
||||
|
||||
void RunErrorStateIn(const string& state, const string& control, const string& input = "")
|
||||
{
|
||||
size_t session{fair::mq::tools::UuidHash()};
|
||||
|
||||
execute_result result{"", 100};
|
||||
thread device_thread([&]() {
|
||||
stringstream cmd;
|
||||
cmd << runTestDevice
|
||||
<< " --id error_state_" << state << "_"
|
||||
<< " --control " << control
|
||||
<< " --session " << session
|
||||
<< " --color false";
|
||||
result = execute(cmd.str(), "[ErrorFound IN " + state + "]", input);
|
||||
});
|
||||
|
||||
device_thread.join();
|
||||
|
||||
ASSERT_NE(string::npos, result.console_out.find("going to change to Error state from " + state + "()"));
|
||||
|
||||
exit(result.exit_code);
|
||||
}
|
||||
|
||||
TEST(ErrorState, static_InInit)
|
||||
{
|
||||
EXPECT_EXIT(RunErrorStateIn("Init", "static"), ::testing::ExitedWithCode(1), "");
|
||||
}
|
||||
TEST(ErrorState, static_InBind)
|
||||
{
|
||||
EXPECT_EXIT(RunErrorStateIn("Bind", "static"), ::testing::ExitedWithCode(1), "");
|
||||
}
|
||||
TEST(ErrorState, static_InConnect)
|
||||
{
|
||||
EXPECT_EXIT(RunErrorStateIn("Connect", "static"), ::testing::ExitedWithCode(1), "");
|
||||
}
|
||||
TEST(ErrorState, static_InInitTask)
|
||||
{
|
||||
EXPECT_EXIT(RunErrorStateIn("InitTask", "static"), ::testing::ExitedWithCode(1), "");
|
||||
}
|
||||
TEST(ErrorState, static_InPreRun)
|
||||
{
|
||||
EXPECT_EXIT(RunErrorStateIn("PreRun", "static"), ::testing::ExitedWithCode(1), "");
|
||||
}
|
||||
TEST(ErrorState, static_InRun)
|
||||
{
|
||||
EXPECT_EXIT(RunErrorStateIn("Run", "static"), ::testing::ExitedWithCode(1), "");
|
||||
}
|
||||
TEST(ErrorState, static_InPostRun)
|
||||
{
|
||||
EXPECT_EXIT(RunErrorStateIn("PostRun", "static"), ::testing::ExitedWithCode(1), "");
|
||||
}
|
||||
TEST(ErrorState, static_InResetTask)
|
||||
{
|
||||
EXPECT_EXIT(RunErrorStateIn("ResetTask", "static"), ::testing::ExitedWithCode(1), "");
|
||||
}
|
||||
TEST(ErrorState, static_InReset)
|
||||
{
|
||||
EXPECT_EXIT(RunErrorStateIn("Reset", "static"), ::testing::ExitedWithCode(1), "");
|
||||
}
|
||||
TEST(ErrorState, interactive_InInit)
|
||||
{
|
||||
EXPECT_EXIT(RunErrorStateIn("Init", "interactive"), ::testing::ExitedWithCode(1), "");
|
||||
}
|
||||
TEST(ErrorState, interactive_InBind)
|
||||
{
|
||||
EXPECT_EXIT(RunErrorStateIn("Bind", "interactive"), ::testing::ExitedWithCode(1), "");
|
||||
}
|
||||
TEST(ErrorState, interactive_InConnect)
|
||||
{
|
||||
EXPECT_EXIT(RunErrorStateIn("Connect", "interactive"), ::testing::ExitedWithCode(1), "");
|
||||
}
|
||||
TEST(ErrorState, interactive_InInitTask)
|
||||
{
|
||||
EXPECT_EXIT(RunErrorStateIn("InitTask", "interactive"), ::testing::ExitedWithCode(1), "");
|
||||
}
|
||||
TEST(ErrorState, interactive_InPreRun)
|
||||
{
|
||||
EXPECT_EXIT(RunErrorStateIn("PreRun", "interactive"), ::testing::ExitedWithCode(1), "");
|
||||
}
|
||||
TEST(ErrorState, interactive_InRun)
|
||||
{
|
||||
EXPECT_EXIT(RunErrorStateIn("Run", "interactive"), ::testing::ExitedWithCode(1), "");
|
||||
}
|
||||
TEST(ErrorState, interactive_InPostRun)
|
||||
{
|
||||
EXPECT_EXIT(RunErrorStateIn("PostRun", "interactive"), ::testing::ExitedWithCode(1), "");
|
||||
}
|
||||
TEST(ErrorState, interactive_InResetTask)
|
||||
{
|
||||
EXPECT_EXIT(RunErrorStateIn("ResetTask", "interactive", "q"), ::testing::ExitedWithCode(1), "");
|
||||
}
|
||||
TEST(ErrorState, interactive_InReset)
|
||||
{
|
||||
EXPECT_EXIT(RunErrorStateIn("Reset", "interactive", "q"), ::testing::ExitedWithCode(1), "");
|
||||
}
|
||||
|
||||
} // namespace
|
@@ -23,7 +23,7 @@ using namespace std;
|
||||
using namespace fair::mq::test;
|
||||
using namespace fair::mq::tools;
|
||||
|
||||
void RunExceptionIn(const std::string& state, const std::string& input = "")
|
||||
void RunExceptionIn(const string& state, const string& control, const string& input = "")
|
||||
{
|
||||
size_t session{fair::mq::tools::UuidHash()};
|
||||
|
||||
@@ -32,7 +32,7 @@ void RunExceptionIn(const std::string& state, const std::string& input = "")
|
||||
stringstream cmd;
|
||||
cmd << runTestDevice
|
||||
<< " --id exceptions_" << state << "_"
|
||||
<< " --control " << ((input == "") ? "static" : "interactive")
|
||||
<< " --control " << control
|
||||
<< " --session " << session
|
||||
<< " --color false";
|
||||
result = execute(cmd.str(), "[EXCEPTION IN " + state + "]", input);
|
||||
@@ -40,86 +40,82 @@ void RunExceptionIn(const std::string& state, const std::string& input = "")
|
||||
|
||||
device_thread.join();
|
||||
|
||||
ASSERT_NE(std::string::npos, result.console_out.find("exception in " + state + "()"));
|
||||
ASSERT_NE(string::npos, result.console_out.find("exception in " + state + "()"));
|
||||
|
||||
exit(result.exit_code);
|
||||
}
|
||||
|
||||
TEST(Exceptions, static_InInit)
|
||||
{
|
||||
EXPECT_EXIT(RunExceptionIn("Init"), ::testing::ExitedWithCode(1), "");
|
||||
EXPECT_EXIT(RunExceptionIn("Init", "static"), ::testing::ExitedWithCode(1), "");
|
||||
}
|
||||
TEST(Exceptions, static_InBind)
|
||||
{
|
||||
EXPECT_EXIT(RunExceptionIn("Bind", "static"), ::testing::ExitedWithCode(1), "");
|
||||
}
|
||||
TEST(Exceptions, static_InConnect)
|
||||
{
|
||||
EXPECT_EXIT(RunExceptionIn("Connect", "static"), ::testing::ExitedWithCode(1), "");
|
||||
}
|
||||
TEST(Exceptions, static_InInitTask)
|
||||
{
|
||||
EXPECT_EXIT(RunExceptionIn("InitTask"), ::testing::ExitedWithCode(1), "");
|
||||
EXPECT_EXIT(RunExceptionIn("InitTask", "static"), ::testing::ExitedWithCode(1), "");
|
||||
}
|
||||
TEST(Exceptions, static_InPreRun)
|
||||
{
|
||||
EXPECT_EXIT(RunExceptionIn("PreRun"), ::testing::ExitedWithCode(1), "");
|
||||
EXPECT_EXIT(RunExceptionIn("PreRun", "static"), ::testing::ExitedWithCode(1), "");
|
||||
}
|
||||
TEST(Exceptions, static_InRun)
|
||||
{
|
||||
EXPECT_EXIT(RunExceptionIn("Run"), ::testing::ExitedWithCode(1), "");
|
||||
EXPECT_EXIT(RunExceptionIn("Run", "static"), ::testing::ExitedWithCode(1), "");
|
||||
}
|
||||
TEST(Exceptions, static_InPostRun)
|
||||
{
|
||||
EXPECT_EXIT(RunExceptionIn("PostRun"), ::testing::ExitedWithCode(1), "");
|
||||
EXPECT_EXIT(RunExceptionIn("PostRun", "static"), ::testing::ExitedWithCode(1), "");
|
||||
}
|
||||
TEST(Exceptions, static_InResetTask)
|
||||
{
|
||||
EXPECT_EXIT(RunExceptionIn("ResetTask"), ::testing::ExitedWithCode(1), "");
|
||||
EXPECT_EXIT(RunExceptionIn("ResetTask", "static"), ::testing::ExitedWithCode(1), "");
|
||||
}
|
||||
TEST(Exceptions, static_InReset)
|
||||
{
|
||||
EXPECT_EXIT(RunExceptionIn("Reset"), ::testing::ExitedWithCode(1), "");
|
||||
EXPECT_EXIT(RunExceptionIn("Reset", "static"), ::testing::ExitedWithCode(1), "");
|
||||
}
|
||||
TEST(Exceptions, interactive_InInit)
|
||||
{
|
||||
EXPECT_EXIT(RunExceptionIn("Init", "q"), ::testing::ExitedWithCode(1), "");
|
||||
EXPECT_EXIT(RunExceptionIn("Init", "interactive"), ::testing::ExitedWithCode(1), "");
|
||||
}
|
||||
TEST(Exceptions, interactive_InBind)
|
||||
{
|
||||
EXPECT_EXIT(RunExceptionIn("Bind", "interactive"), ::testing::ExitedWithCode(1), "");
|
||||
}
|
||||
TEST(Exceptions, interactive_InConnect)
|
||||
{
|
||||
EXPECT_EXIT(RunExceptionIn("Connect", "interactive"), ::testing::ExitedWithCode(1), "");
|
||||
}
|
||||
TEST(Exceptions, interactive_InInitTask)
|
||||
{
|
||||
EXPECT_EXIT(RunExceptionIn("InitTask", "q"), ::testing::ExitedWithCode(1), "");
|
||||
EXPECT_EXIT(RunExceptionIn("InitTask", "interactive"), ::testing::ExitedWithCode(1), "");
|
||||
}
|
||||
TEST(Exceptions, interactive_InPreRun)
|
||||
{
|
||||
EXPECT_EXIT(RunExceptionIn("PreRun", "q"), ::testing::ExitedWithCode(1), "");
|
||||
EXPECT_EXIT(RunExceptionIn("PreRun", "interactive"), ::testing::ExitedWithCode(1), "");
|
||||
}
|
||||
TEST(Exceptions, interactive_InRun)
|
||||
{
|
||||
EXPECT_EXIT(RunExceptionIn("Run", "q"), ::testing::ExitedWithCode(1), "");
|
||||
EXPECT_EXIT(RunExceptionIn("Run", "interactive"), ::testing::ExitedWithCode(1), "");
|
||||
}
|
||||
TEST(Exceptions, interactive_InPostRun)
|
||||
{
|
||||
EXPECT_EXIT(RunExceptionIn("PostRun", "q"), ::testing::ExitedWithCode(1), "");
|
||||
EXPECT_EXIT(RunExceptionIn("PostRun", "interactive"), ::testing::ExitedWithCode(1), "");
|
||||
}
|
||||
TEST(Exceptions, interactive_InResetTask)
|
||||
{
|
||||
EXPECT_EXIT(RunExceptionIn("ResetTask", "q"), ::testing::ExitedWithCode(1), "");
|
||||
EXPECT_EXIT(RunExceptionIn("ResetTask", "interactive", "q"), ::testing::ExitedWithCode(1), "");
|
||||
}
|
||||
TEST(Exceptions, interactive_InReset)
|
||||
{
|
||||
EXPECT_EXIT(RunExceptionIn("Reset", "q"), ::testing::ExitedWithCode(1), "");
|
||||
}
|
||||
TEST(Exceptions, interactive_invalid_InInit)
|
||||
{
|
||||
EXPECT_EXIT(RunExceptionIn("Init", "_"), ::testing::ExitedWithCode(1), "");
|
||||
}
|
||||
TEST(Exceptions, interactive_invalid_InInitTask)
|
||||
{
|
||||
EXPECT_EXIT(RunExceptionIn("InitTask", "_"), ::testing::ExitedWithCode(1), "");
|
||||
}
|
||||
TEST(Exceptions, interactive_invalid_InPreRun)
|
||||
{
|
||||
EXPECT_EXIT(RunExceptionIn("PreRun", "_"), ::testing::ExitedWithCode(1), "");
|
||||
}
|
||||
TEST(Exceptions, interactive_invalid_InRun)
|
||||
{
|
||||
EXPECT_EXIT(RunExceptionIn("Run", "_"), ::testing::ExitedWithCode(1), "");
|
||||
}
|
||||
TEST(Exceptions, interactive_invalid_InPostRun)
|
||||
{
|
||||
EXPECT_EXIT(RunExceptionIn("PostRun", "_"), ::testing::ExitedWithCode(1), "");
|
||||
EXPECT_EXIT(RunExceptionIn("Reset", "interactive", "q"), ::testing::ExitedWithCode(1), "");
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
121
test/device/_signals.cxx
Normal file
121
test/device/_signals.cxx
Normal file
@@ -0,0 +1,121 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
|
||||
#include "runner.h"
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
#include <boost/process.hpp>
|
||||
#include <fairmq/Tools.h>
|
||||
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <iostream>
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
using namespace std;
|
||||
using namespace fair::mq::test;
|
||||
using namespace fair::mq::tools;
|
||||
|
||||
void RunSignalIn(const string& state, const string& control, const string& input = "")
|
||||
{
|
||||
size_t session{fair::mq::tools::UuidHash()};
|
||||
|
||||
execute_result result{"", 100};
|
||||
thread device_thread([&]() {
|
||||
stringstream cmd;
|
||||
cmd << runTestDevice
|
||||
<< " --id signals_" << state << "_"
|
||||
<< " --control " << control
|
||||
<< " --session " << session
|
||||
<< " --color false";
|
||||
result = execute(cmd.str(), "[SIGINT IN " + state + "]", input);
|
||||
});
|
||||
|
||||
device_thread.join();
|
||||
|
||||
ASSERT_NE(string::npos, result.console_out.find("raising SIGINT from " + state + "()"));
|
||||
|
||||
exit(result.exit_code);
|
||||
}
|
||||
|
||||
TEST(Signal_SIGINT, static_InInit)
|
||||
{
|
||||
EXPECT_EXIT(RunSignalIn("Init", "static"), ::testing::ExitedWithCode(0), "");
|
||||
}
|
||||
TEST(Signal_SIGINT, static_InBind)
|
||||
{
|
||||
EXPECT_EXIT(RunSignalIn("Bind", "static"), ::testing::ExitedWithCode(0), "");
|
||||
}
|
||||
TEST(Signal_SIGINT, static_InConnect)
|
||||
{
|
||||
EXPECT_EXIT(RunSignalIn("Connect", "static"), ::testing::ExitedWithCode(0), "");
|
||||
}
|
||||
TEST(Signal_SIGINT, static_InInitTask)
|
||||
{
|
||||
EXPECT_EXIT(RunSignalIn("InitTask", "static"), ::testing::ExitedWithCode(0), "");
|
||||
}
|
||||
TEST(Signal_SIGINT, static_InPreRun)
|
||||
{
|
||||
EXPECT_EXIT(RunSignalIn("PreRun", "static"), ::testing::ExitedWithCode(0), "");
|
||||
}
|
||||
TEST(Signal_SIGINT, static_InRun)
|
||||
{
|
||||
EXPECT_EXIT(RunSignalIn("Run", "static"), ::testing::ExitedWithCode(0), "");
|
||||
}
|
||||
TEST(Signal_SIGINT, static_InPostRun)
|
||||
{
|
||||
EXPECT_EXIT(RunSignalIn("PostRun", "static"), ::testing::ExitedWithCode(0), "");
|
||||
}
|
||||
TEST(Signal_SIGINT, static_InResetTask)
|
||||
{
|
||||
EXPECT_EXIT(RunSignalIn("ResetTask", "static"), ::testing::ExitedWithCode(0), "");
|
||||
}
|
||||
TEST(Signal_SIGINT, static_InReset)
|
||||
{
|
||||
EXPECT_EXIT(RunSignalIn("Reset", "static"), ::testing::ExitedWithCode(0), "");
|
||||
}
|
||||
TEST(Signal_SIGINT, interactive_InInit)
|
||||
{
|
||||
EXPECT_EXIT(RunSignalIn("Init", "interactive"), ::testing::ExitedWithCode(0), "");
|
||||
}
|
||||
TEST(Signal_SIGINT, interactive_InBind)
|
||||
{
|
||||
EXPECT_EXIT(RunSignalIn("Bind", "interactive"), ::testing::ExitedWithCode(0), "");
|
||||
}
|
||||
TEST(Signal_SIGINT, interactive_InConnect)
|
||||
{
|
||||
EXPECT_EXIT(RunSignalIn("Connect", "interactive"), ::testing::ExitedWithCode(0), "");
|
||||
}
|
||||
TEST(Signal_SIGINT, interactive_InInitTask)
|
||||
{
|
||||
EXPECT_EXIT(RunSignalIn("InitTask", "interactive"), ::testing::ExitedWithCode(0), "");
|
||||
}
|
||||
TEST(Signal_SIGINT, interactive_InPreRun)
|
||||
{
|
||||
EXPECT_EXIT(RunSignalIn("PreRun", "interactive"), ::testing::ExitedWithCode(0), "");
|
||||
}
|
||||
TEST(Signal_SIGINT, interactive_InRun)
|
||||
{
|
||||
EXPECT_EXIT(RunSignalIn("Run", "interactive"), ::testing::ExitedWithCode(0), "");
|
||||
}
|
||||
TEST(Signal_SIGINT, interactive_InPostRun)
|
||||
{
|
||||
EXPECT_EXIT(RunSignalIn("PostRun", "interactive"), ::testing::ExitedWithCode(0), "");
|
||||
}
|
||||
TEST(Signal_SIGINT, interactive_InResetTask)
|
||||
{
|
||||
EXPECT_EXIT(RunSignalIn("ResetTask", "interactive", "q"), ::testing::ExitedWithCode(0), "");
|
||||
}
|
||||
TEST(Signal_SIGINT, interactive_InReset)
|
||||
{
|
||||
EXPECT_EXIT(RunSignalIn("Reset", "interactive", "q"), ::testing::ExitedWithCode(0), "");
|
||||
}
|
||||
|
||||
} // namespace
|
@@ -9,66 +9,65 @@
|
||||
#include "runner.h"
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
#include <boost/process.hpp>
|
||||
#include <fairmq/Tools.h>
|
||||
|
||||
#include <sys/types.h>
|
||||
#include <signal.h>
|
||||
#include <signal.h> // kill
|
||||
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <chrono>
|
||||
#include <iostream>
|
||||
#include <future> // std::async, std::future
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
using namespace std;
|
||||
using namespace fair::mq::test;
|
||||
using namespace fair::mq::tools;
|
||||
|
||||
void RunWaitFor()
|
||||
void RunWaitFor(const string& state, const string& control)
|
||||
{
|
||||
std::mutex mtx;
|
||||
std::condition_variable cv;
|
||||
|
||||
int pid = 0;
|
||||
int exit_code = 0;
|
||||
|
||||
thread deviceThread([&]() {
|
||||
execute_result result;
|
||||
thread device_thread([&] {
|
||||
stringstream cmd;
|
||||
cmd << runTestDevice << " --id waitfor_" << " --control static " << " --severity nolog";
|
||||
|
||||
boost::process::ipstream stdout;
|
||||
boost::process::child c(cmd.str(), boost::process::std_out > stdout);
|
||||
string line;
|
||||
getline(stdout, line);
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mtx);
|
||||
pid = c.id();
|
||||
}
|
||||
cv.notify_one();
|
||||
|
||||
c.wait();
|
||||
|
||||
exit_code = c.exit_code();
|
||||
cmd << runTestDevice
|
||||
<< " --id waitfor_" << state
|
||||
<< " --control " << control
|
||||
<< " --session " << UuidHash()
|
||||
<< " --severity debug"
|
||||
<< " --color false";
|
||||
result = execute(cmd.str(), "[WaitFor]", "", SIGINT);
|
||||
});
|
||||
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mtx);
|
||||
cv.wait(lock, [&pid]{ return pid != 0; });
|
||||
}
|
||||
device_thread.join();
|
||||
|
||||
kill(pid, SIGINT);
|
||||
ASSERT_NE(string::npos, result.console_out.find("Sleeping Done. Interrupted."));
|
||||
|
||||
deviceThread.join();
|
||||
|
||||
exit(exit_code);
|
||||
exit(result.exit_code);
|
||||
}
|
||||
|
||||
TEST(Device, WaitFor)
|
||||
TEST(WaitFor, static_InPreRun)
|
||||
{
|
||||
EXPECT_EXIT(RunWaitFor(), ::testing::ExitedWithCode(0), "");
|
||||
EXPECT_EXIT(RunWaitFor("PreRun", "static"), ::testing::ExitedWithCode(0), "");
|
||||
}
|
||||
TEST(WaitFor, static_InRun)
|
||||
{
|
||||
EXPECT_EXIT(RunWaitFor("Run", "static"), ::testing::ExitedWithCode(0), "");
|
||||
}
|
||||
TEST(WaitFor, static_InPostRun)
|
||||
{
|
||||
EXPECT_EXIT(RunWaitFor("PostRun", "static"), ::testing::ExitedWithCode(0), "");
|
||||
}
|
||||
TEST(WaitFor, interactive_InPreRun)
|
||||
{
|
||||
EXPECT_EXIT(RunWaitFor("PreRun", "interactive"), ::testing::ExitedWithCode(0), "");
|
||||
}
|
||||
TEST(WaitFor, interactive_InRun)
|
||||
{
|
||||
EXPECT_EXIT(RunWaitFor("Run", "interactive"), ::testing::ExitedWithCode(0), "");
|
||||
}
|
||||
TEST(WaitFor, interactive_InPostRun)
|
||||
{
|
||||
EXPECT_EXIT(RunWaitFor("PostRun", "interactive"), ::testing::ExitedWithCode(0), "");
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
113
test/helper/devices/TestErrorState.h
Normal file
113
test/helper/devices/TestErrorState.h
Normal file
@@ -0,0 +1,113 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
|
||||
#ifndef FAIR_MQ_TEST_ERROR_STATE_H
|
||||
#define FAIR_MQ_TEST_ERROR_STATE_H
|
||||
|
||||
#include <FairMQDevice.h>
|
||||
#include <FairMQLogger.h>
|
||||
|
||||
#include <iostream>
|
||||
|
||||
namespace fair
|
||||
{
|
||||
namespace mq
|
||||
{
|
||||
namespace test
|
||||
{
|
||||
|
||||
class ErrorState : public FairMQDevice
|
||||
{
|
||||
public:
|
||||
void Init() override
|
||||
{
|
||||
std::string state("Init");
|
||||
if (std::string::npos != GetId().find("_" + state + "_")) {
|
||||
LOG(debug) << "going to change to Error state from " << state << "()";
|
||||
ChangeState(fair::mq::Transition::ErrorFound);
|
||||
}
|
||||
}
|
||||
|
||||
void Bind() override
|
||||
{
|
||||
std::string state("Bind");
|
||||
if (std::string::npos != GetId().find("_" + state + "_")) {
|
||||
LOG(debug) << "going to change to Error state from " << state << "()";
|
||||
ChangeState(fair::mq::Transition::ErrorFound);
|
||||
}
|
||||
}
|
||||
|
||||
void Connect() override
|
||||
{
|
||||
std::string state("Connect");
|
||||
if (std::string::npos != GetId().find("_" + state + "_")) {
|
||||
LOG(debug) << "going to change to Error state from " << state << "()";
|
||||
ChangeState(fair::mq::Transition::ErrorFound);
|
||||
}
|
||||
}
|
||||
|
||||
void InitTask() override
|
||||
{
|
||||
std::string state("InitTask");
|
||||
if (std::string::npos != GetId().find("_" + state + "_")) {
|
||||
LOG(debug) << "going to change to Error state from " << state << "()";
|
||||
ChangeState(fair::mq::Transition::ErrorFound);
|
||||
}
|
||||
}
|
||||
|
||||
void PreRun() override
|
||||
{
|
||||
std::string state("PreRun");
|
||||
if (std::string::npos != GetId().find("_" + state + "_")) {
|
||||
LOG(debug) << "going to change to Error state from " << state << "()";
|
||||
ChangeState(fair::mq::Transition::ErrorFound);
|
||||
}
|
||||
}
|
||||
|
||||
void Run() override
|
||||
{
|
||||
std::string state("Run");
|
||||
if (std::string::npos != GetId().find("_" + state + "_")) {
|
||||
LOG(debug) << "going to change to Error state from " << state << "()";
|
||||
ChangeState(fair::mq::Transition::ErrorFound);
|
||||
}
|
||||
}
|
||||
|
||||
void PostRun() override
|
||||
{
|
||||
std::string state("PostRun");
|
||||
if (std::string::npos != GetId().find("_" + state + "_")) {
|
||||
LOG(debug) << "going to change to Error state from " << state << "()";
|
||||
ChangeState(fair::mq::Transition::ErrorFound);
|
||||
}
|
||||
}
|
||||
|
||||
void ResetTask() override
|
||||
{
|
||||
std::string state("ResetTask");
|
||||
if (std::string::npos != GetId().find("_" + state + "_")) {
|
||||
LOG(debug) << "going to change to Error state from " << state << "()";
|
||||
ChangeState(fair::mq::Transition::ErrorFound);
|
||||
}
|
||||
}
|
||||
|
||||
void Reset() override
|
||||
{
|
||||
std::string state("Reset");
|
||||
if (std::string::npos != GetId().find("_" + state + "_")) {
|
||||
LOG(debug) << "going to change to Error state from " << state << "()";
|
||||
ChangeState(fair::mq::Transition::ErrorFound);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace test
|
||||
} // namespace mq
|
||||
} // namespace fair
|
||||
|
||||
#endif /* FAIR_MQ_TEST_ERROR_STATE_H */
|
@@ -33,6 +33,22 @@ class Exceptions : public FairMQDevice
|
||||
}
|
||||
}
|
||||
|
||||
auto Bind() -> void override
|
||||
{
|
||||
std::string state("Bind");
|
||||
if (std::string::npos != GetId().find("_" + state + "_")) {
|
||||
throw std::runtime_error("exception in " + state + "()");
|
||||
}
|
||||
}
|
||||
|
||||
auto Connect() -> void override
|
||||
{
|
||||
std::string state("Connect");
|
||||
if (std::string::npos != GetId().find("_" + state + "_")) {
|
||||
throw std::runtime_error("exception in " + state + "()");
|
||||
}
|
||||
}
|
||||
|
||||
auto InitTask() -> void override
|
||||
{
|
||||
std::string state("InitTask");
|
||||
|
@@ -57,6 +57,7 @@ class PairLeft : public FairMQDevice
|
||||
if (counter == 6) LOG(info) << "Simple message with short text data successfull";
|
||||
|
||||
assert(counter == 6);
|
||||
if (counter == 6) LOG(info) << "PAIR test successfull.";
|
||||
};
|
||||
};
|
||||
|
||||
|
@@ -55,9 +55,6 @@ class PairRight : public FairMQDevice
|
||||
auto msg6(NewSimpleMessageFor("data", 0, "testdata1234"));
|
||||
if (Send(msg6, "data") >= 0) counter++;
|
||||
if (counter == 6) LOG(info) << "Simple message with short text data successfull";
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
|
||||
if (counter == 6) LOG(info) << "PAIR test successfull.";
|
||||
};
|
||||
};
|
||||
|
||||
@@ -65,4 +62,4 @@ class PairRight : public FairMQDevice
|
||||
} // namespace mq
|
||||
} // namespace fair
|
||||
|
||||
#endif /* FAIR_MQ_TEST_PAIRRIGHT_H */
|
||||
#endif /* FAIR_MQ_TEST_PAIRRIGHT_H */
|
||||
|
112
test/helper/devices/TestSignals.h
Normal file
112
test/helper/devices/TestSignals.h
Normal file
@@ -0,0 +1,112 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
|
||||
#ifndef FAIR_MQ_TEST_SIGNALS_H
|
||||
#define FAIR_MQ_TEST_SIGNALS_H
|
||||
|
||||
#include <FairMQDevice.h>
|
||||
#include <FairMQLogger.h>
|
||||
|
||||
#include <iostream>
|
||||
#include <csignal>
|
||||
|
||||
namespace fair
|
||||
{
|
||||
namespace mq
|
||||
{
|
||||
namespace test
|
||||
{
|
||||
|
||||
class Signals : public FairMQDevice
|
||||
{
|
||||
public:
|
||||
void Init() override
|
||||
{
|
||||
std::string state("Init");
|
||||
if (std::string::npos != GetId().find("_" + state + "_")) {
|
||||
LOG(debug) << "raising SIGINT from " << state << "()";
|
||||
raise(SIGINT);
|
||||
}
|
||||
}
|
||||
void Bind() override
|
||||
{
|
||||
std::string state("Bind");
|
||||
if (std::string::npos != GetId().find("_" + state + "_")) {
|
||||
LOG(debug) << "raising SIGINT from " << state << "()";
|
||||
raise(SIGINT);
|
||||
}
|
||||
}
|
||||
void Connect() override
|
||||
{
|
||||
std::string state("Connect");
|
||||
if (std::string::npos != GetId().find("_" + state + "_")) {
|
||||
LOG(debug) << "raising SIGINT from " << state << "()";
|
||||
raise(SIGINT);
|
||||
}
|
||||
}
|
||||
|
||||
void InitTask() override
|
||||
{
|
||||
std::string state("InitTask");
|
||||
if (std::string::npos != GetId().find("_" + state + "_")) {
|
||||
LOG(debug) << "raising SIGINT from " << state << "()";
|
||||
raise(SIGINT);
|
||||
}
|
||||
}
|
||||
|
||||
void PreRun() override
|
||||
{
|
||||
std::string state("PreRun");
|
||||
if (std::string::npos != GetId().find("_" + state + "_")) {
|
||||
LOG(debug) << "raising SIGINT from " << state << "()";
|
||||
raise(SIGINT);
|
||||
}
|
||||
}
|
||||
|
||||
void Run() override
|
||||
{
|
||||
std::string state("Run");
|
||||
if (std::string::npos != GetId().find("_" + state + "_")) {
|
||||
LOG(debug) << "raising SIGINT from " << state << "()";
|
||||
raise(SIGINT);
|
||||
}
|
||||
}
|
||||
|
||||
void PostRun() override
|
||||
{
|
||||
std::string state("PostRun");
|
||||
if (std::string::npos != GetId().find("_" + state + "_")) {
|
||||
LOG(debug) << "raising SIGINT from " << state << "()";
|
||||
raise(SIGINT);
|
||||
}
|
||||
}
|
||||
|
||||
void ResetTask() override
|
||||
{
|
||||
std::string state("ResetTask");
|
||||
if (std::string::npos != GetId().find("_" + state + "_")) {
|
||||
LOG(debug) << "raising SIGINT from " << state << "()";
|
||||
raise(SIGINT);
|
||||
}
|
||||
}
|
||||
|
||||
void Reset() override
|
||||
{
|
||||
std::string state("Reset");
|
||||
if (std::string::npos != GetId().find("_" + state + "_")) {
|
||||
LOG(debug) << "raising SIGINT from " << state << "()";
|
||||
raise(SIGINT);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace test
|
||||
} // namespace mq
|
||||
} // namespace fair
|
||||
|
||||
#endif /* FAIR_MQ_TEST_SIGNALS_H */
|
@@ -24,10 +24,34 @@ namespace test
|
||||
class TestWaitFor : public FairMQDevice
|
||||
{
|
||||
public:
|
||||
void Run()
|
||||
void PreRun() override
|
||||
{
|
||||
std::cout << "hello" << std::endl;
|
||||
WaitFor(std::chrono::seconds(60));
|
||||
std::string state("PreRun");
|
||||
if (std::string::npos != GetId().find("_" + state)) {
|
||||
LOG(info) << "Going to sleep via WaitFor() in " << state << "...";
|
||||
bool result = WaitFor(std::chrono::seconds(60));
|
||||
LOG(info) << (result == true ? "Sleeping Done. Not interrupted." : "Sleeping Done. Interrupted.");
|
||||
}
|
||||
}
|
||||
|
||||
void Run() override
|
||||
{
|
||||
std::string state("Run");
|
||||
if (std::string::npos != GetId().find("_" + state)) {
|
||||
LOG(info) << "Going to sleep via WaitFor() in " << state << "...";
|
||||
bool result = WaitFor(std::chrono::seconds(60));
|
||||
LOG(info) << (result == true ? "Sleeping Done. Not interrupted." : "Sleeping Done. Interrupted.");
|
||||
}
|
||||
}
|
||||
|
||||
void PostRun() override
|
||||
{
|
||||
std::string state("PostRun");
|
||||
if (std::string::npos != GetId().find("_" + state)) {
|
||||
LOG(info) << "Going to sleep via WaitFor() in " << state << "...";
|
||||
bool result = WaitFor(std::chrono::seconds(60));
|
||||
LOG(info) << (result == true ? "Sleeping Done. Not interrupted." : "Sleeping Done. Interrupted.");
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
@@ -19,6 +19,8 @@
|
||||
#include "devices/TestTransferTimeout.h"
|
||||
#include "devices/TestWaitFor.h"
|
||||
#include "devices/TestExceptions.h"
|
||||
#include "devices/TestErrorState.h"
|
||||
#include "devices/TestSignals.h"
|
||||
|
||||
#include <runFairMQDevice.h>
|
||||
|
||||
@@ -40,60 +42,38 @@ auto getDevice(const FairMQProgOptions& config) -> FairMQDevicePtr
|
||||
using namespace fair::mq::test;
|
||||
|
||||
auto id = config.GetValue<std::string>("id");
|
||||
if (0 == id.find("pull_"))
|
||||
{
|
||||
|
||||
if (0 == id.find("pull_")) {
|
||||
return new Pull;
|
||||
}
|
||||
else if (0 == id.find("push_"))
|
||||
{
|
||||
} else if (0 == id.find("push_")) {
|
||||
return new Push;
|
||||
}
|
||||
else if (0 == id.find("sub_"))
|
||||
{
|
||||
} else if (0 == id.find("sub_")) {
|
||||
return new Sub;
|
||||
}
|
||||
else if (0 == id.find("pub_"))
|
||||
{
|
||||
} else if (0 == id.find("pub_")) {
|
||||
return new Pub;
|
||||
}
|
||||
else if (0 == id.find("req_"))
|
||||
{
|
||||
} else if (0 == id.find("req_")) {
|
||||
return new Req;
|
||||
}
|
||||
else if (0 == id.find("rep_"))
|
||||
{
|
||||
} else if (0 == id.find("rep_")) {
|
||||
return new Rep;
|
||||
}
|
||||
else if (0 == id.find("transfer_timeout_"))
|
||||
{
|
||||
} else if (0 == id.find("transfer_timeout_")) {
|
||||
return new TransferTimeout;
|
||||
}
|
||||
else if (0 == id.find("pollout_"))
|
||||
{
|
||||
} else if (0 == id.find("pollout_")) {
|
||||
return new PollOut;
|
||||
}
|
||||
else if (0 == id.find("pollin_"))
|
||||
{
|
||||
} else if (0 == id.find("pollin_")) {
|
||||
return new PollIn;
|
||||
}
|
||||
else if (0 == id.find("pairleft_"))
|
||||
{
|
||||
} else if (0 == id.find("pairleft_")) {
|
||||
return new PairLeft;
|
||||
}
|
||||
else if (0 == id.find("pairright_"))
|
||||
{
|
||||
} else if (0 == id.find("pairright_")) {
|
||||
return new PairRight;
|
||||
}
|
||||
else if (0 == id.find("waitfor_"))
|
||||
{
|
||||
} else if (0 == id.find("waitfor_")) {
|
||||
return new TestWaitFor;
|
||||
}
|
||||
else if (0 == id.find("exceptions_"))
|
||||
{
|
||||
} else if (0 == id.find("exceptions_")) {
|
||||
return new Exceptions;
|
||||
}
|
||||
else
|
||||
{
|
||||
} else if (0 == id.find("error_state_")) {
|
||||
return new ErrorState;
|
||||
} else if (0 == id.find("signals_")) {
|
||||
return new Signals;
|
||||
} else {
|
||||
cerr << "Don't know id '" << id << "'" << endl;
|
||||
return nullptr;
|
||||
}
|
||||
|
Reference in New Issue
Block a user