Compare commits

..

7 Commits

Author SHA1 Message Date
Alexey Rybalchenko
35c7959c53 Workaround Cpp17MoveInsertable issue on xcode 12 2020-09-22 05:33:29 +02:00
Alexey Rybalchenko
5ea8ffeb34 Update command format in PMIx plugin 2020-09-17 14:22:03 +02:00
Alexey Rybalchenko
04ee1db8e5 Avoid default session id in shmem tests 2020-09-17 14:22:03 +02:00
Dennis Klein
4a15a38dd4 Tests.Device: Set correct log level for FairLogger 1.[7-8] 2020-09-16 15:43:58 +02:00
Dennis Klein
0f5e1b6815 Tests.SDK: Reduce timeout by factor 1000 because new machines can be fast enough to complete within 1ms 2020-09-16 15:43:58 +02:00
Dennis Klein
5e6ad47223 CI: Run macOS checks on newer environment 2020-09-16 15:43:58 +02:00
Alexey Rybalchenko
6932f88c84 Adjust transfer methods behaviour when interrupted
A transer is attempted even if the transport has been interrupted
(with a timeout). When the timeout is reached, transfer methods will
return TransferResult::interrupted (-3).
2020-09-16 15:43:58 +02:00
14 changed files with 259 additions and 140 deletions

11
Jenkinsfile vendored
View File

