From 363576496d76dfd2413d5d1892d38e769c7a3c4c Mon Sep 17 00:00:00 2001 From: Dennis Klein Date: Thu, 25 Jul 2019 14:40:24 +0200 Subject: [PATCH] SDK: Pass CSession as shared ptr Even though it is copyable the copy does not work. --- fairmq/sdk/DDSSession.cxx | 41 ++++++++++++-------- fairmq/sdk/DDSSession.h | 2 +- fairmq/sdk/Topology.cxx | 12 +++--- fairmq/sdk/Topology.h | 17 ++++---- test/CMakeLists.txt | 2 +- test/sdk/{TopologyFixture.h => Fixtures.h} | 10 ++--- test/sdk/_dds.cxx | 45 ++++++++++++---------- test/sdk/_topology.cxx | 10 +++-- 8 files changed, 75 insertions(+), 64 deletions(-) rename test/sdk/{TopologyFixture.h => Fixtures.h} (88%) diff --git a/fairmq/sdk/DDSSession.cxx b/fairmq/sdk/DDSSession.cxx index 04c42ab8..6691cd44 100644 --- a/fairmq/sdk/DDSSession.cxx +++ b/fairmq/sdk/DDSSession.cxx @@ -59,8 +59,9 @@ struct DDSSession::Impl explicit Impl(DDSEnvironment env) : fEnv(std::move(env)) , fRMSPlugin(DDSRMSPlugin::localhost) + , fSession(std::make_shared()) , fDDSCustomCmd(fDDSService) - , fId(to_string(fSession.create())) + , fId(to_string(fSession->create())) , fStopOnDestruction(false) { setenv("DDS_SESSION_ID", fId.c_str(), 1); @@ -73,13 +74,14 @@ struct DDSSession::Impl explicit Impl(Id existing, DDSEnvironment env) : fEnv(std::move(env)) , fRMSPlugin(DDSRMSPlugin::localhost) + , fSession(std::make_shared()) , fDDSCustomCmd(fDDSService) , fId(std::move(existing)) , fStopOnDestruction(false) { - fSession.attach(fId); - std::string envId(std::getenv("DDS_SESSION_ID")); - if (envId != fId) { + fSession->attach(fId); + auto envId(std::getenv("DDS_SESSION_ID")); + if (envId != nullptr && std::string(envId) != fId) { setenv("DDS_SESSION_ID", fId.c_str(), 1); } @@ -88,16 +90,21 @@ struct DDSSession::Impl }); } - explicit Impl(dds::tools_api::CSession nativeSession, DDSEnv env) + explicit Impl(std::shared_ptr nativeSession, DDSEnv env) : fEnv(std::move(env)) , fRMSPlugin(DDSRMSPlugin::localhost) , fSession(std::move(nativeSession)) , fDDSCustomCmd(fDDSService) - , fId(to_string(fSession.getSessionID())) + , fId(to_string(fSession->getSessionID())) , fStopOnDestruction(false) { + auto envId(std::getenv("DDS_SESSION_ID")); + if (envId != nullptr && std::string(envId) != fId) { + setenv("DDS_SESSION_ID", fId.c_str(), 1); + } + // Sanity check - if (!fSession.IsRunning()) { + if (!fSession->IsRunning()) { throw std::runtime_error("Given CSession must be running"); } } @@ -105,7 +112,7 @@ struct DDSSession::Impl ~Impl() { if (fStopOnDestruction) { - fSession.shutdown(); + fSession->shutdown(); } } @@ -122,7 +129,7 @@ struct DDSSession::Impl DDSEnvironment fEnv; DDSRMSPlugin fRMSPlugin; Path fRMSConfig; - dds::tools_api::CSession fSession; + std::shared_ptr fSession; dds::intercom_api::CIntercomService fDDSService; dds::intercom_api::CCustomCmd fDDSCustomCmd; Id fId; @@ -137,17 +144,17 @@ DDSSession::DDSSession(Id existing, DDSEnvironment env) : fImpl(std::make_shared(std::move(existing), std::move(env))) {} -DDSSession::DDSSession(dds::tools_api::CSession nativeSession, DDSEnv env) +DDSSession::DDSSession(std::shared_ptr nativeSession, DDSEnv env) : fImpl(std::make_shared(std::move(nativeSession), std::move(env))) {} auto DDSSession::GetEnv() const -> DDSEnvironment { return fImpl->fEnv; } -auto DDSSession::IsRunning() const -> bool { return fImpl->fSession.IsRunning(); } +auto DDSSession::IsRunning() const -> bool { return fImpl->fSession->IsRunning(); } auto DDSSession::GetId() const -> Id { return fImpl->fId; } -auto DDSSession::Stop() -> void { return fImpl->fSession.shutdown(); } +auto DDSSession::Stop() -> void { return fImpl->fSession->shutdown(); } auto DDSSession::GetRMSPlugin() const -> DDSRMSPlugin { return fImpl->fRMSPlugin; } @@ -183,7 +190,7 @@ auto DDSSession::SubmitAgents(Quantity agents) -> void blocker.Signal(); }); - fImpl->fSession.sendRequest(submitRequest); + fImpl->fSession->sendRequest(submitRequest); blocker.Wait(); // perfect @@ -204,12 +211,12 @@ auto DDSSession::RequestAgentInfo() -> AgentInfo info.executingAgentsCount = _response.m_executingAgentsCount; info.agents.reserve(_response.m_activeAgentsCount); } - info.agents.emplace_back(*this, std::move(_response.m_agentInfo)); + info.agents.emplace_back(*this, _response.m_agentInfo); }); agentInfoRequest->setMessageCallback( [](const dds::tools_api::SMessageResponseData& _message) { LOG(debug) << _message; }); agentInfoRequest->setDoneCallback([&]() { blocker.Signal(); }); - fImpl->fSession.sendRequest(agentInfoRequest); + fImpl->fSession->sendRequest(agentInfoRequest); blocker.Wait(); return info; @@ -230,7 +237,7 @@ auto DDSSession::RequestCommanderInfo() -> CommanderInfo commanderInfoRequest->setMessageCallback( [](const dds::tools_api::SMessageResponseData& _message) { LOG(debug) << _message; }); commanderInfoRequest->setDoneCallback([&]() { blocker.Signal(); }); - fImpl->fSession.sendRequest(commanderInfoRequest); + fImpl->fSession->sendRequest(commanderInfoRequest); blocker.Wait(); return info; @@ -269,7 +276,7 @@ auto DDSSession::ActivateTopology(DDSTopology topo) -> void topologyRequest->setMessageCallback( [](const dds::tools_api::SMessageResponseData& _message) { LOG(debug) << _message; }); topologyRequest->setDoneCallback([&]() { blocker.Signal(); }); - fImpl->fSession.sendRequest(topologyRequest); + fImpl->fSession->sendRequest(topologyRequest); blocker.Wait(); WaitForExecutingAgents(topo.GetNumRequiredAgents()); diff --git a/fairmq/sdk/DDSSession.h b/fairmq/sdk/DDSSession.h index 44c846d2..9dc7ccbb 100644 --- a/fairmq/sdk/DDSSession.h +++ b/fairmq/sdk/DDSSession.h @@ -60,7 +60,7 @@ class DDSSession /// @brief Construct with already existing native DDS API objects /// @param nativeSession Existing and initialized CSession (either via create() or attach()) /// @param env Optional DDSEnv - explicit DDSSession(dds::tools_api::CSession nativeSession, DDSEnv env = {}); + explicit DDSSession(std::shared_ptr nativeSession, DDSEnv env = {}); auto GetEnv() const -> DDSEnvironment; auto GetId() const -> Id; diff --git a/fairmq/sdk/Topology.cxx b/fairmq/sdk/Topology.cxx index 418aede0..2bf69774 100644 --- a/fairmq/sdk/Topology.cxx +++ b/fairmq/sdk/Topology.cxx @@ -105,20 +105,22 @@ Topology::Topology(DDSTopology topo, DDSSession session) } Topology::Topology(dds::topology_api::CTopology nativeTopo, - dds::tools_api::CSession nativeSession, + std::shared_ptr nativeSession, DDSEnv env) : Topology(DDSTopo(std::move(nativeTopo), env), DDSSession(std::move(nativeSession), env)) -{} +{ + if (fDDSSession.RequestCommanderInfo().activeTopologyName != fDDSTopo.GetName()) { + throw std::runtime_error("Given topology must be activated"); + } +} auto Topology::ChangeState(TopologyTransition transition, ChangeStateCallback cb, Duration timeout) -> void { { std::unique_lock lock(fMtx); if (fStateChangeOngoing) { - LOG(error) << "A state change request is already in progress, concurrent requests are currently not supported"; + throw std::runtime_error("A state change request is already in progress, concurrent requests are currently not supported"); lock.unlock(); - cb({{AsyncOpResultCode::Error, "A state change request is already in progress, concurrent requests are currently not supported"}, fState}); - return; } LOG(info) << "Initiating ChangeState with " << transition << " to " << expectedState.at(transition); fStateChangeOngoing = true; diff --git a/fairmq/sdk/Topology.h b/fairmq/sdk/Topology.h index 8813e7e9..e875a698 100644 --- a/fairmq/sdk/Topology.h +++ b/fairmq/sdk/Topology.h @@ -61,20 +61,21 @@ using TopologyTransition = fair::mq::Transition; struct MixedState : std::runtime_error { using std::runtime_error::runtime_error; }; -DeviceState AggregateState(const TopologyState& topologyState) +inline DeviceState AggregateState(const TopologyState& topologyState) { DeviceState first = topologyState.begin()->second.state; if (std::all_of(topologyState.cbegin(), topologyState.cend(), [&](TopologyState::value_type i) { - return i.second.state == first; - })) { + return i.second.state == first; + })) { return first; - } else { - throw MixedState("State is not uniform"); } + + throw MixedState("State is not uniform"); + } -bool StateEqualsTo(const TopologyState& topologyState, DeviceState state) +inline bool StateEqualsTo(const TopologyState& topologyState, DeviceState state) { return AggregateState(topologyState) == state; } @@ -92,11 +93,11 @@ class Topology explicit Topology(DDSTopology topo, DDSSession session = DDSSession()); /// @brief (Re)Construct a FairMQ topology based on already existing native DDS API objects - /// @param nativeTopo Existing CTopology /// @param nativeSession Existing and initialized CSession (either via create() or attach()) + /// @param nativeTopo Existing CTopology that is activated on the given nativeSession /// @param env Optional DDSEnv (needed primarily for unit testing) explicit Topology(dds::topology_api::CTopology nativeTopo, - dds::tools_api::CSession nativeSession, + std::shared_ptr nativeSession, DDSEnv env = {}); explicit Topology(const Topology&) = delete; diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 74395e69..ee229223 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -289,7 +289,7 @@ if(BUILD_SDK) ${CMAKE_CURRENT_BINARY_DIR}/runner.cxx sdk/_dds.cxx sdk/_topology.cxx - sdk/TopologyFixture.h + sdk/Fixtures.h LINKS SDK diff --git a/test/sdk/TopologyFixture.h b/test/sdk/Fixtures.h similarity index 88% rename from test/sdk/TopologyFixture.h rename to test/sdk/Fixtures.h index 7a00e1d9..a9443b52 100644 --- a/test/sdk/TopologyFixture.h +++ b/test/sdk/Fixtures.h @@ -6,8 +6,8 @@ * copied verbatim in the file "LICENSE" * ********************************************************************************/ -#ifndef FAIR_MQ_TEST_TOPOLOGYFIXTURE -#define FAIR_MQ_TEST_TOPOLOGYFIXTURE +#ifndef FAIR_MQ_TEST_FIXTURES +#define FAIR_MQ_TEST_FIXTURES #include "TestEnvironment.h" #include @@ -51,10 +51,6 @@ struct TopologyFixture : ::testing::Test mDDSSession.StopOnDestruction(); } - // auto ActivateDDSTopology(const std::string& topology_file) -> void { - // LOG(debug) << "ActivateDDSTopology(\"" << topology_file << "\")"; - // } - auto SetUp() -> void override { LOG(info) << mDDSEnv; LOG(info) << mDDSSession; @@ -78,5 +74,5 @@ struct TopologyFixture : ::testing::Test } /* namespace mq */ } /* namespace fair */ -#endif /* FAIR_MQ_TEST_TOPOLOGYFIXTURE */ +#endif /* FAIR_MQ_TEST_FIXTURES */ diff --git a/test/sdk/_dds.cxx b/test/sdk/_dds.cxx index b22ac0b2..583c3a9e 100644 --- a/test/sdk/_dds.cxx +++ b/test/sdk/_dds.cxx @@ -6,41 +6,44 @@ * copied verbatim in the file "LICENSE" * ********************************************************************************/ -#include -#include -#include -#include -#include -#include +#include "Fixtures.h" + +#include +#include namespace { -auto setup() -> void +TEST(DDSEnvironment, Construction) { - fair::Logger::SetConsoleSeverity("debug"); - fair::Logger::DefineVerbosity("user1", - fair::VerbositySpec::Make(fair::VerbositySpec::Info::timestamp_us, - fair::VerbositySpec::Info::severity)); - fair::Logger::SetVerbosity("user1"); - fair::Logger::SetConsoleColor(); -} - -TEST(DDS, Environment) -{ - setup(); + fair::mq::test::LoggerConfig cfg; fair::mq::sdk::DDSEnvironment env(CMAKE_CURRENT_BINARY_DIR); LOG(debug) << env; } -TEST(DDS, Session) +TEST(DDSSession, Construction) { - setup(); - + fair::mq::test::LoggerConfig cfg; fair::mq::sdk::DDSEnvironment env(CMAKE_CURRENT_BINARY_DIR); + fair::mq::sdk::DDSSession session(env); session.StopOnDestruction(); LOG(debug) << session; } +TEST(DDSSession, Construction2) +{ + fair::mq::test::LoggerConfig cfg; + fair::mq::sdk::DDSEnvironment env(CMAKE_CURRENT_BINARY_DIR); + + auto nativeSession(std::make_shared()); + nativeSession->create(); + + fair::mq::sdk::DDSSession session(nativeSession, env); + session.StopOnDestruction(); + LOG(debug) << session; + + session.RequestCommanderInfo(); +} + } // namespace diff --git a/test/sdk/_topology.cxx b/test/sdk/_topology.cxx index e280a7d7..f65f18fd 100644 --- a/test/sdk/_topology.cxx +++ b/test/sdk/_topology.cxx @@ -6,7 +6,7 @@ * copied verbatim in the file "LICENSE" * ********************************************************************************/ -#include "TopologyFixture.h" +#include "Fixtures.h" #include #include @@ -20,13 +20,15 @@ using Topology = fair::mq::test::TopologyFixture; TEST(Topology2, ConstructionWithNativeDdsApiObjects) { // This is only needed for this unit test + fair::mq::test::LoggerConfig cfg; fair::mq::sdk::DDSEnv env(CMAKE_CURRENT_BINARY_DIR); + ///////////////////////////////////////// // Example usage: dds::topology_api::CTopology nativeTopo(fair::mq::tools::ToString(SDK_TESTSUITE_SOURCE_DIR, "/test_topo.xml")); - dds::tools_api::CSession nativeSession; - nativeSession.create(); - fair::mq::sdk::Topology topo(nativeTopo, nativeSession, env); + auto nativeSession(std::make_shared()); + nativeSession->create(); + EXPECT_THROW(fair::mq::sdk::Topology topo(nativeTopo, nativeSession, env), std::runtime_error); } TEST_F(Topology, Construction)