(WIP) SDK: Implement Topology::ChangeState

This commit is contained in:
Dennis Klein 2019-07-19 21:06:10 +02:00 committed by Dennis Klein
parent 499ffcd300
commit 18dc536f3d
16 changed files with 619 additions and 181 deletions

View File

@ -46,6 +46,8 @@ bool Sampler::ConditionalRun()
return false;
}
this_thread::sleep_for(chrono::seconds(5));
if (fIterations > 0) {
++fCounter;
if (fCounter >= fIterations) {

View File

@ -413,6 +413,7 @@ if(BUILD_FAIRMQ)
target_link_libraries(fairmq-splitter FairMQ)
add_executable(fairmq-shmmonitor shmem/Monitor.cxx shmem/Monitor.h shmem/runMonitor.cxx)
target_compile_definitions(fairmq-shmmonitor PUBLIC BOOST_ERROR_CODE_HEADER_ONLY)
target_link_libraries(fairmq-shmmonitor PUBLIC
Threads::Threads
$<$<PLATFORM_ID:Linux>:rt>

View File

@ -17,6 +17,7 @@ set(SDK_PUBLIC_HEADER_FILES
../SDK.h
DDSEnvironment.h
DDSSession.h
DDSTopology.h
Topology.h
)
@ -27,6 +28,7 @@ set(SDK_PRIVATE_HEADER_FILES
set(SDK_SOURCE_FILES
DDSEnvironment.cxx
DDSSession.cxx
DDSTopology.cxx
Topology.cxx
)

View File

@ -8,35 +8,56 @@
#include "DDSEnvironment.h"
#include <DDS/Tools.h>
#include <cstdlib>
#include <fairmq/Tools.h>
#include <fairmq/sdk/DDSInfo.h>
#include <fairlogger/Logger.h>
#include <DDS/Tools.h>
#include <DDS/dds_intercom.h>
#include <cstdlib>
#include <sstream>
#include <stdlib.h>
#include <utility>
namespace fair {
namespace mq {
namespace sdk {
// TODO https://github.com/FairRootGroup/DDS/issues/224
auto LoadDDSEnv(const boost::filesystem::path& config_home)
-> void
struct DDSEnvironment::Impl
{
setenv("DDS_LOCATION", DDSInstallPrefix.c_str(), 1);
if (!config_home.empty()) {
setenv("HOME", config_home.c_str(), 1);
explicit Impl(Path configHome)
: fLocation(DDSInstallPrefix)
, fConfigHome(std::move(configHome))
{
SetupLocation();
SetupDynamicLoader();
SetupPath();
SetupConfigHome();
}
std::string path(std::getenv("PATH"));
path = DDSExecutableDir + std::string(":") + path;
setenv("PATH", path.c_str(), 1);
#ifndef __APPLE__
std::string ldVar("LD_LIBRARY_PATH");
std::string ld(std::getenv(ldVar.c_str()));
ld = DDSLibraryDir + std::string(":") + ld;
setenv(ldVar.c_str(), ld.c_str(), 1);
#endif
auto SetupLocation() -> void
{
std::string location(GetEnv("DDS_LOCATION"));
if (location != DDSInstallPrefix) {
if (location.empty()) {
setenv("DDS_LOCATION", DDSInstallPrefix.c_str(), 1);
} else {
LOG(debug) << "$DDS_LOCATION appears to point to a different installation than this"
<< "program was linked against. Things might still work out, so not"
<< "touching it.";
fLocation = location;
}
}
}
auto SetupConfigHome() -> void
{
if (fConfigHome.empty()) {
fConfigHome = GetEnv("HOME");
} else {
setenv("HOME", fConfigHome.c_str(), 1);
}
std::istringstream cmd;
cmd.str("DDS_CFG=`dds-user-defaults --ignore-default-sid -p`\n"
@ -45,36 +66,61 @@ auto LoadDDSEnv(const boost::filesystem::path& config_home)
" dds-user-defaults --ignore-default-sid -d -c \"$HOME/.DDS/DDS.cfg\"\n"
"fi");
std::system(cmd.str().c_str());
}
struct DDSEnvironment::Impl
{
explicit Impl(Path config_home)
: fCount()
, fConfigHome(std::move(config_home))
{
LoadDDSEnv(fConfigHome);
if (fConfigHome.empty()) {
fConfigHome = std::getenv("HOME");
}
auto SetupPath() -> void
{
std::string path(GetEnv("PATH"));
Path ddsExecDir = (fLocation == DDSInstallPrefix) ? DDSExecutableDir : fLocation / Path("bin");
path = ddsExecDir.string() + std::string(":") + path;
setenv("PATH", path.c_str(), 1);
}
auto SetupDynamicLoader() -> void
{
#ifdef __APPLE__
std::string ldVar("DYLD_LIBRARY_PATH");
#else
std::string ldVar("LD_LIBRARY_PATH");
#endif
std::string ld(GetEnv(ldVar));
Path ddsLibDir = (fLocation == DDSInstallPrefix) ? DDSLibraryDir : fLocation / Path("lib");
ld = ddsLibDir.string() + std::string(":") + ld;
setenv(ldVar.c_str(), ld.c_str(), 1);
}
auto GetEnv(const std::string& key) -> std::string
{
auto value = std::getenv(key.c_str());
if (value) {
return {value};
}
return {};
}
struct Tag {};
friend auto operator<<(std::ostream& os, Tag) -> std::ostream& { return os << "DDSEnvironment"; }
tools::InstanceLimiter<Tag, 1> fCount;
Path fLocation;
Path fConfigHome;
};
DDSEnvironment::DDSEnvironment(Path config_home)
: fImpl(std::make_shared<Impl>(std::move(config_home)))
DDSEnvironment::DDSEnvironment()
: DDSEnvironment(Path())
{}
DDSEnvironment::DDSEnvironment(Path configHome)
: fImpl(std::make_shared<Impl>(std::move(configHome)))
{}
auto DDSEnvironment::GetLocation() const -> Path { return fImpl->fLocation; }
auto DDSEnvironment::GetConfigHome() const -> Path { return fImpl->fConfigHome; }
auto operator<<(std::ostream& os, DDSEnvironment env) -> std::ostream&
{
return os << "$DDS_LOCATION: " << DDSInstallPrefix << ", "
return os << "$DDS_LOCATION: " << env.GetLocation() << ", "
<< "$DDS_CONFIG_HOME: " << env.GetConfigHome() / DDSEnvironment::Path(".DDS");
}

View File

@ -10,7 +10,6 @@
#define FAIR_MQ_SDK_DDSENVIRONMENT_H
#include <boost/filesystem.hpp>
#include <fairmq/sdk/DDSInfo.h>
#include <memory>
#include <ostream>
@ -27,9 +26,10 @@ class DDSEnvironment
public:
using Path = boost::filesystem::path;
/// @brief See fair::mq::sdk::LoadDDSEnv
explicit DDSEnvironment(Path config_home = "");
DDSEnvironment();
explicit DDSEnvironment(Path);
auto GetLocation() const -> Path;
auto GetConfigHome() const -> Path;
friend auto operator<<(std::ostream& os, DDSEnvironment env) -> std::ostream&;
@ -38,6 +38,8 @@ class DDSEnvironment
std::shared_ptr<Impl> fImpl;
};
using DDSEnv = DDSEnvironment;
} // namespace sdk
} // namespace mq
} // namespace fair

View File

@ -11,6 +11,20 @@
#include <string>
namespace dds {
namespace tools_api {
class CSession;
} // namespace tools_api
namespace topology_api {
class CTopology;
} // namespace topology_api
} // namespace dds
namespace fair {
namespace mq {
namespace sdk {

View File

@ -7,16 +7,19 @@
********************************************************************************/
#include "DDSSession.h"
#include "DDSEnvironment.h"
#include <fairmq/sdk/DDSEnvironment.h>
#include <fairmq/Tools.h>
#include <fairlogger/Logger.h>
#include <DDS/Tools.h>
#include <boost/uuid/uuid_io.hpp>
#include <cassert>
#include <cstdlib>
#include <fairlogger/Logger.h>
#include <fairmq/Tools.h>
#include <sstream>
#include <stdlib.h>
#include <utility>
namespace fair {
@ -52,48 +55,76 @@ auto operator>>(std::istream& is, DDSRMSPlugin& plugin) -> std::istream&
struct DDSSession::Impl
{
Impl(DDSEnvironment env, DDSRMSPlugin plugin)
: fCount()
, fEnv(std::move(env))
, fDefaultPlugin(std::move(plugin))
, fSession()
explicit Impl(DDSEnvironment env)
: fEnv(std::move(env))
, fRMSPlugin(DDSRMSPlugin::localhost)
, fDDSService()
, fDDSCustomCmd(fDDSService)
, fId(to_string(fSession.create()))
, fStopOnDestruction(false)
{
setenv("DDS_SESSION_ID", fId.c_str(), 1);
fDDSService.subscribeOnError([](const dds::intercom_api::EErrorCode errorCode, const std::string& msg) {
std::cerr << "DDS error, error code: " << errorCode << ", error message: " << msg << std::endl;
});
}
Impl(DDSEnvironment env, DDSRMSPlugin plugin, Id existing_id)
: fCount()
, fEnv(std::move(env))
, fDefaultPlugin(std::move(plugin))
, fSession()
, fId(std::move(existing_id))
explicit Impl(Id existing, DDSEnvironment env)
: fEnv(std::move(env))
, fRMSPlugin(DDSRMSPlugin::localhost)
, fDDSService()
, fDDSCustomCmd(fDDSService)
, fId(std::move(existing))
, fStopOnDestruction(false)
{
fSession.attach(fId);
std::string envId(std::getenv("DDS_SESSION_ID"));
if (envId != fId) {
setenv("DDS_SESSION_ID", fId.c_str(), 1);
}
fDDSService.subscribeOnError([](const dds::intercom_api::EErrorCode errorCode, const std::string& msg) {
std::cerr << "DDS error, error code: " << errorCode << ", error message: " << msg << std::endl;
});
}
~Impl()
{
if (fStopOnDestruction) {
fSession.shutdown();
}
}
Impl() = delete;
Impl(const Impl&) = delete;
Impl& operator=(const Impl&) = delete;
Impl(Impl&&) = delete;
Impl& operator=(Impl&&) = delete;
struct Tag {};
friend auto operator<<(std::ostream& os, Tag) -> std::ostream& { return os << "DDSSession"; }
tools::InstanceLimiter<Tag, 1> fCount;
const DDSEnvironment fEnv;
const DDSRMSPlugin fDefaultPlugin;
DDSEnvironment fEnv;
DDSRMSPlugin fRMSPlugin;
Path fRMSConfig;
dds::tools_api::CSession fSession;
const Id fId;
dds::intercom_api::CIntercomService fDDSService;
dds::intercom_api::CCustomCmd fDDSCustomCmd;
Id fId;
bool fStopOnDestruction;
};
DDSSession::DDSSession(DDSEnvironment env, DDSRMSPlugin default_plugin)
: fImpl(std::make_shared<Impl>(std::move(env), std::move(default_plugin))) {}
DDSSession::DDSSession(DDSEnvironment env)
: fImpl(std::make_shared<Impl>(env))
{}
DDSSession::DDSSession(DDSEnvironment env, Id existing_id)
: fImpl(std::make_shared<Impl>(std::move(env), DDSRMSPlugin::localhost, std::move(existing_id))) {}
DDSSession::DDSSession(Id existing, DDSEnvironment env)
: fImpl(std::make_shared<Impl>(std::move(existing), env))
{}
DDSSession::DDSSession(DDSEnvironment env, DDSRMSPlugin default_plugin, Id existing_id)
: fImpl(std::make_shared<Impl>(std::move(env), std::move(default_plugin), std::move(existing_id))) {}
auto DDSSession::GetEnv() const -> DDSEnvironment { return fImpl->fEnv; }
auto DDSSession::IsRunning() const -> bool { return fImpl->fSession.IsRunning(); }
@ -101,36 +132,30 @@ auto DDSSession::GetId() const -> Id { return fImpl->fId; }
auto DDSSession::Stop() -> void { return fImpl->fSession.shutdown(); }
auto DDSSession::GetDefaultPlugin() const -> DDSRMSPlugin { return fImpl->fDefaultPlugin; }
auto DDSSession::GetRMSPlugin() const -> DDSRMSPlugin { return fImpl->fRMSPlugin; }
auto DDSSession::SetRMSPlugin(DDSRMSPlugin plugin) -> void { fImpl->fRMSPlugin = plugin; }
auto DDSSession::GetRMSConfig() const -> Path { return fImpl->fRMSConfig; }
auto DDSSession::SetRMSConfig(Path configFile) const -> void
{
fImpl->fRMSConfig = std::move(configFile);
}
auto DDSSession::IsStoppedOnDestruction() const -> bool { return fImpl->fStopOnDestruction; }
auto DDSSession::StopOnDestruction(bool stop) -> void { fImpl->fStopOnDestruction = stop; }
auto DDSSession::SubmitAgents(Quantity agents) -> void
{
SubmitAgents(agents, GetDefaultPlugin(), Path());
}
auto DDSSession::SubmitAgents(Quantity agents, DDSRMSPlugin plugin) -> void
{
SubmitAgents(agents, plugin, Path());
}
auto DDSSession::SubmitAgents(Quantity agents, const Path& config) -> void
{
SubmitAgents(agents, GetDefaultPlugin(), std::move(config));
}
auto DDSSession::SubmitAgents(Quantity agents, DDSRMSPlugin plugin, const Path& config) -> void
{
// Requesting to submit 0 agents is not meaningful
assert(agents > 0);
// The config argument is required with all plugins except localhost
if (plugin != DDSRMSPlugin::localhost) {
assert(exists(config));
}
dds::tools_api::SSubmitRequestData submitInfo;
submitInfo.m_rms = tools::ToString(plugin);
submitInfo.m_rms = tools::ToString(GetRMSPlugin());
submitInfo.m_instances = agents;
submitInfo.m_config = config.string();
submitInfo.m_config = GetRMSConfig().string();
tools::Semaphore blocker;
auto submitRequest = dds::tools_api::SSubmitRequest::makeRequest(submitInfo);
@ -162,7 +187,28 @@ auto DDSSession::RequestAgentInfo() -> void
blocker.Wait();
}
auto DDSSession::ActivateTopology(Path topologyFile) -> void
auto DDSSession::RequestCommanderInfo() -> void
{
dds::tools_api::SCommanderInfoRequestData commanderInfoInfo;
tools::Semaphore blocker;
auto commanderInfoRequest =
dds::tools_api::SCommanderInfoRequest::makeRequest(commanderInfoInfo);
commanderInfoRequest->setResponseCallback(
[&](const dds::tools_api::SCommanderInfoResponseData& _response) {
LOG(debug) << "pid: " << _response.m_pid;
LOG(debug) << "idle agents: " << _response.m_idleAgentsCount;
// LOG(debug) << "active topology: "
// << ((_response.m_hasActiveTopology) ? _response.m_activeTopologyName
// : "<none>");
});
commanderInfoRequest->setMessageCallback(
[](const dds::tools_api::SMessageResponseData& _message) { LOG(debug) << _message; });
commanderInfoRequest->setDoneCallback([&]() { blocker.Signal(); });
fImpl->fSession.sendRequest<dds::tools_api::SCommanderInfoRequest>(commanderInfoRequest);
blocker.Wait();
}
auto DDSSession::ActivateTopology(const Path& topologyFile) -> void
{
dds::tools_api::STopologyRequestData topologyInfo;
topologyInfo.m_updateType = dds::tools_api::STopologyRequestData::EUpdateType::ACTIVATE;
@ -177,7 +223,16 @@ auto DDSSession::ActivateTopology(Path topologyFile) -> void
blocker.Wait();
}
auto operator<<(std::ostream& os, DDSSession session) -> std::ostream&
void DDSSession::StartDDSService() { fImpl->fDDSService.start(fImpl->fId); }
void DDSSession::SubscribeToCommands(std::function<void(const std::string& msg, const std::string& condition, uint64_t senderId)> cb)
{
fImpl->fDDSCustomCmd.subscribe(cb);
}
void DDSSession::SendCommand(const std::string& cmd) { fImpl->fDDSCustomCmd.send(cmd, ""); }
auto operator<<(std::ostream& os, const DDSSession& session) -> std::ostream&
{
return os << "$DDS_SESSION_ID: " << session.GetId();
}

View File

@ -9,14 +9,18 @@
#ifndef FAIR_MQ_SDK_DDSSESSION_H
#define FAIR_MQ_SDK_DDSSESSION_H
#include <boost/filesystem.hpp>
#include <cstdint>
#include <fairmq/sdk/DDSEnvironment.h>
#include <fairmq/sdk/DDSInfo.h>
#include <boost/filesystem.hpp>
#include <cstdint>
#include <istream>
#include <memory>
#include <ostream>
#include <stdexcept>
#include <string>
#include <functional>
namespace fair {
namespace mq {
@ -47,23 +51,29 @@ class DDSSession
using Quantity = std::uint32_t;
using Path = boost::filesystem::path;
DDSSession() = delete;
explicit DDSSession(DDSEnvironment env, DDSRMSPlugin default_plugin = DDSRMSPlugin::localhost);
explicit DDSSession(DDSEnvironment env, Id existing_id);
explicit DDSSession(DDSEnvironment env, DDSRMSPlugin default_plugin, Id existing_id);
explicit DDSSession(DDSEnvironment env = DDSEnvironment());
explicit DDSSession(Id existing, DDSEnvironment env = DDSEnvironment());
auto GetEnv() const -> DDSEnvironment;
auto GetId() const -> Id;
auto GetDefaultPlugin() const -> DDSRMSPlugin;
auto GetRMSPlugin() const -> DDSRMSPlugin;
auto SetRMSPlugin(DDSRMSPlugin) -> void;
auto GetRMSConfig() const -> Path;
auto SetRMSConfig(Path) const -> void;
auto IsStoppedOnDestruction() const -> bool;
auto StopOnDestruction(bool stop = true) -> void;
auto IsRunning() const -> bool;
auto SubmitAgents(Quantity agents) -> void;
auto SubmitAgents(Quantity agents, DDSRMSPlugin plugin) -> void;
auto SubmitAgents(Quantity agents, DDSRMSPlugin plugin, const Path& config) -> void;
auto SubmitAgents(Quantity agents, const Path& config) -> void;
auto RequestAgentInfo() -> void;
auto ActivateTopology(Path topologyFile) -> void;
auto RequestCommanderInfo() -> void;
auto ActivateTopology(const Path& topologyFile) -> void;
auto Stop() -> void;
friend auto operator<<(std::ostream& os, DDSSession session) -> std::ostream&;
void StartDDSService();
void SubscribeToCommands(std::function<void(const std::string& msg, const std::string& condition, uint64_t senderId)>);
void SendCommand(const std::string&);
friend auto operator<<(std::ostream& os, const DDSSession& session) -> std::ostream&;
private:
struct Impl;
std::shared_ptr<Impl> fImpl;

View File

@ -0,0 +1,96 @@
/********************************************************************************
* Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "DDSTopology.h"
#include <fairmq/sdk/DDSEnvironment.h>
#include <fairmq/Tools.h>
#include <fairlogger/Logger.h>
#include <DDS/Topology.h>
#include <sstream>
#include <stdexcept>
#include <utility>
#include <memory>
namespace fair {
namespace mq {
namespace sdk {
struct DDSTopology::Impl
{
explicit Impl(Path topoFile, DDSEnvironment env)
: fEnv(std::move(env))
, fTopoFile(std::move(topoFile))
, fTopo(nullptr)
{
LOG(warn) << topoFile;
}
DDSEnvironment fEnv;
Path fTopoFile;
std::unique_ptr<dds::topology_api::CTopology> fTopo;
};
DDSTopology::DDSTopology(Path topoFile, DDSEnvironment env)
: fImpl(std::make_shared<Impl>(std::move(topoFile), std::move(env)))
{}
auto DDSTopology::GetEnv() const -> DDSEnvironment { return fImpl->fEnv; }
auto DDSTopology::GetTopoFile() const -> Path
{
auto file(fImpl->fTopoFile);
if (file.string().empty()) {
throw std::runtime_error("DDS topology xml spec file unknown");
}
return file;
}
void DDSTopology::CreateTopology(Path topoFile)
{
fImpl->fTopo = tools::make_unique<dds::topology_api::CTopology>(fImpl->fTopoFile.c_str());
}
int DDSTopology::GetNumRequiredAgents()
{
return fImpl->fTopo->getRequiredNofAgents();
}
std::vector<uint64_t> DDSTopology::GetDeviceList()
{
std::vector<uint64_t> taskIDs;
taskIDs.reserve(fImpl->fTopo->getRequiredNofAgents());
// TODO make sure returned tasks are actually devices
dds::topology_api::STopoRuntimeTask::FilterIteratorPair_t taskIt = fImpl->fTopo->getRuntimeTaskIterator([](const dds::topology_api::STopoRuntimeTask::FilterIterator_t::value_type& value) -> bool {
return true;
});
for (auto& it = taskIt.first; it != taskIt.second; ++it) {
LOG(debug) << "Found task " << it->first << " : " << it->second.m_task->getPath();
taskIDs.push_back(it->first);
}
return taskIDs;
}
// auto DDSTopology::GetName() const -> std::string { return fImpl->fTopo.getName(); }
auto operator<<(std::ostream& os, const DDSTopology& t) -> std::ostream&
try {
return os << "DDS topology: " /*<< t.GetName() <<*/ " (loaded from " << t.GetTopoFile() << ")";
} catch (std::runtime_error&) {
return os << "DDS topology: " /*<< t.GetName()*/;
}
} // namespace sdk
} // namespace mq
} // namespace fair

View File

@ -9,18 +9,11 @@
#ifndef FAIR_MQ_SDK_DDSTOPOLOGY_H
#define FAIR_MQ_SDK_DDSTOPOLOGY_H
#include <fairmq/sdk/DDSInfo.h>
#include <boost/filesystem.hpp>
#include <fairmq/sdk/DDSEnvironment.h>
#include <memory>
#include <string>
namespace dds {
namespace topology_api {
class CTopology;
} // namespace topology_api
} // namespace dds
namespace fair {
namespace mq {
namespace sdk {
@ -29,23 +22,44 @@ namespace sdk {
* @class DDSTopology DDSTopology.h <fairmq/sdk/DDSTopology.h>
* @brief Represents a DDS topology
*/
class DDSSession
class DDSTopology
{
public:
using CSessionPtr = std::shared_ptr<dds::tools_api::CSession>;
using Path = boost::filesystem::path;
explicit DDSSession();
explicit DDSSession(std::string existing_session_id);
DDSTopology() = delete;
/// @brief Construct from file
/// @param topoFile DDS topology xml file
/// @param env DDS environment
explicit DDSTopology(Path topoFile, DDSEnvironment env = DDSEnvironment());
/// @brief Get associated DDS environment
auto GetEnv() const -> DDSEnvironment;
/// @brief Get path to DDS topology xml, if it is known
/// @throw std::runtime_error
auto GetTopoFile() const -> Path;
void CreateTopology(Path);
/// @brief Get number of required agents for this topology
int GetNumRequiredAgents();
/// @brief Get list of devices
std::vector<uint64_t> GetDeviceList();
/// @brief Get the name of the topology
// auto GetName() const -> std::string;
friend auto operator<<(std::ostream&, const DDSTopology&) -> std::ostream&;
auto GetId() const -> const std::string&;
auto IsRunning() const -> bool;
private:
CSessionPtr fSession;
const std::string fId;
struct Impl;
std::shared_ptr<Impl> fImpl;
};
auto LoadDDSEnv(const std::string& config_home = "", const std::string& prefix = DDSInstallPrefix)
-> void;
using DDSTopo = DDSTopology;
} // namespace sdk
} // namespace mq

View File

@ -8,25 +8,166 @@
#include "Topology.h"
#include <DDS/Topology.h>
#include <fairlogger/Logger.h>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/trim.hpp>
#include <utility>
#include <thread>
#include <future>
#include <mutex>
#include <condition_variable>
namespace fair {
namespace mq {
auto operator<<(std::ostream& os, AsyncOpResult v) -> std::ostream&
{
switch (v) {
case AsyncOpResult::Aborted:
return os << "Aborted";
case AsyncOpResult::Timeout:
return os << "Timeout";
case AsyncOpResult::Error:
return os << "Error";
case AsyncOpResult::Ok:
default:
return os << "Ok";
}
}
namespace sdk {
struct Topology::Impl
const std::unordered_map<DeviceTransition, DeviceState, tools::HashEnum<DeviceTransition>> Topology::fkExpectedState =
{
Impl(dds::topology_api::CTopology topo)
: fDDSTopology(std::move(topo))
{}
dds::topology_api::CTopology fDDSTopology;
{ Transition::InitDevice, DeviceState::InitializingDevice },
{ Transition::CompleteInit, DeviceState::Initialized },
{ Transition::Bind, DeviceState::Bound },
{ Transition::Connect, DeviceState::DeviceReady },
{ Transition::InitTask, DeviceState::InitializingTask },
{ Transition::Run, DeviceState::Running },
{ Transition::Stop, DeviceState::Ready },
{ Transition::ResetTask, DeviceState::DeviceReady },
{ Transition::ResetDevice, DeviceState::Idle },
{ Transition::End, DeviceState::Exiting }
};
Topology::Topology(dds::topology_api::CTopology topo)
: fImpl(std::make_shared<Impl>(std::move(topo)))
{}
Topology::Topology(DDSTopology topo, DDSSession session)
: fDDSSession(std::move(session))
, fDDSTopo(std::move(topo))
, fTopologyState()
, fStateChangeOngoing(false)
, fExecutionThread()
, fShutdown(false)
{
fDDSTopo.CreateTopology(fDDSTopo.GetTopoFile());
std::vector<uint64_t> deviceList = fDDSTopo.GetDeviceList();
for (const auto& d : deviceList) {
fTopologyState.emplace(d, DeviceStatus{ false, DeviceState::Ok });
}
fDDSSession.SubscribeToCommands([this](const std::string& msg, const std::string& condition, uint64_t senderId) {
LOG(info) << "Received from " << senderId << ": " << msg;
std::vector<std::string> parts;
boost::algorithm::split(parts, msg, boost::algorithm::is_any_of(":,"));
if (parts[0] == "state-change") {
boost::trim(parts[2]);
AddNewStateEntry(senderId, parts[2]);
} else if (parts[0] == "state-changes-subscription") {
if (parts[2] != "OK") {
LOG(error) << "state-changes-subscription failed with return code: " << parts[2];
}
} else if (parts[0] == "state-changes-unsubscription") {
if (parts[2] != "OK") {
LOG(error) << "state-changes-unsubscription failed with return code: " << parts[2];
}
}
});
fDDSSession.StartDDSService();
fDDSSession.SendCommand("subscribe-to-state-changes");
fExecutionThread = std::thread(&Topology::WaitForState, this);
}
auto Topology::ChangeState(fair::mq::Transition transition, ChangeStateCallback cb, const std::chrono::milliseconds& timeout) -> void
{
{
std::lock_guard<std::mutex> guard(fMtx);
if (fStateChangeOngoing) {
LOG(error) << "State change already in progress, concurrent requested not yet supported";
return;
}
fStateChangeOngoing = true;
fChangeStateCallback = cb;
fStateChangeTimeout = timeout;
fDDSSession.SendCommand(GetTransitionName(transition));
fTargetState = fkExpectedState.at(transition);
}
fExecutionCV.notify_one();
}
void Topology::WaitForState()
{
while (!fShutdown) {
if (fStateChangeOngoing) {
auto condition = [&] { return fShutdown || std::all_of(fTopologyState.cbegin(),
fTopologyState.cend(),
[&](TopologyState::value_type i) {
return i.second.state == fTargetState;
});
};
std::unique_lock<std::mutex> lock(fMtx);
if (fStateChangeTimeout > std::chrono::milliseconds(0)) {
if (!fCV.wait_for(lock, fStateChangeTimeout, condition)) {
LOG(debug) << "timeout";
// TODO: catch this from another thread...
throw std::runtime_error("timeout");
}
} else {
fCV.wait(lock, condition);
}
if (fShutdown) {
break;
}
fStateChangeOngoing = false;
fChangeStateCallback(ChangeStateResult{AsyncOpResult::Ok, fTopologyState});
} else {
std::unique_lock<std::mutex> lock(fExecutionMtx);
fExecutionCV.wait(lock);
}
}
};
void Topology::AddNewStateEntry(uint64_t senderId, const std::string& state)
{
{
std::unique_lock<std::mutex> lock(fMtx);
fTopologyState[senderId] = DeviceStatus{ true, fair::mq::GetState(state) };
}
fCV.notify_one();
}
Topology::~Topology()
{
{
std::lock_guard<std::mutex> guard(fMtx);
fShutdown = true;
}
fExecutionCV.notify_one();
fExecutionThread.join();
}
auto operator<<(std::ostream& os, Topology::ChangeStateResult v) -> std::ostream&
{
return os << v.rc;
}
} // namespace sdk
} // namespace mq

View File

@ -9,25 +9,45 @@
#ifndef FAIR_MQ_SDK_TOPOLOGY_H
#define FAIR_MQ_SDK_TOPOLOGY_H
#include <fairmq/sdk/DDSInfo.h>
#include <fairmq/sdk/DDSSession.h>
#include <fairmq/sdk/DDSTopology.h>
#include <fairmq/States.h>
#include <fairmq/Tools.h>
#include <functional>
#include <unordered_map>
#include <memory>
#include <chrono>
#include <ostream>
#include <string>
#include <vector>
namespace dds {
namespace topology_api {
class CTopology;
} // namespace topology_api
} // namespace dds
namespace fair {
namespace mq {
enum class AsyncOpResult {
Ok,
Timeout,
Error,
Aborted
};
auto operator<<(std::ostream& os, AsyncOpResult v) -> std::ostream&;
namespace sdk {
using DeviceState = fair::mq::State;
using DeviceTransition = fair::mq::Transition;
struct DeviceStatus
{
bool initialized;
DeviceState state;
};
using TopologyState = std::unordered_map<uint64_t, DeviceStatus>;
using TopologyTransition = fair::mq::Transition;
/**
* @class Topology Topology.h <fairmq/sdk/Topology.h>
* @brief Represents a FairMQ topology
@ -35,16 +55,46 @@ namespace sdk {
class Topology
{
public:
/// @brief (Re)Construct a FairMQ topology from an existing DDS topology
/// @param topo Initialized DDS CTopology
explicit Topology(DDSTopology topo, DDSSession session = DDSSession());
~Topology();
/// @brief Construct a FairMQ topology from an existing DDS session via the dds::topology_api
/// @param topo An initialized CTopology object
explicit Topology(dds::topology_api::CTopology topo);
struct ChangeStateResult {
AsyncOpResult rc;
TopologyState state;
friend auto operator<<(std::ostream& os, ChangeStateResult v) -> std::ostream&;
};
using ChangeStateCallback = std::function<void(ChangeStateResult)>;
/// @brief Initiate state transition on all FairMQ devices in this topology
/// @param t FairMQ device state machine transition
/// @param cb Completion callback
auto ChangeState(TopologyTransition t, ChangeStateCallback cb, const std::chrono::milliseconds& timeout = std::chrono::milliseconds(0)) -> void;
static const std::unordered_map<DeviceTransition, DeviceState, tools::HashEnum<DeviceTransition>> fkExpectedState;
private:
struct Impl;
std::shared_ptr<Impl> fImpl;
DDSSession fDDSSession;
DDSTopology fDDSTopo;
TopologyState fTopologyState;
bool fStateChangeOngoing;
DeviceState fTargetState;
std::mutex fMtx;
std::mutex fExecutionMtx;
std::condition_variable fCV;
std::condition_variable fExecutionCV;
std::thread fExecutionThread;
ChangeStateCallback fChangeStateCallback;
std::chrono::milliseconds fStateChangeTimeout;
bool fShutdown;
void WaitForState();
void AddNewStateEntry(uint64_t senderId, const std::string& state);
};
using Topo = Topology;
} // namespace sdk
} // namespace mq
} // namespace fair

View File

@ -287,7 +287,7 @@ if(BUILD_SDK)
add_testsuite(SDK
SOURCES
${CMAKE_CURRENT_BINARY_DIR}/runner.cxx
# sdk/_dds.cxx
sdk/_dds.cxx
sdk/_topology.cxx
sdk/TopologyFixture.h

View File

@ -13,12 +13,10 @@
#include <fairmq/SDK.h>
#include <fairmq/Tools.h>
#include <DDS/Topology.h>
#include <chrono>
#include <cstdlib>
#include <fairlogger/Logger.h>
#include <gtest/gtest.h>
#include <stdlib.h>
#include <thread>
namespace fair {
@ -45,14 +43,14 @@ struct LoggerConfig
struct TopologyFixture : ::testing::Test
{
TopologyFixture()
: mLoggerConfig()
, mDDSTopologyFile(std::string(SDK_TESTSUITE_SOURCE_DIR) + "/test_topo.xml")
: mDDSTopoFile(tools::ToString(SDK_TESTSUITE_SOURCE_DIR, "/test_topo.xml"))
, mDDSEnv(CMAKE_CURRENT_BINARY_DIR)
, mDDSSession(mDDSEnv)
, mDDSTopology(mDDSTopologyFile)
{}
//
//
, mDDSTopo(mDDSTopoFile, mDDSEnv)
{
mDDSSession.StopOnDestruction();
}
// auto ActivateDDSTopology(const std::string& topology_file) -> void {
// LOG(debug) << "ActivateDDSTopology(\"" << topology_file << "\")";
// }
@ -60,20 +58,22 @@ struct TopologyFixture : ::testing::Test
auto SetUp() -> void override {
LOG(info) << mDDSEnv;
LOG(info) << mDDSSession;
mDDSSession.RequestCommanderInfo();
mDDSSession.SubmitAgents(2);
mDDSSession.RequestCommanderInfo();
std::this_thread::sleep_for(std::chrono::seconds(1)); // TODO implement WaitForIdleAgents
mDDSSession.ActivateTopology(mDDSTopologyFile);
mDDSSession.ActivateTopology(mDDSTopoFile);
mDDSSession.RequestCommanderInfo();
}
auto TearDown() -> void override {
mDDSSession.Stop();
}
LoggerConfig mLoggerConfig;
std::string mDDSTopologyFile;
std::string mDDSTopoFile;
sdk::DDSEnvironment mDDSEnv;
sdk::DDSSession mDDSSession;
dds::topology_api::CTopology mDDSTopology;
sdk::DDSTopology mDDSTopo;
};
} /* namespace test */

View File

@ -15,7 +15,7 @@
namespace {
auto session_test() -> void
auto setup() -> void
{
fair::Logger::SetConsoleSeverity("debug");
fair::Logger::DefineVerbosity("user1",
@ -23,35 +23,24 @@ auto session_test() -> void
fair::VerbositySpec::Info::severity));
fair::Logger::SetVerbosity("user1");
fair::Logger::SetConsoleColor();
}
TEST(DDS, Environment)
{
setup();
fair::mq::sdk::DDSEnvironment env(CMAKE_CURRENT_BINARY_DIR);
LOG(debug) << env;
{
fair::mq::sdk::DDSSession session(env);
LOG(debug) << session;
session.SubmitAgents(5);
session.SubmitAgents(5);
}
{
fair::mq::sdk::DDSSession session(env);
LOG(debug) << session;
session.SubmitAgents(5);
}
{
fair::mq::sdk::DDSSession session(env);
LOG(debug) << session;
session.SubmitAgents(5);
}
}
TEST(DDS, Session)
{
session_test();
}
setup();
TEST(DDS, Session2)
{
session_test();
fair::mq::sdk::DDSEnvironment env(CMAKE_CURRENT_BINARY_DIR);
fair::mq::sdk::DDSSession session(env);
session.StopOnDestruction();
LOG(debug) << session;
}
} // namespace

View File

@ -8,7 +8,9 @@
#include "TopologyFixture.h"
#include <DDS/Topology.h>
#include <fairmq/sdk/Topology.h>
#include <fairmq/Tools.h>
namespace {
@ -16,7 +18,21 @@ using Topology = fair::mq::test::TopologyFixture;
TEST_F(Topology, Construction)
{
fair::mq::sdk::Topology topo(mDDSTopology);
fair::mq::sdk::Topology topo(mDDSTopo, mDDSSession);
}
TEST_F(Topology, ChangeState)
{
using fair::mq::sdk::Topology;
using fair::mq::sdk::TopologyTransition;
Topology topo(mDDSTopo, mDDSSession);
fair::mq::tools::Semaphore blocker;
topo.ChangeState(TopologyTransition::Stop, [&](Topology::ChangeStateResult result) {
LOG(info) << result;
blocker.Signal();
});
blocker.Wait();
}
} // namespace