mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 16:46:47 +00:00
Try to reconnect on refused connection
This commit is contained in:
parent
fb42b1e2f0
commit
a8f1a4dfdb
|
@ -24,7 +24,6 @@
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
#include <sys/socket.h>
|
#include <sys/socket.h>
|
||||||
#include <thread>
|
#include <thread>
|
||||||
|
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
#include <condition_variable>
|
#include <condition_variable>
|
||||||
|
|
||||||
|
@ -196,6 +195,10 @@ try {
|
||||||
InitOfi(fRemoteAddr);
|
InitOfi(fRemoteAddr);
|
||||||
|
|
||||||
ConnectEndpoint(fControlEndpoint, Band::Control);
|
ConnectEndpoint(fControlEndpoint, Band::Control);
|
||||||
|
ConnectEndpoint(fDataEndpoint, Band::Data);
|
||||||
|
|
||||||
|
boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::SendQueueReader, this));
|
||||||
|
boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvControlQueueReader, this));
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -221,20 +224,38 @@ auto Socket::ConnectEndpoint(std::unique_ptr<asiofi::connected_endpoint>& endpoi
|
||||||
|
|
||||||
LOG(debug) << "OFI transport (" << fId << "): Sending " << band << " band connection request to " << fRemoteAddr;
|
LOG(debug) << "OFI transport (" << fId << "): Sending " << band << " band connection request to " << fRemoteAddr;
|
||||||
|
|
||||||
endpoint->connect(Context::ConvertAddress(fRemoteAddr), [&, band, type](asiofi::eq::event event) {
|
std::mutex mtx;
|
||||||
LOG(debug) << "OFI transport (" << fId << "): " << band << " band conn event happened";
|
std::condition_variable cv;
|
||||||
|
bool notified(false), connected(false);
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
endpoint->connect(Context::ConvertAddress(fRemoteAddr), [&, band](asiofi::eq::event event) {
|
||||||
|
// LOG(debug) << "OFI transport (" << fId << "): " << band << " band conn event happened";
|
||||||
|
std::unique_lock<std::mutex> lk2(mtx);
|
||||||
|
notified = true;
|
||||||
if (event == asiofi::eq::event::connected) {
|
if (event == asiofi::eq::event::connected) {
|
||||||
LOG(debug) << "OFI transport (" << fId << "): " << band << " band connected.";
|
LOG(debug) << "OFI transport (" << fId << "): " << band << " band connected.";
|
||||||
if (type == Band::Control) {
|
connected = true;
|
||||||
ConnectEndpoint(fDataEndpoint, Band::Data);
|
|
||||||
} else {
|
} else {
|
||||||
boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::SendQueueReader, this));
|
// LOG(debug) << "OFI transport (" << fId << "): " << band << " band connection refused. Trying again.";
|
||||||
boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvControlQueueReader, this));
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
LOG(error) << "asdf";
|
|
||||||
}
|
}
|
||||||
|
lk2.unlock();
|
||||||
|
cv.notify_one();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
{
|
||||||
|
std::unique_lock<std::mutex> lk(mtx);
|
||||||
|
cv.wait(lk, [&] { return notified; });
|
||||||
|
|
||||||
|
if (connected) {
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
notified = false;
|
||||||
|
lk.unlock();
|
||||||
|
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// auto Socket::ReceiveDataAddressAnnouncement() -> void
|
// auto Socket::ReceiveDataAddressAnnouncement() -> void
|
||||||
|
|
Loading…
Reference in New Issue
Block a user