From a8f1a4dfdbd567636c43e174feb23db4bc593da4 Mon Sep 17 00:00:00 2001 From: Dennis Klein Date: Tue, 12 Mar 2019 17:48:06 +0100 Subject: [PATCH] Try to reconnect on refused connection --- fairmq/ofi/Socket.cxx | 45 +++++++++++++++++++++++++++++++------------ 1 file changed, 33 insertions(+), 12 deletions(-) diff --git a/fairmq/ofi/Socket.cxx b/fairmq/ofi/Socket.cxx index ab93d095..5474dd86 100644 --- a/fairmq/ofi/Socket.cxx +++ b/fairmq/ofi/Socket.cxx @@ -24,7 +24,6 @@ #include #include #include - #include #include @@ -196,6 +195,10 @@ try { InitOfi(fRemoteAddr); ConnectEndpoint(fControlEndpoint, Band::Control); + ConnectEndpoint(fDataEndpoint, Band::Data); + + boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::SendQueueReader, this)); + boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvControlQueueReader, this)); return true; } @@ -221,20 +224,38 @@ auto Socket::ConnectEndpoint(std::unique_ptr& endpoi LOG(debug) << "OFI transport (" << fId << "): Sending " << band << " band connection request to " << fRemoteAddr; - endpoint->connect(Context::ConvertAddress(fRemoteAddr), [&, band, type](asiofi::eq::event event) { - LOG(debug) << "OFI transport (" << fId << "): " << band << " band conn event happened"; - if (event == asiofi::eq::event::connected) { - LOG(debug) << "OFI transport (" << fId << "): " << band << " band connected."; - if (type == Band::Control) { - ConnectEndpoint(fDataEndpoint, Band::Data); + std::mutex mtx; + std::condition_variable cv; + bool notified(false), connected(false); + + while (true) { + endpoint->connect(Context::ConvertAddress(fRemoteAddr), [&, band](asiofi::eq::event event) { + // LOG(debug) << "OFI transport (" << fId << "): " << band << " band conn event happened"; + std::unique_lock lk2(mtx); + notified = true; + if (event == asiofi::eq::event::connected) { + LOG(debug) << "OFI transport (" << fId << "): " << band << " band connected."; + connected = true; } else { - boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::SendQueueReader, this)); - boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvControlQueueReader, this)); + // LOG(debug) << "OFI transport (" << fId << "): " << band << " band connection refused. Trying again."; + } + lk2.unlock(); + cv.notify_one(); + }); + + { + std::unique_lock lk(mtx); + cv.wait(lk, [&] { return notified; }); + + if (connected) { + break; + } else { + notified = false; + lk.unlock(); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); } - } else { - LOG(error) << "asdf"; } - }); + } } // auto Socket::ReceiveDataAddressAnnouncement() -> void