PMIx plugin: Add barriers and fix lookup of multiple keys

This commit is contained in:
Dennis Klein 2019-10-07 12:31:22 +02:00 committed by Dennis Klein
parent e1134321dd
commit caeee626a3

View File

@ -1,5 +1,5 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
@ -8,7 +8,6 @@
#include "PMIxPlugin.h" #include "PMIxPlugin.h"
#include <boost/algorithm/string/join.hpp>
#include <fairmq/Tools.h> #include <fairmq/Tools.h>
#include <stdexcept> #include <stdexcept>
@ -27,14 +26,28 @@ PMIxPlugin::PMIxPlugin(const std::string& name,
: Plugin(name, version, maintainer, homepage, pluginServices) : Plugin(name, version, maintainer, homepage, pluginServices)
, fPid(getpid()) , fPid(getpid())
{ {
Init();
SetProperty<std::string>("id", std::string(fProc.nspace) + "_" + std::to_string(fProc.rank));
Fence();
SubscribeToDeviceStateChange([&](DeviceState newState) { SubscribeToDeviceStateChange([&](DeviceState newState) {
switch (newState) { switch (newState) {
case DeviceState::Connecting: case DeviceState::Idle:
Init(); Fence();
Publish(); break;
Fence(); case DeviceState::Bound:
Lookup(); Publish();
break; Fence();
break;
case DeviceState::Connecting:
Lookup();
break;
case DeviceState::DeviceReady:
Fence();
break;
case DeviceState::Ready:
Fence();
break;
case DeviceState::Exiting: case DeviceState::Exiting:
UnsubscribeFromDeviceStateChange(); UnsubscribeFromDeviceStateChange();
break; break;
@ -49,9 +62,9 @@ PMIxPlugin::~PMIxPlugin()
while (pmix::initialized()) { while (pmix::initialized()) {
try { try {
pmix::finalize(); pmix::finalize();
LOG(debug) << PMIxClient() << " pmix::finalize() OK"; LOG(debug) << PMIxClient() << "pmix::finalize() OK";
} catch (const pmix::runtime_error& e) { } catch (const pmix::runtime_error& e) {
LOG(debug) << PMIxClient() << " pmix::finalize() failed: " << e.what(); LOG(debug) << PMIxClient() << "pmix::finalize() failed: " << e.what();
} }
} }
} }
@ -59,7 +72,7 @@ PMIxPlugin::~PMIxPlugin()
auto PMIxPlugin::PMIxClient() const -> std::string auto PMIxPlugin::PMIxClient() const -> std::string
{ {
std::stringstream ss; std::stringstream ss;
ss << "PMIx client(pid=" << fPid << ")"; ss << "PMIx client(pid=" << fPid << ") ";
return ss.str(); return ss.str();
} }
@ -67,7 +80,7 @@ auto PMIxPlugin::Init() -> void
{ {
if (!pmix::initialized()) { if (!pmix::initialized()) {
fProc = pmix::init(); fProc = pmix::init();
LOG(debug) << PMIxClient() << " pmix::init() OK: " << fProc LOG(debug) << PMIxClient() << "pmix::init() OK: " << fProc
<< ",version=" << pmix::get_version(); << ",version=" << pmix::get_version();
} }
} }
@ -90,7 +103,7 @@ auto PMIxPlugin::Publish() -> void
if (info.size() > 0) { if (info.size() > 0) {
pmix::publish(info); pmix::publish(info);
LOG(debug) << PMIxClient() << " pmix::publish() OK: published " LOG(debug) << PMIxClient() << "pmix::publish() OK: published "
<< info.size() << " binding channels."; << info.size() << " binding channels.";
} }
} }
@ -101,44 +114,43 @@ auto PMIxPlugin::Fence() -> void
all.rank = pmix::rank::wildcard; all.rank = pmix::rank::wildcard;
pmix::fence({all}); pmix::fence({all});
LOG(debug) << PMIxClient() << "pmix::fence() OK";
} }
auto PMIxPlugin::Lookup() -> void auto PMIxPlugin::Lookup() -> void
{ {
auto channels(GetChannelInfo()); auto channels(GetChannelInfo());
std::vector<pmix::pdata> pdata;
for (const auto& c : channels) { for (const auto& c : channels) {
std::string methodKey{"chans." + c.first + "." + std::to_string(c.second - 1) + ".method"}; std::string methodKey{"chans." + c.first + "." + std::to_string(c.second - 1) + ".method"};
if (GetProperty<std::string>(methodKey) == "connect") { if (GetProperty<std::string>(methodKey) == "connect") {
for (int i = 0; i < c.second; ++i) { for (int i = 0; i < c.second; ++i) {
std::vector<pmix::pdata> pdata;
std::string addressKey{"chans." + c.first + "." + std::to_string(i) + ".address"}; std::string addressKey{"chans." + c.first + "." + std::to_string(i) + ".address"};
pdata.emplace_back(); pdata.emplace_back();
pdata.back().set_key(addressKey); pdata.back().set_key(addressKey);
std::vector<pmix::info> info;
info.emplace_back(PMIX_WAIT, static_cast<int>(pdata.size()));
if (pdata.size() > 0) {
pmix::lookup(pdata, info);
LOG(debug) << PMIxClient() << "pmix::lookup() OK";
}
for (const auto& p : pdata) {
if (p.value.type == PMIX_UNDEF) {
LOG(debug) << PMIxClient() << "pmix::lookup() not found: key=" << p.key;
} else if (p.value.type == PMIX_STRING) {
LOG(debug) << PMIxClient() << "pmix::lookup() found:"
<< " key=" << p.key << ",value=" << p.value.data.string;
SetProperty<std::string>(p.key, p.value.data.string);
} else {
LOG(debug) << PMIxClient() << "pmix::lookup() wrong type returned: "
<< "key=" << p.key << ",type=" << p.value.type;
}
}
} }
} }
} }
if (pdata.size() > 0) {
pmix::lookup(pdata);
LOG(debug) << PMIxClient() << " pmix::lookup() OK";
}
LOG(info) << pdata.size();
for (const auto& p : pdata) {
if (p.value.type == PMIX_UNDEF) {
LOG(debug) << PMIxClient() << " pmix::lookup() not found: key=" << p.key;
} else if (p.value.type == PMIX_STRING) {
LOG(debug) << PMIxClient() << " pmix::lookup() found: key=" << p.key << ",value=" << p.value.data.string;
SetProperty<std::string>(p.key, p.value.data.string);
LOG(info) << GetProperty<std::string>(p.key);
} else {
LOG(debug) << PMIxClient() << " pmix::lookup() wrong type returned: key=" << p.key << ",type=" << p.value.type;
}
}
LOG(info) << pdata.size();
} }
} /* namespace plugins */ } /* namespace plugins */