From 62781389d4d00ae339a80135e38720681b744758 Mon Sep 17 00:00:00 2001 From: Dennis Klein Date: Sat, 9 Feb 2019 05:58:11 +0100 Subject: [PATCH] Add pmix::lookup binding and cleanup --- fairmq/plugins/PMIx/PMIx.hpp | 35 ++++++-- fairmq/plugins/PMIx/PMIxPlugin.cxx | 129 ++++++++++++++++------------- fairmq/plugins/PMIx/PMIxPlugin.h | 27 ++---- 3 files changed, 107 insertions(+), 84 deletions(-) diff --git a/fairmq/plugins/PMIx/PMIx.hpp b/fairmq/plugins/PMIx/PMIx.hpp index 60417e58..e42d0d49 100644 --- a/fairmq/plugins/PMIx/PMIx.hpp +++ b/fairmq/plugins/PMIx/PMIx.hpp @@ -131,11 +131,27 @@ struct info : pmix_info_t PMIX_VALUE_XFER(rc, lhs, static_cast(&rhs)); if (rc != PMIX_SUCCESS) { - throw runtime_error("pmix::info ctor failed: rc=" + rc); + throw runtime_error("pmix::info ctor failed: rc=" + std::to_string(rc)); } } }; +struct pdata : pmix_pdata_t +{ + pdata() { PMIX_PDATA_CONSTRUCT(static_cast(this)); } + ~pdata() { PMIX_PDATA_DESTRUCT(static_cast(this)); } + pdata(const pdata& rhs) + { + PMIX_PDATA_XFER(static_cast(this), + static_cast(const_cast(&rhs))); + } + + auto set_key(const std::string& new_key) -> void + { + (void)strncpy(key, new_key.c_str(), PMIX_MAX_KEYLEN); + } +}; + auto init(const std::vector& info = {}) -> proc { proc res; @@ -143,7 +159,7 @@ auto init(const std::vector& info = {}) -> proc rc = PMIx_Init(&res, const_cast(info.data()), info.size()); if (rc != PMIX_SUCCESS) { - throw runtime_error("pmix::init() failed: rc=" + rc); + throw runtime_error("pmix::init() failed: rc=" + std::to_string(rc)); } return res; @@ -159,7 +175,7 @@ auto finalize(const std::vector& info = {}) -> void rc = PMIx_Finalize(info.data(), info.size()); if (rc != PMIX_SUCCESS) { - throw runtime_error("pmix::finalize() failed: rc=" + rc); + throw runtime_error("pmix::finalize() failed: rc=" + std::to_string(rc)); } } @@ -169,7 +185,7 @@ auto publish(const std::vector& info) -> void rc = PMIx_Publish(info.data(), info.size()); if (rc != PMIX_SUCCESS) { - throw runtime_error("pmix::publish() failed: rc=" + rc); + throw runtime_error("pmix::publish() failed: rc=" + std::to_string(rc)); } } @@ -179,10 +195,19 @@ auto fence(const std::vector& procs = {}, const std::vector& info = rc = PMIx_Fence(procs.data(), procs.size(), info.data(), info.size()); if (rc != PMIX_SUCCESS) { - throw runtime_error("pmix::fence() failed: rc=" + rc); + throw runtime_error("pmix::fence() failed: rc=" + std::to_string(rc)); } } +auto lookup(std::vector& pdata, const std::vector& info = {}) -> void +{ + status rc; + + rc = PMIx_Lookup(pdata.data(), pdata.size(), info.data(), info.size()); + if (rc != PMIX_SUCCESS) { + throw runtime_error("pmix::lookup() failed: rc=" + std::to_string(rc)); + } +} } /* namespace pmix */ diff --git a/fairmq/plugins/PMIx/PMIxPlugin.cxx b/fairmq/plugins/PMIx/PMIxPlugin.cxx index d68b0037..77796210 100644 --- a/fairmq/plugins/PMIx/PMIxPlugin.cxx +++ b/fairmq/plugins/PMIx/PMIxPlugin.cxx @@ -30,24 +30,10 @@ PMIxPlugin::PMIxPlugin(const std::string& name, SubscribeToDeviceStateChange([&](DeviceState newState) { switch (newState) { case DeviceState::InitializingDevice: - if (!pmix::initialized()) { - fProc = pmix::init(); - LOG(debug) << PMIxClient() << " pmix::init() OK: " << fProc - << ",version=" << pmix::get_version(); - } - - FillChannelContainers(); - - PublishBoundChannels(); - - { - pmix::proc all(fProc); - all.rank = pmix::rank::wildcard; - - pmix::fence({all}); - } - - // lookup + Init(); + Publish(); + Fence(); + Lookup(); break; case DeviceState::Exiting: UnsubscribeFromDeviceStateChange(); @@ -60,8 +46,6 @@ PMIxPlugin::PMIxPlugin(const std::string& name, PMIxPlugin::~PMIxPlugin() { - LOG(debug) << PMIxClient() << " Finalizing PMIx session... (On success, logs seen by the RTE will stop here.)"; - while (pmix::initialized()) { try { pmix::finalize(); @@ -72,52 +56,83 @@ PMIxPlugin::~PMIxPlugin() } } -auto PMIxPlugin::FillChannelContainers() -> void +auto PMIxPlugin::PMIxClient() const -> std::string { - try { - std::unordered_map channelInfo(GetChannelInfo()); + std::stringstream ss; + ss << "PMIx client(pid=" << fPid << ")"; + return ss.str(); +} - // fill binding and connecting chans - for (const auto& c : channelInfo) { - std::string methodKey{"chans." + c.first + "." + std::to_string(c.second - 1) - + ".method"}; - if (GetProperty(methodKey) == "bind") { - fBindingChannels.insert(std::make_pair(c.first, std::vector())); - for (int i = 0; i < c.second; ++i) { - fBindingChannels.at(c.first).push_back(GetProperty( - std::string{"chans." + c.first + "." + std::to_string(i) + ".address"})); - } - } else if (GetProperty(methodKey) == "connect") { - fConnectingChannels.insert(std::make_pair(c.first, ConnectingChannel())); - LOG(debug) << "preparing to connect: " << c.first << " with " << c.second - << " sub-channels."; - for (int i = 0; i < c.second; ++i) { - fConnectingChannels.at(c.first).fSubChannelAddresses.push_back(std::string()); - } - } else { - LOG(error) << "Cannot update address configuration. Channel method (bind/connect) " - "not specified."; - return; - } - } - } catch (const std::exception& e) { - LOG(error) << "Error filling channel containers: " << e.what(); +auto PMIxPlugin::Init() -> void +{ + if (!pmix::initialized()) { + fProc = pmix::init(); + LOG(debug) << PMIxClient() << " pmix::init() OK: " << fProc + << ",version=" << pmix::get_version(); } } -auto PMIxPlugin::PublishBoundChannels() -> void +auto PMIxPlugin::Publish() -> void { - std::vector infos; - infos.reserve(fBindingChannels.size()); + auto channels(GetChannelInfo()); + std::vector info; - for (const auto& channel : fBindingChannels) { - std::string joined = boost::algorithm::join(channel.second, ","); - infos.emplace_back(channel.first, joined); + for (const auto& c : channels) { + std::string methodKey{"chans." + c.first + "." + std::to_string(c.second - 1) + ".method"}; + if (GetProperty(methodKey) == "bind") { + for (int i = 0; i < c.second; ++i) { + std::string addressKey{"chans." + c.first + "." + std::to_string(i) + ".address"}; + info.emplace_back(addressKey, GetProperty(addressKey)); + } + } } - pmix::publish(infos); - LOG(debug) << PMIxClient() << " pmix::publish() OK: published " - << fBindingChannels.size() << " binding channels."; + if (info.size() > 0) { + pmix::publish(info); + LOG(debug) << PMIxClient() << " pmix::publish() OK: published " + << info.size() << " binding channels."; + } +} + +auto PMIxPlugin::Fence() -> void +{ + pmix::proc all(fProc); + all.rank = pmix::rank::wildcard; + + pmix::fence({all}); +} + +auto PMIxPlugin::Lookup() -> void +{ + auto channels(GetChannelInfo()); + std::vector pdata; + + for (const auto& c : channels) { + std::string methodKey{"chans." + c.first + "." + std::to_string(c.second - 1) + ".method"}; + if (GetProperty(methodKey) == "connect") { + for (int i = 0; i < c.second; ++i) { + std::string addressKey{"chans." + c.first + "." + std::to_string(i) + ".address"}; + pdata.emplace_back(); + pdata.back().set_key(addressKey); + } + } + } + + if (pdata.size() > 0) { + pmix::lookup(pdata); + LOG(debug) << PMIxClient() << " pmix::lookup() OK"; + } + + for (const auto& p : pdata) { + if (p.value.type == PMIX_UNDEF) { + LOG(debug) << PMIxClient() << " pmix::lookup() not found: key=" << p.key; + } else if (p.value.type == PMIX_STRING) { + SetProperty(p.key, p.value.data.string); + LOG(debug) << PMIxClient() << " pmix::lookup() found: key=" << p.key << ",value=" << p.value.data.string; + } else { + LOG(debug) << PMIxClient() << " pmix::lookup() wrong type returned: key=" << p.key << ",type=" << p.value.type; + } + } } } /* namespace plugins */ diff --git a/fairmq/plugins/PMIx/PMIxPlugin.h b/fairmq/plugins/PMIx/PMIxPlugin.h index 3e731182..9432e637 100644 --- a/fairmq/plugins/PMIx/PMIxPlugin.h +++ b/fairmq/plugins/PMIx/PMIxPlugin.h @@ -21,7 +21,6 @@ #include #include #include -#include #include namespace fair @@ -31,17 +30,6 @@ namespace mq namespace plugins { -struct ConnectingChannel -{ - ConnectingChannel() - : fSubChannelAddresses() - , fValues() - {} - - std::vector fSubChannelAddresses; - std::unordered_map fValues; -}; - class PMIxPlugin : public Plugin { public: @@ -51,21 +39,16 @@ class PMIxPlugin : public Plugin const std::string& homepage, PluginServices* pluginServices); ~PMIxPlugin(); - auto PMIxClient() const -> std::string - { - std::stringstream ss; - ss << "PMIx client(pid=" << fPid << ")"; - return ss.str(); - } + auto PMIxClient() const -> std::string; private: pmix::proc fProc; pid_t fPid; - std::unordered_map> fBindingChannels; - std::unordered_map fConnectingChannels; - auto FillChannelContainers() -> void; - auto PublishBoundChannels() -> void; + auto Init() -> void; + auto Publish() -> void; + auto Fence() -> void; + auto Lookup() -> void; }; Plugin::ProgOptions PMIxProgramOptions()