Retry ofi connection

This commit is contained in:
Dennis Klein 2019-02-26 16:18:18 +01:00 committed by Dennis Klein
parent 02e1511667
commit 241bf08337
2 changed files with 44 additions and 56 deletions

View File

@ -17,12 +17,14 @@
#include <boost/asio/bind_executor.hpp> #include <boost/asio/bind_executor.hpp>
#include <boost/asio/buffer.hpp> #include <boost/asio/buffer.hpp>
#include <boost/asio/post.hpp> #include <boost/asio/post.hpp>
#include <chrono>
#include <cstring> #include <cstring>
#include <functional> #include <functional>
#include <memory> #include <memory>
#include <sstream> #include <sstream>
#include <string.h> #include <string.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <thread>
#include <mutex> #include <mutex>
#include <condition_variable> #include <condition_variable>
@ -149,7 +151,7 @@ auto Socket::BindControlEndpoint() -> void
{ {
assert(!fControlEndpoint); assert(!fControlEndpoint);
fPassiveEndpoint->listen([&](fid_t /*handle*/, asiofi::info&& info) { fPassiveEndpoint->listen([&](asiofi::info&& info) {
LOG(debug) << "OFI transport (" << fId LOG(debug) << "OFI transport (" << fId
<< "): control band connection request received. Accepting ..."; << "): control band connection request received. Accepting ...";
fControlEndpoint = tools::make_unique<asiofi::connected_endpoint>( fControlEndpoint = tools::make_unique<asiofi::connected_endpoint>(
@ -169,7 +171,7 @@ auto Socket::BindDataEndpoint() -> void
{ {
assert(!fDataEndpoint); assert(!fDataEndpoint);
fPassiveEndpoint->listen([&](fid_t /*handle*/, asiofi::info&& info) { fPassiveEndpoint->listen([&](asiofi::info&& info) {
LOG(debug) << "OFI transport (" << fId LOG(debug) << "OFI transport (" << fId
<< "): data band connection request received. Accepting ..."; << "): data band connection request received. Accepting ...";
fDataEndpoint = tools::make_unique<asiofi::connected_endpoint>( fDataEndpoint = tools::make_unique<asiofi::connected_endpoint>(
@ -195,86 +197,65 @@ try {
InitOfi(fRemoteAddr); InitOfi(fRemoteAddr);
ConnectControlEndpoint(); ConnectEndpoint(fControlEndpoint, Band::Control);
ConnectDataEndpoint(); ConnectEndpoint(fDataEndpoint, Band::Data);
boost::asio::post(fIoStrand, std::bind(&Socket::SendQueueReader, this)); boost::asio::post(fIoStrand, std::bind(&Socket::SendQueueReader, this));
boost::asio::post(fIoStrand, std::bind(&Socket::RecvControlQueueReader, this)); boost::asio::post(fIoStrand, std::bind(&Socket::RecvControlQueueReader, this));
return true; return true;
} }
// TODO catch the correct ofi error
catch (const SilentSocketError& e) catch (const SilentSocketError& e)
{ {
// do not print error in this case, this is handled by FairMQDevice // do not print error in this case, this is handled by FairMQDevice
// in case no connection could be established after trying a number of random ports from a range. // in case no connection could be established after trying a number of random ports from a range.
return false; return false;
} }
catch (const SocketError& e) catch (const std::exception& e)
{ {
LOG(error) << "OFI transport: " << e.what(); LOG(error) << "OFI transport: " << e.what();
return false; return false;
} }
auto Socket::ConnectControlEndpoint() -> void auto Socket::ConnectEndpoint(std::unique_ptr<asiofi::connected_endpoint>& endpoint, Band type) -> void
{ {
assert(!fControlEndpoint); assert(!endpoint);
std::mutex m; std::mutex m;
std::condition_variable cv; std::condition_variable cv;
bool completed(false); asiofi::eq::event status(asiofi::eq::event::connrefused);
std::string band(type == Band::Control ? "control" : "data");
fControlEndpoint = endpoint = tools::make_unique<asiofi::connected_endpoint>(fIoStrand.context(), *fOfiDomain);
tools::make_unique<asiofi::connected_endpoint>(fIoStrand.context(), *fOfiDomain); endpoint->enable();
fControlEndpoint->enable();
fControlEndpoint->connect(Context::ConvertAddress(fRemoteAddr), [&]() { LOG(debug) << "OFI transport (" << fId << "): Sending " << band << " band connection request to "
<< fRemoteAddr;
while (true) {
endpoint->connect(Context::ConvertAddress(fRemoteAddr), [&](asiofi::eq::event event) {
{ {
std::unique_lock<std::mutex> lk(m); std::unique_lock<std::mutex> lk(m);
completed = true; status = event;
} }
cv.notify_one(); cv.notify_one();
}); });
LOG(debug) << "OFI transport (" << fId << "): control band connection request sent to "
<< fRemoteAddr;
{ {
std::unique_lock<std::mutex> lk(m); std::unique_lock<std::mutex> lk(m);
cv.wait(lk, [&]() { return completed; }); cv.wait(lk);
if (status == asiofi::eq::event::connected) {
break;
} else {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
continue;
}
} }
LOG(debug) << "OFI transport (" << fId << "): control band connected.";
} }
auto Socket::ConnectDataEndpoint() -> void LOG(debug) << "OFI transport (" << fId << "): " << band << " band connected.";
{
assert(!fDataEndpoint);
std::mutex m;
std::condition_variable cv;
bool completed(false);
fDataEndpoint =
tools::make_unique<asiofi::connected_endpoint>(fIoStrand.context(), *fOfiDomain);
fDataEndpoint->enable();
fDataEndpoint->connect(Context::ConvertAddress(fRemoteAddr), [&]() {
{
std::unique_lock<std::mutex> lk(m);
completed = true;
}
cv.notify_one();
});
LOG(debug) << "OFI transport (" << fId << "): data band connection request sent to "
<< fRemoteAddr;
{
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, [&]() { return completed; });
}
LOG(debug) << "OFI transport (" << fId << "): data band connected.";
} }
// auto Socket::ReceiveDataAddressAnnouncement() -> void // auto Socket::ReceiveDataAddressAnnouncement() -> void
@ -362,15 +343,22 @@ auto Socket::Receive(MessagePtr& msg, const int /*timeout*/) -> int
} }
} }
auto Socket::Send(std::vector<MessagePtr>& msgVec, const int timeout) -> int64_t { return SendImpl(msgVec, 0, timeout); } auto Socket::Send(std::vector<MessagePtr>& msgVec, const int timeout) -> int64_t
auto Socket::Receive(std::vector<MessagePtr>& msgVec, const int timeout) -> int64_t { return ReceiveImpl(msgVec, 0, timeout); } {
return SendImpl(msgVec, 0, timeout);
}
auto Socket::Receive(std::vector<MessagePtr>& msgVec, const int timeout) -> int64_t
{
return ReceiveImpl(msgVec, 0, timeout);
}
auto Socket::SendQueueReader() -> void auto Socket::SendQueueReader() -> void
{ {
fSendSem.async_wait( fSendSem.async_wait(
boost::asio::bind_executor(fIoStrand, [&](const boost::system::error_code& ec) { boost::asio::bind_executor(fIoStrand, [&](const boost::system::error_code& ec) {
if (!ec) { if (!ec) {
// LOG(debug) << "OFI transport (" << fId << "): < Wait fSendSem=" << fSendSem.get_value(); // LOG(debug) << "OFI transport (" << fId << "): < Wait fSendSem=" <<
// fSendSem.get_value();
fSendQueueRead.async_receive([&](const boost::system::error_code& ec2, fSendQueueRead.async_receive([&](const boost::system::error_code& ec2,
azmq::message& zmsg, azmq::message& zmsg,
size_t bytes_transferred) { size_t bytes_transferred) {

View File

@ -117,8 +117,8 @@ class Socket final : public fair::mq::Socket
auto InitOfi(Address addr) -> void; auto InitOfi(Address addr) -> void;
auto BindControlEndpoint() -> void; auto BindControlEndpoint() -> void;
auto BindDataEndpoint() -> void; auto BindDataEndpoint() -> void;
auto ConnectControlEndpoint() -> void; enum class Band { Control, Data };
auto ConnectDataEndpoint() -> void; auto ConnectEndpoint(std::unique_ptr<asiofi::connected_endpoint>& endpoint, Band type) -> void;
// auto ReceiveDataAddressAnnouncement() -> void; // auto ReceiveDataAddressAnnouncement() -> void;
}; /* class Socket */ }; /* class Socket */