@@ -8,11 +8,15 @@ def jobMatrix(String prefix, List specs, Closure callback) {
def nodes = [:]
for (spec in specs) {
def label = specToLabel(spec)
def node_tag = label
if (spec.os =~ /macOS/) {
node_tag = spec.os
}
def fairsoft = spec.fairsoft
def os = spec.os
def compiler = spec.compiler
nodes["${prefix}/${label}"] = {
node(label) {
node(node_tag) {
githubNotify(context: "${prefix}/${label}", description: 'Building ...', status: 'PENDING')
try {
deleteDir()
@@ -29,7 +33,7 @@ def jobMatrix(String prefix, List specs, Closure callback) {
echo "module load compiler/gcc/9.1.0" >> Dart.cfg
'''
}
if (os =~ /MacOS/) {
if (os =~ /[Mm]acOS/) {
sh "echo \"export EXTRA_FLAGS='-DCMAKE_CXX_COMPILER=clang++'\" >> Dart.cfg"
} else {
sh "echo \"export EXTRA_FLAGS='-DCMAKE_CXX_COMPILER=g++'\" >> Dart.cfg"
@@ -71,8 +75,7 @@ pipeline{
script {
def build_jobs = jobMatrix('build', [
[os: 'Debian8', arch: 'x86_64', compiler: 'gcc9.1.0', fairsoft: 'fairmq_dev'],
[os: 'MacOS10.13', arch: 'x86_64', compiler: 'AppleLLVM10.0.0', fairsoft: 'fairmq_dev'],
[os: 'MacOS10.14', arch: 'x86_64', compiler: 'AppleLLVM10.0.0', fairsoft: 'fairmq_dev'],
[os: 'macOS10.15', arch: 'x86_64', compiler: 'AppleLLVM11.0.3', fairsoft: 'fairmq_dev'],
]) { spec, label ->
sh './Dart.sh alfa_ci Dart.cfg'
}

View File

@@ -66,13 +66,13 @@ class FairMQChannel
FairMQChannel(const FairMQChannel&, const std::string& name);
/// Move constructor
FairMQChannel(FairMQChannel&&) = delete;
// FairMQChannel(FairMQChannel&&) = delete;
/// Assignment operator
FairMQChannel& operator=(const FairMQChannel&);
/// Move assignment operator
FairMQChannel& operator=(FairMQChannel&&) = delete;
// FairMQChannel& operator=(FairMQChannel&&) = delete;
/// Destructor
virtual ~FairMQChannel()
@@ -250,7 +250,7 @@ class FairMQChannel
/// Sends a message to the socket queue.
/// @param msg Constant reference of unique_ptr to a FairMQMessage
/// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. -1 if there was an error.
/// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
int Send(FairMQMessagePtr& msg, int sndTimeoutInMs = -1)
{
CheckSendCompatibility(msg);
@@ -260,7 +260,7 @@ class FairMQChannel
/// Receives a message from the socket queue.
/// @param msg Constant reference of unique_ptr to a FairMQMessage
/// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received. -2 if reading from the queue was not possible or timed out. -1 if there was an error.
/// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
int Receive(FairMQMessagePtr& msg, int rcvTimeoutInMs = -1)
{
CheckReceiveCompatibility(msg);
@@ -270,7 +270,7 @@ class FairMQChannel
/// Send a vector of messages
/// @param msgVec message vector reference
/// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. -1 if there was an error.
/// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
int64_t Send(std::vector<FairMQMessagePtr>& msgVec, int sndTimeoutInMs = -1)
{
CheckSendCompatibility(msgVec);
@@ -280,7 +280,7 @@ class FairMQChannel
/// Receive a vector of messages
/// @param msgVec message vector reference
/// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received. -2 if reading from the queue was not possible or timed out. -1 if there was an error.
/// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
int64_t Receive(std::vector<FairMQMessagePtr>& msgVec, int rcvTimeoutInMs = -1)
{
CheckReceiveCompatibility(msgVec);
@@ -290,7 +290,7 @@ class FairMQChannel
/// Send FairMQParts
/// @param parts FairMQParts reference
/// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. -1 if there was an error.
/// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
int64_t Send(FairMQParts& parts, int sndTimeoutInMs = -1)
{
return Send(parts.fParts, sndTimeoutInMs);
@@ -299,7 +299,7 @@ class FairMQChannel
/// Receive FairMQParts
/// @param parts FairMQParts reference
/// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received. -2 if reading from the queue was not possible or timed out. -1 if there was an error.
/// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
int64_t Receive(FairMQParts& parts, int rcvTimeoutInMs = -1)
{
return Receive(parts.fParts, rcvTimeoutInMs);

View File

@@ -129,7 +129,7 @@ class FairMQDevice
/// @param chan channel name
/// @param i channel index
/// @param sndTimeoutInMs send timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. -1 if there was an error.
/// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
int Send(FairMQMessagePtr& msg, const std::string& channel, const int index = 0, int sndTimeoutInMs = -1)
{
return GetChannel(channel, index).Send(msg, sndTimeoutInMs);
@@ -140,7 +140,7 @@ class FairMQDevice
/// @param chan channel name
/// @param i channel index
/// @param rcvTimeoutInMs receive timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received. -2 if reading from the queue was not possible or timed out. -1 if there was an error.
/// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
int Receive(FairMQMessagePtr& msg, const std::string& channel, const int index = 0, int rcvTimeoutInMs = -1)
{
return GetChannel(channel, index).Receive(msg, rcvTimeoutInMs);
@@ -151,7 +151,7 @@ class FairMQDevice
/// @param chan channel name
/// @param i channel index
/// @param sndTimeoutInMs send timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. -1 if there was an error.
/// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
int64_t Send(FairMQParts& parts, const std::string& channel, const int index = 0, int sndTimeoutInMs = -1)
{
return GetChannel(channel, index).Send(parts.fParts, sndTimeoutInMs);
@@ -162,7 +162,7 @@ class FairMQDevice
/// @param chan channel name
/// @param i channel index
/// @param rcvTimeoutInMs receive timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received. -2 if reading from the queue was not possible or timed out. -1 if there was an error.
/// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
int64_t Receive(FairMQParts& parts, const std::string& channel, const int index = 0, int rcvTimeoutInMs = -1)
{
return GetChannel(channel, index).Receive(parts.fParts, rcvTimeoutInMs);

View File

@@ -9,14 +9,31 @@
#ifndef FAIRMQSOCKET_H_
#define FAIRMQSOCKET_H_
#include "FairMQMessage.h"
#include <memory>
#include <ostream>
#include <stdexcept>
#include <string>
#include <vector>
#include "FairMQMessage.h"
class FairMQTransportFactory;
namespace fair
{
namespace mq
{
enum class TransferResult : int
{
error = -1,
timeout = -2,
interrupted = -3
};
} // namespace mq
} // namespace fair
class FairMQSocket
{
public:

View File

@@ -272,7 +272,7 @@ try {
int size(0);
for (auto& msg : msgVec) {
size += msg->GetSize();
}
}
fSendPushSem.wait();
{
@@ -284,7 +284,7 @@ try {
return size;
} catch (const std::exception& e) {
LOG(error) << e.what();
return -1;
return TransferResult::error;
}
auto Socket::SendQueueReader() -> void
@@ -431,7 +431,7 @@ try {
return size;
} catch (const std::exception& e) {
LOG(error) << e.what();
return -1;
return TransferResult::error;
}
auto Socket::Receive(std::vector<MessagePtr>& msgVec, const int /*timeout*/) -> int64_t
@@ -449,14 +449,14 @@ try {
int64_t size(0);
for (auto& msg : msgVec) {
size += msg->GetSize();
}
}
fBytesRx += size;
++fMessagesRx;
return size;
return size;
} catch (const std::exception& e) {
LOG(error) << e.what();
return -1;
return TransferResult::error;
}
auto Socket::RecvControlQueueReader() -> void

View File

@@ -148,12 +148,12 @@ auto PMIxPlugin::SubscribeForCommands() -> void
Transition transition = static_cast<ChangeState&>(*cmd).GetTransition();
if (ChangeDeviceState(transition)) {
fCommands.Send(
Cmds(make<TransitionStatus>(fDeviceId, 0, Result::Ok, transition))
Cmds(make<TransitionStatus>(fDeviceId, 0, Result::Ok, transition, GetCurrentDeviceState()))
.Serialize(Format::JSON),
{sender});
} else {
fCommands.Send(
Cmds(make<TransitionStatus>(fDeviceId, 0, Result::Failure, transition))
Cmds(make<TransitionStatus>(fDeviceId, 0, Result::Failure, transition, GetCurrentDeviceState()))
.Serialize(Format::JSON),
{sender});
}

View File

@@ -464,7 +464,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
}
}
using Duration = std::chrono::milliseconds;
using Duration = std::chrono::microseconds;
using ChangeStateCompletionSignature = void(std::error_code, TopologyState);
private:

View File

@@ -130,7 +130,7 @@ class Socket final : public fair::mq::Socket
bool ShouldRetry(int flags, int timeout, int& elapsed) const
{
if (!fManager.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) {
if ((flags & ZMQ_DONTWAIT) == 0) {
if (timeout > 0) {
elapsed += fTimeout;
if (elapsed >= timeout) {
@@ -147,10 +147,10 @@ class Socket final : public fair::mq::Socket
{
if (zmq_errno() == ETERM) {
LOG(debug) << "Terminating socket " << fId;
return -1;
return static_cast<int>(TransferResult::error);
} else {
LOG(error) << "Failed transfer on socket " << fId << ", reason: " << zmq_strerror(errno);
return -1;
return static_cast<int>(TransferResult::error);
}
}
@@ -166,7 +166,7 @@ class Socket final : public fair::mq::Socket
ZMsg zmqMsg(sizeof(MetaHeader));
std::memcpy(zmqMsg.Data(), &(shmMsg->fMeta), sizeof(MetaHeader));
while (true && !fManager.Interrupted()) {
while (true) {
int nbytes = zmq_msg_send(zmqMsg.Msg(), fSocket, flags);
if (nbytes > 0) {
shmMsg->fQueued = true;
@@ -175,17 +175,19 @@ class Socket final : public fair::mq::Socket
fBytesTx += size;
return size;
} else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (ShouldRetry(flags, timeout, elapsed)) {
if (fManager.Interrupted()) {
return static_cast<int>(TransferResult::interrupted);
} else if (ShouldRetry(flags, timeout, elapsed)) {
continue;
} else {
return -2;
return static_cast<int>(TransferResult::timeout);
}
} else {
return HandleErrors();
}
}
return -1;
return static_cast<int>(TransferResult::error);
}
int Receive(MessagePtr& msg, const int timeout = -1) override
@@ -218,10 +220,12 @@ class Socket final : public fair::mq::Socket
++fMessagesRx;
return size;
} else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (ShouldRetry(flags, timeout, elapsed)) {
if (fManager.Interrupted()) {
return static_cast<int>(TransferResult::interrupted);
} else if (ShouldRetry(flags, timeout, elapsed)) {
continue;
} else {
return -2;
return static_cast<int>(TransferResult::timeout);
}
} else {
return HandleErrors();
@@ -249,7 +253,7 @@ class Socket final : public fair::mq::Socket
std::memcpy(metas++, &(shmMsg->fMeta), sizeof(MetaHeader));
}
while (!fManager.Interrupted()) {
while (true) {
int64_t totalSize = 0;
int nbytes = zmq_msg_send(zmqMsg.Msg(), fSocket, flags);
if (nbytes > 0) {
@@ -267,17 +271,19 @@ class Socket final : public fair::mq::Socket
return totalSize;
} else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (ShouldRetry(flags, timeout, elapsed)) {
if (fManager.Interrupted()) {
return static_cast<int>(TransferResult::interrupted);
} else if (ShouldRetry(flags, timeout, elapsed)) {
continue;
} else {
return -2;
return static_cast<int>(TransferResult::timeout);
}
} else {
return HandleErrors();
}
}
return -1;
return static_cast<int>(TransferResult::error);
}
int64_t Receive(std::vector<MessagePtr>& msgVec, const int timeout = -1) override
@@ -290,7 +296,7 @@ class Socket final : public fair::mq::Socket
ZMsg zmqMsg;
while (!fManager.Interrupted()) {
while (true) {
int64_t totalSize = 0;
int nbytes = zmq_msg_recv(zmqMsg.Msg(), fSocket, flags);
if (nbytes > 0) {
@@ -321,17 +327,19 @@ class Socket final : public fair::mq::Socket
return totalSize;
} else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (ShouldRetry(flags, timeout, elapsed)) {
if (fManager.Interrupted()) {
return static_cast<int>(TransferResult::interrupted);
} else if (ShouldRetry(flags, timeout, elapsed)) {
continue;
} else {
return -2;
return static_cast<int>(TransferResult::timeout);
}
} else {
return HandleErrors();
}
}
return -1;
return static_cast<int>(TransferResult::error);
}
void* GetSocket() const { return fSocket; }
@@ -498,7 +506,7 @@ class Socket final : public fair::mq::Socket
if (constant == "pollout")
return ZMQ_POLLOUT;
return -1;
throw SocketError(tools::ToString("GetConstant called with an invalid argument: ", constant));
}
~Socket() override { Close(); }

View File

@@ -108,7 +108,7 @@ class Socket final : public fair::mq::Socket
bool ShouldRetry(int flags, int timeout, int& elapsed) const
{
if (!fCtx.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) {
if ((flags & ZMQ_DONTWAIT) == 0) {
if (timeout > 0) {
elapsed += fTimeout;
if (elapsed >= timeout) {
@@ -125,10 +125,10 @@ class Socket final : public fair::mq::Socket
{
if (zmq_errno() == ETERM) {
LOG(debug) << "Terminating socket " << fId;
return -1;
return static_cast<int>(TransferResult::error);
} else {
LOG(error) << "Failed transfer on socket " << fId << ", errno: " << errno << ", reason: " << zmq_strerror(errno);
return -1;
return static_cast<int>(TransferResult::error);
}
}
@@ -149,10 +149,12 @@ class Socket final : public fair::mq::Socket
++fMessagesTx;
return nbytes;
} else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (ShouldRetry(flags, timeout, elapsed)) {
if (fCtx.Interrupted()) {
return static_cast<int>(TransferResult::interrupted);
} else if (ShouldRetry(flags, timeout, elapsed)) {
continue;
} else {
return -2;
return static_cast<int>(TransferResult::timeout);
}
} else {
return HandleErrors();
@@ -175,10 +177,12 @@ class Socket final : public fair::mq::Socket
++fMessagesRx;
return nbytes;
} else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (ShouldRetry(flags, timeout, elapsed)) {
if (fCtx.Interrupted()) {
return static_cast<int>(TransferResult::interrupted);
} else if (ShouldRetry(flags, timeout, elapsed)) {
continue;
} else {
return -2;
return static_cast<int>(TransferResult::timeout);
}
} else {
return HandleErrors();
@@ -210,11 +214,13 @@ class Socket final : public fair::mq::Socket
if (nbytes >= 0) {
totalSize += nbytes;
} else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (ShouldRetry(flags, timeout, elapsed)) {
if (fCtx.Interrupted()) {
return static_cast<int>(TransferResult::interrupted);
} else if (ShouldRetry(flags, timeout, elapsed)) {
repeat = true;
break;
} else {
return -2;
return static_cast<int>(TransferResult::timeout);
}
} else {
return HandleErrors();
@@ -234,7 +240,7 @@ class Socket final : public fair::mq::Socket
return Send(msgVec.back(), timeout);
} else { // if the vector is empty, something might be wrong
LOG(warn) << "Will not send empty vector";
return -1;
return static_cast<int>(TransferResult::error);
}
}
@@ -259,11 +265,13 @@ class Socket final : public fair::mq::Socket
msgVec.push_back(move(part));
totalSize += nbytes;
} else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (ShouldRetry(flags, timeout, elapsed)) {
if (fCtx.Interrupted()) {
return static_cast<int>(TransferResult::interrupted);
} else if (ShouldRetry(flags, timeout, elapsed)) {
repeat = true;
break;
} else {
return -2;
return static_cast<int>(TransferResult::timeout);
}
} else {
return HandleErrors();
@@ -446,7 +454,7 @@ class Socket final : public fair::mq::Socket
if (constant == "pollout")
return ZMQ_POLLOUT;
return -1;
throw SocketError(tools::ToString("GetConstant called with an invalid argument: ", constant));
}
~Socket() override { Close(); }

View File

@@ -12,6 +12,18 @@ include(GTestHelper)
# FairMQ Testsuites/helpers #
#############################
if(FairLogger_VERSION VERSION_LESS 1.9.0 AND FairLogger_VERSION VERSION_GREATER_EQUAL 1.7.0)
LIST(APPEND definitions FAIR_MIN_SEVERITY=trace)
endif()
if(BUILD_OFI_TRANSPORT)
LIST(APPEND definitions BUILD_OFI_TRANSPORT)
endif()
if(definitions)
set(definitions DEFINITIONS ${definitions})
endif()
add_testhelper(runTestDevice
SOURCES
helper/runTestDevice.cxx
@@ -30,16 +42,9 @@ add_testhelper(runTestDevice
helper/devices/TestExceptions.h
LINKS FairMQ
${definitions}
)
if(BUILD_OFI_TRANSPORT)
LIST(APPEND definitions BUILD_OFI_TRANSPORT)
endif()
if(definitions)
set(definitions DEFINITIONS ${definitions})
endif()
set(MQ_CONFIG "${CMAKE_BINARY_DIR}/test/testsuite_FairMQ.IOPatterns_config.json")
set(RUN_TEST_DEVICE "${CMAKE_BINARY_DIR}/test/testhelper_runTestDevice")
set(FAIRMQ_BIN_DIR ${CMAKE_BINARY_DIR}/fairmq)

View File

@@ -1,5 +1,5 @@
/********************************************************************************
* Copyright (C) 2015-2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2015static_cast<int>(TransferResult::timeout017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH ) *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
@@ -24,20 +24,20 @@ class TransferTimeout : public FairMQDevice
protected:
auto Run() -> void override
{
bool sendMsgCancelingAfter100ms = false;
bool receiveMsgCancelingAfter100ms = false;
bool sendMsgCancelingAfter200ms = false;
bool receiveMsgCancelingAfter200ms = false;
bool sendMsgCancelingAfter0ms = false;
bool receiveMsgCancelingAfter0ms = false;
bool send1PartCancelingAfter100ms = false;
bool receive1PartCancelingAfter100ms = false;
bool send1PartCancelingAfter200ms = false;
bool receive1PartCancelingAfter200ms = false;
bool send1PartCancelingAfter0ms = false;
bool receive1PartCancelingAfter0ms = false;
bool send2PartsCancelingAfter100ms = false;
bool receive2PartsCancelingAfter100ms = false;
bool send2PartsCancelingAfter200ms = false;
bool receive2PartsCancelingAfter200ms = false;
bool send2PartsCancelingAfter0ms = false;
bool receive2PartsCancelingAfter0ms = false;
@@ -45,28 +45,28 @@ class TransferTimeout : public FairMQDevice
FairMQMessagePtr msg1(NewMessage());
FairMQMessagePtr msg2(NewMessage());
if (Send(msg1, "data-out", 0, 100) == -2) {
LOG(info) << "send msg canceled (100ms)";
sendMsgCancelingAfter100ms = true;
if (Send(msg1, "data-out", 0, 200) == static_cast<int>(TransferResult::timeout)) {
LOG(info) << "send msg canceled (200ms)";
sendMsgCancelingAfter200ms = true;
} else {
LOG(error) << "send msg did not cancel (100ms)";
LOG(error) << "send msg did not cancel (200ms)";
}
if (Receive(msg2, "data-in", 0, 100) == -2) {
LOG(info) << "receive msg canceled (100ms)";
receiveMsgCancelingAfter100ms = true;
if (Receive(msg2, "data-in", 0, 200) == static_cast<int>(TransferResult::timeout)) {
LOG(info) << "receive msg canceled (200ms)";
receiveMsgCancelingAfter200ms = true;
} else {
LOG(error) << "receive msg did not cancel (100ms)";
LOG(error) << "receive msg did not cancel (200ms)";
}
if (Send(msg1, "data-out", 0, 0) == -2) {
if (Send(msg1, "data-out", 0, 0) == static_cast<int>(TransferResult::timeout)) {
LOG(info) << "send msg canceled (0ms)";
sendMsgCancelingAfter0ms = true;
} else {
LOG(error) << "send msg did not cancel (0ms)";
}
if (Receive(msg2, "data-in", 0, 0) == -2) {
if (Receive(msg2, "data-in", 0, 0) == static_cast<int>(TransferResult::timeout)) {
LOG(info) << "receive msg canceled (0ms)";
receiveMsgCancelingAfter0ms = true;
} else {
@@ -77,28 +77,28 @@ class TransferTimeout : public FairMQDevice
parts1.AddPart(NewMessage(10));
FairMQParts parts2;
if (Send(parts1, "data-out", 0, 100) == -2) {
LOG(info) << "send 1 part canceled (100ms)";
send1PartCancelingAfter100ms = true;
if (Send(parts1, "data-out", 0, 200) == static_cast<int>(TransferResult::timeout)) {
LOG(info) << "send 1 part canceled (200ms)";
send1PartCancelingAfter200ms = true;
} else {
LOG(error) << "send 1 part did not cancel (100ms)";
LOG(error) << "send 1 part did not cancel (200ms)";
}
if (Receive(parts2, "data-in", 0, 100) == -2) {
LOG(info) << "receive 1 part canceled (100ms)";
receive1PartCancelingAfter100ms = true;
if (Receive(parts2, "data-in", 0, 200) == static_cast<int>(TransferResult::timeout)) {
LOG(info) << "receive 1 part canceled (200ms)";
receive1PartCancelingAfter200ms = true;
} else {
LOG(error) << "receive 1 part did not cancel (100ms)";
LOG(error) << "receive 1 part did not cancel (200ms)";
}
if (Send(parts1, "data-out", 0, 0) == -2) {
if (Send(parts1, "data-out", 0, 0) == static_cast<int>(TransferResult::timeout)) {
LOG(info) << "send 1 part canceled (0ms)";
send1PartCancelingAfter0ms = true;
} else {
LOG(error) << "send 1 part did not cancel (0ms)";
}
if (Receive(parts2, "data-in", 0, 0) == -2) {
if (Receive(parts2, "data-in", 0, 0) == static_cast<int>(TransferResult::timeout)) {
LOG(info) << "receive 1 part canceled (0ms)";
receive1PartCancelingAfter0ms = true;
} else {
@@ -110,44 +110,44 @@ class TransferTimeout : public FairMQDevice
parts3.AddPart(NewMessage(10));
FairMQParts parts4;
if (Send(parts3, "data-out", 0, 100) == -2) {
LOG(info) << "send 2 parts canceled (100ms)";
send2PartsCancelingAfter100ms = true;
if (Send(parts3, "data-out", 0, 200) == static_cast<int>(TransferResult::timeout)) {
LOG(info) << "send 2 parts canceled (200ms)";
send2PartsCancelingAfter200ms = true;
} else {
LOG(error) << "send 2 parts did not cancel (100ms)";
LOG(error) << "send 2 parts did not cancel (200ms)";
}
if (Receive(parts4, "data-in", 0, 100) == -2) {
LOG(info) << "receive 2 parts canceled (100ms)";
receive2PartsCancelingAfter100ms = true;
if (Receive(parts4, "data-in", 0, 200) == static_cast<int>(TransferResult::timeout)) {
LOG(info) << "receive 2 parts canceled (200ms)";
receive2PartsCancelingAfter200ms = true;
} else {
LOG(error) << "receive 2 parts did not cancel (100ms)";
LOG(error) << "receive 2 parts did not cancel (200ms)";
}
if (Send(parts3, "data-out", 0, 0) == -2) {
if (Send(parts3, "data-out", 0, 0) == static_cast<int>(TransferResult::timeout)) {
LOG(info) << "send 2 parts canceled (0ms)";
send2PartsCancelingAfter0ms = true;
} else {
LOG(error) << "send 2 parts did not cancel (0ms)";
}
if (Receive(parts4, "data-in", 0, 0) == -2) {
if (Receive(parts4, "data-in", 0, 0) == static_cast<int>(TransferResult::timeout)) {
LOG(info) << "receive 2 parts canceled (0ms)";
receive2PartsCancelingAfter0ms = true;
} else {
LOG(error) << "receive 2 parts did not cancel (0ms)";
}
if (sendMsgCancelingAfter100ms &&
receiveMsgCancelingAfter100ms &&
if (sendMsgCancelingAfter200ms &&
receiveMsgCancelingAfter200ms &&
sendMsgCancelingAfter0ms &&
receiveMsgCancelingAfter0ms &&
send1PartCancelingAfter100ms &&
receive1PartCancelingAfter100ms &&
send1PartCancelingAfter200ms &&
receive1PartCancelingAfter200ms &&
send1PartCancelingAfter0ms &&
receive1PartCancelingAfter0ms &&
send2PartsCancelingAfter100ms &&
receive2PartsCancelingAfter100ms &&
send2PartsCancelingAfter200ms &&
receive2PartsCancelingAfter200ms &&
send2PartsCancelingAfter0ms &&
receive2PartsCancelingAfter0ms)
{

View File

@@ -6,67 +6,83 @@
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <cstring>
#include <fairmq/FairMQTransportFactory.h>
#include <fairmq/MemoryResourceTools.h>
#include <fairmq/ProgOptions.h>
#include <fairmq/Tools.h>
#include <boost/container/pmr/polymorphic_allocator.hpp>
#include <gtest/gtest.h>
#include <cstring>
#include <vector>
namespace {
namespace
{
using namespace std;
using namespace fair::mq;
using factoryType = std::shared_ptr<FairMQTransportFactory>;
factoryType factoryZMQ = FairMQTransportFactory::CreateTransportFactory("zeromq");
factoryType factorySHM = FairMQTransportFactory::CreateTransportFactory("shmem");
using FactoryType = shared_ptr<FairMQTransportFactory>;
struct testData
struct TestData
{
int i{1};
static int nallocated;
static int nallocations;
static int ndeallocations;
testData()
TestData()
{
++nallocated;
++nallocations;
}
testData(const testData& in)
TestData(const TestData& in)
: i{in.i}
{
++nallocated;
++nallocations;
}
testData(const testData&& in)
TestData(const TestData&& in)
: i{in.i}
{
++nallocated;
++nallocations;
}
testData(int in)
TestData(int in)
: i{in}
{
++nallocated;
++nallocations;
}
~testData()
~TestData()
{
--nallocated;
++ndeallocations;
}
};
int testData::nallocated = 0;
int testData::nallocations = 0;
int testData::ndeallocations = 0;
int TestData::nallocated = 0;
int TestData::nallocations = 0;
int TestData::ndeallocations = 0;
auto allocZMQ = factoryZMQ -> GetMemoryResource();
auto allocSHM = factorySHM -> GetMemoryResource();
TEST(MemoryResources, transportallocatormap)
TEST(MemoryResources, transportAllocatorMap)
{
size_t session{tools::UuidHash()};
ProgOptions config;
config.SetProperty<string>("session", to_string(session));
FactoryType factoryZMQ = FairMQTransportFactory::CreateTransportFactory("zeromq", fair::mq::tools::Uuid(), &config);
FactoryType factorySHM = FairMQTransportFactory::CreateTransportFactory("shmem", fair::mq::tools::Uuid(), &config);
auto allocZMQ = factoryZMQ->GetMemoryResource();
auto allocSHM = factorySHM->GetMemoryResource();
EXPECT_TRUE(allocZMQ != nullptr && allocSHM != allocZMQ);
auto _tmp = factoryZMQ->GetMemoryResource();
EXPECT_TRUE(_tmp == allocZMQ);
@@ -76,28 +92,45 @@ using namespace fair::mq::pmr;
TEST(MemoryResources, allocator)
{
testData::nallocations = 0;
testData::ndeallocations = 0;
TestData::nallocations = 0;
TestData::ndeallocations = 0;
size_t session{tools::UuidHash()};
ProgOptions config;
config.SetProperty<string>("session", to_string(session));
FactoryType factoryZMQ = FairMQTransportFactory::CreateTransportFactory("zeromq", fair::mq::tools::Uuid(), &config);
auto allocZMQ = factoryZMQ->GetMemoryResource();
{
std::vector<testData, polymorphic_allocator<testData>> v(polymorphic_allocator<testData>{allocZMQ});
std::vector<TestData, polymorphic_allocator<TestData>> v(polymorphic_allocator<TestData>{allocZMQ});
v.reserve(3);
EXPECT_TRUE(v.capacity() == 3);
EXPECT_TRUE(allocZMQ->getNumberOfMessages() == 1);
v.emplace_back(1);
v.emplace_back(2);
v.emplace_back(3);
EXPECT_TRUE((fair::mq::byte*)&(*v.end()) - (fair::mq::byte*)&(*v.begin()) == 3 * sizeof(testData));
EXPECT_TRUE(testData::nallocated == 3);
EXPECT_TRUE((fair::mq::byte*)&(*v.end()) - (fair::mq::byte*)&(*v.begin()) == 3 * sizeof(TestData));
EXPECT_TRUE(TestData::nallocated == 3);
}
EXPECT_TRUE(testData::nallocated == 0);
EXPECT_TRUE(testData::nallocations == testData::ndeallocations);
EXPECT_TRUE(TestData::nallocated == 0);
EXPECT_TRUE(TestData::nallocations == TestData::ndeallocations);
}
TEST(MemoryResources, getMessage)
{
testData::nallocations = 0;
testData::ndeallocations = 0;
TestData::nallocations = 0;
TestData::ndeallocations = 0;
size_t session{tools::UuidHash()};
ProgOptions config;
config.SetProperty<string>("session", to_string(session));
FactoryType factoryZMQ = FairMQTransportFactory::CreateTransportFactory("zeromq", fair::mq::tools::Uuid(), &config);
FactoryType factorySHM = FairMQTransportFactory::CreateTransportFactory("shmem", fair::mq::tools::Uuid(), &config);
auto allocZMQ = factoryZMQ->GetMemoryResource();
FairMQMessagePtr message{nullptr};
@@ -105,7 +138,7 @@ TEST(MemoryResources, getMessage)
// test message creation on the same channel it was allocated with
{
std::vector<testData, polymorphic_allocator<testData>> v(polymorphic_allocator<testData>{allocZMQ});
std::vector<TestData, polymorphic_allocator<TestData>> v(polymorphic_allocator<TestData>{allocZMQ});
v.emplace_back(1);
v.emplace_back(2);
v.emplace_back(3);
@@ -114,13 +147,13 @@ TEST(MemoryResources, getMessage)
EXPECT_TRUE(message != nullptr);
EXPECT_TRUE(message->GetData() == vectorBeginPtr);
}
EXPECT_TRUE(message->GetSize() == 3 * sizeof(testData));
EXPECT_TRUE(message->GetSize() == 3 * sizeof(TestData));
messageArray = static_cast<int*>(message->GetData());
EXPECT_TRUE(messageArray[0] == 1 && messageArray[1] == 2 && messageArray[2] == 3);
// test message creation on a different channel than it was allocated with
{
std::vector<testData, polymorphic_allocator<testData>> v(polymorphic_allocator<testData>{allocZMQ});
std::vector<TestData, polymorphic_allocator<TestData>> v(polymorphic_allocator<TestData>{allocZMQ});
v.emplace_back(4);
v.emplace_back(5);
v.emplace_back(6);
@@ -130,7 +163,7 @@ TEST(MemoryResources, getMessage)
EXPECT_TRUE(message->GetData() != vectorBeginPtr);
}
EXPECT_TRUE(message->GetSize() == 3 * sizeof(testData));
EXPECT_TRUE(message->GetSize() == 3 * sizeof(TestData));
messageArray = static_cast<int*>(message->GetData());
EXPECT_TRUE(messageArray[0] == 4 && messageArray[1] == 5 && messageArray[2] == 6);
}

View File

@@ -361,7 +361,7 @@ TEST_F(Topology, AsyncSetPropertiesTimeout)
topo.AsyncSetProperties({{"key1", "val1"}},
"",
std::chrono::milliseconds(1),
std::chrono::microseconds(1),
[=](std::error_code ec, sdk::FailedDevices) mutable {
LOG(info) << ec;
EXPECT_EQ(ec, MakeErrorCode(ErrorCode::OperationTimeout));

View File

@@ -7,9 +7,16 @@
********************************************************************************/
#include "runner.h"
#include <FairMQChannel.h>
#include <FairMQLogger.h>
#include <FairMQTransportFactory.h>
#include <fairmq/ProgOptions.h>
#include <fairmq/Tools.h>
#include <gtest/gtest.h>
#include <chrono>
#include <sstream> // std::stringstream
#include <thread>
namespace
{
@@ -18,6 +25,12 @@ using namespace std;
using namespace fair::mq::test;
using namespace fair::mq::tools;
void delayedInterruptor(FairMQTransportFactory& transport)
{
std::this_thread::sleep_for(std::chrono::milliseconds(100));
transport.Interrupt();
}
auto RunTransferTimeout(string transport) -> void
{
size_t session{fair::mq::tools::UuidHash()};
@@ -31,6 +44,28 @@ auto RunTransferTimeout(string transport) -> void
exit(res.exit_code);
}
void InterruptTransfer(const string& transport, const string& _address)
{
size_t session{fair::mq::tools::UuidHash()};
std::string address(fair::mq::tools::ToString(_address, "_", transport));
fair::mq::ProgOptions config;
config.SetProperty<string>("session", to_string(session));
auto factory = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config);
FairMQChannel pull{"Pull", "pull", factory};
pull.Bind(address);
FairMQMessagePtr msg(pull.NewMessage());
auto t = thread(delayedInterruptor, ref(*factory));
auto result = pull.Receive(msg);
t.join();
ASSERT_EQ(result, static_cast<int>(fair::mq::TransferResult::interrupted));
}
TEST(TransferTimeout, zeromq)
{
EXPECT_EXIT(RunTransferTimeout("zeromq"), ::testing::ExitedWithCode(0), "Transfer timeout test successfull");
@@ -41,4 +76,14 @@ TEST(TransferTimeout, shmem)
EXPECT_EXIT(RunTransferTimeout("shmem"), ::testing::ExitedWithCode(0), "Transfer timeout test successfull");
}
TEST(InterruptTransfer, zeromq)
{
InterruptTransfer("zeromq", "ipc://test_interrupt_transfer");
}
TEST(InterruptTransfer, shmem)
{
InterruptTransfer("shmem", "ipc://test_interrupt_transfer");
}
} // namespace