Add pmix::lookup binding and cleanup

This commit is contained in:
Dennis Klein
2019-02-09 05:58:11 +01:00
committed by Dennis Klein
parent dfc6b5c4a3
commit 62781389d4
3 changed files with 107 additions and 84 deletions

View File

@@ -30,24 +30,10 @@ PMIxPlugin::PMIxPlugin(const std::string& name,
SubscribeToDeviceStateChange([&](DeviceState newState) {
switch (newState) {
case DeviceState::InitializingDevice:
if (!pmix::initialized()) {
fProc = pmix::init();
LOG(debug) << PMIxClient() << " pmix::init() OK: " << fProc
<< ",version=" << pmix::get_version();
}
FillChannelContainers();
PublishBoundChannels();
{
pmix::proc all(fProc);
all.rank = pmix::rank::wildcard;
pmix::fence({all});
}
// lookup
Init();
Publish();
Fence();
Lookup();
break;
case DeviceState::Exiting:
UnsubscribeFromDeviceStateChange();
@@ -60,8 +46,6 @@ PMIxPlugin::PMIxPlugin(const std::string& name,
PMIxPlugin::~PMIxPlugin()
{
LOG(debug) << PMIxClient() << " Finalizing PMIx session... (On success, logs seen by the RTE will stop here.)";
while (pmix::initialized()) {
try {
pmix::finalize();
@@ -72,52 +56,83 @@ PMIxPlugin::~PMIxPlugin()
}
}
auto PMIxPlugin::FillChannelContainers() -> void
auto PMIxPlugin::PMIxClient() const -> std::string
{
try {
std::unordered_map<std::string, int> channelInfo(GetChannelInfo());
std::stringstream ss;
ss << "PMIx client(pid=" << fPid << ")";
return ss.str();
}
// fill binding and connecting chans
for (const auto& c : channelInfo) {
std::string methodKey{"chans." + c.first + "." + std::to_string(c.second - 1)
+ ".method"};
if (GetProperty<std::string>(methodKey) == "bind") {
fBindingChannels.insert(std::make_pair(c.first, std::vector<std::string>()));
for (int i = 0; i < c.second; ++i) {
fBindingChannels.at(c.first).push_back(GetProperty<std::string>(
std::string{"chans." + c.first + "." + std::to_string(i) + ".address"}));
}
} else if (GetProperty<std::string>(methodKey) == "connect") {
fConnectingChannels.insert(std::make_pair(c.first, ConnectingChannel()));
LOG(debug) << "preparing to connect: " << c.first << " with " << c.second
<< " sub-channels.";
for (int i = 0; i < c.second; ++i) {
fConnectingChannels.at(c.first).fSubChannelAddresses.push_back(std::string());
}
} else {
LOG(error) << "Cannot update address configuration. Channel method (bind/connect) "
"not specified.";
return;
}
}
} catch (const std::exception& e) {
LOG(error) << "Error filling channel containers: " << e.what();
auto PMIxPlugin::Init() -> void
{
if (!pmix::initialized()) {
fProc = pmix::init();
LOG(debug) << PMIxClient() << " pmix::init() OK: " << fProc
<< ",version=" << pmix::get_version();
}
}
auto PMIxPlugin::PublishBoundChannels() -> void
auto PMIxPlugin::Publish() -> void
{
std::vector<pmix::info> infos;
infos.reserve(fBindingChannels.size());
auto channels(GetChannelInfo());
std::vector<pmix::info> info;
for (const auto& channel : fBindingChannels) {
std::string joined = boost::algorithm::join(channel.second, ",");
infos.emplace_back(channel.first, joined);
for (const auto& c : channels) {
std::string methodKey{"chans." + c.first + "." + std::to_string(c.second - 1) + ".method"};
if (GetProperty<std::string>(methodKey) == "bind") {
for (int i = 0; i < c.second; ++i) {
std::string addressKey{"chans." + c.first + "." + std::to_string(i) + ".address"};
info.emplace_back(addressKey, GetProperty<std::string>(addressKey));
}
}
}
pmix::publish(infos);
LOG(debug) << PMIxClient() << " pmix::publish() OK: published "
<< fBindingChannels.size() << " binding channels.";
if (info.size() > 0) {
pmix::publish(info);
LOG(debug) << PMIxClient() << " pmix::publish() OK: published "
<< info.size() << " binding channels.";
}
}
auto PMIxPlugin::Fence() -> void
{
pmix::proc all(fProc);
all.rank = pmix::rank::wildcard;
pmix::fence({all});
}
auto PMIxPlugin::Lookup() -> void
{
auto channels(GetChannelInfo());
std::vector<pmix::pdata> pdata;
for (const auto& c : channels) {
std::string methodKey{"chans." + c.first + "." + std::to_string(c.second - 1) + ".method"};
if (GetProperty<std::string>(methodKey) == "connect") {
for (int i = 0; i < c.second; ++i) {
std::string addressKey{"chans." + c.first + "." + std::to_string(i) + ".address"};
pdata.emplace_back();
pdata.back().set_key(addressKey);
}
}
}
if (pdata.size() > 0) {
pmix::lookup(pdata);
LOG(debug) << PMIxClient() << " pmix::lookup() OK";
}
for (const auto& p : pdata) {
if (p.value.type == PMIX_UNDEF) {
LOG(debug) << PMIxClient() << " pmix::lookup() not found: key=" << p.key;
} else if (p.value.type == PMIX_STRING) {
SetProperty<std::string>(p.key, p.value.data.string);
LOG(debug) << PMIxClient() << " pmix::lookup() found: key=" << p.key << ",value=" << p.value.data.string;
} else {
LOG(debug) << PMIxClient() << " pmix::lookup() wrong type returned: key=" << p.key << ",type=" << p.value.type;
}
}
}
} /* namespace plugins */