Compare commits

..

7 Commits

Author SHA1 Message Date
Alexey Rybalchenko
35c7959c53 Workaround Cpp17MoveInsertable issue on xcode 12 2020-09-22 05:33:29 +02:00
Alexey Rybalchenko
5ea8ffeb34 Update command format in PMIx plugin 2020-09-17 14:22:03 +02:00
Alexey Rybalchenko
04ee1db8e5 Avoid default session id in shmem tests 2020-09-17 14:22:03 +02:00
Dennis Klein
4a15a38dd4 Tests.Device: Set correct log level for FairLogger 1.[7-8] 2020-09-16 15:43:58 +02:00
Dennis Klein
0f5e1b6815 Tests.SDK: Reduce timeout by factor 1000 because new machines can be fast enough to complete within 1ms 2020-09-16 15:43:58 +02:00
Dennis Klein
5e6ad47223 CI: Run macOS checks on newer environment 2020-09-16 15:43:58 +02:00
Alexey Rybalchenko
6932f88c84 Adjust transfer methods behaviour when interrupted
A transer is attempted even if the transport has been interrupted
(with a timeout). When the timeout is reached, transfer methods will
return TransferResult::interrupted (-3).
2020-09-16 15:43:58 +02:00
14 changed files with 259 additions and 140 deletions

11
Jenkinsfile vendored
View File

