Compare commits

..

6 Commits

Author SHA1 Message Date
Alexey Rybalchenko
92632a022c Support region callbacks when no channel is specified 2019-03-08 11:06:30 +01:00
Alexey Rybalchenko
bd5105d609 Remove hint parameter from builder 2019-03-06 16:35:02 +01:00
Alexey Rybalchenko
080dd0a9df Remove wrong readme file 2019-03-06 16:35:02 +01:00
Alexey Rybalchenko
a9dfe39bf7 Add a hack to set the expected msg size via cmd option 2019-03-06 16:35:02 +01:00
Alexey Rybalchenko
e1b1e5e21b Temporary remove the OFI control band 2019-03-06 16:35:02 +01:00
Alexey Rybalchenko
763c21ffdd Remove azmq on send, make connect/bind blocking 2019-03-06 16:35:02 +01:00
42 changed files with 628 additions and 1381 deletions

View File

@@ -61,12 +61,14 @@ endif()
if(BUILD_OFI_TRANSPORT)
find_package2(PRIVATE asiofi REQUIRED
VERSION 0.3.1
VERSION 0.2.0
)
find_package2(PRIVATE OFI REQUIRED
VERSION ${asiofi_OFI_VERSION}
COMPONENTS ${asiofi_OFI_COMPONENTS}
)
find_package2(PRIVATE AZMQ REQUIRED)
set(PROJECT_AZMQ_VERSION 1.0.2)
endif()
if(BUILD_NANOMSG_TRANSPORT)
@@ -192,6 +194,11 @@ if(BUILD_DDS_PLUGIN)
DESTINATION ${PROJECT_INSTALL_CMAKEMODDIR}
)
endif()
if(BUILD_OFI_TRANSPORT)
install(FILES cmake/FindAZMQ.cmake
DESTINATION ${PROJECT_INSTALL_CMAKEMODDIR}
)
endif()
if(BUILD_DOCS)
install(DIRECTORY ${CMAKE_BINARY_DIR}/doxygen/html
DESTINATION ${PROJECT_INSTALL_DATADIR}/docs

View File

@@ -38,7 +38,7 @@ a simulation, reconstruction and analysis framework.
* PUBLIC: [**Boost**](https://www.boost.org/), [**FairLogger**](https://github.com/FairRootGroup/FairLogger)
* BUILD: [CMake](https://cmake.org/), [GTest](https://github.com/google/googletest), [Doxygen](http://www.doxygen.org/)
* PRIVATE: [ZeroMQ](http://zeromq.org/), [Msgpack](https://msgpack.org/index.html), [nanomsg](http://nanomsg.org/),
[asiofi](https://github.com/FairRootGroup/asiofi), [DDS](http://dds.gsi.de), [PMIx](https://pmix.org/)
[asiofi](https://github.com/FairRootGroup/asiofi), [DDS](http://dds.gsi.de), [PMIx](https://pmix.org/), [AZMQ](https://github.com/zeromq/azmq), [asiofi](https://github.com/FairRootGroup/asiofi)
Supported platforms: Linux and MacOS.
@@ -51,7 +51,7 @@ cmake -DCMAKE_INSTALL_PREFIX=./fairmq_install ../fairmq
cmake --build . --target install
```
If dependencies are not installed in standard system directories, you can hint the installation location via `-DCMAKE_PREFIX_PATH=...` or per dependency via `-D{DEPENDENCY}_ROOT=...`. `{DEPENDENCY}` can be `GTEST`, `BOOST`, `FAIRLOGGER`, `ZEROMQ`, `MSGPACK`, `NANOMSG`, `OFI`, `PMIX`, `ASIOFI` or `DDS` (`*_ROOT` variables can also be environment variables).
If dependencies are not installed in standard system directories, you can hint the installation location via `-DCMAKE_PREFIX_PATH=...` or per dependency via `-D{DEPENDENCY}_ROOT=...`. `{DEPENDENCY}` can be `GTEST`, `BOOST`, `FAIRLOGGER`, `ZEROMQ`, `MSGPACK`, `NANOMSG`, `OFI`, `PMIX`, `ASIOFI`, `AZMQ` or `DDS` (`*_ROOT` variables can also be environment variables).
## Usage

57
cmake/FindAZMQ.cmake Normal file
View File

@@ -0,0 +1,57 @@
################################################################################
# Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# #
# This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, #
# copied verbatim in the file "LICENSE" #
################################################################################
#
# ###########################
# # Locate the AZMQ library #
# ###########################
#
#
# Usage:
#
# find_package(AZMQ [version] [QUIET] [REQUIRED])
#
#
# Defines the following variables:
#
# AZMQ_FOUND - Found the ZeroMQ library
# AZMQ_INCLUDE_DIR (CMake cache) - Include directory
#
#
# Accepts the following variables as hints for installation directories:
#
# AZMQ_ROOT (CMake var, ENV var)
#
#
# If the above variables are not defined, or if ZeroMQ could not be found there,
# it will look for it in the system directories. Custom ZeroMQ installations
# will always have priority over system ones.
#
if(NOT AZMQ_ROOT)
set(AZMQ_ROOT $ENV{AZMQ_ROOT})
endif()
find_path(AZMQ_INCLUDE_DIR
NAMES azmq/socket.hpp
HINTS ${AZMQ_ROOT} $ENV{AZMQ_ROOT}
PATH_SUFFIXES include
DOC "AZMQ include directory"
)
include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(AZMQ
REQUIRED_VARS AZMQ_INCLUDE_DIR
)
if(AZMQ_FOUND AND NOT TARGET AZMQ::AZMQ)
add_library(AZMQ::AZMQ INTERFACE IMPORTED)
set_target_properties(AZMQ::AZMQ PROPERTIES
INTERFACE_INCLUDE_DIRECTORIES ${AZMQ_INCLUDE_DIR}
INTERFACE_LINK_LIBRARIES "libzmq;Boost::boost;Boost::container;Boost::system"
)
endif()

View File

@@ -32,20 +32,15 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-multipart.sh.in ${CMA
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test-ex-multipart.sh.in ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh)
add_test(NAME Example.Multipart.zeromq COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh zeromq)
set_tests_properties(Example.Multipart.zeromq PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 5 parts")
set_tests_properties(Example.Multipart.zeromq PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 2 parts")
if(BUILD_NANOMSG_TRANSPORT)
add_test(NAME Example.Multipart.nanomsg COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh nanomsg)
set_tests_properties(Example.Multipart.nanomsg PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 5 parts")
set_tests_properties(Example.Multipart.nanomsg PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 2 parts")
endif()
add_test(NAME Example.Multipart.shmem COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh shmem)
set_tests_properties(Example.Multipart.shmem PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 5 parts")
if(BUILD_OFI_TRANSPORT)
add_test(NAME Example.Multipart.ofi COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh ofi)
set_tests_properties(Example.Multipart.ofi PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 5 parts")
endif()
set_tests_properties(Example.Multipart.shmem PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 2 parts")
# install

View File

@@ -53,13 +53,6 @@ bool Sampler::ConditionalRun()
parts.AddPart(NewSimpleMessage(header));
parts.AddPart(NewMessage(1000));
// create more data parts, testing the FairMQParts in-place constructor
FairMQParts auxData{ NewMessage(500), NewMessage(600), NewMessage(700) };
assert(auxData.Size() == 3);
parts.AddPart(std::move(auxData));
assert(auxData.Size() == 0);
assert(parts.Size() == 5);
LOG(info) << "Sending body of size: " << parts.At(1)->GetSize();
Send(parts, "data");
@@ -81,4 +74,4 @@ Sampler::~Sampler()
{
}
} // namespace example_multipart
} // namespace example_multipart

View File

@@ -20,7 +20,7 @@ SAMPLER+=" --verbosity veryhigh"
SAMPLER+=" --session $SESSION"
SAMPLER+=" --max-iterations 1"
SAMPLER+=" --control static --color false"
SAMPLER+=" --channel-config name=data,type=pair,method=connect,rateLogging=0,address=tcp://127.0.0.1:5555"
SAMPLER+=" --channel-config name=data,type=push,method=connect,rateLogging=0,address=tcp://127.0.0.1:5555"
@CMAKE_CURRENT_BINARY_DIR@/$SAMPLER &
SAMPLER_PID=$!
@@ -30,7 +30,7 @@ SINK+=" --transport $transport"
SINK+=" --verbosity veryhigh"
SINK+=" --session $SESSION"
SINK+=" --control static --color false"
SINK+=" --channel-config name=data,type=pair,method=bind,rateLogging=0,address=tcp://127.0.0.1:5555"
SINK+=" --channel-config name=data,type=pull,method=bind,rateLogging=0,address=tcp://127.0.0.1:5555"
@CMAKE_CURRENT_BINARY_DIR@/$SINK &
SINK_PID=$!

View File

@@ -25,7 +25,7 @@ target_link_libraries(fairmq-ex-readout-builder PRIVATE ExampleReadoutLib)
add_executable(fairmq-ex-readout-sink runSink.cxx)
target_link_libraries(fairmq-ex-readout-sink PRIVATE ExampleReadoutLib)
add_custom_target(Examplereadout DEPENDS fairmq-ex-readout-sampler fairmq-ex-readout-sink)
add_custom_target(ExampleReadout DEPENDS fairmq-ex-readout-sampler fairmq-ex-readout-builder fairmq-ex-readout-sink)
set(EX_BIN_DIR ${CMAKE_CURRENT_BINARY_DIR})
set(FAIRMQ_BIN_DIR ${CMAKE_BINARY_DIR}/fairmq)
@@ -36,6 +36,7 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-readout.sh.in ${CMAKE
install(
TARGETS
fairmq-ex-readout-sampler
fairmq-ex-readout-builder
fairmq-ex-readout-sink
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}

View File

@@ -1,5 +0,0 @@
Region example
==============
This example demonstrates the use of a more advanced feature - UnmanagedRegion, that can be used to create a buffer through one of FairMQ transports. The contents of this buffer are managed by the user, who can also create messages out of sub-buffers of the created buffer. Such feature can be interesting in environments that have special requirements by the hardware that writes the data, to keep the transfer efficient (e.g. shared memory).

View File

@@ -35,28 +35,23 @@ void Sampler::InitTask()
fMsgSize = fConfig->GetValue<int>("msg-size");
fMaxIterations = fConfig->GetValue<uint64_t>("max-iterations");
fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data1",
0,
10000000,
[this](void* /*data*/, size_t /*size*/, void* /*hint*/) { // callback to be called when message buffers no longer needed by transport
--fNumUnackedMsgs;
if (fMaxIterations > 0)
{
LOG(debug) << "Received ack";
}
}
));
fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegion(10000000,
[this](void* /*data*/, size_t /*size*/, void* /*hint*/) { // callback to be called when message buffers no longer needed by transport
--fNumUnackedMsgs;
if (fMaxIterations > 0)
{
LOG(debug) << "Received ack";
}
}));
}
bool Sampler::ConditionalRun()
{
FairMQMessagePtr msg(NewMessageFor("data1", // channel
0, // sub-channel
fRegion, // region
fRegion->GetData(), // ptr within region
fMsgSize, // offset from ptr
nullptr // hint
));
FairMQMessagePtr msg(NewMessage(fRegion, // region
fRegion->GetData(), // ptr within region
fMsgSize, // offset from ptr
nullptr // hint
));
if (Send(msg, "data1", 0) > 0)
{

View File

@@ -2,7 +2,7 @@
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
msgSize="1000000"
msgSize="2000000"
if [[ $1 =~ ^[0-9]+$ ]]; then
msgSize=$1
@@ -11,6 +11,7 @@ fi
SAMPLER="fairmq-ex-readout-sampler"
SAMPLER+=" --id sampler1"
SAMPLER+=" --severity debug"
SAMPLER+=" --transport shmem"
SAMPLER+=" --msg-size $msgSize"
# SAMPLER+=" --rate 10"
SAMPLER+=" --channel-config name=data1,type=pair,method=bind,address=tcp://127.0.0.1:7777,transport=shmem"
@@ -26,5 +27,6 @@ xterm -geometry 80x23+500+0 -hold -e @EX_BIN_DIR@/$BUILDER &
SINK="fairmq-ex-readout-sink"
SINK+=" --id sink1"
SINK+=" --severity debug"
SINK+=" --ofi-size-hint $msgSize"
SINK+=" --channel-config name=data,type=pair,method=bind,address=tcp://127.0.0.1:7778,transport=ofi"
xterm -geometry 80x23+1000+0 -hold -e @EX_BIN_DIR@/$SINK &

View File

@@ -224,6 +224,7 @@ if(BUILD_OFI_TRANSPORT)
set(OFI_DEPS
asiofi::asiofi
Boost::container
AZMQ::AZMQ
)
endif()
set(optional_deps ${NANOMSG_DEPS} ${OFI_DEPS})

View File

@@ -345,11 +345,6 @@ class FairMQChannel
return Transport()->NewStaticMessage(data);
}
FairMQUnmanagedRegionPtr NewUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr)
{
return Transport()->CreateUnmanagedRegion(size, callback);
}
private:
std::shared_ptr<FairMQTransportFactory> fTransportFactory;
fair::mq::Transport fTransportType;

View File

@@ -191,58 +191,50 @@ class FairMQDevice
return fTransportFactory.get();
}
// creates message with the default device transport
template<typename... Args>
FairMQMessagePtr NewMessage(Args&&... args)
{
return Transport()->CreateMessage(std::forward<Args>(args)...);
}
// creates message with the transport of the specified channel
template<typename... Args>
FairMQMessagePtr NewMessageFor(const std::string& channel, int index, Args&&... args)
{
return GetChannel(channel, index).NewMessage(std::forward<Args>(args)...);
}
// creates a message that will not be cleaned up after transfer, with the default device transport
template<typename T>
FairMQMessagePtr NewStaticMessage(const T& data)
{
return Transport()->NewStaticMessage(data);
}
// creates a message that will not be cleaned up after transfer, with the transport of the specified channel
template<typename T>
FairMQMessagePtr NewStaticMessageFor(const std::string& channel, int index, const T& data)
{
return GetChannel(channel, index).NewStaticMessage(data);
}
// creates a message with a copy of the provided data, with the default device transport
template<typename T>
FairMQMessagePtr NewSimpleMessage(const T& data)
{
return Transport()->NewSimpleMessage(data);
}
// creates a message with a copy of the provided data, with the transport of the specified channel
template<typename T>
FairMQMessagePtr NewSimpleMessageFor(const std::string& channel, int index, const T& data)
{
return GetChannel(channel, index).NewSimpleMessage(data);
}
// creates unamanaged region with the default device transport
FairMQUnmanagedRegionPtr NewUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr)
{
return Transport()->CreateUnmanagedRegion(size, callback);
}
// creates unmanaged region with the transport of the specified channel
FairMQUnmanagedRegionPtr NewUnmanagedRegionFor(const std::string& channel, int index, const size_t size, FairMQRegionCallback callback = nullptr)
{
return GetChannel(channel, index).NewUnmanagedRegion(size, callback);
return GetChannel(channel, index).Transport()->CreateUnmanagedRegion(size, callback);
}
template<typename ...Ts>
@@ -430,7 +422,7 @@ class FairMQDevice
template<class Rep, class Period>
bool WaitFor(std::chrono::duration<Rep, Period> const& duration)
{
return !fStateMachine.WaitForPendingStateFor(std::chrono::duration_cast<std::chrono::milliseconds>(duration).count());
return fStateMachine.WaitForPendingStateFor(std::chrono::duration_cast<std::chrono::milliseconds>(duration).count());
}
protected:

View File

@@ -31,9 +31,6 @@ class FairMQParts
FairMQParts(FairMQParts&& p) = default;
/// Assignment operator
FairMQParts& operator=(const FairMQParts&) = delete;
/// Constructor from argument pack of std::unique_ptr<FairMQMessage> rvalues
template <typename... Ts>
FairMQParts(Ts&&... messages) : fParts() {AddPart(std::forward<Ts>(messages)...);}
/// Default destructor
~FairMQParts() {};
@@ -44,6 +41,14 @@ class FairMQParts
fParts.push_back(std::unique_ptr<FairMQMessage>(msg));
}
/// Adds part (std::unique_ptr<FairMQMessage>&) to the container (move)
/// @param msg unique pointer to FairMQMessage
/// lvalue ref (move not required when passing argument)
// inline void AddPart(std::unique_ptr<FairMQMessage>& msg)
// {
// fParts.push_back(std::move(msg));
// }
/// Adds part (std::unique_ptr<FairMQMessage>&) to the container (move)
/// @param msg unique pointer to FairMQMessage
/// rvalue ref (move required when passing argument)
@@ -52,23 +57,6 @@ class FairMQParts
fParts.push_back(std::move(msg));
}
/// Add variable list of parts to the container (move)
template <typename... Ts>
void AddPart(std::unique_ptr<FairMQMessage>&& first, Ts&&... remaining)
{
AddPart(std::move(first));
AddPart(std::forward<Ts>(remaining)...);
}
/// Add content of another object by move
void AddPart(FairMQParts&& other)
{
container parts = std::move(other.fParts);
for (auto& part : parts) {
fParts.push_back(std::move(part));
}
}
/// Get reference to part in the container at index (without bounds check)
/// @param index container index
FairMQMessage& operator[](const int index) { return *(fParts[index]); }

View File

@@ -28,7 +28,6 @@
#include <array>
#include <unordered_map>
#include <mutex>
#include <stdexcept>
using namespace std;
using namespace boost::msm;
@@ -168,6 +167,7 @@ struct Machine_ : public state_machine_def<Machine_>
Machine_()
: fLastTransitionResult(true)
, fNewStatePending(false)
, fWorkOngoing(false)
{}
virtual ~Machine_() {}
@@ -258,7 +258,9 @@ struct Machine_ : public state_machine_def<Machine_>
mutex fStateMtx;
atomic<bool> fNewStatePending;
atomic<bool> fWorkOngoing;
condition_variable fNewStatePendingCV;
condition_variable fWorkDoneCV;
boost::signals2::signal<void(const State)> fStateChangeSignal;
boost::signals2::signal<void(const State)> fStateHandleSignal;
@@ -281,6 +283,7 @@ struct Machine_ : public state_machine_def<Machine_>
LOG(state) << fState << " ---> " << fNewState;
fState = static_cast<State>(fNewState);
fNewStatePending = false;
fWorkOngoing = true;
if (fState == State::Exiting || fState == State::Error) {
stop = true;
@@ -289,10 +292,12 @@ struct Machine_ : public state_machine_def<Machine_>
CallStateChangeCallbacks(fState);
CallStateHandler(fState);
}
if (fState == State::Error) {
throw StateMachine::ErrorStateException("Device transitioned to error state");
{
lock_guard<mutex> lock(fStateMtx);
fWorkOngoing = false;
fWorkDoneCV.notify_one();
}
}
}
@@ -458,15 +463,13 @@ void StateMachine::ProcessWork()
try {
fsm->CallStateChangeCallbacks(State::Idle);
fsm->ProcessWork();
} catch(ErrorStateException& ese) {
LOG(trace) << "ErrorStateException caught in ProcessWork(), rethrowing";
throw;
} catch(...) {
LOG(debug) << "Exception caught in ProcessWork(), going to Error state and rethrowing";
{
lock_guard<mutex> lock(fsm->fStateMtx);
fsm->fState = State::Error;
fsm->CallStateChangeCallbacks(State::Error);
fsm->fWorkOngoing = false;
fsm->fWorkDoneCV.notify_one();
}
ChangeState(Transition::ErrorFound);
throw;
@@ -477,4 +480,3 @@ string StateMachine::GetStateName(const State state) { return stateNames.at(stat
string StateMachine::GetTransitionName(const Transition transition) { return transitionNames.at(static_cast<int>(transition)); }
State StateMachine::GetState(const string& state) { return stateNumbers.at(state); }
Transition StateMachine::GetTransition(const string& transition) { return transitionNumbers.at(transition); }

View File

@@ -94,8 +94,6 @@ class StateMachine
static State GetState(const std::string& state);
static Transition GetTransition(const std::string& transition);
struct ErrorStateException : std::runtime_error { using std::runtime_error::runtime_error; };
private:
std::shared_ptr<void> fFsm;
};

View File

@@ -37,7 +37,7 @@ Context::Context(FairMQTransportFactory& sendFactory,
: fIoWork(fIoContext)
, fReceiveFactory(receiveFactory)
, fSendFactory(sendFactory)
, fSizeHint(0)
, fSizeHint(2000000) // temporary hack to provide expected message size for receive
{
InitThreadPool(numberIoThreads);
}
@@ -48,24 +48,15 @@ auto Context::InitThreadPool(int numberIoThreads) -> void
for (int i = 1; i <= numberIoThreads; ++i) {
fThreadPool.emplace_back([&, i, numberIoThreads]{
try {
LOG(debug) << "OFI transport: I/O thread #" << i << " of " << numberIoThreads << " started";
fIoContext.run();
LOG(debug) << "OFI transport: I/O thread #" << i << " of " << numberIoThreads << " stopped";
} catch (const std::exception& e) {
LOG(error) << "OFI transport: Uncaught exception in I/O thread #" << i << ": " << e.what();
} catch (...) {
LOG(error) << "OFI transport: Uncaught exception in I/O thread #" << i;
}
LOG(debug) << "OFI transport: I/O thread #" << i << " of " << numberIoThreads << " started";
fIoContext.run();
LOG(debug) << "OFI transport: I/O thread #" << i << " of " << numberIoThreads << " stopped";
});
}
}
auto Context::Reset() -> void
{
// TODO "Linger", rethink this
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
fIoContext.stop();
}

View File

@@ -72,8 +72,8 @@ class Context
auto Reset() -> void;
auto MakeReceiveMessage(size_t size) -> MessagePtr;
auto MakeSendMessage(size_t size) -> MessagePtr;
auto GetSizeHint() -> size_t { return fSizeHint; }
auto SetSizeHint(size_t size) -> void { fSizeHint = size; }
size_t GetSizeHint() { return fSizeHint; } // temporary hack to provide expected message size for receive
void SetSizeHint(size_t size) { fSizeHint = size; } // temporary hack to provide expected message size for receive
private:
boost::asio::io_context fIoContext;
@@ -81,7 +81,8 @@ class Context
std::vector<std::thread> fThreadPool;
FairMQTransportFactory& fReceiveFactory;
FairMQTransportFactory& fSendFactory;
size_t fSizeHint;
size_t fSizeHint; // temporary hack to provide expected message size for receive
auto InitThreadPool(int numberIoThreads) -> void;
}; /* class Context */

View File

@@ -35,76 +35,59 @@ namespace ofi {
enum class ControlMessageType
{
Empty = 1,
DataAddressAnnouncement = 1,
PostBuffer,
PostMultiPartStartBuffer
};
struct Empty
{};
struct PostBuffer
{
uint64_t size; // buffer size (size_t)
};
struct PostMultiPartStartBuffer
{
uint32_t numParts; // buffer size (size_t)
uint64_t size; // buffer size (size_t)
};
union ControlMessageContent
{
PostBuffer postBuffer;
PostMultiPartStartBuffer postMultiPartStartBuffer;
PostBufferAcknowledgement
};
struct ControlMessage
{
ControlMessageType type;
ControlMessageContent msg;
};
struct DataAddressAnnouncement : ControlMessage
{
uint32_t ipv4; // in_addr_t from <netinet/in.h>
uint32_t port; // in_port_t from <netinet/in.h>
};
struct PostBuffer : ControlMessage
{
uint64_t size; // buffer size (size_t)
};
template<typename T>
using unique_ptr = std::unique_ptr<T, std::function<void(T*)>>;
template<typename T, typename... Args>
auto MakeControlMessageWithPmr(boost::container::pmr::memory_resource& pmr, Args&&... args)
-> ofi::unique_ptr<ControlMessage>
auto MakeControlMessageWithPmr(boost::container::pmr::memory_resource* pmr, Args&&... args)
-> ofi::unique_ptr<T>
{
void* mem = pmr.allocate(sizeof(ControlMessage));
ControlMessage* ctrl = new (mem) ControlMessage();
void* mem = pmr->allocate(sizeof(T));
T* ctrl = new (mem) T(std::forward<Args>(args)...);
if (std::is_same<T, PostBuffer>::value) {
if (std::is_same<T, DataAddressAnnouncement>::value) {
ctrl->type = ControlMessageType::DataAddressAnnouncement;
} else if (std::is_same<T, PostBuffer>::value) {
ctrl->type = ControlMessageType::PostBuffer;
ctrl->msg.postBuffer = PostBuffer(std::forward<Args>(args)...);
} else if (std::is_same<T, PostMultiPartStartBuffer>::value) {
ctrl->type = ControlMessageType::PostMultiPartStartBuffer;
ctrl->msg.postMultiPartStartBuffer = PostMultiPartStartBuffer(std::forward<Args>(args)...);
} else if (std::is_same<T, Empty>::value) {
ctrl->type = ControlMessageType::Empty;
}
return ofi::unique_ptr<ControlMessage>(ctrl, [&pmr](ControlMessage* p) {
p->~ControlMessage();
pmr.deallocate(p, sizeof(T));
return ofi::unique_ptr<T>(ctrl, [=](T* p) {
p->~T();
pmr->deallocate(p, sizeof(T));
});
}
template<typename T, typename... Args>
auto MakeControlMessage(Args&&... args) -> ControlMessage
auto MakeControlMessage(Args&&... args) -> T
{
ControlMessage ctrl;
T ctrl = T(std::forward<Args>(args)...);
if (std::is_same<T, PostBuffer>::value) {
if (std::is_same<T, DataAddressAnnouncement>::value) {
ctrl.type = ControlMessageType::DataAddressAnnouncement;
} else if (std::is_same<T, PostBuffer>::value) {
ctrl.type = ControlMessageType::PostBuffer;
} else if (std::is_same<T, PostMultiPartStartBuffer>::value) {
ctrl.type = ControlMessageType::PostMultiPartStartBuffer;
} else if (std::is_same<T, Empty>::value) {
ctrl.type = ControlMessageType::Empty;
}
ctrl.msg = T(std::forward<Args>(args)...);
return ctrl;
}

View File

@@ -43,6 +43,8 @@ Message::Message(boost::container::pmr::memory_resource* pmr, const size_t size)
, fPmr(pmr)
{
if (size) {
// static void* buffer = fPmr->allocate(size);
// fData = buffer;
fData = fPmr->allocate(size);
assert(fData);
}

View File

@@ -13,17 +13,20 @@
#include <FairMQLogger.h>
#include <asiofi.hpp>
#include <azmq/message.hpp>
#include <boost/asio/buffer.hpp>
#include <boost/asio/dispatch.hpp>
#include <boost/asio/post.hpp>
#include <chrono>
#include <cstring>
#include <functional>
#include <memory>
#include <sstream>
#include <string.h>
#include <sys/socket.h>
#include <thread>
#include <mutex>
#include <queue>
#include <condition_variable>
namespace fair
{
@@ -41,7 +44,6 @@ Socket::Socket(Context& context, const string& type, const string& name, const s
, fOfiDomain(nullptr)
, fPassiveEndpoint(nullptr)
, fDataEndpoint(nullptr)
, fControlEndpoint(nullptr)
, fId(id + "." + name + "." + type)
, fBytesTx(0)
, fBytesRx(0)
@@ -49,15 +51,33 @@ Socket::Socket(Context& context, const string& type, const string& name, const s
, fMessagesRx(0)
, fSndTimeout(100)
, fRcvTimeout(100)
, fMultiPartRecvCounter(-1)
, fSendPushSem(fContext.GetIoContext(), 384)
, fSendPopSem(fContext.GetIoContext(), 0)
, fRecvPushSem(fContext.GetIoContext(), 384)
, fRecvPopSem(fContext.GetIoContext(), 0)
, fRecvQueueWrite(fContext.GetIoContext(), ZMQ_PUSH)
, fRecvQueueRead(fContext.GetIoContext(), ZMQ_PULL)
, fSendSem(fContext.GetIoContext(), 300)
, fRecvSem(fContext.GetIoContext(), 300)
, fNeedOfiMemoryRegistration(false)
, fBound(false)
, fConnected(false)
{
if (type != "pair") {
throw SocketError{tools::ToString("Socket type '", type, "' not implemented for ofi transport.")};
} else {
// TODO wire this up with config
azmq::socket::snd_hwm send_max(300);
azmq::socket::rcv_hwm recv_max(300);
fRecvQueueRead.set_option(send_max);
fRecvQueueRead.set_option(recv_max);
fRecvQueueWrite.set_option(send_max);
fRecvQueueWrite.set_option(recv_max);
// Setup internal queue
auto hashed_id = hash<string>()(fId);
auto queue_id = tools::ToString("inproc://TXQUEUE", hashed_id);
queue_id = tools::ToString("inproc://RXQUEUE", hashed_id);
LOG(debug) << "OFI transport (" << fId << "): " << "Binding RQR: " << queue_id;
fRecvQueueRead.bind(queue_id);
LOG(debug) << "OFI transport (" << fId << "): " << "Connecting RQW: " << queue_id;
fRecvQueueWrite.connect(queue_id);
}
}
@@ -74,12 +94,12 @@ auto Socket::InitOfi(Address addr) -> void
hints.set_provider("verbs");
}
if (fRemoteAddr == addr) {
fOfiInfo = tools::make_unique<asiofi::info>(addr.Ip.c_str(), std::to_string(addr.Port).c_str(), 0, hints);
fOfiInfo = tools::make_unique<asiofi::info>(addr.Ip.c_str(), to_string(addr.Port).c_str(), 0, hints);
} else {
fOfiInfo = tools::make_unique<asiofi::info>(addr.Ip.c_str(), std::to_string(addr.Port).c_str(), FI_SOURCE, hints);
fOfiInfo = tools::make_unique<asiofi::info>(addr.Ip.c_str(), to_string(addr.Port).c_str(), FI_SOURCE, hints);
}
LOG(debug) << "OFI transport (" << fId << "): " << *fOfiInfo;
LOG(debug) << "OFI transport: " << *fOfiInfo;
fOfiFabric = tools::make_unique<asiofi::fabric>(*fOfiInfo);
@@ -89,6 +109,7 @@ auto Socket::InitOfi(Address addr) -> void
auto Socket::Bind(const string& addr) -> bool
try {
fBound = false;
fLocalAddr = Context::VerifyAddress(addr);
if (fLocalAddr.Protocol == "verbs") {
fNeedOfiMemoryRegistration = true;
@@ -99,80 +120,39 @@ try {
fPassiveEndpoint = tools::make_unique<asiofi::passive_endpoint>(fContext.GetIoContext(), *fOfiFabric);
//fPassiveEndpoint->set_local_address(Context::ConvertAddress(fLocalAddr));
BindControlEndpoint();
return true;
}
// TODO catch the correct ofi error
catch (const SilentSocketError& e)
{
// do not print error in this case, this is handled by FairMQDevice
// in case no connection could be established after trying a number of random ports from a range.
return false;
}
catch (const std::exception& e)
{
LOG(error) << "OFI transport: " << e.what();
return false;
}
catch (...)
{
LOG(error) << "OFI transport: Unknown exception in ofi::Socket::Bind";
return false;
}
auto Socket::BindControlEndpoint() -> void
{
assert(!fControlEndpoint);
fPassiveEndpoint->listen([&](asiofi::info&& info) {
LOG(debug) << "OFI transport (" << fId
<< "): control band connection request received. Accepting ...";
fControlEndpoint = tools::make_unique<asiofi::connected_endpoint>(
fContext.GetIoContext(), *fOfiDomain, info);
fControlEndpoint->enable();
fControlEndpoint->accept([&]() {
LOG(debug) << "OFI transport (" << fId << "): control band connection accepted.";
BindDataEndpoint();
});
});
LOG(debug) << "OFI transport (" << fId << "): control band bound to " << fLocalAddr;
}
auto Socket::BindDataEndpoint() -> void
{
assert(!fDataEndpoint);
fPassiveEndpoint->listen([&](asiofi::info&& info) {
LOG(debug) << "OFI transport (" << fId
<< "): data band connection request received. Accepting ...";
fDataEndpoint = tools::make_unique<asiofi::connected_endpoint>(
fContext.GetIoContext(), *fOfiDomain, info);
LOG(debug) << "OFI transport (" << fId << "): data band connection request received. Accepting ...";
fDataEndpoint = tools::make_unique<asiofi::connected_endpoint>(fContext.GetIoContext(), *fOfiDomain, info);
fDataEndpoint->enable();
fDataEndpoint->accept([&]() {
LOG(debug) << "OFI transport (" << fId << "): data band connection accepted.";
if (fContext.GetSizeHint()) {
boost::asio::post(fContext.GetIoContext(),
std::bind(&Socket::SendQueueReaderStatic, this));
boost::asio::post(fContext.GetIoContext(),
std::bind(&Socket::RecvQueueReaderStatic, this));
} else {
boost::asio::post(fContext.GetIoContext(),
std::bind(&Socket::SendQueueReader, this));
boost::asio::post(fContext.GetIoContext(),
std::bind(&Socket::RecvControlQueueReader, this));
}
boost::asio::post(fContext.GetIoContext(), bind(&Socket::RecvQueueReader, this));
fBound = true;
});
});
LOG(debug) << "OFI transport (" << fId << "): data band bound to " << fLocalAddr;
while (!fBound) {
this_thread::sleep_for(chrono::milliseconds(100));
}
return true;
} catch (const SilentSocketError& e) {// TODO catch the correct ofi error
// do not print error in this case, this is handled by FairMQDevice
// in case no connection could be established after trying a number of random ports from a range.
return false;
} catch (const SocketError& e) {
LOG(error) << "OFI transport: " << e.what();
return false;
}
auto Socket::Connect(const string& address) -> bool
try {
fConnected = false;
fRemoteAddr = Context::VerifyAddress(address);
if (fRemoteAddr.Protocol == "verbs") {
fNeedOfiMemoryRegistration = true;
@@ -180,421 +160,185 @@ try {
InitOfi(fRemoteAddr);
ConnectEndpoint(fControlEndpoint, Band::Control);
ConnectEndpoint(fDataEndpoint, Band::Data);
assert(!fDataEndpoint);
if (fContext.GetSizeHint()) {
boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::SendQueueReaderStatic, this));
boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvQueueReaderStatic, this));
} else {
boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::SendQueueReader, this));
boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvControlQueueReader, this));
fDataEndpoint = tools::make_unique<asiofi::connected_endpoint>(fContext.GetIoContext(), *fOfiDomain);
fDataEndpoint->enable();
LOG(debug) << "OFI transport (" << fId << "): Sending data band connection request to " << fRemoteAddr;
fDataEndpoint->connect(Context::ConvertAddress(fRemoteAddr), [&](asiofi::eq::event event) {
LOG(debug) << "OFI transport (" << fId << "): data band conn event happened";
if (event == asiofi::eq::event::connected) {
LOG(debug) << "OFI transport (" << fId << "): data band connected.";
boost::asio::post(fContext.GetIoContext(), bind(&Socket::RecvQueueReader, this));
fConnected = true;
} else {
LOG(error) << "Could not connect on the first try";
}
});
while (!fConnected) {
this_thread::sleep_for(chrono::milliseconds(100));
}
return true;
}
catch (const SilentSocketError& e)
{
} catch (const SilentSocketError& e) {
// do not print error in this case, this is handled by FairMQDevice
return false;
}
catch (const std::exception& e)
{
} catch (const exception& e) {
LOG(error) << "OFI transport: " << e.what();
return false;
}
catch (...)
{
LOG(error) << "OFI transport: Unknown exception in ofi::Socket::Connect";
return false;
}
auto Socket::ConnectEndpoint(std::unique_ptr<asiofi::connected_endpoint>& endpoint, Band type) -> void
{
assert(!endpoint);
std::string band(type == Band::Control ? "control" : "data");
endpoint = tools::make_unique<asiofi::connected_endpoint>(fContext.GetIoContext(), *fOfiDomain);
endpoint->enable();
LOG(debug) << "OFI transport (" << fId << "): Sending " << band << " band connection request to " << fRemoteAddr;
std::mutex mtx;
std::condition_variable cv;
bool notified(false), connected(false);
while (true) {
endpoint->connect(Context::ConvertAddress(fRemoteAddr), [&, band](asiofi::eq::event event) {
// LOG(debug) << "OFI transport (" << fId << "): " << band << " band conn event happened";
std::unique_lock<std::mutex> lk2(mtx);
notified = true;
if (event == asiofi::eq::event::connected) {
LOG(debug) << "OFI transport (" << fId << "): " << band << " band connected.";
connected = true;
} else {
// LOG(debug) << "OFI transport (" << fId << "): " << band << " band connection refused. Trying again.";
}
lk2.unlock();
cv.notify_one();
});
{
std::unique_lock<std::mutex> lk(mtx);
cv.wait(lk, [&] { return notified; });
if (connected) {
break;
} else {
notified = false;
lk.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
}
}
auto Socket::Send(MessagePtr& msg, const int /*timeout*/) -> int
{
// timeout argument not yet implemented
// LOG(debug) << "OFI transport (" << fId << "): ENTER Send: data=" << msg->GetData() << ",size=" << msg->GetSize();
std::vector<MessagePtr> msgVec;
msgVec.reserve(1);
msgVec.emplace_back(std::move(msg));
return Send(msgVec);
}
auto Socket::Send(std::vector<MessagePtr>& msgVec, const int /*timeout*/) -> int64_t
try {
// timeout argument not yet implemented
int size(0);
for (auto& msg : msgVec) {
size += msg->GetSize();
}
fSendPushSem.wait();
{
std::lock_guard<std::mutex> lk(fSendQueueMutex);
fSendQueue.emplace(std::move(msgVec));
try {
fSendSem.wait();
size_t size = msg->GetSize();
OnSend(msg);
return size;
} catch (const exception& e) {
LOG(error) << e.what();
return -1;
} catch (const boost::system::error_code& e) {
LOG(error) << e;
return -1;
}
fSendPopSem.signal();
return size;
} catch (const std::exception& e) {
LOG(error) << e.what();
return -1;
}
auto Socket::SendQueueReader() -> void
auto Socket::OnSend(MessagePtr& msg) -> void
{
fSendPopSem.async_wait([&] {
// Read msg from send queue
std::unique_lock<std::mutex> lk(fSendQueueMutex);
std::vector<MessagePtr> msgVec(std::move(fSendQueue.front()));
fSendQueue.pop();
lk.unlock();
// LOG(debug) << "OFI transport (" << fId << "): ENTER OnSend";
bool postMultiPartStartBuffer = msgVec.size() > 1;
for (auto& msg : msgVec) {
// Create control message
ofi::unique_ptr<ControlMessage> ctrl(nullptr);
if (postMultiPartStartBuffer) {
postMultiPartStartBuffer = false;
ctrl = MakeControlMessageWithPmr<PostMultiPartStartBuffer>(fControlMemPool);
ctrl->msg.postMultiPartStartBuffer.numParts = msgVec.size();
ctrl->msg.postMultiPartStartBuffer.size = msg->GetSize();
} else {
ctrl = MakeControlMessageWithPmr<PostBuffer>(fControlMemPool);
ctrl->msg.postBuffer.size = msg->GetSize();
}
auto size = msg->GetSize();
// Send control message
boost::asio::mutable_buffer ctrlMsg(ctrl.get(), sizeof(ControlMessage));
// LOG(debug) << "OFI transport (" << fId << "): OnSend: data=" << msg->GetData() << ",size=" << msg->GetSize();
if (fNeedOfiMemoryRegistration) {
asiofi::memory_region mr(*fOfiDomain, ctrlMsg, asiofi::mr::access::send);
auto desc = mr.desc();
fControlEndpoint->send(ctrlMsg,
desc,
[&, ctrl2 = std::move(ctrlMsg), mr2 = std::move(mr)](
boost::asio::mutable_buffer) mutable {});
} else {
fControlEndpoint->send(
ctrlMsg, [&, ctrl2 = std::move(ctrl)](boost::asio::mutable_buffer) mutable {});
}
boost::asio::mutable_buffer buffer(msg->GetData(), size);
// Send data message
const auto size = msg->GetSize();
if (fNeedOfiMemoryRegistration) {
asiofi::memory_region mr(*fOfiDomain, buffer, asiofi::mr::access::send);
auto desc = mr.desc();
if (size) {
boost::asio::mutable_buffer buffer(msg->GetData(), size);
if (fNeedOfiMemoryRegistration) {
asiofi::memory_region mr(*fOfiDomain, buffer, asiofi::mr::access::send);
auto desc = mr.desc();
fDataEndpoint->send(buffer,
desc,
[&, size, msg2 = std::move(msg), mr2 = std::move(mr)](
boost::asio::mutable_buffer) mutable {
fBytesTx += size;
fMessagesTx++;
fSendPushSem.signal();
});
} else {
fDataEndpoint->send(
buffer, [&, size, msg2 = std::move(msg)](boost::asio::mutable_buffer) mutable {
fBytesTx += size;
fMessagesTx++;
fSendPushSem.signal();
});
fDataEndpoint->send(buffer, desc, [&, size, msg2 = move(msg), mr2 = move(mr)](boost::asio::mutable_buffer) mutable {
// LOG(debug) << "OFI transport (" << fId << "): >>>>> Data buffer sent";
fBytesTx += size;
fMessagesTx++;
fSendSem.async_signal([&](const boost::system::error_code& ec) {
if (!ec) {
// LOG(debug) << "OFI transport (" << fId << "): > Signal fSendSem=" << fSendSem.get_value();
}
} else {
++fMessagesTx;
fSendPushSem.signal();
}
}
});
});
} else {
fDataEndpoint->send(buffer, [&, size, msg2 = move(msg)](boost::asio::mutable_buffer) mutable {
// LOG(debug) << "OFI transport (" << fId << "): >>>>> Data buffer sent";
fBytesTx += size;
fMessagesTx++;
fSendSem.async_signal([&](const boost::system::error_code& ec) {
if (!ec) {
// LOG(debug) << "OFI transport (" << fId << "): > Signal fSendSem=" << fSendSem.get_value();
}
});
});
}
boost::asio::dispatch(fContext.GetIoContext(), std::bind(&Socket::SendQueueReader, this));
});
}
auto Socket::SendQueueReaderStatic() -> void
{
fSendPopSem.async_wait([&] {
// Read msg from send queue
std::unique_lock<std::mutex> lk(fSendQueueMutex);
std::vector<MessagePtr> msgVec(std::move(fSendQueue.front()));
fSendQueue.pop();
lk.unlock();
bool postMultiPartStartBuffer = msgVec.size() > 1;
if (postMultiPartStartBuffer) {
throw SocketError{tools::ToString("Multipart API not supported in static size mode.")};
}
MessagePtr& msg = msgVec[0];
// Send data message
const auto size = msg->GetSize();
if (size) {
boost::asio::mutable_buffer buffer(msg->GetData(), size);
if (fNeedOfiMemoryRegistration) {
asiofi::memory_region mr(*fOfiDomain, buffer, asiofi::mr::access::send);
auto desc = mr.desc();
fDataEndpoint->send(buffer,
desc,
[&, size, msg2 = std::move(msg), mr2 = std::move(mr)](
boost::asio::mutable_buffer) mutable {
fBytesTx += size;
fMessagesTx++;
fSendPushSem.signal();
});
} else {
fDataEndpoint->send(
buffer, [&, size, msg2 = std::move(msg)](boost::asio::mutable_buffer) mutable {
fBytesTx += size;
fMessagesTx++;
fSendPushSem.signal();
});
}
} else {
++fMessagesTx;
fSendPushSem.signal();
}
boost::asio::dispatch(fContext.GetIoContext(), std::bind(&Socket::SendQueueReaderStatic, this));
});
// LOG(debug) << "OFI transport (" << fId << "): LEAVE OnSend";
}
auto Socket::Receive(MessagePtr& msg, const int /*timeout*/) -> int
try {
// timeout argument not yet implemented
// LOG(debug) << "OFI transport (" << fId << "): ENTER Receive";
azmq::message zmsg;
auto recv = fRecvQueueRead.receive(zmsg);
fRecvPopSem.wait();
{
std::lock_guard<std::mutex> lk(fRecvQueueMutex);
msg = std::move(fRecvQueue.front().front());
fRecvQueue.pop();
size_t size = 0;
if (recv > 0) {
msg = move(*(static_cast<MessagePtr*>(zmsg.buffer().data())));
size = msg->GetSize();
}
fRecvPushSem.signal();
int size(msg->GetSize());
fBytesRx += size;
++fMessagesRx;
fMessagesRx++;
// LOG(debug) << "OFI transport (" << fId << "): LEAVE Receive";
return size;
} catch (const std::exception& e) {
} catch (const exception& e) {
LOG(error) << e.what();
return -1;
} catch (const boost::system::error_code& e) {
LOG(error) << e;
return -1;
}
auto Socket::Receive(std::vector<MessagePtr>& msgVec, const int /*timeout*/) -> int64_t
try {
// timeout argument not yet implemented
fRecvPopSem.wait();
{
std::lock_guard<std::mutex> lk(fRecvQueueMutex);
msgVec = std::move(fRecvQueue.front());
fRecvQueue.pop();
}
fRecvPushSem.signal();
int64_t size(0);
for (auto& msg : msgVec) {
size += msg->GetSize();
}
fBytesRx += size;
++fMessagesRx;
return size;
} catch (const std::exception& e) {
LOG(error) << e.what();
return -1;
auto Socket::Receive(vector<MessagePtr>& msgVec, const int timeout) -> int64_t
{
return ReceiveImpl(msgVec, 0, timeout);
}
auto Socket::RecvControlQueueReader() -> void
auto Socket::RecvQueueReader() -> void
{
fRecvPushSem.async_wait([&] {
// Receive control message
ofi::unique_ptr<ControlMessage> ctrl(MakeControlMessageWithPmr<Empty>(fControlMemPool));
boost::asio::mutable_buffer ctrlMsg(ctrl.get(), sizeof(ControlMessage));
fRecvSem.async_wait([&](const boost::system::error_code& ec) {
if (!ec) {
static size_t size = fContext.GetSizeHint(); // temporary hack to provide expected message size for receive
if (fNeedOfiMemoryRegistration) {
asiofi::memory_region mr(*fOfiDomain, ctrlMsg, asiofi::mr::access::recv);
auto desc = mr.desc();
fControlEndpoint->recv(
ctrlMsg,
desc,
[&, ctrl2 = std::move(ctrl), mr2 = std::move(mr)](
boost::asio::mutable_buffer) mutable { OnRecvControl(std::move(ctrl2)); });
} else {
fControlEndpoint->recv(
ctrlMsg, [&, ctrl2 = std::move(ctrl)](boost::asio::mutable_buffer) mutable {
OnRecvControl(std::move(ctrl2));
});
}
});
}
auto Socket::OnRecvControl(ofi::unique_ptr<ControlMessage> ctrl) -> void
{
// Check control message type
auto size(0);
if (ctrl->type == ControlMessageType::PostMultiPartStartBuffer) {
size = ctrl->msg.postMultiPartStartBuffer.size;
if (fMultiPartRecvCounter == -1) {
fMultiPartRecvCounter = ctrl->msg.postMultiPartStartBuffer.numParts;
assert(fInflightMultiPartMessage.empty());
fInflightMultiPartMessage.reserve(ctrl->msg.postMultiPartStartBuffer.numParts);
} else {
throw SocketError{tools::ToString(
"OFI transport: Received control start of new multi part message without completed "
"reception of previous multi part message. Number of parts missing: ",
fMultiPartRecvCounter)};
}
} else if (ctrl->type == ControlMessageType::PostBuffer) {
size = ctrl->msg.postBuffer.size;
} else {
throw SocketError{tools::ToString("OFI transport: Unknown control message type: '",
static_cast<int>(ctrl->type))};
}
// Receive data
auto msg = fContext.MakeReceiveMessage(size);
if (size) {
boost::asio::mutable_buffer buffer(msg->GetData(), size);
if (fNeedOfiMemoryRegistration) {
asiofi::memory_region mr(*fOfiDomain, buffer, asiofi::mr::access::recv);
auto desc = mr.desc();
fDataEndpoint->recv(
buffer,
desc,
[&, msg2 = std::move(msg), mr2 = std::move(mr)](
boost::asio::mutable_buffer) mutable { DataMessageReceived(std::move(msg2)); });
} else {
fDataEndpoint->recv(buffer,
[&, msg2 = std::move(msg)](boost::asio::mutable_buffer) mutable {
DataMessageReceived(std::move(msg2));
});
}
} else {
DataMessageReceived(std::move(msg));
}
boost::asio::dispatch(fContext.GetIoContext(),
std::bind(&Socket::RecvControlQueueReader, this));
}
auto Socket::RecvQueueReaderStatic() -> void
{
fRecvPushSem.async_wait([&] {
static size_t size = fContext.GetSizeHint();
// Receive data
auto msg = fContext.MakeReceiveMessage(size);
if (size) {
auto msg = fContext.MakeReceiveMessage(size);
boost::asio::mutable_buffer buffer(msg->GetData(), size);
if (fNeedOfiMemoryRegistration) {
asiofi::memory_region mr(*fOfiDomain, buffer, asiofi::mr::access::recv);
auto desc = mr.desc();
fDataEndpoint->recv(buffer,
desc,
[&, msg2 = std::move(msg), mr2 = std::move(mr)](
boost::asio::mutable_buffer) mutable {
DataMessageReceived(std::move(msg2));
});
} else {
fDataEndpoint->recv(
buffer, [&, msg2 = std::move(msg)](boost::asio::mutable_buffer) mutable {
DataMessageReceived(std::move(msg2));
fDataEndpoint->recv(buffer, desc, [&, msg2 = move(msg), mr2 = move(mr)](boost::asio::mutable_buffer) mutable {
MessagePtr* msgptr(new std::unique_ptr<Message>(move(msg2)));
fRecvQueueWrite.async_send(azmq::message(boost::asio::const_buffer(msgptr, sizeof(MessagePtr))), [&](const boost::system::error_code& ec2, size_t /*bytes_transferred2*/) {
if (!ec2) {
// LOG(debug) << "OFI transport (" << fId << "): <<<<< Data buffer received, bytes_transferred2=" << bytes_transferred2;
fRecvSem.async_signal([&](const boost::system::error_code& ec3) {
if (!ec3) {
// LOG(debug) << "OFI transport (" << fId << "): < Signal fRecvSem";
}
});
}
});
});
} else {
fDataEndpoint->recv(buffer, [&, msg2 = move(msg)](boost::asio::mutable_buffer) mutable {
MessagePtr* msgptr(new std::unique_ptr<Message>(move(msg2)));
fRecvQueueWrite.async_send(azmq::message(boost::asio::const_buffer(msgptr, sizeof(MessagePtr))), [&](const boost::system::error_code& ec2, size_t /*bytes_transferred2*/) {
if (!ec2) {
// LOG(debug) << "OFI transport (" << fId << "): <<<<< Data buffer received, bytes_transferred2=" << bytes_transferred2;
fRecvSem.async_signal([&](const boost::system::error_code& ec3) {
if (!ec3) {
// LOG(debug) << "OFI transport (" << fId << "): < Signal fRecvSem";
}
});
}
});
});
}
} else {
DataMessageReceived(std::move(msg));
}
boost::asio::dispatch(fContext.GetIoContext(),
std::bind(&Socket::RecvQueueReaderStatic, this));
boost::asio::post(fContext.GetIoContext(), bind(&Socket::RecvQueueReader, this));
}
});
}
auto Socket::DataMessageReceived(MessagePtr msg) -> void
auto Socket::Send(vector<MessagePtr>& msgVec, const int timeout) -> int64_t
{
if (fMultiPartRecvCounter > 0) {
--fMultiPartRecvCounter;
fInflightMultiPartMessage.push_back(std::move(msg));
}
return SendImpl(msgVec, 0, timeout);
}
if (fMultiPartRecvCounter == 0) {
std::unique_lock<std::mutex> lk(fRecvQueueMutex);
fRecvQueue.push(std::move(fInflightMultiPartMessage));
lk.unlock();
fMultiPartRecvCounter = -1;
fRecvPopSem.signal();
} else if (fMultiPartRecvCounter == -1) {
std::vector<MessagePtr> msgVec;
msgVec.push_back(std::move(msg));
std::unique_lock<std::mutex> lk(fRecvQueueMutex);
fRecvQueue.push(std::move(msgVec));
lk.unlock();
fRecvPopSem.signal();
}
auto Socket::SendImpl(vector<FairMQMessagePtr>& /*msgVec*/, const int /*flags*/, const int /*timeout*/) -> int64_t
{
throw SocketError{"Not yet implemented."};
}
auto Socket::ReceiveImpl(vector<FairMQMessagePtr>& /*msgVec*/, const int /*flags*/, const int /*timeout*/) -> int64_t
{
throw SocketError{"Not yet implemented."};
}
auto Socket::Close() -> void {}
@@ -615,62 +359,121 @@ auto Socket::GetOption(const string& /*option*/, void* /*value*/, size_t* /*valu
void Socket::SetLinger(const int /*value*/)
{
LOG(debug) << "OFI transport (" << fId << "): Not yet implemented.";
// azmq::socket::linger opt(value);
// fControlEndpoint.set_option(opt);
}
int Socket::GetLinger() const
{
LOG(debug) << "OFI transport (" << fId << "): Not yet implemented.";
// azmq::socket::linger opt(0);
// fControlEndpoint.get_option(opt);
// return opt.value();
return 0;
}
void Socket::SetSndBufSize(const int /*value*/)
{
LOG(debug) << "OFI transport (" << fId << "): Not yet implemented.";
// azmq::socket::snd_hwm opt(value);
// fControlEndpoint.set_option(opt);
}
int Socket::GetSndBufSize() const
{
LOG(debug) << "OFI transport (" << fId << "): Not yet implemented.";
// azmq::socket::snd_hwm opt(0);
// fControlEndpoint.get_option(opt);
// return opt.value();
return 0;
}
void Socket::SetRcvBufSize(const int /*value*/)
{
LOG(debug) << "OFI transport (" << fId << "): Not yet implemented.";
// azmq::socket::rcv_hwm opt(value);
// fControlEndpoint.set_option(opt);
}
int Socket::GetRcvBufSize() const
{
LOG(debug) << "OFI transport (" << fId << "): Not yet implemented.";
// azmq::socket::rcv_hwm opt(0);
// fControlEndpoint.get_option(opt);
// return opt.value();
return 0;
}
void Socket::SetSndKernelSize(const int /*value*/)
{
LOG(debug) << "OFI transport (" << fId << "): Not yet implemented.";
// azmq::socket::snd_buf opt(value);
// fControlEndpoint.set_option(opt);
}
int Socket::GetSndKernelSize() const
{
LOG(debug) << "OFI transport (" << fId << "): Not yet implemented.";
// azmq::socket::snd_buf opt(0);
// fControlEndpoint.get_option(opt);
// return opt.value();
return 0;
}
void Socket::SetRcvKernelSize(const int /*value*/)
{
LOG(debug) << "OFI transport (" << fId << "): Not yet implemented.";
// azmq::socket::rcv_buf opt(value);
// fControlEndpoint.set_option(opt);
}
int Socket::GetRcvKernelSize() const
{
LOG(debug) << "OFI transport (" << fId << "): Not yet implemented.";
// azmq::socket::rcv_buf opt(0);
// fControlEndpoint.get_option(opt);
// return opt.value();
return 0;
}
auto Socket::GetConstant(const string& /*constant*/) -> int
auto Socket::GetConstant(const string& constant) -> int
{
LOG(debug) << "OFI transport: Not yet implemented.";
if (constant == "")
return 0;
if (constant == "sub")
return ZMQ_SUB;
if (constant == "pub")
return ZMQ_PUB;
if (constant == "xsub")
return ZMQ_XSUB;
if (constant == "xpub")
return ZMQ_XPUB;
if (constant == "push")
return ZMQ_PUSH;
if (constant == "pull")
return ZMQ_PULL;
if (constant == "req")
return ZMQ_REQ;
if (constant == "rep")
return ZMQ_REP;
if (constant == "dealer")
return ZMQ_DEALER;
if (constant == "router")
return ZMQ_ROUTER;
if (constant == "pair")
return ZMQ_PAIR;
if (constant == "snd-hwm")
return ZMQ_SNDHWM;
if (constant == "rcv-hwm")
return ZMQ_RCVHWM;
if (constant == "snd-size")
return ZMQ_SNDBUF;
if (constant == "rcv-size")
return ZMQ_RCVBUF;
if (constant == "snd-more")
return ZMQ_SNDMORE;
if (constant == "rcv-more")
return ZMQ_RCVMORE;
if (constant == "linger")
return ZMQ_LINGER;
if (constant == "no-block")
return ZMQ_DONTWAIT;
if (constant == "snd-more no-block")
return ZMQ_DONTWAIT|ZMQ_SNDMORE;
return -1;
}

View File

@@ -18,10 +18,10 @@
#include <asiofi/memory_resources.hpp>
#include <asiofi/passive_endpoint.hpp>
#include <asiofi/semaphore.hpp>
#include <azmq/socket.hpp>
#include <boost/asio.hpp>
#include <memory> // unique_ptr
#include <mutex>
#include <netinet/in.h>
namespace fair
{
@@ -82,12 +82,11 @@ class Socket final : public fair::mq::Socket
private:
Context& fContext;
asiofi::allocated_pool_resource fControlMemPool;
std::unique_ptr<asiofi::info> fOfiInfo;
std::unique_ptr<asiofi::fabric> fOfiFabric;
std::unique_ptr<asiofi::domain> fOfiDomain;
std::unique_ptr<asiofi::passive_endpoint> fPassiveEndpoint;
std::unique_ptr<asiofi::connected_endpoint> fDataEndpoint, fControlEndpoint;
std::unique_ptr<asiofi::connected_endpoint> fDataEndpoint;
std::string fId;
std::atomic<unsigned long> fBytesTx;
std::atomic<unsigned long> fBytesRx;
@@ -97,24 +96,22 @@ class Socket final : public fair::mq::Socket
Address fLocalAddr;
int fSndTimeout;
int fRcvTimeout;
std::mutex fSendQueueMutex, fRecvQueueMutex;
std::queue<std::vector<MessagePtr>> fSendQueue, fRecvQueue;
std::vector<MessagePtr> fInflightMultiPartMessage;
int64_t fMultiPartRecvCounter;
asiofi::synchronized_semaphore fSendPushSem, fSendPopSem, fRecvPushSem, fRecvPopSem;
azmq::socket fRecvQueueWrite, fRecvQueueRead;
asiofi::semaphore fSendSem, fRecvSem;
std::atomic<bool> fNeedOfiMemoryRegistration;
std::atomic<bool> fBound;
std::atomic<bool> fConnected;
auto OnSend(MessagePtr& msg) -> void;
auto RecvQueueReader() -> void;
auto ReceiveImpl(MessagePtr& msg, const int flags, const int timeout) -> int;
auto SendImpl(std::vector<MessagePtr>& msgVec, const int flags, const int timeout) -> int64_t;
auto ReceiveImpl(std::vector<MessagePtr>& msgVec, const int flags, const int timeout) -> int64_t;
// auto WaitForControlPeer() -> void;
// auto AnnounceDataAddress() -> void;
auto InitOfi(Address addr) -> void;
auto BindControlEndpoint() -> void;
auto BindDataEndpoint() -> void;
enum class Band { Control, Data };
auto ConnectEndpoint(std::unique_ptr<asiofi::connected_endpoint>& endpoint, Band type) -> void;
auto SendQueueReader() -> void;
auto SendQueueReaderStatic() -> void;
auto RecvControlQueueReader() -> void;
auto RecvQueueReaderStatic() -> void;
auto OnRecvControl(ofi::unique_ptr<ControlMessage> ctrl) -> void;
auto DataMessageReceived(MessagePtr msg) -> void;
// auto ReceiveDataAddressAnnouncement() -> void;
}; /* class Socket */
struct SilentSocketError : SocketError { using SocketError::SocketError; };

View File

@@ -27,10 +27,10 @@ TransportFactory::TransportFactory(const string& id, const FairMQProgOptions* co
try : FairMQTransportFactory(id)
, fContext(*this, *this, 1)
{
LOG(debug) << "OFI transport: asiofi (" << fContext.GetAsiofiVersion() << ")";
LOG(debug) << "OFI transport: Using AZMQ & "
<< "asiofi (" << fContext.GetAsiofiVersion() << ")";
if (config) {
fContext.SetSizeHint(config->GetValue<size_t>("ofi-size-hint"));
fContext.SetSizeHint(config->GetValue<size_t>("ofi-size-hint")); // temporary hack to provide expected message size for receive
}
} catch (ContextError& e) {
throw TransportFactoryError{e.what()};

View File

@@ -66,7 +66,7 @@ FairMQProgOptions::FairMQProgOptions()
("print-channels", po::value<bool >()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (<channel name>:<min num subchannels>:<max num subchannels>)")
("shm-segment-size", po::value<size_t >()->default_value(2000000000), "Shared memory: size of the shared memory segment (in bytes).")
("shm-monitor", po::value<bool >()->default_value(true), "Shared memory: run monitor daemon.")
("ofi-size-hint", po::value<size_t >()->default_value(0), "EXPERIMENTAL: OFI size hint for the allocator.")
("ofi-size-hint", po::value<size_t >()->default_value(2000000), "EXPERIMENTAL: OFI size hint for the allocator.")
("rate", po::value<float >()->default_value(0.), "Rate for conditional run loop (Hz).")
("session", po::value<string >()->default_value("default"), "Session name.");

View File

@@ -29,7 +29,8 @@ namespace
++gSignalCount;
gLastSignal = signal;
if (gSignalCount > 1) {
if (gSignalCount > 1)
{
std::abort();
}
}
@@ -56,18 +57,11 @@ Control::Control(const string& name, const Plugin::Version version, const string
{
SubscribeToDeviceStateChange([&](DeviceState newState) {
LOG(trace) << "control plugin notified on new state: " << newState;
{
lock_guard<mutex> lock{fEventsMutex};
fEvents.push(newState);
}
fNewEvent.notify_one();
if (newState == DeviceState::Error) {
fPluginShutdownRequested = true;
fDeviceShutdownRequested = true;
// throw DeviceErrorState("Controlled device transitioned to error state.");
}
});
try {
@@ -116,25 +110,6 @@ auto Control::RunStartupSequence() -> void
while (WaitForNextState() != DeviceState::Running) {}
}
auto Control::WaitForNextState() -> DeviceState
{
unique_lock<mutex> lock{fEventsMutex};
while (fEvents.empty()) {
fNewEvent.wait_for(lock, chrono::milliseconds(50));
}
auto result = fEvents.front();
if (result == DeviceState::Error) {
ReleaseDeviceControl();
throw DeviceErrorState("Controlled device transitioned to error state.");
}
fEvents.pop();
return result;
}
auto ControlPluginProgramOptions() -> Plugin::ProgOptions
{
namespace po = boost::program_options;
@@ -189,7 +164,7 @@ try {
bool keepRunning = true;
while (keepRunning) {
if (poll(cinfd, 1, 100)) {
if (poll(cinfd, 1, 500)) {
if (fDeviceShutdownRequested) {
break;
}
@@ -273,7 +248,6 @@ try {
}
if (GetCurrentDeviceState() == DeviceState::Error) {
ReleaseDeviceControl();
throw DeviceErrorState("Controlled device transitioned to error state.");
}
@@ -389,38 +363,64 @@ void Control::PrintStateMachine()
cout << ss.str() << flush;
}
auto Control::WaitForNextState() -> DeviceState
{
unique_lock<mutex> lock{fEventsMutex};
while (fEvents.empty()) {
fNewEvent.wait_for(lock, chrono::milliseconds(50));
}
auto result = fEvents.front();
if (result == DeviceState::Error) {
throw DeviceErrorState("Controlled device transitioned to error state.");
}
fEvents.pop();
return result;
}
auto Control::StaticMode() -> void
try {
try
{
RunStartupSequence();
{
// Wait for next state, which is DeviceState::Ready,
// or for device shutdown request (Ctrl-C)
unique_lock<mutex> lock{fEventsMutex};
while (fEvents.empty() && !fDeviceShutdownRequested) {
while (fEvents.empty() && !fDeviceShutdownRequested)
{
fNewEvent.wait_for(lock, chrono::milliseconds(50));
}
if (fEvents.front() == DeviceState::Error) {
ReleaseDeviceControl();
if (fEvents.front() == DeviceState::Error)
{
throw DeviceErrorState("Controlled device transitioned to error state.");
}
}
RunShutdownSequence();
} catch (PluginServices::DeviceControlError& e) {
}
catch (PluginServices::DeviceControlError& e)
{
// If we are here, it means another plugin has taken control. That's fine, just print the exception message and do nothing else.
LOG(debug) << e.what();
} catch (DeviceErrorState&) {
}
catch (DeviceErrorState&)
{
}
auto Control::SignalHandler() -> void
{
while (gSignalCount == 0 && !fPluginShutdownRequested) {
while (gSignalCount == 0 && !fPluginShutdownRequested)
{
this_thread::sleep_for(chrono::milliseconds(100));
}
if (!fPluginShutdownRequested) {
if (!fPluginShutdownRequested)
{
LOG(info) << "Received device shutdown request (signal " << gLastSignal << ").";
LOG(info) << "Waiting for graceful device shutdown. Hit Ctrl-C again to abort immediately.";
@@ -431,14 +431,20 @@ auto Control::SignalHandler() -> void
if (fControllerThread.joinable()) fControllerThread.join();
}
if (!fDeviceHasShutdown) {
if (!fDeviceHasShutdown)
{
// Take over control and attempt graceful shutdown
StealDeviceControl();
try {
try
{
RunShutdownSequence();
} catch (PluginServices::DeviceControlError& e) {
}
catch (PluginServices::DeviceControlError& e)
{
LOG(info) << "Graceful device shutdown failed: " << e.what() << " If hanging, hit Ctrl-C again to abort immediately.";
} catch (...) {
}
catch (...)
{
LOG(info) << "Graceful device shutdown failed. If hanging, hit Ctrl-C again to abort immediately.";
}
}

View File

@@ -72,6 +72,7 @@ SINK+=" --id sink1"
#SINK+=" --io-threads 2"
#SINK+=" --control static"
SINK+=" --transport $transport"
SINK+=" --ofi-size-hint $msgSize"
SINK+=" --severity debug"
SINK+=" --multipart false"
SINK+=" --max-iterations $maxIterations"

View File

@@ -7,13 +7,8 @@
********************************************************************************/
#include <fairmq/tools/Process.h>
#include <fairmq/tools/Strings.h>
#include <boost/process.hpp>
#include <boost/asio.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <signal.h> // kill, signals
#include <iostream>
#include <sstream>
@@ -22,28 +17,6 @@
using namespace std;
namespace bp = boost::process;
namespace ba = boost::asio;
namespace bs = boost::system;
class LinePrinter
{
public:
LinePrinter(stringstream& out, const string& prefix)
: fOut(out)
, fPrefix(prefix)
{}
// prints line with prefix on both cout (thread-safe) and output stream
void Print(const string& line)
{
cout << fair::mq::tools::ToString(fPrefix, line, "\n") << flush;
fOut << fPrefix << line << endl;
}
private:
stringstream& fOut;
const string fPrefix;
};
namespace fair
{
@@ -60,111 +33,57 @@ namespace tools
* @param[in] log_prefix How to prefix each captured output line with
* @return Captured stdout output and exit code
*/
execute_result execute(const string& cmd, const string& prefix, const string& input, int sig)
execute_result execute(const string& cmd, const string& prefix, const string& input)
{
execute_result result;
stringstream out;
LinePrinter p(out, prefix);
// print full line thread-safe
stringstream printCmd;
printCmd << prefix << " " << cmd << "\n";
cout << printCmd.str() << flush;
p.Print(cmd);
out << prefix << cmd << endl;
ba::io_service ios;
// Execute command and capture stdout, add prefix line by line
bp::ipstream c_stdout;
bp::opstream c_stdin;
bp::child c(cmd, bp::std_out > c_stdout, bp::std_in < c_stdin);
// containers for std_in
ba::const_buffer inputBuffer(ba::buffer(input));
bp::async_pipe inputPipe(ios);
// containers for std_out
ba::streambuf outputBuffer;
bp::async_pipe outputPipe(ios);
// containers for std_err
ba::streambuf errorBuffer;
bp::async_pipe errorPipe(ios);
while (c.valid() && !c.running()) {
;
}
const string delimiter = "\n";
ba::deadline_timer inputTimer(ios, boost::posix_time::milliseconds(100));
ba::deadline_timer signalTimer(ios, boost::posix_time::milliseconds(100));
if (!c.valid()) {
throw runtime_error("Can't execute the given process.");
}
// child process
bp::child c(cmd, bp::std_out > outputPipe, bp::std_err > errorPipe, bp::std_in < inputPipe);
int pid = c.id();
p.Print(ToString("fair::mq::tools::execute: pid: ", pid));
// handle std_in with a delay
// Optionally, write to stdin of the child
if (input != "") {
inputTimer.async_wait([&](const bs::error_code& ec1) {
if (!ec1) {
ba::async_write(inputPipe, inputBuffer, [&](const bs::error_code& ec2, size_t /* n */) {
if (!ec2) {
// inputPipe.async_close();
} else {
p.Print(ToString("error in boost::asio::async_write: ", ec2.message()));
}
});
} else {
p.Print(ToString("error in boost::asio::deadline_timer.async_wait: ", ec1.message()));
}
});
this_thread::sleep_for(chrono::milliseconds(100));
c_stdin << input;
c_stdin.flush();
}
if (sig != -1) {
signalTimer.async_wait([&](const bs::error_code& ec1) {
if (!ec1) {
kill(pid, sig);
} else {
p.Print(ToString("error in boost::asio::deadline_timer.async_wait: ", ec1.message()));
}
});
string line;
while (c.running() && getline(c_stdout, line)) {
// print full line thread-safe
stringstream printLine;
printLine << prefix << line << "\n";
cout << printLine.str() << flush;
out << prefix << line << "\n";
}
// handle std_out line by line
function<void(const bs::error_code&, size_t)> onStdOut = [&](const bs::error_code& ec, size_t /* n */) {
if (!ec) {
istream is(&outputBuffer);
string line;
getline(is, line);
p.Print(line);
ba::async_read_until(outputPipe, outputBuffer, delimiter, onStdOut);
} else {
if (ec == ba::error::eof) {
// outputPipe.async_close();
} else {
p.Print(ec.message());
}
}
};
ba::async_read_until(outputPipe, outputBuffer, delimiter, onStdOut);
// handle std_err line by line
function<void(const bs::error_code&, size_t)> onStdErr = [&](const bs::error_code& ec, size_t /* n */) {
if (!ec) {
istream is(&errorBuffer);
string line;
getline(is, line);
p.Print(ToString("error: ", line));
ba::async_read_until(errorPipe, errorBuffer, delimiter, onStdErr);
} else {
if (ec == ba::error::eof) {
// errorPipe.async_close();
} else {
p.Print(ec.message());
}
}
};
ba::async_read_until(errorPipe, errorBuffer, delimiter, onStdErr);
ios.run();
c.wait();
// Capture exit code
result.exit_code = c.exit_code();
out << prefix << " Exit code: " << result.exit_code << endl;
p.Print(ToString("fair::mq::tools::execute: exit code: ", result.exit_code));
result.console_out = out.str();
// Return result
return result;
}

View File

@@ -38,8 +38,7 @@ struct execute_result
*/
execute_result execute(const std::string& cmd,
const std::string& prefix = "",
const std::string& input = "",
int sig = -1);
const std::string& input = "");
} /* namespace tools */
} /* namespace mq */

View File

@@ -105,8 +105,6 @@ add_testsuite(Device
device/_config.cxx
device/_waitfor.cxx
device/_exceptions.cxx
device/_error_state.cxx
device/_signals.cxx
LINKS FairMQ
DEPENDS testhelper_runTestDevice

View File

@@ -38,9 +38,12 @@ class Receiver : public FairMQDevice
auto Run() -> void override
{
auto msg = FairMQMessagePtr{NewMessage()};
if (Receive(msg, fChannelName) >= 0) {
if (Receive(msg, fChannelName) >= 0)
{
LOG(info) << "received empty message";
} else {
}
else
{
LOG(error) << "fair::mq::test::Receiver::Run(): Receive(msg, fChannelName) < 0";
}
};

View File

@@ -38,9 +38,12 @@ class Sender : public FairMQDevice
auto Run() -> void override
{
auto msg = FairMQMessagePtr{NewMessage()};
if (Send(msg, fChannelName) >= 0) {
if (Send(msg, fChannelName) >= 0)
{
LOG(info) << "sent empty message";
} else {
}
else
{
LOG(error) << "fair::mq::test::Sender::Run(): Send(msg, fChannelName) < 0";
}
};

View File

@@ -1,121 +0,0 @@
/********************************************************************************
* Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runner.h"
#include <gtest/gtest.h>
#include <boost/process.hpp>
#include <fairmq/Tools.h>
#include <string>
#include <thread>
#include <iostream>
namespace
{
using namespace std;
using namespace fair::mq::test;
using namespace fair::mq::tools;
void RunErrorStateIn(const string& state, const string& control, const string& input = "")
{
size_t session{fair::mq::tools::UuidHash()};
execute_result result{"", 100};
thread device_thread([&]() {
stringstream cmd;
cmd << runTestDevice
<< " --id error_state_" << state << "_"
<< " --control " << control
<< " --session " << session
<< " --color false";
result = execute(cmd.str(), "[ErrorFound IN " + state + "]", input);
});
device_thread.join();
ASSERT_NE(string::npos, result.console_out.find("going to change to Error state from " + state + "()"));
exit(result.exit_code);
}
TEST(ErrorState, static_InInit)
{
EXPECT_EXIT(RunErrorStateIn("Init", "static"), ::testing::ExitedWithCode(1), "");
}
TEST(ErrorState, static_InBind)
{
EXPECT_EXIT(RunErrorStateIn("Bind", "static"), ::testing::ExitedWithCode(1), "");
}
TEST(ErrorState, static_InConnect)
{
EXPECT_EXIT(RunErrorStateIn("Connect", "static"), ::testing::ExitedWithCode(1), "");
}
TEST(ErrorState, static_InInitTask)
{
EXPECT_EXIT(RunErrorStateIn("InitTask", "static"), ::testing::ExitedWithCode(1), "");
}
TEST(ErrorState, static_InPreRun)
{
EXPECT_EXIT(RunErrorStateIn("PreRun", "static"), ::testing::ExitedWithCode(1), "");
}
TEST(ErrorState, static_InRun)
{
EXPECT_EXIT(RunErrorStateIn("Run", "static"), ::testing::ExitedWithCode(1), "");
}
TEST(ErrorState, static_InPostRun)
{
EXPECT_EXIT(RunErrorStateIn("PostRun", "static"), ::testing::ExitedWithCode(1), "");
}
TEST(ErrorState, static_InResetTask)
{
EXPECT_EXIT(RunErrorStateIn("ResetTask", "static"), ::testing::ExitedWithCode(1), "");
}
TEST(ErrorState, static_InReset)
{
EXPECT_EXIT(RunErrorStateIn("Reset", "static"), ::testing::ExitedWithCode(1), "");
}
TEST(ErrorState, interactive_InInit)
{
EXPECT_EXIT(RunErrorStateIn("Init", "interactive"), ::testing::ExitedWithCode(1), "");
}
TEST(ErrorState, interactive_InBind)
{
EXPECT_EXIT(RunErrorStateIn("Bind", "interactive"), ::testing::ExitedWithCode(1), "");
}
TEST(ErrorState, interactive_InConnect)
{
EXPECT_EXIT(RunErrorStateIn("Connect", "interactive"), ::testing::ExitedWithCode(1), "");
}
TEST(ErrorState, interactive_InInitTask)
{
EXPECT_EXIT(RunErrorStateIn("InitTask", "interactive"), ::testing::ExitedWithCode(1), "");
}
TEST(ErrorState, interactive_InPreRun)
{
EXPECT_EXIT(RunErrorStateIn("PreRun", "interactive"), ::testing::ExitedWithCode(1), "");
}
TEST(ErrorState, interactive_InRun)
{
EXPECT_EXIT(RunErrorStateIn("Run", "interactive"), ::testing::ExitedWithCode(1), "");
}
TEST(ErrorState, interactive_InPostRun)
{
EXPECT_EXIT(RunErrorStateIn("PostRun", "interactive"), ::testing::ExitedWithCode(1), "");
}
TEST(ErrorState, interactive_InResetTask)
{
EXPECT_EXIT(RunErrorStateIn("ResetTask", "interactive", "q"), ::testing::ExitedWithCode(1), "");
}
TEST(ErrorState, interactive_InReset)
{
EXPECT_EXIT(RunErrorStateIn("Reset", "interactive", "q"), ::testing::ExitedWithCode(1), "");
}
} // namespace

View File

@@ -23,7 +23,7 @@ using namespace std;
using namespace fair::mq::test;
using namespace fair::mq::tools;
void RunExceptionIn(const string& state, const string& control, const string& input = "")
void RunExceptionIn(const std::string& state, const std::string& input = "")
{
size_t session{fair::mq::tools::UuidHash()};
@@ -32,7 +32,7 @@ void RunExceptionIn(const string& state, const string& control, const string& in
stringstream cmd;
cmd << runTestDevice
<< " --id exceptions_" << state << "_"
<< " --control " << control
<< " --control " << ((input == "") ? "static" : "interactive")
<< " --session " << session
<< " --color false";
result = execute(cmd.str(), "[EXCEPTION IN " + state + "]", input);
@@ -40,82 +40,86 @@ void RunExceptionIn(const string& state, const string& control, const string& in
device_thread.join();
ASSERT_NE(string::npos, result.console_out.find("exception in " + state + "()"));
ASSERT_NE(std::string::npos, result.console_out.find("exception in " + state + "()"));
exit(result.exit_code);
}
TEST(Exceptions, static_InInit)
{
EXPECT_EXIT(RunExceptionIn("Init", "static"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, static_InBind)
{
EXPECT_EXIT(RunExceptionIn("Bind", "static"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, static_InConnect)
{
EXPECT_EXIT(RunExceptionIn("Connect", "static"), ::testing::ExitedWithCode(1), "");
EXPECT_EXIT(RunExceptionIn("Init"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, static_InInitTask)
{
EXPECT_EXIT(RunExceptionIn("InitTask", "static"), ::testing::ExitedWithCode(1), "");
EXPECT_EXIT(RunExceptionIn("InitTask"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, static_InPreRun)
{
EXPECT_EXIT(RunExceptionIn("PreRun", "static"), ::testing::ExitedWithCode(1), "");
EXPECT_EXIT(RunExceptionIn("PreRun"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, static_InRun)
{
EXPECT_EXIT(RunExceptionIn("Run", "static"), ::testing::ExitedWithCode(1), "");
EXPECT_EXIT(RunExceptionIn("Run"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, static_InPostRun)
{
EXPECT_EXIT(RunExceptionIn("PostRun", "static"), ::testing::ExitedWithCode(1), "");
EXPECT_EXIT(RunExceptionIn("PostRun"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, static_InResetTask)
{
EXPECT_EXIT(RunExceptionIn("ResetTask", "static"), ::testing::ExitedWithCode(1), "");
EXPECT_EXIT(RunExceptionIn("ResetTask"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, static_InReset)
{
EXPECT_EXIT(RunExceptionIn("Reset", "static"), ::testing::ExitedWithCode(1), "");
EXPECT_EXIT(RunExceptionIn("Reset"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, interactive_InInit)
{
EXPECT_EXIT(RunExceptionIn("Init", "interactive"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, interactive_InBind)
{
EXPECT_EXIT(RunExceptionIn("Bind", "interactive"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, interactive_InConnect)
{
EXPECT_EXIT(RunExceptionIn("Connect", "interactive"), ::testing::ExitedWithCode(1), "");
EXPECT_EXIT(RunExceptionIn("Init", "q"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, interactive_InInitTask)
{
EXPECT_EXIT(RunExceptionIn("InitTask", "interactive"), ::testing::ExitedWithCode(1), "");
EXPECT_EXIT(RunExceptionIn("InitTask", "q"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, interactive_InPreRun)
{
EXPECT_EXIT(RunExceptionIn("PreRun", "interactive"), ::testing::ExitedWithCode(1), "");
EXPECT_EXIT(RunExceptionIn("PreRun", "q"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, interactive_InRun)
{
EXPECT_EXIT(RunExceptionIn("Run", "interactive"), ::testing::ExitedWithCode(1), "");
EXPECT_EXIT(RunExceptionIn("Run", "q"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, interactive_InPostRun)
{
EXPECT_EXIT(RunExceptionIn("PostRun", "interactive"), ::testing::ExitedWithCode(1), "");
EXPECT_EXIT(RunExceptionIn("PostRun", "q"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, interactive_InResetTask)
{
EXPECT_EXIT(RunExceptionIn("ResetTask", "interactive", "q"), ::testing::ExitedWithCode(1), "");
EXPECT_EXIT(RunExceptionIn("ResetTask", "q"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, interactive_InReset)
{
EXPECT_EXIT(RunExceptionIn("Reset", "interactive", "q"), ::testing::ExitedWithCode(1), "");
EXPECT_EXIT(RunExceptionIn("Reset", "q"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, interactive_invalid_InInit)
{
EXPECT_EXIT(RunExceptionIn("Init", "_"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, interactive_invalid_InInitTask)
{
EXPECT_EXIT(RunExceptionIn("InitTask", "_"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, interactive_invalid_InPreRun)
{
EXPECT_EXIT(RunExceptionIn("PreRun", "_"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, interactive_invalid_InRun)
{
EXPECT_EXIT(RunExceptionIn("Run", "_"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, interactive_invalid_InPostRun)
{
EXPECT_EXIT(RunExceptionIn("PostRun", "_"), ::testing::ExitedWithCode(1), "");
}
} // namespace

View File

@@ -1,121 +0,0 @@
/********************************************************************************
* Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runner.h"
#include <gtest/gtest.h>
#include <boost/process.hpp>
#include <fairmq/Tools.h>
#include <string>
#include <thread>
#include <iostream>
namespace
{
using namespace std;
using namespace fair::mq::test;
using namespace fair::mq::tools;
void RunSignalIn(const string& state, const string& control, const string& input = "")
{
size_t session{fair::mq::tools::UuidHash()};
execute_result result{"", 100};
thread device_thread([&]() {
stringstream cmd;
cmd << runTestDevice
<< " --id signals_" << state << "_"
<< " --control " << control
<< " --session " << session
<< " --color false";
result = execute(cmd.str(), "[SIGINT IN " + state + "]", input);
});
device_thread.join();
ASSERT_NE(string::npos, result.console_out.find("raising SIGINT from " + state + "()"));
exit(result.exit_code);
}
TEST(Signal_SIGINT, static_InInit)
{
EXPECT_EXIT(RunSignalIn("Init", "static"), ::testing::ExitedWithCode(0), "");
}
TEST(Signal_SIGINT, static_InBind)
{
EXPECT_EXIT(RunSignalIn("Bind", "static"), ::testing::ExitedWithCode(0), "");
}
TEST(Signal_SIGINT, static_InConnect)
{
EXPECT_EXIT(RunSignalIn("Connect", "static"), ::testing::ExitedWithCode(0), "");
}
TEST(Signal_SIGINT, static_InInitTask)
{
EXPECT_EXIT(RunSignalIn("InitTask", "static"), ::testing::ExitedWithCode(0), "");
}
TEST(Signal_SIGINT, static_InPreRun)
{
EXPECT_EXIT(RunSignalIn("PreRun", "static"), ::testing::ExitedWithCode(0), "");
}
TEST(Signal_SIGINT, static_InRun)
{
EXPECT_EXIT(RunSignalIn("Run", "static"), ::testing::ExitedWithCode(0), "");
}
TEST(Signal_SIGINT, static_InPostRun)
{
EXPECT_EXIT(RunSignalIn("PostRun", "static"), ::testing::ExitedWithCode(0), "");
}
TEST(Signal_SIGINT, static_InResetTask)
{
EXPECT_EXIT(RunSignalIn("ResetTask", "static"), ::testing::ExitedWithCode(0), "");
}
TEST(Signal_SIGINT, static_InReset)
{
EXPECT_EXIT(RunSignalIn("Reset", "static"), ::testing::ExitedWithCode(0), "");
}
TEST(Signal_SIGINT, interactive_InInit)
{
EXPECT_EXIT(RunSignalIn("Init", "interactive"), ::testing::ExitedWithCode(0), "");
}
TEST(Signal_SIGINT, interactive_InBind)
{
EXPECT_EXIT(RunSignalIn("Bind", "interactive"), ::testing::ExitedWithCode(0), "");
}
TEST(Signal_SIGINT, interactive_InConnect)
{
EXPECT_EXIT(RunSignalIn("Connect", "interactive"), ::testing::ExitedWithCode(0), "");
}
TEST(Signal_SIGINT, interactive_InInitTask)
{
EXPECT_EXIT(RunSignalIn("InitTask", "interactive"), ::testing::ExitedWithCode(0), "");
}
TEST(Signal_SIGINT, interactive_InPreRun)
{
EXPECT_EXIT(RunSignalIn("PreRun", "interactive"), ::testing::ExitedWithCode(0), "");
}
TEST(Signal_SIGINT, interactive_InRun)
{
EXPECT_EXIT(RunSignalIn("Run", "interactive"), ::testing::ExitedWithCode(0), "");
}
TEST(Signal_SIGINT, interactive_InPostRun)
{
EXPECT_EXIT(RunSignalIn("PostRun", "interactive"), ::testing::ExitedWithCode(0), "");
}
TEST(Signal_SIGINT, interactive_InResetTask)
{
EXPECT_EXIT(RunSignalIn("ResetTask", "interactive", "q"), ::testing::ExitedWithCode(0), "");
}
TEST(Signal_SIGINT, interactive_InReset)
{
EXPECT_EXIT(RunSignalIn("Reset", "interactive", "q"), ::testing::ExitedWithCode(0), "");
}
} // namespace

View File

@@ -9,65 +9,66 @@
#include "runner.h"
#include <gtest/gtest.h>
#include <fairmq/Tools.h>
#include <boost/process.hpp>
#include <signal.h> // kill
#include <sys/types.h>
#include <signal.h>
#include <string>
#include <thread>
#include <chrono>
#include <iostream>
#include <future> // std::async, std::future
namespace
{
using namespace std;
using namespace fair::mq::test;
using namespace fair::mq::tools;
void RunWaitFor(const string& state, const string& control)
void RunWaitFor()
{
execute_result result;
thread device_thread([&] {
std::mutex mtx;
std::condition_variable cv;
int pid = 0;
int exit_code = 0;
thread deviceThread([&]() {
stringstream cmd;
cmd << runTestDevice
<< " --id waitfor_" << state
<< " --control " << control
<< " --session " << UuidHash()
<< " --severity debug"
<< " --color false";
result = execute(cmd.str(), "[WaitFor]", "", SIGINT);
cmd << runTestDevice << " --id waitfor_" << " --control static " << " --severity nolog";
boost::process::ipstream stdout;
boost::process::child c(cmd.str(), boost::process::std_out > stdout);
string line;
getline(stdout, line);
{
std::lock_guard<std::mutex> lock(mtx);
pid = c.id();
}
cv.notify_one();
c.wait();
exit_code = c.exit_code();
});
device_thread.join();
{
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [&pid]{ return pid != 0; });
}
ASSERT_NE(string::npos, result.console_out.find("Sleeping Done. Interrupted."));
kill(pid, SIGINT);
exit(result.exit_code);
deviceThread.join();
exit(exit_code);
}
TEST(WaitFor, static_InPreRun)
TEST(Device, WaitFor)
{
EXPECT_EXIT(RunWaitFor("PreRun", "static"), ::testing::ExitedWithCode(0), "");
}
TEST(WaitFor, static_InRun)
{
EXPECT_EXIT(RunWaitFor("Run", "static"), ::testing::ExitedWithCode(0), "");
}
TEST(WaitFor, static_InPostRun)
{
EXPECT_EXIT(RunWaitFor("PostRun", "static"), ::testing::ExitedWithCode(0), "");
}
TEST(WaitFor, interactive_InPreRun)
{
EXPECT_EXIT(RunWaitFor("PreRun", "interactive"), ::testing::ExitedWithCode(0), "");
}
TEST(WaitFor, interactive_InRun)
{
EXPECT_EXIT(RunWaitFor("Run", "interactive"), ::testing::ExitedWithCode(0), "");
}
TEST(WaitFor, interactive_InPostRun)
{
EXPECT_EXIT(RunWaitFor("PostRun", "interactive"), ::testing::ExitedWithCode(0), "");
EXPECT_EXIT(RunWaitFor(), ::testing::ExitedWithCode(0), "");
}
} // namespace

View File

@@ -1,113 +0,0 @@
/********************************************************************************
* Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIR_MQ_TEST_ERROR_STATE_H
#define FAIR_MQ_TEST_ERROR_STATE_H
#include <FairMQDevice.h>
#include <FairMQLogger.h>
#include <iostream>
namespace fair
{
namespace mq
{
namespace test
{
class ErrorState : public FairMQDevice
{
public:
void Init() override
{
std::string state("Init");
if (std::string::npos != GetId().find("_" + state + "_")) {
LOG(debug) << "going to change to Error state from " << state << "()";
ChangeState(fair::mq::Transition::ErrorFound);
}
}
void Bind() override
{
std::string state("Bind");
if (std::string::npos != GetId().find("_" + state + "_")) {
LOG(debug) << "going to change to Error state from " << state << "()";
ChangeState(fair::mq::Transition::ErrorFound);
}
}
void Connect() override
{
std::string state("Connect");
if (std::string::npos != GetId().find("_" + state + "_")) {
LOG(debug) << "going to change to Error state from " << state << "()";
ChangeState(fair::mq::Transition::ErrorFound);
}
}
void InitTask() override
{
std::string state("InitTask");
if (std::string::npos != GetId().find("_" + state + "_")) {
LOG(debug) << "going to change to Error state from " << state << "()";
ChangeState(fair::mq::Transition::ErrorFound);
}
}
void PreRun() override
{
std::string state("PreRun");
if (std::string::npos != GetId().find("_" + state + "_")) {
LOG(debug) << "going to change to Error state from " << state << "()";
ChangeState(fair::mq::Transition::ErrorFound);
}
}
void Run() override
{
std::string state("Run");
if (std::string::npos != GetId().find("_" + state + "_")) {
LOG(debug) << "going to change to Error state from " << state << "()";
ChangeState(fair::mq::Transition::ErrorFound);
}
}
void PostRun() override
{
std::string state("PostRun");
if (std::string::npos != GetId().find("_" + state + "_")) {
LOG(debug) << "going to change to Error state from " << state << "()";
ChangeState(fair::mq::Transition::ErrorFound);
}
}
void ResetTask() override
{
std::string state("ResetTask");
if (std::string::npos != GetId().find("_" + state + "_")) {
LOG(debug) << "going to change to Error state from " << state << "()";
ChangeState(fair::mq::Transition::ErrorFound);
}
}
void Reset() override
{
std::string state("Reset");
if (std::string::npos != GetId().find("_" + state + "_")) {
LOG(debug) << "going to change to Error state from " << state << "()";
ChangeState(fair::mq::Transition::ErrorFound);
}
}
};
} // namespace test
} // namespace mq
} // namespace fair
#endif /* FAIR_MQ_TEST_ERROR_STATE_H */

View File

@@ -33,22 +33,6 @@ class Exceptions : public FairMQDevice
}
}
auto Bind() -> void override
{
std::string state("Bind");
if (std::string::npos != GetId().find("_" + state + "_")) {
throw std::runtime_error("exception in " + state + "()");
}
}
auto Connect() -> void override
{
std::string state("Connect");
if (std::string::npos != GetId().find("_" + state + "_")) {
throw std::runtime_error("exception in " + state + "()");
}
}
auto InitTask() -> void override
{
std::string state("InitTask");

View File

@@ -57,7 +57,6 @@ class PairLeft : public FairMQDevice
if (counter == 6) LOG(info) << "Simple message with short text data successfull";
assert(counter == 6);
if (counter == 6) LOG(info) << "PAIR test successfull.";
};
};

View File

@@ -55,6 +55,9 @@ class PairRight : public FairMQDevice
auto msg6(NewSimpleMessageFor("data", 0, "testdata1234"));
if (Send(msg6, "data") >= 0) counter++;
if (counter == 6) LOG(info) << "Simple message with short text data successfull";
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
if (counter == 6) LOG(info) << "PAIR test successfull.";
};
};
@@ -62,4 +65,4 @@ class PairRight : public FairMQDevice
} // namespace mq
} // namespace fair
#endif /* FAIR_MQ_TEST_PAIRRIGHT_H */
#endif /* FAIR_MQ_TEST_PAIRRIGHT_H */

View File

@@ -1,112 +0,0 @@
/********************************************************************************
* Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIR_MQ_TEST_SIGNALS_H
#define FAIR_MQ_TEST_SIGNALS_H
#include <FairMQDevice.h>
#include <FairMQLogger.h>
#include <iostream>
#include <csignal>
namespace fair
{
namespace mq
{
namespace test
{
class Signals : public FairMQDevice
{
public:
void Init() override
{
std::string state("Init");
if (std::string::npos != GetId().find("_" + state + "_")) {
LOG(debug) << "raising SIGINT from " << state << "()";
raise(SIGINT);
}
}
void Bind() override
{
std::string state("Bind");
if (std::string::npos != GetId().find("_" + state + "_")) {
LOG(debug) << "raising SIGINT from " << state << "()";
raise(SIGINT);
}
}
void Connect() override
{
std::string state("Connect");
if (std::string::npos != GetId().find("_" + state + "_")) {
LOG(debug) << "raising SIGINT from " << state << "()";
raise(SIGINT);
}
}
void InitTask() override
{
std::string state("InitTask");
if (std::string::npos != GetId().find("_" + state + "_")) {
LOG(debug) << "raising SIGINT from " << state << "()";
raise(SIGINT);
}
}
void PreRun() override
{
std::string state("PreRun");
if (std::string::npos != GetId().find("_" + state + "_")) {
LOG(debug) << "raising SIGINT from " << state << "()";
raise(SIGINT);
}
}
void Run() override
{
std::string state("Run");
if (std::string::npos != GetId().find("_" + state + "_")) {
LOG(debug) << "raising SIGINT from " << state << "()";
raise(SIGINT);
}
}
void PostRun() override
{
std::string state("PostRun");
if (std::string::npos != GetId().find("_" + state + "_")) {
LOG(debug) << "raising SIGINT from " << state << "()";
raise(SIGINT);
}
}
void ResetTask() override
{
std::string state("ResetTask");
if (std::string::npos != GetId().find("_" + state + "_")) {
LOG(debug) << "raising SIGINT from " << state << "()";
raise(SIGINT);
}
}
void Reset() override
{
std::string state("Reset");
if (std::string::npos != GetId().find("_" + state + "_")) {
LOG(debug) << "raising SIGINT from " << state << "()";
raise(SIGINT);
}
}
};
} // namespace test
} // namespace mq
} // namespace fair
#endif /* FAIR_MQ_TEST_SIGNALS_H */

View File

@@ -24,34 +24,10 @@ namespace test
class TestWaitFor : public FairMQDevice
{
public:
void PreRun() override
void Run()
{
std::string state("PreRun");
if (std::string::npos != GetId().find("_" + state)) {
LOG(info) << "Going to sleep via WaitFor() in " << state << "...";
bool result = WaitFor(std::chrono::seconds(60));
LOG(info) << (result == true ? "Sleeping Done. Not interrupted." : "Sleeping Done. Interrupted.");
}
}
void Run() override
{
std::string state("Run");
if (std::string::npos != GetId().find("_" + state)) {
LOG(info) << "Going to sleep via WaitFor() in " << state << "...";
bool result = WaitFor(std::chrono::seconds(60));
LOG(info) << (result == true ? "Sleeping Done. Not interrupted." : "Sleeping Done. Interrupted.");
}
}
void PostRun() override
{
std::string state("PostRun");
if (std::string::npos != GetId().find("_" + state)) {
LOG(info) << "Going to sleep via WaitFor() in " << state << "...";
bool result = WaitFor(std::chrono::seconds(60));
LOG(info) << (result == true ? "Sleeping Done. Not interrupted." : "Sleeping Done. Interrupted.");
}
std::cout << "hello" << std::endl;
WaitFor(std::chrono::seconds(60));
}
};

View File

@@ -19,8 +19,6 @@
#include "devices/TestTransferTimeout.h"
#include "devices/TestWaitFor.h"
#include "devices/TestExceptions.h"
#include "devices/TestErrorState.h"
#include "devices/TestSignals.h"
#include <runFairMQDevice.h>
@@ -42,38 +40,60 @@ auto getDevice(const FairMQProgOptions& config) -> FairMQDevicePtr
using namespace fair::mq::test;
auto id = config.GetValue<std::string>("id");
if (0 == id.find("pull_")) {
if (0 == id.find("pull_"))
{
return new Pull;
} else if (0 == id.find("push_")) {
}
else if (0 == id.find("push_"))
{
return new Push;
} else if (0 == id.find("sub_")) {
}
else if (0 == id.find("sub_"))
{
return new Sub;
} else if (0 == id.find("pub_")) {
}
else if (0 == id.find("pub_"))
{
return new Pub;
} else if (0 == id.find("req_")) {
}
else if (0 == id.find("req_"))
{
return new Req;
} else if (0 == id.find("rep_")) {
}
else if (0 == id.find("rep_"))
{
return new Rep;
} else if (0 == id.find("transfer_timeout_")) {
}
else if (0 == id.find("transfer_timeout_"))
{
return new TransferTimeout;
} else if (0 == id.find("pollout_")) {
}
else if (0 == id.find("pollout_"))
{
return new PollOut;
} else if (0 == id.find("pollin_")) {
}
else if (0 == id.find("pollin_"))
{
return new PollIn;
} else if (0 == id.find("pairleft_")) {
}
else if (0 == id.find("pairleft_"))
{
return new PairLeft;
} else if (0 == id.find("pairright_")) {
}
else if (0 == id.find("pairright_"))
{
return new PairRight;
} else if (0 == id.find("waitfor_")) {
}
else if (0 == id.find("waitfor_"))
{
return new TestWaitFor;
} else if (0 == id.find("exceptions_")) {
}
else if (0 == id.find("exceptions_"))
{
return new Exceptions;
} else if (0 == id.find("error_state_")) {
return new ErrorState;
} else if (0 == id.find("signals_")) {
return new Signals;
} else {
}
else
{
cerr << "Don't know id '" << id << "'" << endl;
return nullptr;
}