SDK: Require DDS 2.5.36 and support new Tools API

This commit is contained in:
Alexey Rybalchenko 2019-08-13 19:07:48 +02:00 committed by Dennis Klein
parent 7f0237d97d
commit eaa8f5cbdd
10 changed files with 229 additions and 107 deletions

View File

@ -75,7 +75,7 @@ if(BUILD_NANOMSG_TRANSPORT)
endif() endif()
if(BUILD_SDK) if(BUILD_SDK)
set(required_dds_version 2.5.22) set(required_dds_version 2.5.36)
else() else()
set(required_dds_version 2.4) set(required_dds_version 2.4)
endif() endif()

View File

@ -10,6 +10,7 @@
#define FAIR_MQ_SDK_H #define FAIR_MQ_SDK_H
// IWYU pragma: begin_exports // IWYU pragma: begin_exports
#include <fairmq/sdk/DDSAgent.h>
#include <fairmq/sdk/DDSInfo.h> #include <fairmq/sdk/DDSInfo.h>
#include <fairmq/sdk/DDSEnvironment.h> #include <fairmq/sdk/DDSEnvironment.h>
#include <fairmq/sdk/DDSSession.h> #include <fairmq/sdk/DDSSession.h>

View File

@ -15,6 +15,7 @@ set(target SDK)
set(SDK_PUBLIC_HEADER_FILES set(SDK_PUBLIC_HEADER_FILES
../SDK.h ../SDK.h
DDSAgent.h
DDSEnvironment.h DDSEnvironment.h
DDSSession.h DDSSession.h
DDSTopology.h DDSTopology.h

94
fairmq/sdk/DDSAgent.h Normal file
View File

@ -0,0 +1,94 @@
/********************************************************************************
* Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIR_MQ_SDK_DDSSAGENT_H
#define FAIR_MQ_SDK_DDSSAGENT_H
#include <fairmq/sdk/DDSSession.h>
#include <ostream>
#include <string>
#include <chrono>
#include <cstdint>
namespace fair {
namespace mq {
namespace sdk {
/**
* @class DDSAgent <fairmq/sdk/DDSAgent.h>
* @brief Represents a DDS agent
*/
class DDSAgent
{
public:
using Id = uint64_t;
using Pid = uint32_t;
explicit DDSAgent(DDSSession session,
Id id,
Pid pid,
std::string state,
std::string path,
std::string host,
bool lobbyLeader,
std::chrono::milliseconds startupTime,
Id taskId,
std::string username)
: fSession(std::move(session))
, fId(id)
, fPid(pid)
, fState(std::move(state))
, fDDSPath(std::move(path))
, fHost(std::move(host))
, fLobbyLeader(lobbyLeader)
, fStartupTime(startupTime)
, fTaskId(taskId)
, fUsername(std::move(username))
{}
DDSSession GetSession() const { return fSession; }
Id GetId() const { return fId; }
Pid GetPid() const { return fPid; }
std::string GetState() const { return fState; }
std::string GetHost() const { return fHost; }
bool IsLobbyLeader() const { return fLobbyLeader; }
std::chrono::milliseconds GetStartupTime() const { return fStartupTime; }
std::string GetUsername() const { return fUsername; }
friend auto operator<<(std::ostream& os, const DDSAgent& agent) -> std::ostream&
{
return os << "DDSAgent id: " << agent.fId
<< ", pid: " << agent.fPid
<< ", state: " << agent.fState
<< ", path: " << agent.fDDSPath
<< ", host: " << agent.fHost
<< ", lobbyLeader: " << agent.fLobbyLeader
<< ", startupTime: " << agent.fStartupTime.count()
<< ", taskId: " << agent.fTaskId
<< ", username: " << agent.fUsername;
}
private:
DDSSession fSession;
Id fId;
Pid fPid;
std::string fState;
std::string fDDSPath;
std::string fHost;
bool fLobbyLeader;
std::chrono::milliseconds fStartupTime;
Id fTaskId;
std::string fUsername;
};
} // namespace sdk
} // namespace mq
} // namespace fair
#endif /* FAIR_MQ_SDK_DDSSAGENT_H */

View File