@@ -8,11 +8,15 @@ def jobMatrix(String prefix, List specs, Closure callback) {
def nodes = [:] def nodes = [:]
for (spec in specs) { for (spec in specs) {
def label = specToLabel(spec) def label = specToLabel(spec)
def node_tag = label
if (spec.os =~ /macOS/) {
node_tag = spec.os
}
def fairsoft = spec.fairsoft def fairsoft = spec.fairsoft
def os = spec.os def os = spec.os
def compiler = spec.compiler def compiler = spec.compiler
nodes["${prefix}/${label}"] = { nodes["${prefix}/${label}"] = {
node(label) { node(node_tag) {
githubNotify(context: "${prefix}/${label}", description: 'Building ...', status: 'PENDING') githubNotify(context: "${prefix}/${label}", description: 'Building ...', status: 'PENDING')
try { try {
deleteDir() deleteDir()
@@ -29,7 +33,7 @@ def jobMatrix(String prefix, List specs, Closure callback) {
echo "module load compiler/gcc/9.1.0" >> Dart.cfg echo "module load compiler/gcc/9.1.0" >> Dart.cfg
''' '''
} }
if (os =~ /MacOS/) { if (os =~ /[Mm]acOS/) {
sh "echo \"export EXTRA_FLAGS='-DCMAKE_CXX_COMPILER=clang++'\" >> Dart.cfg" sh "echo \"export EXTRA_FLAGS='-DCMAKE_CXX_COMPILER=clang++'\" >> Dart.cfg"
} else { } else {
sh "echo \"export EXTRA_FLAGS='-DCMAKE_CXX_COMPILER=g++'\" >> Dart.cfg" sh "echo \"export EXTRA_FLAGS='-DCMAKE_CXX_COMPILER=g++'\" >> Dart.cfg"
@@ -71,8 +75,7 @@ pipeline{
script { script {
def build_jobs = jobMatrix('build', [ def build_jobs = jobMatrix('build', [
[os: 'Debian8', arch: 'x86_64', compiler: 'gcc9.1.0', fairsoft: 'fairmq_dev'], [os: 'Debian8', arch: 'x86_64', compiler: 'gcc9.1.0', fairsoft: 'fairmq_dev'],
[os: 'MacOS10.13', arch: 'x86_64', compiler: 'AppleLLVM10.0.0', fairsoft: 'fairmq_dev'], [os: 'macOS10.15', arch: 'x86_64', compiler: 'AppleLLVM11.0.3', fairsoft: 'fairmq_dev'],
[os: 'MacOS10.14', arch: 'x86_64', compiler: 'AppleLLVM10.0.0', fairsoft: 'fairmq_dev'],
]) { spec, label -> ]) { spec, label ->
sh './Dart.sh alfa_ci Dart.cfg' sh './Dart.sh alfa_ci Dart.cfg'
} }

View File

@@ -66,13 +66,13 @@ class FairMQChannel
FairMQChannel(const FairMQChannel&, const std::string& name); FairMQChannel(const FairMQChannel&, const std::string& name);
/// Move constructor /// Move constructor
FairMQChannel(FairMQChannel&&) = delete; // FairMQChannel(FairMQChannel&&) = delete;
/// Assignment operator /// Assignment operator
FairMQChannel& operator=(const FairMQChannel&); FairMQChannel& operator=(const FairMQChannel&);
/// Move assignment operator /// Move assignment operator
FairMQChannel& operator=(FairMQChannel&&) = delete; // FairMQChannel& operator=(FairMQChannel&&) = delete;
/// Destructor /// Destructor
virtual ~FairMQChannel() virtual ~FairMQChannel()
@@ -250,7 +250,7 @@ class FairMQChannel
/// Sends a message to the socket queue. /// Sends a message to the socket queue.
/// @param msg Constant reference of unique_ptr to a FairMQMessage /// @param msg Constant reference of unique_ptr to a FairMQMessage
/// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send) /// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. -1 if there was an error. /// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
int Send(FairMQMessagePtr& msg, int sndTimeoutInMs = -1) int Send(FairMQMessagePtr& msg, int sndTimeoutInMs = -1)
{ {
CheckSendCompatibility(msg); CheckSendCompatibility(msg);
@@ -260,7 +260,7 @@ class FairMQChannel
/// Receives a message from the socket queue. /// Receives a message from the socket queue.
/// @param msg Constant reference of unique_ptr to a FairMQMessage /// @param msg Constant reference of unique_ptr to a FairMQMessage
/// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive) /// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received. -2 if reading from the queue was not possible or timed out. -1 if there was an error. /// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
int Receive(FairMQMessagePtr& msg, int rcvTimeoutInMs = -1) int Receive(FairMQMessagePtr& msg, int rcvTimeoutInMs = -1)
{ {
CheckReceiveCompatibility(msg); CheckReceiveCompatibility(msg);
@@ -270,7 +270,7 @@ class FairMQChannel
/// Send a vector of messages /// Send a vector of messages
/// @param msgVec message vector reference /// @param msgVec message vector reference
/// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send) /// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. -1 if there was an error. /// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
int64_t Send(std::vector<FairMQMessagePtr>& msgVec, int sndTimeoutInMs = -1) int64_t Send(std::vector<FairMQMessagePtr>& msgVec, int sndTimeoutInMs = -1)
{ {
CheckSendCompatibility(msgVec); CheckSendCompatibility(msgVec);
@@ -280,7 +280,7 @@ class FairMQChannel
/// Receive a vector of messages /// Receive a vector of messages
/// @param msgVec message vector reference /// @param msgVec message vector reference
/// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive) /// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received. -2 if reading from the queue was not possible or timed out. -1 if there was an error. /// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
int64_t Receive(std::vector<FairMQMessagePtr>& msgVec, int rcvTimeoutInMs = -1) int64_t Receive(std::vector<FairMQMessagePtr>& msgVec, int rcvTimeoutInMs = -1)
{ {
CheckReceiveCompatibility(msgVec); CheckReceiveCompatibility(msgVec);
@@ -290,7 +290,7 @@ class FairMQChannel
/// Send FairMQParts /// Send FairMQParts
/// @param parts FairMQParts reference /// @param parts FairMQParts reference
/// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send) /// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. -1 if there was an error. /// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
int64_t Send(FairMQParts& parts, int sndTimeoutInMs = -1) int64_t Send(FairMQParts& parts, int sndTimeoutInMs = -1)
{ {
return Send(parts.fParts, sndTimeoutInMs); return Send(parts.fParts, sndTimeoutInMs);
@@ -299,7 +299,7 @@ class FairMQChannel
/// Receive FairMQParts /// Receive FairMQParts
/// @param parts FairMQParts reference /// @param parts FairMQParts reference
/// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive) /// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received. -2 if reading from the queue was not possible or timed out. -1 if there was an error. /// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
int64_t Receive(FairMQParts& parts, int rcvTimeoutInMs = -1) int64_t Receive(FairMQParts& parts, int rcvTimeoutInMs = -1)
{ {
return Receive(parts.fParts, rcvTimeoutInMs); return Receive(parts.fParts, rcvTimeoutInMs);

View File

@@ -129,7 +129,7 @@ class FairMQDevice
/// @param chan channel name /// @param chan channel name
/// @param i channel index /// @param i channel index
/// @param sndTimeoutInMs send timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send) /// @param sndTimeoutInMs send timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. -1 if there was an error. /// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
int Send(FairMQMessagePtr& msg, const std::string& channel, const int index = 0, int sndTimeoutInMs = -1) int Send(FairMQMessagePtr& msg, const std::string& channel, const int index = 0, int sndTimeoutInMs = -1)
{ {
return GetChannel(channel, index).Send(msg, sndTimeoutInMs); return GetChannel(channel, index).Send(msg, sndTimeoutInMs);
@@ -140,7 +140,7 @@ class FairMQDevice
/// @param chan channel name /// @param chan channel name
/// @param i channel index /// @param i channel index
/// @param rcvTimeoutInMs receive timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive) /// @param rcvTimeoutInMs receive timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received. -2 if reading from the queue was not possible or timed out. -1 if there was an error. /// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
int Receive(FairMQMessagePtr& msg, const std::string& channel, const int index = 0, int rcvTimeoutInMs = -1) int Receive(FairMQMessagePtr& msg, const std::string& channel, const int index = 0, int rcvTimeoutInMs = -1)
{ {
return GetChannel(channel, index).Receive(msg, rcvTimeoutInMs); return GetChannel(channel, index).Receive(msg, rcvTimeoutInMs);
@@ -151,7 +151,7 @@ class FairMQDevice
/// @param chan channel name /// @param chan channel name
/// @param i channel index /// @param i channel index
/// @param sndTimeoutInMs send timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send) /// @param sndTimeoutInMs send timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. -1 if there was an error. /// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
int64_t Send(FairMQParts& parts, const std::string& channel, const int index = 0, int sndTimeoutInMs = -1) int64_t Send(FairMQParts& parts, const std::string& channel, const int index = 0, int sndTimeoutInMs = -1)
{ {
return GetChannel(channel, index).Send(parts.fParts, sndTimeoutInMs); return GetChannel(channel, index).Send(parts.fParts, sndTimeoutInMs);
@@ -162,7 +162,7 @@ class FairMQDevice
/// @param chan channel name /// @param chan channel name
/// @param i channel index /// @param i channel index
/// @param rcvTimeoutInMs receive timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive) /// @param rcvTimeoutInMs receive timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received. -2 if reading from the queue was not possible or timed out. -1 if there was an error. /// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
int64_t Receive(FairMQParts& parts, const std::string& channel, const int index = 0, int rcvTimeoutInMs = -1) int64_t Receive(FairMQParts& parts, const std::string& channel, const int index = 0, int rcvTimeoutInMs = -1)
{ {
return GetChannel(channel, index).Receive(parts.fParts, rcvTimeoutInMs); return GetChannel(channel, index).Receive(parts.fParts, rcvTimeoutInMs);

View File

@@ -9,14 +9,31 @@
#ifndef FAIRMQSOCKET_H_ #ifndef FAIRMQSOCKET_H_
#define FAIRMQSOCKET_H_ #define FAIRMQSOCKET_H_
#include "FairMQMessage.h"
#include <memory> #include <memory>
#include <ostream>
#include <stdexcept> #include <stdexcept>
#include <string> #include <string>
#include <vector> #include <vector>
#include "FairMQMessage.h"
class FairMQTransportFactory; class FairMQTransportFactory;
namespace fair
{
namespace mq
{
enum class TransferResult : int
{
error = -1,
timeout = -2,
interrupted = -3
};
} // namespace mq
} // namespace fair
class FairMQSocket class FairMQSocket
{ {
public: public:

View File

@@ -284,7 +284,7 @@ try {
return size; return size;
} catch (const std::exception& e) { } catch (const std::exception& e) {
LOG(error) << e.what(); LOG(error) << e.what();
return -1; return TransferResult::error;
} }
auto Socket::SendQueueReader() -> void auto Socket::SendQueueReader() -> void
@@ -431,7 +431,7 @@ try {
return size; return size;
} catch (const std::exception& e) { } catch (const std::exception& e) {
LOG(error) << e.what(); LOG(error) << e.what();
return -1; return TransferResult::error;
} }
auto Socket::Receive(std::vector<MessagePtr>& msgVec, const int /*timeout*/) -> int64_t auto Socket::Receive(std::vector<MessagePtr>& msgVec, const int /*timeout*/) -> int64_t
@@ -456,7 +456,7 @@ try {
return size; return size;
} catch (const std::exception& e) { } catch (const std::exception& e) {
LOG(error) << e.what(); LOG(error) << e.what();
return -1; return TransferResult::error;
} }
auto Socket::RecvControlQueueReader() -> void auto Socket::RecvControlQueueReader() -> void

View File

@@ -148,12 +148,12 @@ auto PMIxPlugin::SubscribeForCommands() -> void
Transition transition = static_cast<ChangeState&>(*cmd).GetTransition(); Transition transition = static_cast<ChangeState&>(*cmd).GetTransition();
if (ChangeDeviceState(transition)) { if (ChangeDeviceState(transition)) {
fCommands.Send( fCommands.Send(
Cmds(make<TransitionStatus>(fDeviceId, 0, Result::Ok, transition)) Cmds(make<TransitionStatus>(fDeviceId, 0, Result::Ok, transition, GetCurrentDeviceState()))
.Serialize(Format::JSON), .Serialize(Format::JSON),
{sender}); {sender});
} else { } else {
fCommands.Send( fCommands.Send(
Cmds(make<TransitionStatus>(fDeviceId, 0, Result::Failure, transition)) Cmds(make<TransitionStatus>(fDeviceId, 0, Result::Failure, transition, GetCurrentDeviceState()))
.Serialize(Format::JSON), .Serialize(Format::JSON),
{sender}); {sender});
} }

View File

@@ -464,7 +464,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
} }
} }
using Duration = std::chrono::milliseconds; using Duration = std::chrono::microseconds;
using ChangeStateCompletionSignature = void(std::error_code, TopologyState); using ChangeStateCompletionSignature = void(std::error_code, TopologyState);
private: private:

View File

@@ -130,7 +130,7 @@ class Socket final : public fair::mq::Socket
bool ShouldRetry(int flags, int timeout, int& elapsed) const bool ShouldRetry(int flags, int timeout, int& elapsed) const
{ {
if (!fManager.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) { if ((flags & ZMQ_DONTWAIT) == 0) {
if (timeout > 0) { if (timeout > 0) {
elapsed += fTimeout; elapsed += fTimeout;
if (elapsed >= timeout) { if (elapsed >= timeout) {
@@ -147,10 +147,10 @@ class Socket final : public fair::mq::Socket
{ {
if (zmq_errno() == ETERM) { if (zmq_errno() == ETERM) {
LOG(debug) << "Terminating socket " << fId; LOG(debug) << "Terminating socket " << fId;
return -1; return static_cast<int>(TransferResult::error);
} else { } else {
LOG(error) << "Failed transfer on socket " << fId << ", reason: " << zmq_strerror(errno); LOG(error) << "Failed transfer on socket " << fId << ", reason: " << zmq_strerror(errno);
return -1; return static_cast<int>(TransferResult::error);
} }
} }
@@ -166,7 +166,7 @@ class Socket final : public fair::mq::Socket
ZMsg zmqMsg(sizeof(MetaHeader)); ZMsg zmqMsg(sizeof(MetaHeader));
std::memcpy(zmqMsg.Data(), &(shmMsg->fMeta), sizeof(MetaHeader)); std::memcpy(zmqMsg.Data(), &(shmMsg->fMeta), sizeof(MetaHeader));
while (true && !fManager.Interrupted()) { while (true) {
int nbytes = zmq_msg_send(zmqMsg.Msg(), fSocket, flags); int nbytes = zmq_msg_send(zmqMsg.Msg(), fSocket, flags);
if (nbytes > 0) { if (nbytes > 0) {
shmMsg->fQueued = true; shmMsg->fQueued = true;
@@ -175,17 +175,19 @@ class Socket final : public fair::mq::Socket
fBytesTx += size; fBytesTx += size;
return size; return size;
} else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) { } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (ShouldRetry(flags, timeout, elapsed)) { if (fManager.Interrupted()) {
return static_cast<int>(TransferResult::interrupted);
} else if (ShouldRetry(flags, timeout, elapsed)) {
continue; continue;
} else { } else {
return -2; return static_cast<int>(TransferResult::timeout);
} }
} else { } else {
return HandleErrors(); return HandleErrors();
} }
} }
return -1; return static_cast<int>(TransferResult::error);
} }
int Receive(MessagePtr& msg, const int timeout = -1) override int Receive(MessagePtr& msg, const int timeout = -1) override
@@ -218,10 +220,12 @@ class Socket final : public fair::mq::Socket
++fMessagesRx; ++fMessagesRx;
return size; return size;
} else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) { } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (ShouldRetry(flags, timeout, elapsed)) { if (fManager.Interrupted()) {
return static_cast<int>(TransferResult::interrupted);
} else if (ShouldRetry(flags, timeout, elapsed)) {
continue; continue;
} else { } else {
return -2; return static_cast<int>(TransferResult::timeout);
} }
} else { } else {
return HandleErrors(); return HandleErrors();
@@ -249,7 +253,7 @@ class Socket final : public fair::mq::Socket
std::memcpy(metas++, &(shmMsg->fMeta), sizeof(MetaHeader)); std::memcpy(metas++, &(shmMsg->fMeta), sizeof(MetaHeader));
} }
while (!fManager.Interrupted()) { while (true) {
int64_t totalSize = 0; int64_t totalSize = 0;
int nbytes = zmq_msg_send(zmqMsg.Msg(), fSocket, flags); int nbytes = zmq_msg_send(zmqMsg.Msg(), fSocket, flags);
if (nbytes > 0) { if (nbytes > 0) {
@@ -267,17 +271,19 @@ class Socket final : public fair::mq::Socket
return totalSize; return totalSize;
} else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) { } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (ShouldRetry(flags, timeout, elapsed)) { if (fManager.Interrupted()) {
return static_cast<int>(TransferResult::interrupted);
} else if (ShouldRetry(flags, timeout, elapsed)) {
continue; continue;
} else { } else {
return -2; return static_cast<int>(TransferResult::timeout);
} }
} else { } else {
return HandleErrors(); return HandleErrors();
} }
} }
return -1; return static_cast<int>(TransferResult::error);
} }
int64_t Receive(std::vector<MessagePtr>& msgVec, const int timeout = -1) override int64_t Receive(std::vector<MessagePtr>& msgVec, const int timeout = -1) override
@@ -290,7 +296,7 @@ class Socket final : public fair::mq::Socket
ZMsg zmqMsg; ZMsg zmqMsg;
while (!fManager.Interrupted()) { while (true) {
int64_t totalSize = 0; int64_t totalSize = 0;
int nbytes = zmq_msg_recv(zmqMsg.Msg(), fSocket, flags); int nbytes = zmq_msg_recv(zmqMsg.Msg(), fSocket, flags);
if (nbytes > 0) { if (nbytes > 0) {
@@ -321,17 +327,19 @@ class Socket final : public fair::mq::Socket
return totalSize; return totalSize;
} else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) { } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (ShouldRetry(flags, timeout, elapsed)) { if (fManager.Interrupted()) {
return static_cast<int>(TransferResult::interrupted);
} else if (ShouldRetry(flags, timeout, elapsed)) {
continue; continue;
} else { } else {
return -2; return static_cast<int>(TransferResult::timeout);
} }
} else { } else {
return HandleErrors(); return HandleErrors();
} }
} }
return -1; return static_cast<int>(TransferResult::error);
} }
void* GetSocket() const { return fSocket; } void* GetSocket() const { return fSocket; }
@@ -498,7 +506,7 @@ class Socket final : public fair::mq::Socket
if (constant == "pollout") if (constant == "pollout")
return ZMQ_POLLOUT; return ZMQ_POLLOUT;
return -1; throw SocketError(tools::ToString("GetConstant called with an invalid argument: ", constant));
} }
~Socket() override { Close(); } ~Socket() override { Close(); }

View File

@@ -108,7 +108,7 @@ class Socket final : public fair::mq::Socket
bool ShouldRetry(int flags, int timeout, int& elapsed) const bool ShouldRetry(int flags, int timeout, int& elapsed) const
{ {
if (!fCtx.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) { if ((flags & ZMQ_DONTWAIT) == 0) {
if (timeout > 0) { if (timeout > 0) {
elapsed += fTimeout; elapsed += fTimeout;
if (elapsed >= timeout) { if (elapsed >= timeout) {
@@ -125,10 +125,10 @@ class Socket final : public fair::mq::Socket
{ {
if (zmq_errno() == ETERM) { if (zmq_errno() == ETERM) {
LOG(debug) << "Terminating socket " << fId; LOG(debug) << "Terminating socket " << fId;
return -1; return static_cast<int>(TransferResult::error);
} else { } else {
LOG(error) << "Failed transfer on socket " << fId << ", errno: " << errno << ", reason: " << zmq_strerror(errno); LOG(error) << "Failed transfer on socket " << fId << ", errno: " << errno << ", reason: " << zmq_strerror(errno);
return -1; return static_cast<int>(TransferResult::error);
} }
} }
@@ -149,10 +149,12 @@ class Socket final : public fair::mq::Socket
++fMessagesTx; ++fMessagesTx;
return nbytes; return nbytes;
} else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) { } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (ShouldRetry(flags, timeout, elapsed)) { if (fCtx.Interrupted()) {
return static_cast<int>(TransferResult::interrupted);
} else if (ShouldRetry(flags, timeout, elapsed)) {
continue; continue;
} else { } else {
return -2; return static_cast<int>(TransferResult::timeout);
} }
} else { } else {
return HandleErrors(); return HandleErrors();
@@ -175,10 +177,12 @@ class Socket final : public fair::mq::Socket
++fMessagesRx; ++fMessagesRx;
return nbytes; return nbytes;
} else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) { } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (ShouldRetry(flags, timeout, elapsed)) { if (fCtx.Interrupted()) {
return static_cast<int>(TransferResult::interrupted);
} else if (ShouldRetry(flags, timeout, elapsed)) {
continue; continue;
} else { } else {
return -2; return static_cast<int>(TransferResult::timeout);
} }
} else { } else {
return HandleErrors(); return HandleErrors();
@@ -210,11 +214,13 @@ class Socket final : public fair::mq::Socket
if (nbytes >= 0) { if (nbytes >= 0) {
totalSize += nbytes; totalSize += nbytes;
} else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) { } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (ShouldRetry(flags, timeout, elapsed)) { if (fCtx.Interrupted()) {
return static_cast<int>(TransferResult::interrupted);
} else if (ShouldRetry(flags, timeout, elapsed)) {
repeat = true; repeat = true;
break; break;
} else { } else {
return -2; return static_cast<int>(TransferResult::timeout);
} }
} else { } else {
return HandleErrors(); return HandleErrors();
@@ -234,7 +240,7 @@ class Socket final : public fair::mq::Socket
return Send(msgVec.back(), timeout); return Send(msgVec.back(), timeout);
} else { // if the vector is empty, something might be wrong } else { // if the vector is empty, something might be wrong
LOG(warn) << "Will not send empty vector"; LOG(warn) << "Will not send empty vector";
return -1; return static_cast<int>(TransferResult::error);
} }
} }
@@ -259,11 +265,13 @@ class Socket final : public fair::mq::Socket
msgVec.push_back(move(part)); msgVec.push_back(move(part));
totalSize += nbytes; totalSize += nbytes;
} else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) { } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (ShouldRetry(flags, timeout, elapsed)) { if (fCtx.Interrupted()) {
return static_cast<int>(TransferResult::interrupted);
} else if (ShouldRetry(flags, timeout, elapsed)) {
repeat = true; repeat = true;
break; break;
} else { } else {
return -2; return static_cast<int>(TransferResult::timeout);
} }
} else { } else {
return HandleErrors(); return HandleErrors();
@@ -446,7 +454,7 @@ class Socket final : public fair::mq::Socket
if (constant == "pollout") if (constant == "pollout")
return ZMQ_POLLOUT; return ZMQ_POLLOUT;
return -1; throw SocketError(tools::ToString("GetConstant called with an invalid argument: ", constant));
} }
~Socket() override { Close(); } ~Socket() override { Close(); }

View File

@@ -12,6 +12,18 @@ include(GTestHelper)
# FairMQ Testsuites/helpers # # FairMQ Testsuites/helpers #
############################# #############################
if(FairLogger_VERSION VERSION_LESS 1.9.0 AND FairLogger_VERSION VERSION_GREATER_EQUAL 1.7.0)
LIST(APPEND definitions FAIR_MIN_SEVERITY=trace)
endif()
if(BUILD_OFI_TRANSPORT)
LIST(APPEND definitions BUILD_OFI_TRANSPORT)
endif()
if(definitions)
set(definitions DEFINITIONS ${definitions})
endif()
add_testhelper(runTestDevice add_testhelper(runTestDevice
SOURCES SOURCES
helper/runTestDevice.cxx helper/runTestDevice.cxx
@@ -30,16 +42,9 @@ add_testhelper(runTestDevice
helper/devices/TestExceptions.h helper/devices/TestExceptions.h
LINKS FairMQ LINKS FairMQ
${definitions}
) )
if(BUILD_OFI_TRANSPORT)
LIST(APPEND definitions BUILD_OFI_TRANSPORT)
endif()
if(definitions)
set(definitions DEFINITIONS ${definitions})
endif()
set(MQ_CONFIG "${CMAKE_BINARY_DIR}/test/testsuite_FairMQ.IOPatterns_config.json") set(MQ_CONFIG "${CMAKE_BINARY_DIR}/test/testsuite_FairMQ.IOPatterns_config.json")
set(RUN_TEST_DEVICE "${CMAKE_BINARY_DIR}/test/testhelper_runTestDevice") set(RUN_TEST_DEVICE "${CMAKE_BINARY_DIR}/test/testhelper_runTestDevice")
set(FAIRMQ_BIN_DIR ${CMAKE_BINARY_DIR}/fairmq) set(FAIRMQ_BIN_DIR ${CMAKE_BINARY_DIR}/fairmq)

View File

@@ -1,5 +1,5 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2015-2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2015static_cast<int>(TransferResult::timeout017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH ) *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
@@ -24,20 +24,20 @@ class TransferTimeout : public FairMQDevice
protected: protected:
auto Run() -> void override auto Run() -> void override
{ {
bool sendMsgCancelingAfter100ms = false; bool sendMsgCancelingAfter200ms = false;
bool receiveMsgCancelingAfter100ms = false; bool receiveMsgCancelingAfter200ms = false;
bool sendMsgCancelingAfter0ms = false; bool sendMsgCancelingAfter0ms = false;
bool receiveMsgCancelingAfter0ms = false; bool receiveMsgCancelingAfter0ms = false;
bool send1PartCancelingAfter100ms = false; bool send1PartCancelingAfter200ms = false;
bool receive1PartCancelingAfter100ms = false; bool receive1PartCancelingAfter200ms = false;
bool send1PartCancelingAfter0ms = false; bool send1PartCancelingAfter0ms = false;
bool receive1PartCancelingAfter0ms = false; bool receive1PartCancelingAfter0ms = false;
bool send2PartsCancelingAfter100ms = false; bool send2PartsCancelingAfter200ms = false;
bool receive2PartsCancelingAfter100ms = false; bool receive2PartsCancelingAfter200ms = false;
bool send2PartsCancelingAfter0ms = false; bool send2PartsCancelingAfter0ms = false;
bool receive2PartsCancelingAfter0ms = false; bool receive2PartsCancelingAfter0ms = false;
@@ -45,28 +45,28 @@ class TransferTimeout : public FairMQDevice
FairMQMessagePtr msg1(NewMessage()); FairMQMessagePtr msg1(NewMessage());
FairMQMessagePtr msg2(NewMessage()); FairMQMessagePtr msg2(NewMessage());
if (Send(msg1, "data-out", 0, 100) == -2) { if (Send(msg1, "data-out", 0, 200) == static_cast<int>(TransferResult::timeout)) {
LOG(info) << "send msg canceled (100ms)"; LOG(info) << "send msg canceled (200ms)";
sendMsgCancelingAfter100ms = true; sendMsgCancelingAfter200ms = true;
} else { } else {
LOG(error) << "send msg did not cancel (100ms)"; LOG(error) << "send msg did not cancel (200ms)";
} }
if (Receive(msg2, "data-in", 0, 100) == -2) { if (Receive(msg2, "data-in", 0, 200) == static_cast<int>(TransferResult::timeout)) {
LOG(info) << "receive msg canceled (100ms)"; LOG(info) << "receive msg canceled (200ms)";
receiveMsgCancelingAfter100ms = true; receiveMsgCancelingAfter200ms = true;
} else { } else {
LOG(error) << "receive msg did not cancel (100ms)"; LOG(error) << "receive msg did not cancel (200ms)";
} }
if (Send(msg1, "data-out", 0, 0) == -2) { if (Send(msg1, "data-out", 0, 0) == static_cast<int>(TransferResult::timeout)) {
LOG(info) << "send msg canceled (0ms)"; LOG(info) << "send msg canceled (0ms)";
sendMsgCancelingAfter0ms = true; sendMsgCancelingAfter0ms = true;
} else { } else {
LOG(error) << "send msg did not cancel (0ms)"; LOG(error) << "send msg did not cancel (0ms)";
} }
if (Receive(msg2, "data-in", 0, 0) == -2) { if (Receive(msg2, "data-in", 0, 0) == static_cast<int>(TransferResult::timeout)) {
LOG(info) << "receive msg canceled (0ms)"; LOG(info) << "receive msg canceled (0ms)";
receiveMsgCancelingAfter0ms = true; receiveMsgCancelingAfter0ms = true;
} else { } else {
@@ -77,28 +77,28 @@ class TransferTimeout : public FairMQDevice
parts1.AddPart(NewMessage(10)); parts1.AddPart(NewMessage(10));
FairMQParts parts2; FairMQParts parts2;
if (Send(parts1, "data-out", 0, 100) == -2) { if (Send(parts1, "data-out", 0, 200) == static_cast<int>(TransferResult::timeout)) {
LOG(info) << "send 1 part canceled (100ms)"; LOG(info) << "send 1 part canceled (200ms)";
send1PartCancelingAfter100ms = true; send1PartCancelingAfter200ms = true;
} else { } else {
LOG(error) << "send 1 part did not cancel (100ms)"; LOG(error) << "send 1 part did not cancel (200ms)";
} }
if (Receive(parts2, "data-in", 0, 100) == -2) { if (Receive(parts2, "data-in", 0, 200) == static_cast<int>(TransferResult::timeout)) {
LOG(info) << "receive 1 part canceled (100ms)"; LOG(info) << "receive 1 part canceled (200ms)";
receive1PartCancelingAfter100ms = true; receive1PartCancelingAfter200ms = true;
} else { } else {
LOG(error) << "receive 1 part did not cancel (100ms)"; LOG(error) << "receive 1 part did not cancel (200ms)";
} }
if (Send(parts1, "data-out", 0, 0) == -2) { if (Send(parts1, "data-out", 0, 0) == static_cast<int>(TransferResult::timeout)) {
LOG(info) << "send 1 part canceled (0ms)"; LOG(info) << "send 1 part canceled (0ms)";
send1PartCancelingAfter0ms = true; send1PartCancelingAfter0ms = true;
} else { } else {
LOG(error) << "send 1 part did not cancel (0ms)"; LOG(error) << "send 1 part did not cancel (0ms)";
} }
if (Receive(parts2, "data-in", 0, 0) == -2) { if (Receive(parts2, "data-in", 0, 0) == static_cast<int>(TransferResult::timeout)) {
LOG(info) << "receive 1 part canceled (0ms)"; LOG(info) << "receive 1 part canceled (0ms)";
receive1PartCancelingAfter0ms = true; receive1PartCancelingAfter0ms = true;
} else { } else {
@@ -110,44 +110,44 @@ class TransferTimeout : public FairMQDevice
parts3.AddPart(NewMessage(10)); parts3.AddPart(NewMessage(10));
FairMQParts parts4; FairMQParts parts4;
if (Send(parts3, "data-out", 0, 100) == -2) { if (Send(parts3, "data-out", 0, 200) == static_cast<int>(TransferResult::timeout)) {
LOG(info) << "send 2 parts canceled (100ms)"; LOG(info) << "send 2 parts canceled (200ms)";
send2PartsCancelingAfter100ms = true; send2PartsCancelingAfter200ms = true;
} else { } else {
LOG(error) << "send 2 parts did not cancel (100ms)"; LOG(error) << "send 2 parts did not cancel (200ms)";
} }
if (Receive(parts4, "data-in", 0, 100) == -2) { if (Receive(parts4, "data-in", 0, 200) == static_cast<int>(TransferResult::timeout)) {
LOG(info) << "receive 2 parts canceled (100ms)"; LOG(info) << "receive 2 parts canceled (200ms)";
receive2PartsCancelingAfter100ms = true; receive2PartsCancelingAfter200ms = true;
} else { } else {
LOG(error) << "receive 2 parts did not cancel (100ms)"; LOG(error) << "receive 2 parts did not cancel (200ms)";
} }
if (Send(parts3, "data-out", 0, 0) == -2) { if (Send(parts3, "data-out", 0, 0) == static_cast<int>(TransferResult::timeout)) {
LOG(info) << "send 2 parts canceled (0ms)"; LOG(info) << "send 2 parts canceled (0ms)";
send2PartsCancelingAfter0ms = true; send2PartsCancelingAfter0ms = true;
} else { } else {
LOG(error) << "send 2 parts did not cancel (0ms)"; LOG(error) << "send 2 parts did not cancel (0ms)";
} }
if (Receive(parts4, "data-in", 0, 0) == -2) { if (Receive(parts4, "data-in", 0, 0) == static_cast<int>(TransferResult::timeout)) {
LOG(info) << "receive 2 parts canceled (0ms)"; LOG(info) << "receive 2 parts canceled (0ms)";
receive2PartsCancelingAfter0ms = true; receive2PartsCancelingAfter0ms = true;
} else { } else {
LOG(error) << "receive 2 parts did not cancel (0ms)"; LOG(error) << "receive 2 parts did not cancel (0ms)";
} }
if (sendMsgCancelingAfter100ms && if (sendMsgCancelingAfter200ms &&
receiveMsgCancelingAfter100ms && receiveMsgCancelingAfter200ms &&
sendMsgCancelingAfter0ms && sendMsgCancelingAfter0ms &&
receiveMsgCancelingAfter0ms && receiveMsgCancelingAfter0ms &&
send1PartCancelingAfter100ms && send1PartCancelingAfter200ms &&
receive1PartCancelingAfter100ms && receive1PartCancelingAfter200ms &&
send1PartCancelingAfter0ms && send1PartCancelingAfter0ms &&
receive1PartCancelingAfter0ms && receive1PartCancelingAfter0ms &&
send2PartsCancelingAfter100ms && send2PartsCancelingAfter200ms &&
receive2PartsCancelingAfter100ms && receive2PartsCancelingAfter200ms &&
send2PartsCancelingAfter0ms && send2PartsCancelingAfter0ms &&
receive2PartsCancelingAfter0ms) receive2PartsCancelingAfter0ms)
{ {

View File

@@ -6,67 +6,83 @@
* copied verbatim in the file "LICENSE" * * copied verbatim in the file "LICENSE" *
********************************************************************************/ ********************************************************************************/
#include <cstring>
#include <fairmq/FairMQTransportFactory.h> #include <fairmq/FairMQTransportFactory.h>
#include <fairmq/MemoryResourceTools.h> #include <fairmq/MemoryResourceTools.h>
#include <fairmq/ProgOptions.h>
#include <fairmq/Tools.h>
#include <boost/container/pmr/polymorphic_allocator.hpp> #include <boost/container/pmr/polymorphic_allocator.hpp>
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <cstring>
#include <vector> #include <vector>
namespace { namespace
{
using namespace std; using namespace std;
using namespace fair::mq; using namespace fair::mq;
using factoryType = std::shared_ptr<FairMQTransportFactory>;
factoryType factoryZMQ = FairMQTransportFactory::CreateTransportFactory("zeromq"); using FactoryType = shared_ptr<FairMQTransportFactory>;
factoryType factorySHM = FairMQTransportFactory::CreateTransportFactory("shmem");
struct testData struct TestData
{ {
int i{1}; int i{1};
static int nallocated; static int nallocated;
static int nallocations; static int nallocations;
static int ndeallocations; static int ndeallocations;
testData()
TestData()
{ {
++nallocated; ++nallocated;
++nallocations; ++nallocations;
} }
testData(const testData& in)
TestData(const TestData& in)
: i{in.i} : i{in.i}
{ {
++nallocated; ++nallocated;
++nallocations; ++nallocations;
} }
testData(const testData&& in)
TestData(const TestData&& in)
: i{in.i} : i{in.i}
{ {
++nallocated; ++nallocated;
++nallocations; ++nallocations;
} }
testData(int in)
TestData(int in)
: i{in} : i{in}
{ {
++nallocated; ++nallocated;
++nallocations; ++nallocations;
} }
~testData()
~TestData()
{ {
--nallocated; --nallocated;
++ndeallocations; ++ndeallocations;
} }
}; };
int testData::nallocated = 0; int TestData::nallocated = 0;
int testData::nallocations = 0; int TestData::nallocations = 0;
int testData::ndeallocations = 0; int TestData::ndeallocations = 0;
auto allocZMQ = factoryZMQ -> GetMemoryResource(); TEST(MemoryResources, transportAllocatorMap)
auto allocSHM = factorySHM -> GetMemoryResource();
TEST(MemoryResources, transportallocatormap)
{ {
size_t session{tools::UuidHash()};
ProgOptions config;
config.SetProperty<string>("session", to_string(session));
FactoryType factoryZMQ = FairMQTransportFactory::CreateTransportFactory("zeromq", fair::mq::tools::Uuid(), &config);
FactoryType factorySHM = FairMQTransportFactory::CreateTransportFactory("shmem", fair::mq::tools::Uuid(), &config);
auto allocZMQ = factoryZMQ->GetMemoryResource();
auto allocSHM = factorySHM->GetMemoryResource();
EXPECT_TRUE(allocZMQ != nullptr && allocSHM != allocZMQ); EXPECT_TRUE(allocZMQ != nullptr && allocSHM != allocZMQ);
auto _tmp = factoryZMQ->GetMemoryResource(); auto _tmp = factoryZMQ->GetMemoryResource();
EXPECT_TRUE(_tmp == allocZMQ); EXPECT_TRUE(_tmp == allocZMQ);
@@ -76,28 +92,45 @@ using namespace fair::mq::pmr;
TEST(MemoryResources, allocator) TEST(MemoryResources, allocator)
{ {
testData::nallocations = 0; TestData::nallocations = 0;
testData::ndeallocations = 0; TestData::ndeallocations = 0;
size_t session{tools::UuidHash()};
ProgOptions config;
config.SetProperty<string>("session", to_string(session));
FactoryType factoryZMQ = FairMQTransportFactory::CreateTransportFactory("zeromq", fair::mq::tools::Uuid(), &config);
auto allocZMQ = factoryZMQ->GetMemoryResource();
{ {
std::vector<testData, polymorphic_allocator<testData>> v(polymorphic_allocator<testData>{allocZMQ}); std::vector<TestData, polymorphic_allocator<TestData>> v(polymorphic_allocator<TestData>{allocZMQ});
v.reserve(3); v.reserve(3);
EXPECT_TRUE(v.capacity() == 3); EXPECT_TRUE(v.capacity() == 3);
EXPECT_TRUE(allocZMQ->getNumberOfMessages() == 1); EXPECT_TRUE(allocZMQ->getNumberOfMessages() == 1);
v.emplace_back(1); v.emplace_back(1);
v.emplace_back(2); v.emplace_back(2);
v.emplace_back(3); v.emplace_back(3);
EXPECT_TRUE((fair::mq::byte*)&(*v.end()) - (fair::mq::byte*)&(*v.begin()) == 3 * sizeof(testData)); EXPECT_TRUE((fair::mq::byte*)&(*v.end()) - (fair::mq::byte*)&(*v.begin()) == 3 * sizeof(TestData));
EXPECT_TRUE(testData::nallocated == 3); EXPECT_TRUE(TestData::nallocated == 3);
} }
EXPECT_TRUE(testData::nallocated == 0); EXPECT_TRUE(TestData::nallocated == 0);
EXPECT_TRUE(testData::nallocations == testData::ndeallocations); EXPECT_TRUE(TestData::nallocations == TestData::ndeallocations);
} }
TEST(MemoryResources, getMessage) TEST(MemoryResources, getMessage)
{ {
testData::nallocations = 0; TestData::nallocations = 0;
testData::ndeallocations = 0; TestData::ndeallocations = 0;
size_t session{tools::UuidHash()};
ProgOptions config;
config.SetProperty<string>("session", to_string(session));
FactoryType factoryZMQ = FairMQTransportFactory::CreateTransportFactory("zeromq", fair::mq::tools::Uuid(), &config);
FactoryType factorySHM = FairMQTransportFactory::CreateTransportFactory("shmem", fair::mq::tools::Uuid(), &config);
auto allocZMQ = factoryZMQ->GetMemoryResource();
FairMQMessagePtr message{nullptr}; FairMQMessagePtr message{nullptr};
@@ -105,7 +138,7 @@ TEST(MemoryResources, getMessage)
// test message creation on the same channel it was allocated with // test message creation on the same channel it was allocated with
{ {
std::vector<testData, polymorphic_allocator<testData>> v(polymorphic_allocator<testData>{allocZMQ}); std::vector<TestData, polymorphic_allocator<TestData>> v(polymorphic_allocator<TestData>{allocZMQ});
v.emplace_back(1); v.emplace_back(1);
v.emplace_back(2); v.emplace_back(2);
v.emplace_back(3); v.emplace_back(3);
@@ -114,13 +147,13 @@ TEST(MemoryResources, getMessage)
EXPECT_TRUE(message != nullptr); EXPECT_TRUE(message != nullptr);
EXPECT_TRUE(message->GetData() == vectorBeginPtr); EXPECT_TRUE(message->GetData() == vectorBeginPtr);
} }
EXPECT_TRUE(message->GetSize() == 3 * sizeof(testData)); EXPECT_TRUE(message->GetSize() == 3 * sizeof(TestData));
messageArray = static_cast<int*>(message->GetData()); messageArray = static_cast<int*>(message->GetData());
EXPECT_TRUE(messageArray[0] == 1 && messageArray[1] == 2 && messageArray[2] == 3); EXPECT_TRUE(messageArray[0] == 1 && messageArray[1] == 2 && messageArray[2] == 3);
// test message creation on a different channel than it was allocated with // test message creation on a different channel than it was allocated with
{ {
std::vector<testData, polymorphic_allocator<testData>> v(polymorphic_allocator<testData>{allocZMQ}); std::vector<TestData, polymorphic_allocator<TestData>> v(polymorphic_allocator<TestData>{allocZMQ});
v.emplace_back(4); v.emplace_back(4);
v.emplace_back(5); v.emplace_back(5);
v.emplace_back(6); v.emplace_back(6);
@@ -130,7 +163,7 @@ TEST(MemoryResources, getMessage)
EXPECT_TRUE(message->GetData() != vectorBeginPtr); EXPECT_TRUE(message->GetData() != vectorBeginPtr);
} }
EXPECT_TRUE(message->GetSize() == 3 * sizeof(testData)); EXPECT_TRUE(message->GetSize() == 3 * sizeof(TestData));
messageArray = static_cast<int*>(message->GetData()); messageArray = static_cast<int*>(message->GetData());
EXPECT_TRUE(messageArray[0] == 4 && messageArray[1] == 5 && messageArray[2] == 6); EXPECT_TRUE(messageArray[0] == 4 && messageArray[1] == 5 && messageArray[2] == 6);
} }

View File

@@ -361,7 +361,7 @@ TEST_F(Topology, AsyncSetPropertiesTimeout)
topo.AsyncSetProperties({{"key1", "val1"}}, topo.AsyncSetProperties({{"key1", "val1"}},
"", "",
std::chrono::milliseconds(1), std::chrono::microseconds(1),
[=](std::error_code ec, sdk::FailedDevices) mutable { [=](std::error_code ec, sdk::FailedDevices) mutable {
LOG(info) << ec; LOG(info) << ec;
EXPECT_EQ(ec, MakeErrorCode(ErrorCode::OperationTimeout)); EXPECT_EQ(ec, MakeErrorCode(ErrorCode::OperationTimeout));

View File

@@ -7,9 +7,16 @@
********************************************************************************/ ********************************************************************************/
#include "runner.h" #include "runner.h"
#include <FairMQChannel.h>
#include <FairMQLogger.h>
#include <FairMQTransportFactory.h>
#include <fairmq/ProgOptions.h>
#include <fairmq/Tools.h> #include <fairmq/Tools.h>
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <chrono>
#include <sstream> // std::stringstream #include <sstream> // std::stringstream
#include <thread>
namespace namespace
{ {
@@ -18,6 +25,12 @@ using namespace std;
using namespace fair::mq::test; using namespace fair::mq::test;
using namespace fair::mq::tools; using namespace fair::mq::tools;
void delayedInterruptor(FairMQTransportFactory& transport)
{
std::this_thread::sleep_for(std::chrono::milliseconds(100));
transport.Interrupt();
}
auto RunTransferTimeout(string transport) -> void auto RunTransferTimeout(string transport) -> void
{ {
size_t session{fair::mq::tools::UuidHash()}; size_t session{fair::mq::tools::UuidHash()};
@@ -31,6 +44,28 @@ auto RunTransferTimeout(string transport) -> void
exit(res.exit_code); exit(res.exit_code);
} }
void InterruptTransfer(const string& transport, const string& _address)
{
size_t session{fair::mq::tools::UuidHash()};
std::string address(fair::mq::tools::ToString(_address, "_", transport));
fair::mq::ProgOptions config;
config.SetProperty<string>("session", to_string(session));
auto factory = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config);
FairMQChannel pull{"Pull", "pull", factory};
pull.Bind(address);
FairMQMessagePtr msg(pull.NewMessage());
auto t = thread(delayedInterruptor, ref(*factory));
auto result = pull.Receive(msg);
t.join();
ASSERT_EQ(result, static_cast<int>(fair::mq::TransferResult::interrupted));
}
TEST(TransferTimeout, zeromq) TEST(TransferTimeout, zeromq)
{ {
EXPECT_EXIT(RunTransferTimeout("zeromq"), ::testing::ExitedWithCode(0), "Transfer timeout test successfull"); EXPECT_EXIT(RunTransferTimeout("zeromq"), ::testing::ExitedWithCode(0), "Transfer timeout test successfull");
@@ -41,4 +76,14 @@ TEST(TransferTimeout, shmem)
EXPECT_EXIT(RunTransferTimeout("shmem"), ::testing::ExitedWithCode(0), "Transfer timeout test successfull"); EXPECT_EXIT(RunTransferTimeout("shmem"), ::testing::ExitedWithCode(0), "Transfer timeout test successfull");
} }
TEST(InterruptTransfer, zeromq)
{
InterruptTransfer("zeromq", "ipc://test_interrupt_transfer");
}
TEST(InterruptTransfer, shmem)
{
InterruptTransfer("shmem", "ipc://test_interrupt_transfer");
}
} // namespace } // namespace