mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-15 17:41:45 +00:00
Compare commits
9 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
42d27af20f | ||
|
25614e3e06 | ||
|
3decac58fc | ||
|
f278e7e312 | ||
|
491a943c63 | ||
|
c47fc6f9fe | ||
|
7b259afdb5 | ||
|
33ddcaad5e | ||
|
4d5dbedeab |
2
.gitignore
vendored
2
.gitignore
vendored
@@ -4,3 +4,5 @@ install
|
||||
.vscode
|
||||
/compile_commands.json
|
||||
.cache
|
||||
.spack-env
|
||||
spack.lock
|
||||
|
@@ -5,6 +5,7 @@
|
||||
[](https://doi.org/10.5281/zenodo.1689985)
|
||||
[](https://bestpractices.coreinfrastructure.org/projects/6915)
|
||||
[](https://github.com/FairRootGroup/FairMQ/actions/workflows/fair-software.yml)
|
||||
[](https://repology.org/project/fairmq/versions)
|
||||
|
||||
C++ Message Queuing Library and Framework
|
||||
|
||||
@@ -44,10 +45,10 @@ Recommended:
|
||||
|
||||
```bash
|
||||
git clone https://github.com/FairRootGroup/FairMQ fairmq_source
|
||||
cmake -S fairmq_source -B fairmq_build -GNinja -DCMAKE_BUILD_TYPE=Release -DCMAKE_INSTALL_PREFIX=fairmq_install
|
||||
cmake -S fairmq_source -B fairmq_build -GNinja -DCMAKE_BUILD_TYPE=Release
|
||||
cmake --build fairmq_build
|
||||
cmake --build fairmq_build --target test
|
||||
cmake --build fairmq_build --target install
|
||||
ctest --test-dir fairmq_build --output-on-failure --schedule-random -j<ncpus>
|
||||
cmake --install fairmq_build --prefix $(pwd)/fairmq_install
|
||||
```
|
||||
|
||||
Please consult the [manpages of your CMake version](https://cmake.org/cmake/help/latest/manual/cmake.1.html) for more options.
|
||||
@@ -55,6 +56,7 @@ Please consult the [manpages of your CMake version](https://cmake.org/cmake/help
|
||||
If dependencies are not installed in standard system directories, you can hint the installation location via
|
||||
`-DCMAKE_PREFIX_PATH=...` or per dependency via `-D{DEPENDENCY}_ROOT=...` (`*_ROOT` variables can also be environment variables).
|
||||
|
||||
|
||||
## Usage
|
||||
|
||||
FairMQ ships as a CMake package, so in your `CMakeLists.txt` you can discover it like this:
|
||||
|
@@ -54,7 +54,7 @@ struct Receiver : fair::mq::Device
|
||||
fBuffer[h.id].start = chrono::steady_clock::now();
|
||||
}
|
||||
// if the received ID has not previously been discarded, store the data part in the buffer
|
||||
fBuffer[h.id].parts.AddPart(move(parts.At(1)));
|
||||
fBuffer[h.id].parts.AddPart(std::move(parts.At(1)));
|
||||
} else {
|
||||
// if received ID has been previously discarded.
|
||||
LOG(debug) << "Received part from an already discarded timeframe with id " << h.id;
|
||||
|
@@ -1,5 +1,5 @@
|
||||
################################################################################
|
||||
# Copyright (C) 2012-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
|
||||
# Copyright (C) 2012-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
|
||||
# #
|
||||
# This software is distributed under the terms of the #
|
||||
# GNU Lesser General Public Licence (LGPL) version 3, #
|
||||
@@ -108,6 +108,7 @@ if(BUILD_FAIRMQ)
|
||||
zeromq/UnmanagedRegion.h
|
||||
zeromq/Socket.h
|
||||
zeromq/TransportFactory.h
|
||||
zeromq/ZMsg.h
|
||||
)
|
||||
|
||||
##########################
|
||||
|
@@ -425,10 +425,10 @@ class Channel
|
||||
msg.get()
|
||||
));
|
||||
msg.release();
|
||||
msg = move(msgWrapper);
|
||||
msg = std::move(msgWrapper);
|
||||
} else {
|
||||
MessagePtr newMsg(NewMessage());
|
||||
msg = move(newMsg);
|
||||
msg = std::move(newMsg);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -446,10 +446,10 @@ class Channel
|
||||
msg.get()
|
||||
));
|
||||
msg.release();
|
||||
msg = move(msgWrapper);
|
||||
msg = std::move(msgWrapper);
|
||||
} else {
|
||||
MessagePtr newMsg(NewMessage());
|
||||
msg = move(newMsg);
|
||||
msg = std::move(newMsg);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -459,7 +459,7 @@ class Channel
|
||||
{
|
||||
if (fTransportType != msg->GetType()) {
|
||||
MessagePtr newMsg(NewMessage());
|
||||
msg = move(newMsg);
|
||||
msg = std::move(newMsg);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -470,7 +470,7 @@ class Channel
|
||||
if (fTransportType != msg->GetType()) {
|
||||
|
||||
MessagePtr newMsg(NewMessage());
|
||||
msg = move(newMsg);
|
||||
msg = std::move(newMsg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -72,6 +72,7 @@ SAMPLER+=" --msg-size $msgSize"
|
||||
SAMPLER+=" --multipart $multipart"
|
||||
SAMPLER+=" --num-parts $numParts"
|
||||
SAMPLER+=" --shm-throw-bad-alloc false"
|
||||
# SAMPLER+=" --shm-metadata-msg-size 1024"
|
||||
# SAMPLER+=" --msg-rate 1000"
|
||||
SAMPLER+=" --max-iterations $maxIterations"
|
||||
SAMPLER+=" --channel-config name=data,type=pair,method=bind,address=tcp://127.0.0.1:5555"
|
||||
|
@@ -11,6 +11,7 @@
|
||||
#include <fairmq/JSONParser.h>
|
||||
#include <fairmq/SuboptParser.h>
|
||||
|
||||
#include <cstddef> // for std::size_t
|
||||
#include <vector>
|
||||
|
||||
using namespace std;
|
||||
@@ -72,6 +73,7 @@ Plugin::ProgOptions ConfigPluginProgramOptions()
|
||||
("shm-zero-segment", po::value<bool >()->default_value(false), "Shared memory: zero the shared memory segment memory after initialization (opened or created).")
|
||||
("shm-zero-segment-on-creation", po::value<bool >()->default_value(false), "Shared memory: zero the shared memory segment memory only once when created.")
|
||||
("shm-throw-bad-alloc", po::value<bool >()->default_value(true), "Shared memory: throw fair::mq::MessageBadAlloc if cannot allocate a message (retry if false).")
|
||||
("shm-metadata-msg-size", po::value<std::size_t >()->default_value(0), "Shared memory: size of the zmq metadata message (values smaller than minimum are clamped to the minimum).")
|
||||
("bad-alloc-max-attempts", po::value<int >(), "Maximum number of allocation attempts before throwing fair::mq::MessageBadAlloc. -1 is infinite. There is always at least one attempt, so 0 has safe effect as 1.")
|
||||
("bad-alloc-attempt-interval", po::value<int >()->default_value(50), "Interval between attempts if cannot allocate a message (in ms).")
|
||||
("shm-monitor", po::value<bool >()->default_value(false), "Shared memory: run monitor daemon.")
|
||||
|
@@ -29,7 +29,7 @@
|
||||
#include <algorithm> // max
|
||||
#include <chrono>
|
||||
#include <condition_variable>
|
||||
#include <cstddef> // max_align_t
|
||||
#include <cstddef> // max_align_t, std::size_t
|
||||
#include <cstdlib> // getenv
|
||||
#include <cstring> // memcpy
|
||||
#include <memory> // make_unique
|
||||
@@ -151,6 +151,7 @@ class Manager
|
||||
, fBadAllocMaxAttempts(1)
|
||||
, fBadAllocAttemptIntervalInMs(config ? config->GetProperty<int>("bad-alloc-attempt-interval", 50) : 50)
|
||||
, fNoCleanup(config ? config->GetProperty<bool>("shm-no-cleanup", false) : false)
|
||||
, fMetadataMsgSize(config ? config->GetProperty<std::size_t>("shm-metadata-msg-size", 0) : 0)
|
||||
{
|
||||
using namespace boost::interprocess;
|
||||
|
||||
@@ -828,6 +829,8 @@ class Manager
|
||||
}
|
||||
}
|
||||
|
||||
auto GetMetadataMsgSize() const noexcept { return fMetadataMsgSize; }
|
||||
|
||||
~Manager()
|
||||
{
|
||||
fRegionsGen += 1; // signal TL cache invalidation
|
||||
@@ -884,6 +887,8 @@ class Manager
|
||||
int fBadAllocMaxAttempts;
|
||||
int fBadAllocAttemptIntervalInMs;
|
||||
bool fNoCleanup;
|
||||
|
||||
std::size_t fMetadataMsgSize;
|
||||
};
|
||||
|
||||
} // namespace fair::mq::shmem
|
||||
|
@@ -1,5 +1,5 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* Copyright (C) 2014-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
@@ -11,18 +11,23 @@
|
||||
#include "Common.h"
|
||||
#include "Manager.h"
|
||||
#include "Message.h"
|
||||
#include <fairmq/Error.h>
|
||||
#include <fairmq/Error.h> // for assertm
|
||||
#include <fairmq/Message.h>
|
||||
#include <fairmq/Socket.h>
|
||||
#include <fairmq/tools/Strings.h>
|
||||
#include <fairmq/zeromq/Common.h>
|
||||
#include <fairmq/zeromq/Common.h> // for zmq::HandleErrors, zmq::ShouldRetry
|
||||
#include <fairmq/zeromq/ZMsg.h> // for zmq::ZMsg
|
||||
|
||||
#include <fairlogger/Logger.h>
|
||||
|
||||
#include <zmq.h>
|
||||
|
||||
#include <algorithm> // for std::max
|
||||
#include <atomic>
|
||||
#include <memory> // make_unique
|
||||
#include <cstddef> // for std::size_t
|
||||
#include <cstring> // for std::memcpy
|
||||
#include <exception> // for std::terminate
|
||||
#include <memory> // for std::make_unique
|
||||
|
||||
namespace fair::mq {
|
||||
class TransportFactory;
|
||||
@@ -31,24 +36,6 @@ namespace fair::mq {
|
||||
namespace fair::mq::shmem
|
||||
{
|
||||
|
||||
struct ZMsg
|
||||
{
|
||||
ZMsg() { int rc __attribute__((unused)) = zmq_msg_init(&fMsg); assert(rc == 0); }
|
||||
explicit ZMsg(size_t size) { int rc __attribute__((unused)) = zmq_msg_init_size(&fMsg, size); assert(rc == 0); }
|
||||
~ZMsg() { int rc __attribute__((unused)) = zmq_msg_close(&fMsg); assert(rc == 0); }
|
||||
|
||||
ZMsg(const ZMsg&) = delete;
|
||||
ZMsg(ZMsg&&) = delete;
|
||||
ZMsg& operator=(const ZMsg&) = delete;
|
||||
ZMsg& operator=(ZMsg&&) = delete;
|
||||
|
||||
void* Data() { return zmq_msg_data(&fMsg); }
|
||||
size_t Size() { return zmq_msg_size(&fMsg); }
|
||||
zmq_msg_t* Msg() { return &fMsg; }
|
||||
|
||||
zmq_msg_t fMsg;
|
||||
};
|
||||
|
||||
class Socket final : public fair::mq::Socket
|
||||
{
|
||||
public:
|
||||
@@ -64,6 +51,7 @@ class Socket final : public fair::mq::Socket
|
||||
, fMessagesRx(0)
|
||||
, fTimeout(100)
|
||||
, fConnectedPeersCount(0)
|
||||
, fMetadataMsgSize(manager.GetMetadataMsgSize())
|
||||
{
|
||||
assert(context);
|
||||
|
||||
@@ -141,8 +129,12 @@ class Socket final : public fair::mq::Socket
|
||||
}
|
||||
int elapsed = 0;
|
||||
|
||||
// meta msg format: | MetaHeader | padded to fMetadataMsgSize |
|
||||
zmq::ZMsg zmqMsg(std::max(fMetadataMsgSize, sizeof(MetaHeader)));
|
||||
std::memcpy(zmqMsg.Data(), &(shmMsg->fMeta), sizeof(MetaHeader));
|
||||
|
||||
while (true) {
|
||||
int nbytes = zmq_send(fSocket, &(shmMsg->fMeta), sizeof(MetaHeader), flags);
|
||||
int nbytes = zmq_msg_send(zmqMsg.Msg(), fSocket, flags);
|
||||
if (nbytes > 0) {
|
||||
shmMsg->fQueued = true;
|
||||
++fMessagesTx;
|
||||
@@ -178,11 +170,11 @@ class Socket final : public fair::mq::Socket
|
||||
int nbytes = zmq_recv(fSocket, &(shmMsg->fMeta), sizeof(MetaHeader), flags);
|
||||
if (nbytes > 0) {
|
||||
// check for number of received messages. must be 1
|
||||
if (nbytes != sizeof(MetaHeader)) {
|
||||
if (static_cast<std::size_t>(nbytes) < sizeof(MetaHeader)) {
|
||||
throw SocketError(
|
||||
tools::ToString("Received message is not a valid FairMQ shared memory message. ",
|
||||
"Possibly due to a misconfigured transport on the sender side. ",
|
||||
"Expected size of ", sizeof(MetaHeader), " bytes, received ", nbytes));
|
||||
"Expected minimum size of ", sizeof(MetaHeader), " bytes, received ", nbytes));
|
||||
}
|
||||
|
||||
size_t size = shmMsg->GetSize();
|
||||
@@ -211,13 +203,14 @@ class Socket final : public fair::mq::Socket
|
||||
}
|
||||
int elapsed = 0;
|
||||
|
||||
// put it into zmq message
|
||||
const unsigned int vecSize = msgVec.size();
|
||||
ZMsg zmqMsg(vecSize * sizeof(MetaHeader));
|
||||
|
||||
// prepare the message with shm metas
|
||||
MetaHeader* metas = static_cast<MetaHeader*>(zmqMsg.Data());
|
||||
// meta msg format: | n | MetaHeader 1 | ... | MetaHeader n | padded to fMetadataMsgSize |
|
||||
auto const n = msgVec.size();
|
||||
zmq::ZMsg zmqMsg(std::max(fMetadataMsgSize, sizeof(std::size_t) + n * sizeof(MetaHeader)));
|
||||
|
||||
auto meta_n = static_cast<std::size_t*>(zmqMsg.Data());
|
||||
*meta_n = n;
|
||||
++meta_n;
|
||||
auto metas = static_cast<MetaHeader*>(static_cast<void*>(meta_n));
|
||||
for (auto& msg : msgVec) {
|
||||
auto msgPtr = msg.get();
|
||||
if (!msgPtr) {
|
||||
@@ -232,7 +225,7 @@ class Socket final : public fair::mq::Socket
|
||||
int64_t totalSize = 0;
|
||||
int nbytes = zmq_msg_send(zmqMsg.Msg(), fSocket, flags);
|
||||
if (nbytes > 0) {
|
||||
assert(static_cast<unsigned int>(nbytes) == (vecSize * sizeof(MetaHeader))); // all or nothing
|
||||
assert(static_cast<unsigned int>(nbytes) >= sizeof(std::size_t) + (n * sizeof(MetaHeader)));
|
||||
|
||||
for (auto& msg : msgVec) {
|
||||
Message* shmMsg = static_cast<Message*>(msg.get());
|
||||
@@ -269,29 +262,26 @@ class Socket final : public fair::mq::Socket
|
||||
}
|
||||
int elapsed = 0;
|
||||
|
||||
ZMsg zmqMsg;
|
||||
zmq::ZMsg zmqMsg;
|
||||
|
||||
while (true) {
|
||||
int64_t totalSize = 0;
|
||||
std::size_t totalSize = 0;
|
||||
int nbytes = zmq_msg_recv(zmqMsg.Msg(), fSocket, flags);
|
||||
if (nbytes > 0) {
|
||||
MetaHeader* hdrVec = static_cast<MetaHeader*>(zmqMsg.Data());
|
||||
const auto hdrVecSize = zmqMsg.Size();
|
||||
[[maybe_unused]] auto const size = zmqMsg.Size();
|
||||
assert(size > sizeof(std::size_t));
|
||||
auto meta_n = static_cast<std::size_t*>(zmqMsg.Data());
|
||||
auto const n = *meta_n;
|
||||
assert(size >= sizeof(std::size_t) + n * sizeof(MetaHeader));
|
||||
++meta_n;
|
||||
auto metas = static_cast<MetaHeader*>(static_cast<void*>(meta_n));
|
||||
msgVec.reserve(msgVec.size() + n);
|
||||
auto const transport = GetTransport();
|
||||
|
||||
assert(hdrVecSize > 0);
|
||||
if (hdrVecSize % sizeof(MetaHeader) != 0) {
|
||||
throw SocketError(
|
||||
tools::ToString("Received message is not a valid FairMQ shared memory message. ",
|
||||
"Possibly due to a misconfigured transport on the sender side. ",
|
||||
"Expected size of ", sizeof(MetaHeader), " bytes, received ", nbytes));
|
||||
}
|
||||
|
||||
const auto numMessages = hdrVecSize / sizeof(MetaHeader);
|
||||
msgVec.reserve(numMessages);
|
||||
|
||||
for (size_t m = 0; m < numMessages; m++) {
|
||||
// create new message (part)
|
||||
msgVec.emplace_back(std::make_unique<Message>(fManager, hdrVec[m], GetTransport()));
|
||||
for (std::size_t i = 0; i < n; ++i) {
|
||||
msgVec.push_back(std::make_unique<Message>(fManager, *metas, transport));
|
||||
++metas;
|
||||
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-static-cast-downcast)
|
||||
Message* shmMsg = static_cast<Message*>(msgVec.back().get());
|
||||
totalSize += shmMsg->GetSize();
|
||||
}
|
||||
@@ -469,6 +459,7 @@ class Socket final : public fair::mq::Socket
|
||||
|
||||
int fTimeout;
|
||||
mutable unsigned long fConnectedPeersCount;
|
||||
std::size_t fMetadataMsgSize;
|
||||
};
|
||||
|
||||
} // namespace fair::mq::shmem
|
||||
|
92
fairmq/zeromq/ZMsg.h
Normal file
92
fairmq/zeromq/ZMsg.h
Normal file
@@ -0,0 +1,92 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
|
||||
#ifndef FAIR_MQ_ZMQ_ZMSG_H
|
||||
#define FAIR_MQ_ZMQ_ZMSG_H
|
||||
|
||||
#include <cstddef> // for std::size_t
|
||||
#include <fairmq/Error.h> // for assertm
|
||||
#include <new> // for std::bad_alloc
|
||||
#include <zmq.h> // for zmq_*
|
||||
|
||||
namespace fair::mq::zmq {
|
||||
|
||||
// Wraps a `zmq_msg_t` C object as a C++ type:
|
||||
// * `zmq_msg_init` -> C++ default ctor
|
||||
// * `zmq_msg_init_size` -> C++ ctor
|
||||
// * `zmq_msg_init_data` -> C++ ctor
|
||||
// * `zmq_msg_init_size` + `memcpy` -> C++ copy ctor
|
||||
// * `zmq_msg_close` + `zmq_msg_init_size` + `memcpy` -> C++ copy assignment
|
||||
// * `zmq_msg_init` + `zmq_msg_move`` -> C++ move ctor
|
||||
// * `zmq_msg_move` -> C++ move assignment
|
||||
// * `zmq_msg_close` -> C++ dtor
|
||||
// * access the underlying `zmq_msg_t` via `Msg() [const] -> zmq_msg_t*`
|
||||
// the const overload does a `const_cast<zmq_msg_t*>`, because the
|
||||
// C interfaces do not model constness
|
||||
// * `zmq_msg_data` -> `Data() -> void*`
|
||||
// * `zmq_msg_size` -> `Size() -> std::size_t`
|
||||
struct ZMsg
|
||||
{
|
||||
ZMsg() noexcept
|
||||
{
|
||||
[[maybe_unused]] auto const rc = zmq_msg_init(Msg());
|
||||
assertm(rc == 0, "msg init successful"); // NOLINT
|
||||
}
|
||||
explicit ZMsg(std::size_t size)
|
||||
{
|
||||
auto const rc = zmq_msg_init_size(Msg(), size);
|
||||
if (rc == -1) {
|
||||
throw std::bad_alloc{};
|
||||
}
|
||||
}
|
||||
explicit ZMsg(void* data,
|
||||
std::size_t size,
|
||||
zmq_free_fn* freefn = nullptr,
|
||||
void* hint = nullptr)
|
||||
{
|
||||
auto const rc = zmq_msg_init_data(Msg(), data, size, freefn, hint);
|
||||
if (rc == -1) {
|
||||
throw std::bad_alloc{};
|
||||
}
|
||||
}
|
||||
~ZMsg() noexcept
|
||||
{
|
||||
[[maybe_unused]] auto const rc = zmq_msg_close(Msg());
|
||||
assertm(rc == 0, "msg close successful"); // NOLINT
|
||||
}
|
||||
ZMsg(const ZMsg& other) = delete;
|
||||
ZMsg(ZMsg&& other) noexcept
|
||||
{
|
||||
[[maybe_unused]] auto rc = zmq_msg_init(Msg());
|
||||
assertm(rc == 0, "msg init successful"); // NOLINT
|
||||
rc = zmq_msg_move(Msg(), other.Msg());
|
||||
assertm(rc == 0, "msg move successful"); // NOLINT
|
||||
}
|
||||
ZMsg& operator=(const ZMsg& rhs) = delete;
|
||||
ZMsg& operator=(ZMsg&& rhs) noexcept
|
||||
{
|
||||
[[maybe_unused]] auto const rc = zmq_msg_move(Msg(), rhs.Msg());
|
||||
assertm(rc == 0, "msg move successful"); // NOLINT
|
||||
return *this;
|
||||
}
|
||||
|
||||
zmq_msg_t* Msg() noexcept { return &fMsg; }
|
||||
zmq_msg_t* Msg() const noexcept
|
||||
{
|
||||
return const_cast<zmq_msg_t*>(&fMsg); // NOLINT(cppcoreguidelines-pro-type-const-cast)
|
||||
}
|
||||
void* Data() const noexcept { return zmq_msg_data(Msg()); }
|
||||
std::size_t Size() const noexcept { return zmq_msg_size(Msg()); }
|
||||
|
||||
private:
|
||||
zmq_msg_t fMsg{};
|
||||
};
|
||||
|
||||
} // namespace fair::mq::zmq
|
||||
|
||||
#endif /* FAIR_MQ_ZMQ_ZMSG_H */
|
10
spack.yaml
Normal file
10
spack.yaml
Normal file
@@ -0,0 +1,10 @@
|
||||
spack:
|
||||
specs:
|
||||
- boost+container+program_options+filesystem+date_time+regex
|
||||
- faircmakemodules
|
||||
- fairlogger+pretty
|
||||
- fmt
|
||||
- libzmq
|
||||
view: true
|
||||
concretizer:
|
||||
unify: true
|
@@ -28,10 +28,26 @@ class Pull : public Device
|
||||
|
||||
auto Run() -> void override
|
||||
{
|
||||
auto msg = NewMessage();
|
||||
int counter = 0;
|
||||
|
||||
if (Receive(msg, "data") >= 0)
|
||||
{
|
||||
auto msg1 = NewMessageFor("data", 0);
|
||||
|
||||
if (Receive(msg1, "data") >= 0) {
|
||||
++counter;
|
||||
}
|
||||
|
||||
auto msg2 = NewMessageFor("data", 0);
|
||||
|
||||
auto ret = Receive(msg2, "data");
|
||||
if (ret >= 0) {
|
||||
auto content = std::string{static_cast<char*>(msg2->GetData()), msg2->GetSize()};
|
||||
LOG(info) << "Transferred " << static_cast<std::size_t>(ret) << " bytes, msg size: " << msg2->GetSize() << ", content: " << content;
|
||||
if (msg2->GetSize() == static_cast<std::size_t>(ret) && content == "testdata1234") {
|
||||
++counter;
|
||||
}
|
||||
}
|
||||
|
||||
if (counter == 2) {
|
||||
LOG(info) << "PUSH-PULL test successfull";
|
||||
}
|
||||
};
|
||||
|
@@ -25,8 +25,12 @@ class Push : public Device
|
||||
|
||||
auto Run() -> void override
|
||||
{
|
||||
auto msg = NewMessage();
|
||||
Send(msg, "data");
|
||||
// empty message
|
||||
auto msg1 = NewMessageFor("data", 0);
|
||||
Send(msg1, "data");
|
||||
// message with short text data
|
||||
auto msg2(NewSimpleMessageFor("data", 0, "testdata1234"));
|
||||
Send(msg2, "data");
|
||||
};
|
||||
};
|
||||
|
||||
|
@@ -1,5 +1,5 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2015-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* Copyright (C) 2015-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
@@ -24,22 +24,40 @@ class Rep : public Device
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(200));
|
||||
}
|
||||
|
||||
bool check(signed long ret, const fair::mq::MessagePtr& msg)
|
||||
{
|
||||
auto content = std::string{static_cast<char*>(msg->GetData()), msg->GetSize()};
|
||||
LOG(info) << "Transferred " << static_cast<std::size_t>(ret) << " bytes, msg size: " << msg->GetSize() << ", content: " << content;
|
||||
return msg->GetSize() == static_cast<std::size_t>(ret) && content == "request";
|
||||
}
|
||||
|
||||
auto Run() -> void override
|
||||
{
|
||||
auto request1 = NewMessage();
|
||||
if (Receive(request1, "data") >= 0) {
|
||||
int counter = 0;
|
||||
auto req1 = NewMessage();
|
||||
auto ret1 = Receive(req1, "data");
|
||||
if (ret1 >= 0) {
|
||||
LOG(info) << "Received request 1";
|
||||
auto reply = NewMessage();
|
||||
if (check(ret1, req1)) {
|
||||
++counter;
|
||||
}
|
||||
auto reply = NewSimpleMessageFor("data", 0, "reply");
|
||||
Send(reply, "data");
|
||||
}
|
||||
auto request2 = NewMessage();
|
||||
if (Receive(request2, "data") >= 0) {
|
||||
auto req2 = NewMessage();
|
||||
auto ret2 = Receive(req2, "data");
|
||||
if (ret2 >= 0) {
|
||||
LOG(info) << "Received request 2";
|
||||
auto reply = NewMessage();
|
||||
if (check(ret2, req2)) {
|
||||
++counter;
|
||||
}
|
||||
auto reply = NewSimpleMessageFor("data", 0, "reply");
|
||||
Send(reply, "data");
|
||||
}
|
||||
|
||||
LOG(info) << "REQ-REP test successfull";
|
||||
if (counter == 2) {
|
||||
LOG(info) << "REQ-REP test successfull";
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
|
@@ -1,5 +1,5 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2015-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* Copyright (C) 2015-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
@@ -26,12 +26,17 @@ class Req : public Device
|
||||
|
||||
auto Run() -> void override
|
||||
{
|
||||
auto request = NewMessage();
|
||||
auto request = NewSimpleMessageFor("data", 0, "request");
|
||||
Send(request, "data");
|
||||
|
||||
auto reply = NewMessage();
|
||||
if (Receive(reply, "data") >= 0) {
|
||||
LOG(info) << "received reply";
|
||||
auto content = std::string{static_cast<char*>(reply->GetData()), reply->GetSize()};
|
||||
LOG(info) << "Transferred reply of size: " << reply->GetSize() << ", content: " << content;
|
||||
if (content != "reply") {
|
||||
ChangeStateOrThrow(Transition::ErrorFound);
|
||||
}
|
||||
}
|
||||
};
|
||||
};
|
||||
|
@@ -37,12 +37,15 @@ auto AsStringView(Message const& msg) -> string_view
|
||||
return {static_cast<char const*>(msg.GetData()), msg.GetSize()};
|
||||
}
|
||||
|
||||
auto RunPushPullWithMsgResize(string const & transport, string const & _address) -> void
|
||||
auto RunPushPullWithMsgResize(string const & transport, string const & _address, bool expandedShmMetadata = false) -> void
|
||||
{
|
||||
ProgOptions config;
|
||||
config.SetProperty<string>("session", tools::Uuid());
|
||||
config.SetProperty<size_t>("shm-segment-size", 100000000);
|
||||
config.SetProperty<bool>("shm-monitor", true);
|
||||
if (expandedShmMetadata) {
|
||||
config.SetProperty<size_t>("shm-metadata-msg-size", 2048);
|
||||
}
|
||||
auto factory(TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config));
|
||||
|
||||
Channel push{"Push", "push", factory};
|
||||
@@ -100,12 +103,15 @@ auto RunPushPullWithMsgResize(string const & transport, string const & _address)
|
||||
}
|
||||
}
|
||||
|
||||
auto RunMsgRebuild(const string& transport) -> void
|
||||
auto RunMsgRebuild(const string& transport, bool expandedShmMetadata = false) -> void
|
||||
{
|
||||
ProgOptions config;
|
||||
config.SetProperty<string>("session", tools::Uuid());
|
||||
config.SetProperty<size_t>("shm-segment-size", 100000000);
|
||||
config.SetProperty<bool>("shm-monitor", true);
|
||||
if (expandedShmMetadata) {
|
||||
config.SetProperty<size_t>("shm-metadata-msg-size", 2048);
|
||||
}
|
||||
auto factory(TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config));
|
||||
|
||||
size_t const msgSize{100};
|
||||
@@ -134,12 +140,15 @@ auto CheckMsgAlignment(Message const& msg, fair::mq::Alignment alignment) -> boo
|
||||
return (reinterpret_cast<uintptr_t>(msg.GetData()) % static_cast<size_t>(alignment)) == 0; // NOLINT
|
||||
}
|
||||
|
||||
auto RunPushPullWithAlignment(string const& transport, string const& _address) -> void
|
||||
auto RunPushPullWithAlignment(string const& transport, string const& _address, bool expandedShmMetadata = false) -> void
|
||||
{
|
||||
ProgOptions config;
|
||||
config.SetProperty<string>("session", tools::Uuid());
|
||||
config.SetProperty<size_t>("shm-segment-size", 100000000);
|
||||
config.SetProperty<bool>("shm-monitor", true);
|
||||
if (expandedShmMetadata) {
|
||||
config.SetProperty<size_t>("shm-metadata-msg-size", 2048);
|
||||
}
|
||||
auto factory(TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config));
|
||||
|
||||
Channel push{"Push", "push", factory};
|
||||
@@ -189,12 +198,15 @@ auto RunPushPullWithAlignment(string const& transport, string const& _address) -
|
||||
ASSERT_TRUE(CheckMsgAlignment(*msgCopy, align32));
|
||||
}
|
||||
|
||||
auto EmptyMessage(string const& transport, string const& _address) -> void
|
||||
auto EmptyMessage(string const& transport, string const& _address, bool expandedShmMetadata = false) -> void
|
||||
{
|
||||
ProgOptions config;
|
||||
config.SetProperty<string>("session", tools::Uuid());
|
||||
config.SetProperty<size_t>("shm-segment-size", 100000000);
|
||||
config.SetProperty<bool>("shm-monitor", true);
|
||||
if (expandedShmMetadata) {
|
||||
config.SetProperty<size_t>("shm-metadata-msg-size", 2048);
|
||||
}
|
||||
auto factory(TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config));
|
||||
|
||||
Channel push{"Push", "push", factory};
|
||||
@@ -241,12 +253,15 @@ auto EmptyMessage(string const& transport, string const& _address) -> void
|
||||
|
||||
// The "zero copy" property of the Copy() method is an implementation detail and is not guaranteed.
|
||||
// Currently it holds true for the shmem (across devices) and for zeromq (within same device) transports.
|
||||
auto ZeroCopy() -> void
|
||||
auto ZeroCopy(bool expandedShmMetadata = false) -> void
|
||||
{
|
||||
ProgOptions config;
|
||||
config.SetProperty<string>("session", tools::Uuid());
|
||||
config.SetProperty<size_t>("shm-segment-size", 100000000);
|
||||
config.SetProperty<bool>("shm-monitor", true);
|
||||
if (expandedShmMetadata) {
|
||||
config.SetProperty<size_t>("shm-metadata-msg-size", 2048);
|
||||
}
|
||||
auto factory(TransportFactory::CreateTransportFactory("shmem", tools::Uuid(), &config));
|
||||
|
||||
unique_ptr<string> str(make_unique<string>("asdf"));
|
||||
@@ -272,7 +287,7 @@ auto ZeroCopy() -> void
|
||||
|
||||
// The "zero copy" property of the Copy() method is an implementation detail and is not guaranteed.
|
||||
// Currently it holds true for the shmem (across devices) and for zeromq (within same device) transports.
|
||||
auto ZeroCopyFromUnmanaged(string const& address) -> void
|
||||
auto ZeroCopyFromUnmanaged(string const& address, bool expandedShmMetadata = false) -> void
|
||||
{
|
||||
ProgOptions config1;
|
||||
ProgOptions config2;
|
||||
@@ -285,6 +300,12 @@ auto ZeroCopyFromUnmanaged(string const& address) -> void
|
||||
config2.SetProperty<bool>("shm-monitor", true);
|
||||
// ref counts should be accessible accross different segments
|
||||
config2.SetProperty<uint16_t>("shm-segment-id", 2);
|
||||
if (expandedShmMetadata) {
|
||||
config1.SetProperty<size_t>("shm-metadata-msg-size", 2048);
|
||||
}
|
||||
if (expandedShmMetadata) {
|
||||
config2.SetProperty<size_t>("shm-metadata-msg-size", 2048);
|
||||
}
|
||||
auto factory1(TransportFactory::CreateTransportFactory("shmem", tools::Uuid(), &config1));
|
||||
auto factory2(TransportFactory::CreateTransportFactory("shmem", tools::Uuid(), &config2));
|
||||
|
||||
@@ -378,6 +399,11 @@ TEST(Resize, shmem) // NOLINT
|
||||
RunPushPullWithMsgResize("shmem", "ipc://test_message_resize");
|
||||
}
|
||||
|
||||
TEST(Resize, shmem_expanded_metadata) // NOLINT
|
||||
{
|
||||
RunPushPullWithMsgResize("shmem", "ipc://test_message_resize", true);
|
||||
}
|
||||
|
||||
TEST(Rebuild, zeromq) // NOLINT
|
||||
{
|
||||
RunMsgRebuild("zeromq");
|
||||
@@ -388,11 +414,21 @@ TEST(Rebuild, shmem) // NOLINT
|
||||
RunMsgRebuild("shmem");
|
||||
}
|
||||
|
||||
TEST(Rebuild, shmem_expanded_metadata) // NOLINT
|
||||
{
|
||||
RunMsgRebuild("shmem", true);
|
||||
}
|
||||
|
||||
TEST(Alignment, shmem) // NOLINT
|
||||
{
|
||||
RunPushPullWithAlignment("shmem", "ipc://test_message_alignment");
|
||||
}
|
||||
|
||||
TEST(Alignment, shmem_expanded_metadata) // NOLINT
|
||||
{
|
||||
RunPushPullWithAlignment("shmem", "ipc://test_message_alignment", true);
|
||||
}
|
||||
|
||||
TEST(Alignment, zeromq) // NOLINT
|
||||
{
|
||||
RunPushPullWithAlignment("zeromq", "ipc://test_message_alignment");
|
||||
@@ -408,14 +444,29 @@ TEST(EmptyMessage, shmem) // NOLINT
|
||||
EmptyMessage("shmem", "ipc://test_empty_message");
|
||||
}
|
||||
|
||||
TEST(EmptyMessage, shmem_expanded_metadata) // NOLINT
|
||||
{
|
||||
EmptyMessage("shmem", "ipc://test_empty_message", true);
|
||||
}
|
||||
|
||||
TEST(ZeroCopy, shmem) // NOLINT
|
||||
{
|
||||
ZeroCopy();
|
||||
}
|
||||
|
||||
TEST(ZeroCopy, shmem_expanded_metadata) // NOLINT
|
||||
{
|
||||
ZeroCopy(true);
|
||||
}
|
||||
|
||||
TEST(ZeroCopyFromUnmanaged, shmem) // NOLINT
|
||||
{
|
||||
ZeroCopyFromUnmanaged("ipc://test_zerocopy_unmanaged");
|
||||
}
|
||||
|
||||
TEST(ZeroCopyFromUnmanaged, shmem_expanded_metadata) // NOLINT
|
||||
{
|
||||
ZeroCopyFromUnmanaged("ipc://test_zerocopy_unmanaged", true);
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
@@ -22,7 +22,7 @@ using namespace std;
|
||||
using namespace fair::mq::test;
|
||||
using namespace fair::mq::tools;
|
||||
|
||||
auto RunPair(string transport) -> void
|
||||
auto RunPair(const string& transport, const string& extraDeviceCmdArgs) -> void
|
||||
{
|
||||
size_t session{fair::mq::tools::UuidHash()};
|
||||
string ipcFile("/tmp/fmq_" + to_string(session) + "_data_" + transport);
|
||||
@@ -38,6 +38,7 @@ auto RunPair(string transport) -> void
|
||||
<< " --shm-segment-size 100000000"
|
||||
<< " --session " << session
|
||||
<< " --color false"
|
||||
<< extraDeviceCmdArgs
|
||||
<< " --channel-config name=data,type=pair,method=bind,address=" << address;
|
||||
pairleft = execute(cmd.str(), "[PAIR L]");
|
||||
});
|
||||
@@ -52,6 +53,7 @@ auto RunPair(string transport) -> void
|
||||
<< " --shm-segment-size 100000000"
|
||||
<< " --session " << session
|
||||
<< " --color false"
|
||||
<< extraDeviceCmdArgs
|
||||
<< " --channel-config name=data,type=pair,method=connect,address=" << address;
|
||||
pairright = execute(cmd.str(), "[PAIR R]");
|
||||
});
|
||||
@@ -65,14 +67,19 @@ auto RunPair(string transport) -> void
|
||||
exit(pairleft.exit_code + pairright.exit_code);
|
||||
}
|
||||
|
||||
TEST(Pair, SingleMsg_MP_tcp_zeromq)
|
||||
TEST(Pair, SingleMsg_MultiThreaded_tcp_zeromq)
|
||||
{
|
||||
EXPECT_EXIT(RunPair("zeromq"), ::testing::ExitedWithCode(0), "PAIR test successfull");
|
||||
EXPECT_EXIT(RunPair("zeromq", ""), ::testing::ExitedWithCode(0), "PAIR test successfull");
|
||||
}
|
||||
|
||||
TEST(Pair, SingleMsg_MP_tcp_shmem)
|
||||
TEST(Pair, SingleMsg_MultiThreaded_tcp_shmem)
|
||||
{
|
||||
EXPECT_EXIT(RunPair("shmem"), ::testing::ExitedWithCode(0), "PAIR test successfull");
|
||||
EXPECT_EXIT(RunPair("shmem", ""), ::testing::ExitedWithCode(0), "PAIR test successfull");
|
||||
}
|
||||
|
||||
TEST(Pair, SingleMsg_MultiThreaded_tcp_shmem_expanded_metadata)
|
||||
{
|
||||
EXPECT_EXIT(RunPair("shmem", " --shm-metadata-msg-size 2048"), ::testing::ExitedWithCode(0), "PAIR test successfull");
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
@@ -22,7 +22,7 @@ using namespace std;
|
||||
using namespace fair::mq::test;
|
||||
using namespace fair::mq::tools;
|
||||
|
||||
auto RunPushPull(string transport) -> void
|
||||
auto RunPushPull(string transport, const string& extraDeviceCmdArgs) -> void
|
||||
{
|
||||
size_t session(fair::mq::tools::UuidHash());
|
||||
string ipcFile("/tmp/fmq_" + to_string(session) + "_data_" + transport);
|
||||
@@ -38,6 +38,7 @@ auto RunPushPull(string transport) -> void
|
||||
<< " --shm-segment-size 100000000"
|
||||
<< " --session " << session
|
||||
<< " --color false"
|
||||
<< extraDeviceCmdArgs
|
||||
<< " --channel-config name=data,type=push,method=bind,address=" << address;
|
||||
push = execute(cmd.str(), "[PUSH]");
|
||||
});
|
||||
@@ -52,6 +53,7 @@ auto RunPushPull(string transport) -> void
|
||||
<< " --shm-segment-size 100000000"
|
||||
<< " --session " << session
|
||||
<< " --color false"
|
||||
<< extraDeviceCmdArgs
|
||||
<< " --channel-config name=data,type=pull,method=connect,address=" << address;
|
||||
pull = execute(cmd.str(), "[PULL]");
|
||||
});
|
||||
@@ -65,14 +67,19 @@ auto RunPushPull(string transport) -> void
|
||||
exit(push.exit_code + pull.exit_code);
|
||||
}
|
||||
|
||||
TEST(PushPull, SingleMsg_MP_ipc_zeromq)
|
||||
TEST(PushPull, SingleMsg_MultiThreaded_ipc_zeromq)
|
||||
{
|
||||
EXPECT_EXIT(RunPushPull("zeromq"), ::testing::ExitedWithCode(0), "PUSH-PULL test successfull");
|
||||
EXPECT_EXIT(RunPushPull("zeromq", ""), ::testing::ExitedWithCode(0), "PUSH-PULL test successfull");
|
||||
}
|
||||
|
||||
TEST(PushPull, SingleMsg_MP_ipc_shmem)
|
||||
TEST(PushPull, SingleMsg_MultiThreaded_ipc_shmem)
|
||||
{
|
||||
EXPECT_EXIT(RunPushPull("shmem"), ::testing::ExitedWithCode(0), "PUSH-PULL test successfull");
|
||||
EXPECT_EXIT(RunPushPull("shmem", ""), ::testing::ExitedWithCode(0), "PUSH-PULL test successfull");
|
||||
}
|
||||
|
||||
TEST(PushPull, SingleMsg_MultiThreaded_ipc_shmem_expanded_metadata)
|
||||
{
|
||||
EXPECT_EXIT(RunPushPull("shmem", " --shm-metadata-msg-size 2048"), ::testing::ExitedWithCode(0), "PUSH-PULL test successfull");
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
@@ -25,12 +25,15 @@ namespace
|
||||
using namespace std;
|
||||
using namespace fair::mq;
|
||||
|
||||
auto RunSingleThreadedMultipart(string transport, string address1, string address2) -> void {
|
||||
auto RunSingleThreadedMultipart(string transport, string address1, string address2, bool expandedShmMetadata) -> void {
|
||||
|
||||
fair::mq::ProgOptions config;
|
||||
config.SetProperty<string>("session", tools::Uuid());
|
||||
config.SetProperty<size_t>("shm-segment-size", 100000000);
|
||||
config.SetProperty<bool>("shm-monitor", true);
|
||||
if (expandedShmMetadata) {
|
||||
config.SetProperty<size_t>("shm-metadata-msg-size", 2048);
|
||||
}
|
||||
|
||||
auto factory = TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config);
|
||||
|
||||
@@ -104,13 +107,16 @@ auto RunSingleThreadedMultipart(string transport, string address1, string addres
|
||||
}
|
||||
}
|
||||
|
||||
auto RunMultiThreadedMultipart(string transport, string address1) -> void
|
||||
auto RunMultiThreadedMultipart(string transport, string address1, bool expandedShmMetadata) -> void
|
||||
{
|
||||
ProgOptions config;
|
||||
config.SetProperty<string>("session", tools::Uuid());
|
||||
config.SetProperty<int>("io-threads", 1);
|
||||
config.SetProperty<size_t>("shm-segment-size", 20000000); // NOLINT
|
||||
config.SetProperty<bool>("shm-monitor", true);
|
||||
if (expandedShmMetadata) {
|
||||
config.SetProperty<size_t>("shm-metadata-msg-size", 2048);
|
||||
}
|
||||
|
||||
auto factory = TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config);
|
||||
|
||||
@@ -147,44 +153,64 @@ auto RunMultiThreadedMultipart(string transport, string address1) -> void
|
||||
puller.join();
|
||||
}
|
||||
|
||||
TEST(PushPull, Multipart_ST_inproc_zeromq) // NOLINT
|
||||
TEST(PushPull, Multipart_SingleThreaded_inproc_zeromq) // NOLINT
|
||||
{
|
||||
RunSingleThreadedMultipart("zeromq", "inproc://test1", "inproc://test2");
|
||||
RunSingleThreadedMultipart("zeromq", "inproc://test1", "inproc://test2", false);
|
||||
}
|
||||
|
||||
TEST(PushPull, Multipart_ST_inproc_shmem) // NOLINT
|
||||
TEST(PushPull, Multipart_SingleThreaded_inproc_shmem) // NOLINT
|
||||
{
|
||||
RunSingleThreadedMultipart("shmem", "inproc://test1", "inproc://test2");
|
||||
RunSingleThreadedMultipart("shmem", "inproc://test1", "inproc://test2", false);
|
||||
}
|
||||
|
||||
TEST(PushPull, Multipart_ST_ipc_zeromq) // NOLINT
|
||||
TEST(PushPull, Multipart_SingleThreaded_inproc_shmem_expanded_metadata) // NOLINT
|
||||
{
|
||||
RunSingleThreadedMultipart("zeromq", "ipc://test_Multipart_ST_ipc_zeromq_1", "ipc://test_Multipart_ST_ipc_zeromq_2");
|
||||
RunSingleThreadedMultipart("shmem", "inproc://test1", "inproc://test2", true);
|
||||
}
|
||||
|
||||
TEST(PushPull, Multipart_ST_ipc_shmem) // NOLINT
|
||||
TEST(PushPull, Multipart_SingleThreaded_ipc_zeromq) // NOLINT
|
||||
{
|
||||
RunSingleThreadedMultipart("shmem", "ipc://test_Multipart_ST_ipc_shmem_1", "ipc://test_Multipart_ST_ipc_shmem_2");
|
||||
RunSingleThreadedMultipart("zeromq", "ipc://test_Multipart_SingleThreaded_ipc_zeromq_1", "ipc://test_Multipart_SingleThreaded_ipc_zeromq_2", false);
|
||||
}
|
||||
|
||||
TEST(PushPull, Multipart_MT_inproc_zeromq) // NOLINT
|
||||
TEST(PushPull, Multipart_SingleThreaded_ipc_shmem) // NOLINT
|
||||
{
|
||||
RunMultiThreadedMultipart("zeromq", "inproc://test_1");
|
||||
RunSingleThreadedMultipart("shmem", "ipc://test_Multipart_SingleThreaded_ipc_shmem_1", "ipc://test_Multipart_SingleThreaded_ipc_shmem_2", false);
|
||||
}
|
||||
|
||||
TEST(PushPull, Multipart_MT_inproc_shmem) // NOLINT
|
||||
TEST(PushPull, Multipart_SingleThreaded_ipc_shmem_expanded_metadata) // NOLINT
|
||||
{
|
||||
RunMultiThreadedMultipart("shmem", "inproc://test_1");
|
||||
RunSingleThreadedMultipart("shmem", "ipc://test_Multipart_SingleThreaded_ipc_shmem_1", "ipc://test_Multipart_SingleThreaded_ipc_shmem_2", true);
|
||||
}
|
||||
|
||||
TEST(PushPull, Multipart_MT_ipc_zeromq) // NOLINT
|
||||
TEST(PushPull, Multipart_MultiThreaded_inproc_zeromq) // NOLINT
|
||||
{
|
||||
RunMultiThreadedMultipart("zeromq", "ipc://test_Multipart_MT_ipc_zeromq_1");
|
||||
RunMultiThreadedMultipart("zeromq", "inproc://test_1", false);
|
||||
}
|
||||
|
||||
TEST(PushPull, Multipart_MT_ipc_shmem) // NOLINT
|
||||
TEST(PushPull, Multipart_MultiThreaded_inproc_shmem) // NOLINT
|
||||
{
|
||||
RunMultiThreadedMultipart("shmem", "ipc://test_Multipart_MT_ipc_shmem_1");
|
||||
RunMultiThreadedMultipart("shmem", "inproc://test_1", false);
|
||||
}
|
||||
|
||||
TEST(PushPull, Multipart_MultiThreaded_inproc_shmem_expanded_metadata) // NOLINT
|
||||
{
|
||||
RunMultiThreadedMultipart("shmem", "inproc://test_1", true);
|
||||
}
|
||||
|
||||
TEST(PushPull, Multipart_MultiThreaded_ipc_zeromq) // NOLINT
|
||||
{
|
||||
RunMultiThreadedMultipart("zeromq", "ipc://test_Multipart_MultiThreaded_ipc_zeromq_1", false);
|
||||
}
|
||||
|
||||
TEST(PushPull, Multipart_MultiThreaded_ipc_shmem) // NOLINT
|
||||
{
|
||||
RunMultiThreadedMultipart("shmem", "ipc://test_Multipart_MultiThreaded_ipc_shmem_1", false);
|
||||
}
|
||||
|
||||
TEST(PushPull, Multipart_MultiThreaded_ipc_shmem_expanded_metadata) // NOLINT
|
||||
{
|
||||
RunMultiThreadedMultipart("shmem", "ipc://test_Multipart_MultiThreaded_ipc_shmem_1", true);
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
@@ -22,7 +22,7 @@ using namespace std;
|
||||
using namespace fair::mq::test;
|
||||
using namespace fair::mq::tools;
|
||||
|
||||
auto RunReqRep(string transport) -> void
|
||||
auto RunReqRep(string transport, const string& extraDeviceCmdArgs) -> void
|
||||
{
|
||||
size_t session{fair::mq::tools::UuidHash()};
|
||||
string ipcFile("/tmp/fmq_" + to_string(session) + "_data_" + transport);
|
||||
@@ -38,6 +38,7 @@ auto RunReqRep(string transport) -> void
|
||||
<< " --shm-segment-size 100000000"
|
||||
<< " --session " << session
|
||||
<< " --color false"
|
||||
<< extraDeviceCmdArgs
|
||||
<< " --channel-config name=data,type=rep,method=bind,address=" << address;
|
||||
rep = execute(cmd.str(), "[REP]");
|
||||
});
|
||||
@@ -52,6 +53,7 @@ auto RunReqRep(string transport) -> void
|
||||
<< " --shm-segment-size 100000000"
|
||||
<< " --session " << session
|
||||
<< " --color false"
|
||||
<< extraDeviceCmdArgs
|
||||
<< " --channel-config name=data,type=req,method=connect,address=" << address;
|
||||
req1 = execute(cmd.str(), "[REQ1]");
|
||||
});
|
||||
@@ -66,6 +68,7 @@ auto RunReqRep(string transport) -> void
|
||||
<< " --shm-segment-size 100000000"
|
||||
<< " --session " << session
|
||||
<< " --color false"
|
||||
<< extraDeviceCmdArgs
|
||||
<< " --channel-config name=data,type=req,method=connect,address=" << address;
|
||||
req2 = execute(cmd.str(), "[REQ2]");
|
||||
});
|
||||
@@ -82,12 +85,17 @@ auto RunReqRep(string transport) -> void
|
||||
|
||||
TEST(ReqRep, zeromq)
|
||||
{
|
||||
EXPECT_EXIT(RunReqRep("zeromq"), ::testing::ExitedWithCode(0), "REQ-REP test successfull");
|
||||
EXPECT_EXIT(RunReqRep("zeromq", ""), ::testing::ExitedWithCode(0), "REQ-REP test successfull");
|
||||
}
|
||||
|
||||
TEST(ReqRep, shmem)
|
||||
{
|
||||
EXPECT_EXIT(RunReqRep("shmem"), ::testing::ExitedWithCode(0), "REQ-REP test successfull");
|
||||
EXPECT_EXIT(RunReqRep("shmem", ""), ::testing::ExitedWithCode(0), "REQ-REP test successfull");
|
||||
}
|
||||
|
||||
TEST(ReqRep, shmem_expanded_metadata)
|
||||
{
|
||||
EXPECT_EXIT(RunReqRep("shmem", " --shm-metadata-msg-size 2048"), ::testing::ExitedWithCode(0), "REQ-REP test successfull");
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
Reference in New Issue
Block a user