@ -8,6 +8,7 @@
#include "DDSSession.h" #include "DDSSession.h"
#include <fairmq/sdk/DDSAgent.h>
#include <fairmq/sdk/DDSEnvironment.h> #include <fairmq/sdk/DDSEnvironment.h>
#include <fairmq/sdk/DDSTopology.h> #include <fairmq/sdk/DDSTopology.h>
#include <fairmq/Tools.h> #include <fairmq/Tools.h>
@ -180,76 +181,110 @@ auto DDSSession::SubmitAgents(Quantity agents) -> void
// Requesting to submit 0 agents is not meaningful // Requesting to submit 0 agents is not meaningful
assert(agents > 0); assert(agents > 0);
dds::tools_api::SSubmitRequestData submitInfo; using namespace dds::tools_api;
SSubmitRequestData submitInfo;
submitInfo.m_rms = tools::ToString(GetRMSPlugin()); submitInfo.m_rms = tools::ToString(GetRMSPlugin());
submitInfo.m_instances = agents; submitInfo.m_instances = agents;
submitInfo.m_config = GetRMSConfig().string(); submitInfo.m_config = GetRMSConfig().string();
tools::Semaphore blocker; tools::Semaphore blocker;
auto submitRequest = dds::tools_api::SSubmitRequest::makeRequest(submitInfo); auto submitRequest = SSubmitRequest::makeRequest(submitInfo);
submitRequest->setMessageCallback( submitRequest->setMessageCallback([](const SMessageResponseData& message){
[](const dds::tools_api::SMessageResponseData& message) { LOG(debug) << message; }); LOG(debug) << message.m_msg;
});
submitRequest->setDoneCallback([&]() { submitRequest->setDoneCallback([&]() {
LOG(debug) << agents << " Agents submitted"; LOG(debug) << agents << " Agents submitted";
blocker.Signal(); blocker.Signal();
}); });
fImpl->fSession->sendRequest<dds::tools_api::SSubmitRequest>(submitRequest); fImpl->fSession->sendRequest<SSubmitRequest>(submitRequest);
blocker.Wait(); blocker.Wait();
// perfect
WaitForIdleAgents(agents); WaitForIdleAgents(agents);
} }
auto DDSSession::RequestAgentInfo() -> AgentInfo auto DDSSession::RequestAgentCount() -> AgentCount
{ {
dds::tools_api::SAgentInfoRequestData agentInfoInfo; using namespace dds::tools_api;
tools::Semaphore blocker;
AgentInfo info;
auto agentInfoRequest = dds::tools_api::SAgentInfoRequest::makeRequest(agentInfoInfo);
agentInfoRequest->setResponseCallback(
[this, &info](const dds::tools_api::SAgentInfoResponseData& _response) {
if (_response.m_index == 0) {
info.activeAgentsCount = _response.m_activeAgentsCount;
info.idleAgentsCount = _response.m_idleAgentsCount;
info.executingAgentsCount = _response.m_executingAgentsCount;
info.agents.reserve(_response.m_activeAgentsCount);
}
info.agents.emplace_back(*this, _response.m_agentInfo);
});
agentInfoRequest->setMessageCallback(
[](const dds::tools_api::SMessageResponseData& _message) { LOG(debug) << _message; });
agentInfoRequest->setDoneCallback([&]() { blocker.Signal(); });
fImpl->fSession->sendRequest<dds::tools_api::SAgentInfoRequest>(agentInfoRequest);
blocker.Wait();
return info; SAgentCountRequest::response_t res;
fImpl->fSession->syncSendRequest<SAgentCountRequest>(SAgentCountRequest::request_t(), res);
AgentCount count;
count.active = res.m_activeAgentsCount;
count.idle = res.m_idleAgentsCount;
count.executing = res.m_executingAgentsCount;
return count;
}
auto DDSSession::RequestAgentInfo() -> std::vector<DDSAgent>
{
using namespace dds::tools_api;
SAgentInfoRequest::responseVector_t res;
fImpl->fSession->syncSendRequest<SAgentInfoRequest>(SAgentInfoRequest::request_t(), res);
std::vector<DDSAgent> agentInfo;
agentInfo.reserve(res.size());
for (const auto& a : res) {
agentInfo.emplace_back(
*this,
a.m_agentID,
a.m_agentPid,
a.m_agentState,
a.m_DDSPath,
a.m_host,
a.m_lobbyLeader,
a.m_startUpTime,
a.m_taskID,
a.m_username
);
}
return agentInfo;
}
auto DDSSession::RequestTaskInfo() -> std::vector<DDSTask>
{
using namespace dds::tools_api;
SAgentInfoRequest::responseVector_t res;
fImpl->fSession->syncSendRequest<SAgentInfoRequest>(SAgentInfoRequest::request_t(), res);
std::vector<DDSTask> taskInfo;
taskInfo.reserve(res.size());
for (auto& a : res) {
taskInfo.emplace_back(a.m_taskID);
}
return taskInfo;
} }
auto DDSSession::RequestCommanderInfo() -> CommanderInfo auto DDSSession::RequestCommanderInfo() -> CommanderInfo
{ {
dds::tools_api::SCommanderInfoRequestData commanderInfoInfo; using namespace dds::tools_api;
SCommanderInfoRequestData commanderInfo;
tools::Semaphore blocker; tools::Semaphore blocker;
std::string error; std::string error;
auto commanderInfoRequest = auto commanderInfoRequest = SCommanderInfoRequest::makeRequest(commanderInfo);
dds::tools_api::SCommanderInfoRequest::makeRequest(commanderInfoInfo);
CommanderInfo info; CommanderInfo info;
commanderInfoRequest->setResponseCallback( commanderInfoRequest->setResponseCallback([&info](const SCommanderInfoResponseData& _response) {
[&info](const dds::tools_api::SCommanderInfoResponseData& _response) { info.pid = _response.m_pid;
info.pid = _response.m_pid; info.activeTopologyName = _response.m_activeTopologyName;
info.activeTopologyName = _response.m_activeTopologyName; });
}); commanderInfoRequest->setMessageCallback([&](const SMessageResponseData& _message) {
commanderInfoRequest->setMessageCallback( if (_message.m_severity == dds::intercom_api::EMsgSeverity::error) {
[&](const dds::tools_api::SMessageResponseData& _message) { error = _message.m_msg;
if (_message.m_severity == dds::intercom_api::EMsgSeverity::error) { blocker.Signal();
error = _message.m_msg; } else {
blocker.Signal(); LOG(debug) << _message.m_msg;
} else { }
LOG(debug) << _message; });
}
});
commanderInfoRequest->setDoneCallback([&]() { blocker.Signal(); }); commanderInfoRequest->setDoneCallback([&]() { blocker.Signal(); });
fImpl->fSession->sendRequest<dds::tools_api::SCommanderInfoRequest>(commanderInfoRequest); fImpl->fSession->sendRequest<SCommanderInfoRequest>(commanderInfoRequest);
blocker.Wait(); blocker.Wait();
if (!error.empty()) { if (!error.empty()) {
@ -261,38 +296,41 @@ auto DDSSession::RequestCommanderInfo() -> CommanderInfo
auto DDSSession::WaitForExecutingAgents(Quantity minCount) -> void auto DDSSession::WaitForExecutingAgents(Quantity minCount) -> void
{ {
auto info(RequestAgentInfo()); auto count(RequestAgentCount());
int interval(8); int interval(8);
while (info.executingAgentsCount < minCount) { while (count.executing < minCount) {
std::this_thread::sleep_for(std::chrono::milliseconds(interval)); std::this_thread::sleep_for(std::chrono::milliseconds(interval));
interval = std::min(256, interval * 2); interval = std::min(256, interval * 2);
info = RequestAgentInfo(); count = RequestAgentCount();
} }
} }
auto DDSSession::WaitForIdleAgents(Quantity minCount) -> void auto DDSSession::WaitForIdleAgents(Quantity minCount) -> void
{ {
auto info(RequestAgentInfo()); auto count(RequestAgentCount());
int interval(8); int interval(8);
while (info.idleAgentsCount < minCount) { while (count.idle < minCount) {
std::this_thread::sleep_for(std::chrono::milliseconds(interval)); std::this_thread::sleep_for(std::chrono::milliseconds(interval));
interval = std::min(256, interval * 2); interval = std::min(256, interval * 2);
info = RequestAgentInfo(); count = RequestAgentCount();
} }
} }
auto DDSSession::ActivateTopology(DDSTopology topo) -> void auto DDSSession::ActivateTopology(DDSTopology topo) -> void
{ {
dds::tools_api::STopologyRequestData topologyInfo; using namespace dds::tools_api;
topologyInfo.m_updateType = dds::tools_api::STopologyRequestData::EUpdateType::ACTIVATE;
STopologyRequestData topologyInfo;
topologyInfo.m_updateType = STopologyRequestData::EUpdateType::ACTIVATE;
topologyInfo.m_topologyFile = topo.GetTopoFile().string(); topologyInfo.m_topologyFile = topo.GetTopoFile().string();
tools::Semaphore blocker; tools::Semaphore blocker;
auto topologyRequest = dds::tools_api::STopologyRequest::makeRequest(topologyInfo); auto topologyRequest = STopologyRequest::makeRequest(topologyInfo);
topologyRequest->setMessageCallback( topologyRequest->setMessageCallback([](const SMessageResponseData& _message) {
[](const dds::tools_api::SMessageResponseData& _message) { LOG(debug) << _message; }); LOG(debug) << _message.m_msg;
});
topologyRequest->setDoneCallback([&]() { blocker.Signal(); }); topologyRequest->setDoneCallback([&]() { blocker.Signal(); });
fImpl->fSession->sendRequest<dds::tools_api::STopologyRequest>(topologyRequest); fImpl->fSession->sendRequest<STopologyRequest>(topologyRequest);
blocker.Wait(); blocker.Wait();
WaitForExecutingAgents(topo.GetNumRequiredAgents()); WaitForExecutingAgents(topo.GetNumRequiredAgents());
@ -337,21 +375,6 @@ auto operator<<(std::ostream& os, const DDSSession& session) -> std::ostream&
return os << "$DDS_SESSION_ID: " << session.GetId(); return os << "$DDS_SESSION_ID: " << session.GetId();
} }
auto DDSAgent::GetSession() const -> DDSSession
{
return fSession;
}
auto DDSAgent::GetInfoStr() const -> std::string
{
return fInfoStr;
}
auto operator<<(std::ostream& os, const DDSAgent& agent) -> std::ostream&
{
return os << agent.GetInfoStr();
}
} // namespace sdk } // namespace sdk
} // namespace mq } // namespace mq
} // namespace fair } // namespace fair

View File

@ -21,6 +21,7 @@
#include <stdexcept> #include <stdexcept>
#include <string> #include <string>
#include <functional> #include <functional>
#include <vector>
namespace fair { namespace fair {
namespace mq { namespace mq {
@ -47,6 +48,20 @@ class DDSTask
{ {
public: public:
using Id = std::uint64_t; using Id = std::uint64_t;
explicit DDSTask(Id id)
: fId(id)
{}
Id GetId() const { return fId; }
friend auto operator<<(std::ostream& os, const DDSTask& task) -> std::ostream&
{
return os << "DDSTask id: " << task.fId;
}
private:
Id fId;
}; };
class DDSChannel class DDSChannel
@ -84,13 +99,14 @@ class DDSSession
auto StopOnDestruction(bool stop = true) -> void; auto StopOnDestruction(bool stop = true) -> void;
auto IsRunning() const -> bool; auto IsRunning() const -> bool;
auto SubmitAgents(Quantity agents) -> void; auto SubmitAgents(Quantity agents) -> void;
struct AgentInfo { struct AgentCount {
Quantity idleAgentsCount = 0; Quantity idle = 0;
Quantity activeAgentsCount = 0; Quantity active = 0;
Quantity executingAgentsCount = 0; Quantity executing = 0;
std::vector<DDSAgent> agents;
}; };
auto RequestAgentInfo() -> AgentInfo; auto RequestAgentCount() -> AgentCount;
auto RequestAgentInfo() -> std::vector<DDSAgent>;
auto RequestTaskInfo() -> std::vector<DDSTask>;
struct CommanderInfo { struct CommanderInfo {
int pid = -1; int pid = -1;
std::string activeTopologyName; std::string activeTopologyName;
@ -117,27 +133,6 @@ class DDSSession
std::shared_ptr<Impl> fImpl; std::shared_ptr<Impl> fImpl;
}; };
/**
* @class DDSAgent DDSSession.h <fairmq/sdk/DDSSession.h>
* @brief Represents a DDS agent
*/
class DDSAgent
{
public:
explicit DDSAgent(DDSSession session, std::string infostr)
: fInfoStr(std::move(infostr))
, fSession(std::move(session))
{}
auto GetSession() const -> DDSSession;
auto GetInfoStr() const -> std::string;
friend auto operator<<(std::ostream& os, const DDSAgent& plugin) -> std::ostream&;
private:
std::string fInfoStr;
DDSSession fSession;
};
} // namespace sdk } // namespace sdk
} // namespace mq } // namespace mq
} // namespace fair } // namespace fair

View File

@ -231,18 +231,18 @@ void Topology::WaitForState()
fExecutionCV.wait(lock); fExecutionCV.wait(lock);
} }
} }
LOG(debug) << "WaitForState shutting down"; LOG(debug) << "Topology::WaitForState shutting down";
}; };
void Topology::AddNewStateEntry(DDSTask::Id senderId, const std::string& state) void Topology::AddNewStateEntry(DDSTask::Id taskId, const std::string& state)
{ {
std::size_t pos = state.find("->"); std::size_t pos = state.find("->");
std::string endState = state.substr(pos + 2); std::string endState = state.substr(pos + 2);
// LOG(debug) << "Adding new state entry: " << senderId << ", " << state << ", end state: " << endState; // LOG(debug) << "Adding new state entry: " << taskId << ", " << state << ", end state: " << endState;
{ {
try { try {
std::unique_lock<std::mutex> lock(fMtx); std::unique_lock<std::mutex> lock(fMtx);
fState[senderId] = DeviceStatus{ true, fair::mq::GetState(endState) }; fState[taskId] = DeviceStatus{ true, fair::mq::GetState(endState) };
} catch (const std::exception& e) { } catch (const std::exception& e) {
LOG(error) << "Exception in AddNewStateEntry: " << e.what(); LOG(error) << "Exception in AddNewStateEntry: " << e.what();
} }

View File

@ -152,7 +152,7 @@ class Topology
std::string fStateChangeError; std::string fStateChangeError;
void WaitForState(); void WaitForState();
void AddNewStateEntry(DDSTask::Id senderId, const std::string& state); void AddNewStateEntry(DDSTask::Id taskId, const std::string& state);
}; };
using Topo = Topology; using Topo = Topology;

View File

@ -46,7 +46,7 @@ struct TopologyFixture : ::testing::Test
: mDDSTopoFile(tools::ToString(SDK_TESTSUITE_SOURCE_DIR, "/test_topo.xml")) : mDDSTopoFile(tools::ToString(SDK_TESTSUITE_SOURCE_DIR, "/test_topo.xml"))
, mDDSEnv(CMAKE_CURRENT_BINARY_DIR) , mDDSEnv(CMAKE_CURRENT_BINARY_DIR)
, mDDSSession(mDDSEnv) , mDDSSession(mDDSEnv)
, mDDSTopo(mDDSTopoFile, mDDSEnv) , mDDSTopo(sdk::DDSTopology::Path(mDDSTopoFile), mDDSEnv)
{ {
mDDSSession.StopOnDestruction(); mDDSSession.StopOnDestruction();
} }
@ -58,6 +58,14 @@ struct TopologyFixture : ::testing::Test
auto n(mDDSTopo.GetNumRequiredAgents()); auto n(mDDSTopo.GetNumRequiredAgents());
mDDSSession.SubmitAgents(n); mDDSSession.SubmitAgents(n);
mDDSSession.ActivateTopology(mDDSTopo); mDDSSession.ActivateTopology(mDDSTopo);
std::vector<sdk::DDSAgent> agents = mDDSSession.RequestAgentInfo();
for (const auto& a : agents) {
LOG(debug) << a;
}
std::vector<sdk::DDSTask> tasks = mDDSSession.RequestTaskInfo();
for (const auto& t : tasks) {
LOG(debug) << t;
}
} }
auto TearDown() -> void override { auto TearDown() -> void override {

View File

@ -16,8 +16,8 @@ namespace {
TEST(DDSEnvironment, Construction) TEST(DDSEnvironment, Construction)
{ {
fair::mq::test::LoggerConfig cfg; fair::mq::test::LoggerConfig cfg;
fair::mq::sdk::DDSEnvironment env(CMAKE_CURRENT_BINARY_DIR); fair::mq::sdk::DDSEnvironment env(CMAKE_CURRENT_BINARY_DIR);
LOG(debug) << env; LOG(debug) << env;
} }