Remove asio strand

This commit is contained in:
Dennis Klein 2019-02-27 18:06:53 +01:00 committed by Dennis Klein
parent 9ffaa55181
commit ad198edd59
2 changed files with 42 additions and 46 deletions

View File

@ -14,7 +14,6 @@
#include <asiofi.hpp> #include <asiofi.hpp>
#include <azmq/message.hpp> #include <azmq/message.hpp>
#include <boost/asio/bind_executor.hpp>
#include <boost/asio/buffer.hpp> #include <boost/asio/buffer.hpp>
#include <boost/asio/post.hpp> #include <boost/asio/post.hpp>
#include <chrono> #include <chrono>
@ -51,15 +50,14 @@ Socket::Socket(Context& context, const string& type, const string& name, const s
, fBytesRx(0) , fBytesRx(0)
, fMessagesTx(0) , fMessagesTx(0)
, fMessagesRx(0) , fMessagesRx(0)
, fIoStrand(fContext.GetIoContext())
, fSndTimeout(100) , fSndTimeout(100)
, fRcvTimeout(100) , fRcvTimeout(100)
, fSendQueueWrite(fIoStrand.context(), ZMQ_PUSH) , fSendQueueWrite(fContext.GetIoContext(), ZMQ_PUSH)
, fSendQueueRead(fIoStrand.context(), ZMQ_PULL) , fSendQueueRead(fContext.GetIoContext(), ZMQ_PULL)
, fRecvQueueWrite(fIoStrand.context(), ZMQ_PUSH) , fRecvQueueWrite(fContext.GetIoContext(), ZMQ_PUSH)
, fRecvQueueRead(fIoStrand.context(), ZMQ_PULL) , fRecvQueueRead(fContext.GetIoContext(), ZMQ_PULL)
, fSendSem(fIoStrand.context(), 300) , fSendSem(fContext.GetIoContext(), 300)
, fRecvSem(fIoStrand.context(), 300) , fRecvSem(fContext.GetIoContext(), 300)
, fNeedOfiMemoryRegistration(false) , fNeedOfiMemoryRegistration(false)
{ {
if (type != "pair") { if (type != "pair") {
@ -110,7 +108,7 @@ auto Socket::InitOfi(Address addr) -> void
fOfiInfo = tools::make_unique<asiofi::info>(addr.Ip.c_str(), std::to_string(addr.Port).c_str(), FI_SOURCE, hints); fOfiInfo = tools::make_unique<asiofi::info>(addr.Ip.c_str(), std::to_string(addr.Port).c_str(), FI_SOURCE, hints);
} }
// LOG(debug) << "OFI transport: " << *fOfiInfo; LOG(debug) << "OFI transport: " << *fOfiInfo;
fOfiFabric = tools::make_unique<asiofi::fabric>(*fOfiInfo); fOfiFabric = tools::make_unique<asiofi::fabric>(*fOfiInfo);
@ -127,7 +125,7 @@ try {
InitOfi(fLocalAddr); InitOfi(fLocalAddr);
fPassiveEndpoint = tools::make_unique<asiofi::passive_endpoint>(fIoStrand.context(), *fOfiFabric); fPassiveEndpoint = tools::make_unique<asiofi::passive_endpoint>(fContext.GetIoContext(), *fOfiFabric);
fPassiveEndpoint->set_local_address(Context::ConvertAddress(fLocalAddr)); fPassiveEndpoint->set_local_address(Context::ConvertAddress(fLocalAddr));
BindControlEndpoint(); BindControlEndpoint();
@ -155,7 +153,7 @@ auto Socket::BindControlEndpoint() -> void
LOG(debug) << "OFI transport (" << fId LOG(debug) << "OFI transport (" << fId
<< "): control band connection request received. Accepting ..."; << "): control band connection request received. Accepting ...";
fControlEndpoint = tools::make_unique<asiofi::connected_endpoint>( fControlEndpoint = tools::make_unique<asiofi::connected_endpoint>(
fIoStrand.context(), *fOfiDomain, info); fContext.GetIoContext(), *fOfiDomain, info);
fControlEndpoint->enable(); fControlEndpoint->enable();
fControlEndpoint->accept([&]() { fControlEndpoint->accept([&]() {
LOG(debug) << "OFI transport (" << fId << "): control band connection accepted."; LOG(debug) << "OFI transport (" << fId << "): control band connection accepted.";
@ -175,13 +173,13 @@ auto Socket::BindDataEndpoint() -> void
LOG(debug) << "OFI transport (" << fId LOG(debug) << "OFI transport (" << fId
<< "): data band connection request received. Accepting ..."; << "): data band connection request received. Accepting ...";
fDataEndpoint = tools::make_unique<asiofi::connected_endpoint>( fDataEndpoint = tools::make_unique<asiofi::connected_endpoint>(
fIoStrand.context(), *fOfiDomain, info); fContext.GetIoContext(), *fOfiDomain, info);
fDataEndpoint->enable(); fDataEndpoint->enable();
fDataEndpoint->accept([&]() { fDataEndpoint->accept([&]() {
LOG(debug) << "OFI transport (" << fId << "): data band connection accepted."; LOG(debug) << "OFI transport (" << fId << "): data band connection accepted.";
boost::asio::post(fIoStrand, std::bind(&Socket::SendQueueReader, this)); boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::SendQueueReader, this));
boost::asio::post(fIoStrand, std::bind(&Socket::RecvControlQueueReader, this)); boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvControlQueueReader, this));
}); });
}); });
@ -191,9 +189,7 @@ auto Socket::BindDataEndpoint() -> void
auto Socket::Connect(const string& address) -> bool auto Socket::Connect(const string& address) -> bool
try { try {
fRemoteAddr = Context::VerifyAddress(address); fRemoteAddr = Context::VerifyAddress(address);
if (fRemoteAddr.Protocol == "verbs") { fNeedOfiMemoryRegistration = (fRemoteAddr.Protocol == "verbs");
fNeedOfiMemoryRegistration = true;
}
InitOfi(fRemoteAddr); InitOfi(fRemoteAddr);
@ -201,15 +197,14 @@ try {
ConnectEndpoint(fDataEndpoint, Band::Data); ConnectEndpoint(fDataEndpoint, Band::Data);
boost::asio::post(fIoStrand, std::bind(&Socket::SendQueueReader, this)); boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::SendQueueReader, this));
boost::asio::post(fIoStrand, std::bind(&Socket::RecvControlQueueReader, this)); boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvControlQueueReader, this));
return true; return true;
} }
catch (const SilentSocketError& e) catch (const SilentSocketError& e)
{ {
// do not print error in this case, this is handled by FairMQDevice // do not print error in this case, this is handled by FairMQDevice
// in case no connection could be established after trying a number of random ports from a range.
return false; return false;
} }
catch (const std::exception& e) catch (const std::exception& e)
@ -227,7 +222,7 @@ auto Socket::ConnectEndpoint(std::unique_ptr<asiofi::connected_endpoint>& endpoi
asiofi::eq::event status(asiofi::eq::event::connrefused); asiofi::eq::event status(asiofi::eq::event::connrefused);
std::string band(type == Band::Control ? "control" : "data"); std::string band(type == Band::Control ? "control" : "data");
endpoint = tools::make_unique<asiofi::connected_endpoint>(fIoStrand.context(), *fOfiDomain); endpoint = tools::make_unique<asiofi::connected_endpoint>(fContext.GetIoContext(), *fOfiDomain);
endpoint->enable(); endpoint->enable();
LOG(debug) << "OFI transport (" << fId << "): Sending " << band << " band connection request to " LOG(debug) << "OFI transport (" << fId << "): Sending " << band << " band connection request to "
@ -235,6 +230,7 @@ auto Socket::ConnectEndpoint(std::unique_ptr<asiofi::connected_endpoint>& endpoi
while (true) { while (true) {
endpoint->connect(Context::ConvertAddress(fRemoteAddr), [&](asiofi::eq::event event) { endpoint->connect(Context::ConvertAddress(fRemoteAddr), [&](asiofi::eq::event event) {
LOG(debug) << "OFI transport (" << fId << "): conn event happened";
{ {
std::unique_lock<std::mutex> lk(m); std::unique_lock<std::mutex> lk(m);
status = event; status = event;
@ -242,6 +238,7 @@ auto Socket::ConnectEndpoint(std::unique_ptr<asiofi::connected_endpoint>& endpoi
cv.notify_one(); cv.notify_one();
}); });
LOG(debug) << "OFI transport (" << fId << "): endpoint->connect called.";
{ {
std::unique_lock<std::mutex> lk(m); std::unique_lock<std::mutex> lk(m);
cv.wait(lk); cv.wait(lk);
@ -347,6 +344,7 @@ auto Socket::Send(std::vector<MessagePtr>& msgVec, const int timeout) -> int64_t
{ {
return SendImpl(msgVec, 0, timeout); return SendImpl(msgVec, 0, timeout);
} }
auto Socket::Receive(std::vector<MessagePtr>& msgVec, const int timeout) -> int64_t auto Socket::Receive(std::vector<MessagePtr>& msgVec, const int timeout) -> int64_t
{ {
return ReceiveImpl(msgVec, 0, timeout); return ReceiveImpl(msgVec, 0, timeout);
@ -354,20 +352,19 @@ auto Socket::Receive(std::vector<MessagePtr>& msgVec, const int timeout) -> int6
auto Socket::SendQueueReader() -> void auto Socket::SendQueueReader() -> void
{ {
fSendSem.async_wait( fSendSem.async_wait([&](const boost::system::error_code& ec) {
boost::asio::bind_executor(fIoStrand, [&](const boost::system::error_code& ec) { if (!ec) {
if (!ec) { // LOG(debug) << "OFI transport (" << fId << "): < Wait fSendSem=" <<
// LOG(debug) << "OFI transport (" << fId << "): < Wait fSendSem=" << // fSendSem.get_value();
// fSendSem.get_value(); fSendQueueRead.async_receive([&](const boost::system::error_code& ec2,
fSendQueueRead.async_receive([&](const boost::system::error_code& ec2, azmq::message& zmsg,
azmq::message& zmsg, size_t bytes_transferred) {
size_t bytes_transferred) { if (!ec2) {
if (!ec2) { OnSend(zmsg, bytes_transferred);
OnSend(zmsg, bytes_transferred); }
} });
}); }
} });
}));
} }
auto Socket::OnSend(azmq::message& zmsg, size_t /*bytes_transferred*/) -> void auto Socket::OnSend(azmq::message& zmsg, size_t /*bytes_transferred*/) -> void
@ -444,23 +441,23 @@ auto Socket::OnSend(azmq::message& zmsg, size_t /*bytes_transferred*/) -> void
}); });
} }
boost::asio::post(fIoStrand, std::bind(&Socket::SendQueueReader, this)); boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::SendQueueReader, this));
// LOG(debug) << "OFI transport (" << fId << "): LEAVE OnSend"; // LOG(debug) << "OFI transport (" << fId << "): LEAVE OnSend";
} }
auto Socket::RecvControlQueueReader() -> void auto Socket::RecvControlQueueReader() -> void
{ {
fRecvSem.async_wait( fRecvSem.async_wait([&](const boost::system::error_code& ec) {
boost::asio::bind_executor(fIoStrand, [&](const boost::system::error_code& ec) { if (!ec) {
if (!ec) { auto ctrl = MakeControlMessageWithPmr<PostBuffer>(&fControlMemPool);
auto ctrl = MakeControlMessageWithPmr<PostBuffer>(&fControlMemPool); auto ctrl_msg = boost::asio::mutable_buffer(ctrl.get(), sizeof(PostBuffer));
auto ctrl_msg = boost::asio::mutable_buffer(ctrl.get(), sizeof(PostBuffer));
fControlEndpoint->recv(ctrl_msg, [&, ctrl2 = std::move(ctrl)](boost::asio::mutable_buffer) mutable { fControlEndpoint->recv(
ctrl_msg, [&, ctrl2 = std::move(ctrl)](boost::asio::mutable_buffer) mutable {
OnRecvControl(std::move(ctrl2)); OnRecvControl(std::move(ctrl2));
}); });
} }
})); });
} }
auto Socket::OnRecvControl(ofi::unique_ptr<PostBuffer> ctrl) -> void auto Socket::OnRecvControl(ofi::unique_ptr<PostBuffer> ctrl) -> void
@ -542,7 +539,7 @@ auto Socket::OnRecvControl(ofi::unique_ptr<PostBuffer> ctrl) -> void
}); });
} }
boost::asio::post(fIoStrand, std::bind(&Socket::RecvControlQueueReader, this)); boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvControlQueueReader, this));
// LOG(debug) << "OFI transport (" << fId << "): LEAVE OnRecvControl"; // LOG(debug) << "OFI transport (" << fId << "): LEAVE OnRecvControl";
} }

View File

@ -95,7 +95,6 @@ class Socket final : public fair::mq::Socket
std::atomic<unsigned long> fMessagesRx; std::atomic<unsigned long> fMessagesRx;
Address fRemoteAddr; Address fRemoteAddr;
Address fLocalAddr; Address fLocalAddr;
boost::asio::io_service::strand fIoStrand;
int fSndTimeout; int fSndTimeout;
int fRcvTimeout; int fRcvTimeout;
azmq::socket fSendQueueWrite, fSendQueueRead; azmq::socket fSendQueueWrite, fSendQueueRead;