mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 16:46:47 +00:00
SDK: Remove instance limit from DDSSession
This commit is contained in:
parent
8dd0b25c06
commit
f14f507584
|
@ -65,8 +65,6 @@ struct DDSSession::Impl
|
||||||
, fId(to_string(fSession->create()))
|
, fId(to_string(fSession->create()))
|
||||||
, fStopOnDestruction(false)
|
, fStopOnDestruction(false)
|
||||||
{
|
{
|
||||||
setenv("DDS_SESSION_ID", fId.c_str(), 1);
|
|
||||||
|
|
||||||
fDDSService.subscribeOnError([](const dds::intercom_api::EErrorCode errorCode, const std::string& msg) {
|
fDDSService.subscribeOnError([](const dds::intercom_api::EErrorCode errorCode, const std::string& msg) {
|
||||||
std::cerr << "DDS error, error code: " << errorCode << ", error message: " << msg << std::endl;
|
std::cerr << "DDS error, error code: " << errorCode << ", error message: " << msg << std::endl;
|
||||||
});
|
});
|
||||||
|
@ -81,10 +79,6 @@ struct DDSSession::Impl
|
||||||
, fStopOnDestruction(false)
|
, fStopOnDestruction(false)
|
||||||
{
|
{
|
||||||
fSession->attach(fId);
|
fSession->attach(fId);
|
||||||
auto envId(std::getenv("DDS_SESSION_ID"));
|
|
||||||
if (envId != nullptr && std::string(envId) != fId) {
|
|
||||||
setenv("DDS_SESSION_ID", fId.c_str(), 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
fDDSService.subscribeOnError([](const dds::intercom_api::EErrorCode errorCode, const std::string& msg) {
|
fDDSService.subscribeOnError([](const dds::intercom_api::EErrorCode errorCode, const std::string& msg) {
|
||||||
std::cerr << "DDS error, error code: " << errorCode << ", error message: " << msg << std::endl;
|
std::cerr << "DDS error, error code: " << errorCode << ", error message: " << msg << std::endl;
|
||||||
|
@ -99,11 +93,6 @@ struct DDSSession::Impl
|
||||||
, fId(to_string(fSession->getSessionID()))
|
, fId(to_string(fSession->getSessionID()))
|
||||||
, fStopOnDestruction(false)
|
, fStopOnDestruction(false)
|
||||||
{
|
{
|
||||||
auto envId(std::getenv("DDS_SESSION_ID"));
|
|
||||||
if (envId != nullptr && std::string(envId) != fId) {
|
|
||||||
setenv("DDS_SESSION_ID", fId.c_str(), 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Sanity check
|
// Sanity check
|
||||||
if (!fSession->IsRunning()) {
|
if (!fSession->IsRunning()) {
|
||||||
throw std::runtime_error("Given CSession must be running");
|
throw std::runtime_error("Given CSession must be running");
|
||||||
|
@ -123,10 +112,6 @@ struct DDSSession::Impl
|
||||||
Impl(Impl&&) = delete;
|
Impl(Impl&&) = delete;
|
||||||
Impl& operator=(Impl&&) = delete;
|
Impl& operator=(Impl&&) = delete;
|
||||||
|
|
||||||
struct Tag {};
|
|
||||||
friend auto operator<<(std::ostream& os, Tag) -> std::ostream& { return os << "DDSSession"; }
|
|
||||||
tools::InstanceLimiter<Tag, 1> fCount;
|
|
||||||
|
|
||||||
DDSEnvironment fEnv;
|
DDSEnvironment fEnv;
|
||||||
DDSRMSPlugin fRMSPlugin;
|
DDSRMSPlugin fRMSPlugin;
|
||||||
Path fRMSConfig;
|
Path fRMSConfig;
|
||||||
|
|
|
@ -11,13 +11,18 @@
|
||||||
|
|
||||||
#include "TestEnvironment.h"
|
#include "TestEnvironment.h"
|
||||||
|
|
||||||
#include <asio/io_context.hpp>
|
|
||||||
#include <chrono>
|
|
||||||
#include <cstdlib>
|
|
||||||
#include <fairlogger/Logger.h>
|
|
||||||
#include <fairmq/SDK.h>
|
#include <fairmq/SDK.h>
|
||||||
#include <fairmq/tools/Strings.h>
|
#include <fairmq/tools/Strings.h>
|
||||||
|
|
||||||
|
#include <fairlogger/Logger.h>
|
||||||
|
|
||||||
|
#include <asio/io_context.hpp>
|
||||||
#include <gtest/gtest.h>
|
#include <gtest/gtest.h>
|
||||||
|
|
||||||
|
#include <algorithm> // for_each
|
||||||
|
#include <array>
|
||||||
|
#include <chrono>
|
||||||
|
#include <cstdlib>
|
||||||
#include <thread>
|
#include <thread>
|
||||||
|
|
||||||
namespace fair {
|
namespace fair {
|
||||||
|
@ -82,8 +87,7 @@ struct TopologyFixture : ::testing::Test
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
auto TearDown() -> void override {
|
auto TearDown() -> void override {}
|
||||||
}
|
|
||||||
|
|
||||||
LoggerConfig mLoggerConfig;
|
LoggerConfig mLoggerConfig;
|
||||||
std::string mDDSTopoFile;
|
std::string mDDSTopoFile;
|
||||||
|
@ -93,14 +97,70 @@ struct TopologyFixture : ::testing::Test
|
||||||
asio::io_context mIoContext;
|
asio::io_context mIoContext;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct AsyncOpFixture : ::testing::Test
|
struct MultipleTopologiesFixture : ::testing::Test
|
||||||
{
|
{
|
||||||
auto SetUp() -> void override {
|
MultipleTopologiesFixture()
|
||||||
|
: mDDSTopoFile(tools::ToString(SDK_TESTSUITE_SOURCE_DIR, "/test_topo.xml"))
|
||||||
|
, mDDSEnv(CMAKE_CURRENT_BINARY_DIR)
|
||||||
|
, mDDSSessions{ sdk::DDSSession(mDDSEnv),
|
||||||
|
sdk::DDSSession(mDDSEnv) }
|
||||||
|
, mDDSTopologies{ sdk::DDSTopology(sdk::DDSTopology::Path(mDDSTopoFile), mDDSEnv),
|
||||||
|
sdk::DDSTopology(sdk::DDSTopology::Path(mDDSTopoFile), mDDSEnv) }
|
||||||
|
{
|
||||||
|
std::for_each(mDDSSessions.begin(), mDDSSessions.end(), [](sdk::DDSSession& s) {
|
||||||
|
s.StopOnDestruction();
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
auto TearDown() -> void override {
|
auto SetUp() -> void override
|
||||||
|
{
|
||||||
|
LOG(info) << mDDSEnv;
|
||||||
|
for (int i = 0; i < mNumSessions; ++i) {
|
||||||
|
LOG(info) << "##### SESSION " << i << " #####";
|
||||||
|
LOG(info) << mDDSSessions[i];
|
||||||
|
LOG(info) << mDDSTopologies[i];
|
||||||
|
auto n(mDDSTopologies[i].GetNumRequiredAgents());
|
||||||
|
mDDSSessions[i].SubmitAgents(n);
|
||||||
|
mDDSSessions[i].ActivateTopology(mDDSTopologies[i]);
|
||||||
|
|
||||||
|
std::vector<sdk::DDSAgent> agents = mDDSSessions[i].RequestAgentInfo();
|
||||||
|
LOG(info) << "##### AgentInfo:";
|
||||||
|
LOG(info) << "size: " << agents.size();
|
||||||
|
for (const auto& a : agents) {
|
||||||
|
LOG(info) << a;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector<sdk::DDSTask> tasks = mDDSSessions[i].RequestTaskInfo();
|
||||||
|
LOG(info) << "##### TaskInfo:";
|
||||||
|
LOG(info) << "size: " << tasks.size();
|
||||||
|
for (const auto& t : tasks) {
|
||||||
|
LOG(info) << t;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector<sdk::DDSCollection> collections = mDDSTopologies[i].GetCollections();
|
||||||
|
LOG(info) << "##### CollectionInfo:";
|
||||||
|
LOG(info) << "size: " << collections.size();
|
||||||
|
for (const auto& c : collections) {
|
||||||
|
LOG(info) << c;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
auto TearDown() -> void override {}
|
||||||
|
|
||||||
|
static constexpr int mNumSessions = 2;
|
||||||
|
LoggerConfig mLoggerConfig;
|
||||||
|
std::string mDDSTopoFile;
|
||||||
|
sdk::DDSEnvironment mDDSEnv;
|
||||||
|
std::array<sdk::DDSSession, mNumSessions> mDDSSessions;
|
||||||
|
std::array<sdk::DDSTopology, mNumSessions> mDDSTopologies;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct AsyncOpFixture : ::testing::Test
|
||||||
|
{
|
||||||
|
auto SetUp() -> void override {}
|
||||||
|
auto TearDown() -> void override {}
|
||||||
|
|
||||||
LoggerConfig mLoggerConfig;
|
LoggerConfig mLoggerConfig;
|
||||||
asio::io_context mIoContext;
|
asio::io_context mIoContext;
|
||||||
};
|
};
|
||||||
|
|
|
@ -13,9 +13,30 @@
|
||||||
#include <fairmq/sdk/Topology.h>
|
#include <fairmq/sdk/Topology.h>
|
||||||
#include <fairmq/Tools.h>
|
#include <fairmq/Tools.h>
|
||||||
|
|
||||||
|
#include <thread>
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
|
|
||||||
using Topology = fair::mq::test::TopologyFixture;
|
using Topology = fair::mq::test::TopologyFixture;
|
||||||
|
using MultipleTopologies = fair::mq::test::MultipleTopologiesFixture;
|
||||||
|
|
||||||
|
void control(fair::mq::sdk::Topology& topo)
|
||||||
|
{
|
||||||
|
using fair::mq::sdk::TopologyTransition;
|
||||||
|
|
||||||
|
for (auto transition : {TopologyTransition::InitDevice,
|
||||||
|
TopologyTransition::CompleteInit,
|
||||||
|
TopologyTransition::Bind,
|
||||||
|
TopologyTransition::Connect,
|
||||||
|
TopologyTransition::InitTask,
|
||||||
|
TopologyTransition::Run,
|
||||||
|
TopologyTransition::Stop,
|
||||||
|
TopologyTransition::ResetTask,
|
||||||
|
TopologyTransition::ResetDevice,
|
||||||
|
TopologyTransition::End}) {
|
||||||
|
ASSERT_EQ(topo.ChangeState(transition).first, std::error_code());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
TEST(TopologyHelper, MakeTopology)
|
TEST(TopologyHelper, MakeTopology)
|
||||||
{
|
{
|
||||||
|
@ -24,7 +45,6 @@ TEST(TopologyHelper, MakeTopology)
|
||||||
// This is only needed for this unit test
|
// This is only needed for this unit test
|
||||||
test::LoggerConfig cfg;
|
test::LoggerConfig cfg;
|
||||||
sdk::DDSEnv env(CMAKE_CURRENT_BINARY_DIR);
|
sdk::DDSEnv env(CMAKE_CURRENT_BINARY_DIR);
|
||||||
/////////////////////////////////////
|
|
||||||
|
|
||||||
std::string topoFile(tools::ToString(SDK_TESTSUITE_SOURCE_DIR, "/test_topo.xml"));
|
std::string topoFile(tools::ToString(SDK_TESTSUITE_SOURCE_DIR, "/test_topo.xml"));
|
||||||
dds::topology_api::CTopology nativeTopo(topoFile);
|
dds::topology_api::CTopology nativeTopo(topoFile);
|
||||||
|
@ -34,6 +54,59 @@ TEST(TopologyHelper, MakeTopology)
|
||||||
nativeSession->shutdown();
|
nativeSession->shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(MultipleTopologies, Construction)
|
||||||
|
{
|
||||||
|
using namespace fair::mq;
|
||||||
|
|
||||||
|
std::array<sdk::Topology, mNumSessions> topos{
|
||||||
|
sdk::Topology(mDDSTopologies[0], mDDSSessions[0]),
|
||||||
|
sdk::Topology(mDDSTopologies[1], mDDSSessions[1])
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(MultipleTopologies, ChangeStateFullDeviceLifecycle)
|
||||||
|
{
|
||||||
|
using namespace fair::mq;
|
||||||
|
|
||||||
|
std::array<sdk::Topology, mNumSessions> topos{
|
||||||
|
sdk::Topology(mDDSTopologies[0], mDDSSessions[0]),
|
||||||
|
sdk::Topology(mDDSTopologies[1], mDDSSessions[1])
|
||||||
|
};
|
||||||
|
|
||||||
|
for (int i = 0; i < mNumSessions; ++i) {
|
||||||
|
using fair::mq::sdk::TopologyTransition;
|
||||||
|
|
||||||
|
for (auto transition : {TopologyTransition::InitDevice,
|
||||||
|
TopologyTransition::CompleteInit,
|
||||||
|
TopologyTransition::Bind,
|
||||||
|
TopologyTransition::Connect,
|
||||||
|
TopologyTransition::InitTask,
|
||||||
|
TopologyTransition::Run,
|
||||||
|
TopologyTransition::Stop,
|
||||||
|
TopologyTransition::ResetTask,
|
||||||
|
TopologyTransition::ResetDevice,
|
||||||
|
TopologyTransition::End}) {
|
||||||
|
ASSERT_EQ(topos[i].ChangeState(transition).first, std::error_code());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(MultipleTopologies, ChangeStateFullDeviceLifecycleConcurrent)
|
||||||
|
{
|
||||||
|
using namespace fair::mq;
|
||||||
|
|
||||||
|
std::array<sdk::Topology, mNumSessions> topos{
|
||||||
|
sdk::Topology(mDDSTopologies[0], mDDSSessions[0]),
|
||||||
|
sdk::Topology(mDDSTopologies[1], mDDSSessions[1])
|
||||||
|
};
|
||||||
|
|
||||||
|
std::thread t0(control, std::ref(topos[0]));
|
||||||
|
std::thread t1(control, std::ref(topos[1]));
|
||||||
|
t0.join();
|
||||||
|
t1.join();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
TEST_F(Topology, Construction)
|
TEST_F(Topology, Construction)
|
||||||
{
|
{
|
||||||
fair::mq::sdk::Topology topo(mDDSTopo, mDDSSession);
|
fair::mq::sdk::Topology topo(mDDSTopo, mDDSSession);
|
||||||
|
|
Loading…
Reference in New Issue
Block a user