mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 08:41:16 +00:00
Fix connection logic
This commit is contained in:
parent
7df278818c
commit
5eae5ccd31
|
@ -126,7 +126,7 @@ try {
|
||||||
InitOfi(fLocalAddr);
|
InitOfi(fLocalAddr);
|
||||||
|
|
||||||
fPassiveEndpoint = tools::make_unique<asiofi::passive_endpoint>(fContext.GetIoContext(), *fOfiFabric);
|
fPassiveEndpoint = tools::make_unique<asiofi::passive_endpoint>(fContext.GetIoContext(), *fOfiFabric);
|
||||||
fPassiveEndpoint->set_local_address(Context::ConvertAddress(fLocalAddr));
|
//fPassiveEndpoint->set_local_address(Context::ConvertAddress(fLocalAddr));
|
||||||
|
|
||||||
BindControlEndpoint();
|
BindControlEndpoint();
|
||||||
|
|
||||||
|
@ -195,11 +195,6 @@ try {
|
||||||
|
|
||||||
ConnectEndpoint(fControlEndpoint, Band::Control);
|
ConnectEndpoint(fControlEndpoint, Band::Control);
|
||||||
|
|
||||||
ConnectEndpoint(fDataEndpoint, Band::Data);
|
|
||||||
|
|
||||||
boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::SendQueueReader, this));
|
|
||||||
boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvControlQueueReader, this));
|
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
catch (const SilentSocketError& e)
|
catch (const SilentSocketError& e)
|
||||||
|
@ -217,42 +212,27 @@ auto Socket::ConnectEndpoint(std::unique_ptr<asiofi::connected_endpoint>& endpoi
|
||||||
{
|
{
|
||||||
assert(!endpoint);
|
assert(!endpoint);
|
||||||
|
|
||||||
std::mutex m;
|
|
||||||
std::condition_variable cv;
|
|
||||||
asiofi::eq::event status(asiofi::eq::event::connrefused);
|
|
||||||
std::string band(type == Band::Control ? "control" : "data");
|
std::string band(type == Band::Control ? "control" : "data");
|
||||||
|
|
||||||
endpoint = tools::make_unique<asiofi::connected_endpoint>(fContext.GetIoContext(), *fOfiDomain);
|
endpoint = tools::make_unique<asiofi::connected_endpoint>(fContext.GetIoContext(), *fOfiDomain);
|
||||||
endpoint->enable();
|
endpoint->enable();
|
||||||
|
|
||||||
LOG(debug) << "OFI transport (" << fId << "): Sending " << band << " band connection request to "
|
LOG(debug) << "OFI transport (" << fId << "): Sending " << band << " band connection request to " << fRemoteAddr;
|
||||||
<< fRemoteAddr;
|
|
||||||
|
|
||||||
while (true) {
|
endpoint->connect(Context::ConvertAddress(fRemoteAddr), [&, band, type](asiofi::eq::event event) {
|
||||||
endpoint->connect(Context::ConvertAddress(fRemoteAddr), [&](asiofi::eq::event event) {
|
LOG(debug) << "OFI transport (" << fId << "): " << band << " band conn event happened";
|
||||||
LOG(debug) << "OFI transport (" << fId << "): conn event happened";
|
if (event == asiofi::eq::event::connected) {
|
||||||
{
|
LOG(debug) << "OFI transport (" << fId << "): " << band << " band connected.";
|
||||||
std::unique_lock<std::mutex> lk(m);
|
if (type == Band::Control) {
|
||||||
status = event;
|
ConnectEndpoint(fDataEndpoint, Band::Data);
|
||||||
}
|
|
||||||
cv.notify_one();
|
|
||||||
});
|
|
||||||
|
|
||||||
LOG(debug) << "OFI transport (" << fId << "): endpoint->connect called.";
|
|
||||||
{
|
|
||||||
std::unique_lock<std::mutex> lk(m);
|
|
||||||
cv.wait(lk);
|
|
||||||
|
|
||||||
if (status == asiofi::eq::event::connected) {
|
|
||||||
break;
|
|
||||||
} else {
|
} else {
|
||||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::SendQueueReader, this));
|
||||||
continue;
|
boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvControlQueueReader, this));
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
LOG(error) << "asdf";
|
||||||
}
|
}
|
||||||
}
|
});
|
||||||
|
|
||||||
LOG(debug) << "OFI transport (" << fId << "): " << band << " band connected.";
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// auto Socket::ReceiveDataAddressAnnouncement() -> void
|
// auto Socket::ReceiveDataAddressAnnouncement() -> void
|
||||||
|
|
Loading…
Reference in New Issue
Block a user