mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-15 09:31:45 +00:00
Implement connection mgmt
This commit is contained in:
committed by
Dennis Klein
parent
1c5d7ca46a
commit
ba4e6f72c9
@@ -6,6 +6,7 @@
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
|
||||
#include <fairmq/ofi/ControlMessages.h>
|
||||
#include <fairmq/ofi/Socket.h>
|
||||
#include <fairmq/ofi/TransportFactory.h>
|
||||
#include <fairmq/Tools.h>
|
||||
@@ -39,6 +40,7 @@ Socket::Socket(Context& context, const string& type, const string& name, const s
|
||||
, fId(id + "." + name + "." + type)
|
||||
, fControlSocket(nullptr)
|
||||
, fMonitorSocket(nullptr)
|
||||
, fPassiveDataEndpoint(nullptr)
|
||||
, fDataEndpoint(nullptr)
|
||||
, fId(id + "." + name + "." + type)
|
||||
, fBytesTx(0)
|
||||
@@ -92,10 +94,16 @@ Socket::Socket(Context& context, const string& type, const string& name, const s
|
||||
auto Socket::Bind(const string& address) -> bool
|
||||
try {
|
||||
auto addr = Context::VerifyAddress(address);
|
||||
|
||||
BindControlSocket(addr);
|
||||
fContext.InitOfi(ConnectionType::Bind, addr);
|
||||
InitDataEndpoint();
|
||||
fWaitingForControlPeer = true;
|
||||
|
||||
// TODO make data port choice more robust
|
||||
addr.Port += 500;
|
||||
fLocalDataAddr = addr;
|
||||
BindDataEndpoint();
|
||||
|
||||
AnnounceDataAddress();
|
||||
|
||||
return true;
|
||||
}
|
||||
catch (const SilentSocketError& e)
|
||||
@@ -106,18 +114,20 @@ catch (const SilentSocketError& e)
|
||||
}
|
||||
catch (const SocketError& e)
|
||||
{
|
||||
LOG(error) << e.what();
|
||||
LOG(error) << "OFI transport: " << e.what();
|
||||
return false;
|
||||
}
|
||||
|
||||
auto Socket::Connect(const string& address) -> bool
|
||||
{
|
||||
auto addr = Context::VerifyAddress(address);
|
||||
|
||||
ConnectControlSocket(addr);
|
||||
fContext.InitOfi(ConnectionType::Connect, addr);
|
||||
InitDataEndpoint();
|
||||
fWaitingForControlPeer = true;
|
||||
return true;
|
||||
|
||||
ProcessControlMessage(
|
||||
StaticUniquePtrDowncast<DataAddressAnnouncement>(ReceiveControlMessage()));
|
||||
|
||||
ConnectDataEndpoint();
|
||||
}
|
||||
|
||||
auto Socket::BindControlSocket(Context::Address address) -> void
|
||||
@@ -128,6 +138,26 @@ auto Socket::BindControlSocket(Context::Address address) -> void
|
||||
if (errno == EADDRINUSE) throw SilentSocketError("EADDRINUSE");
|
||||
throw SocketError(tools::ToString("Failed binding control socket ", fId, ", reason: ", zmq_strerror(errno)));
|
||||
}
|
||||
|
||||
LOG(debug) << "OFI transport (" << fId << "): control band bound to " << address;
|
||||
}
|
||||
|
||||
auto Socket::BindDataEndpoint() -> void
|
||||
{
|
||||
assert(!fPassiveDataEndpoint);
|
||||
assert(!fDataEndpoint);
|
||||
|
||||
fPassiveDataEndpoint = fContext.MakeOfiPassiveEndpoint(fLocalDataAddr);
|
||||
fPassiveDataEndpoint->listen([&](fid_t /*handle*/, asiofi::info&& info) {
|
||||
LOG(debug) << "OFI transport (" << fId << "): data band connection request received. Accepting ...";
|
||||
fDataEndpoint = fContext.MakeOfiConnectedEndpoint(info);
|
||||
fDataEndpoint->enable();
|
||||
fDataEndpoint->accept([&]() {
|
||||
LOG(debug) << "OFI transport (" << fId << "): data band connection accepted.";
|
||||
});
|
||||
});
|
||||
|
||||
LOG(debug) << "OFI transport (" << fId << "): data band bound to " << fLocalDataAddr;
|
||||
}
|
||||
|
||||
auto Socket::ConnectControlSocket(Context::Address address) -> void
|
||||
@@ -138,119 +168,120 @@ auto Socket::ConnectControlSocket(Context::Address address) -> void
|
||||
throw SocketError(tools::ToString("Failed connecting control socket ", fId, ", reason: ", zmq_strerror(errno)));
|
||||
}
|
||||
|
||||
// auto Socket::ProcessDataAddressAnnouncement(std::unique_ptr<ControlMessage> ctrl) -> void
|
||||
// {
|
||||
// assert(ctrl->has_data_address_announcement());
|
||||
// auto daa = ctrl->data_address_announcement();
|
||||
//
|
||||
// sockaddr_in remoteAddr;
|
||||
// remoteAddr.sin_family = AF_INET;
|
||||
// remoteAddr.sin_port = daa.port();
|
||||
// remoteAddr.sin_addr.s_addr = daa.ipv4();
|
||||
//
|
||||
// LOG(debug) << "Data address announcement of remote ofi endpoint received: " << Context::ConvertAddress(remoteAddr);
|
||||
// fRemoteDataAddr = fContext.InsertAddressVector(remoteAddr);
|
||||
// }
|
||||
|
||||
auto Socket::InitDataEndpoint() -> void
|
||||
auto Socket::ConnectDataEndpoint() -> void
|
||||
{
|
||||
assert(!fDataEndpoint);
|
||||
|
||||
// try {
|
||||
// fDataEndpoint = fContext.CreateOfiEndpoint();
|
||||
// } catch (ContextError& e) {
|
||||
// throw SocketError(tools::ToString("Failed creating ofi endpoint, reason: ", e.what()));
|
||||
// }
|
||||
//
|
||||
// if (!fDataCompletionQueueTx)
|
||||
// fDataCompletionQueueTx = fContext.CreateOfiCompletionQueue(Direction::Transmit);
|
||||
// auto ret = fi_ep_bind(fDataEndpoint, &fDataCompletionQueueTx->fid, FI_TRANSMIT);
|
||||
// if (ret != FI_SUCCESS)
|
||||
// throw SocketError(tools::ToString("Failed binding ofi transmit completion queue to endpoint, reason: ", fi_strerror(ret)));
|
||||
//
|
||||
// if (!fDataCompletionQueueRx)
|
||||
// fDataCompletionQueueRx = fContext.CreateOfiCompletionQueue(Direction::Receive);
|
||||
// ret = fi_ep_bind(fDataEndpoint, &fDataCompletionQueueRx->fid, FI_RECV);
|
||||
// if (ret != FI_SUCCESS)
|
||||
// throw SocketError(tools::ToString("Failed binding ofi receive completion queue to endpoint, reason: ", fi_strerror(ret)));
|
||||
//
|
||||
// ret = fi_enable(fDataEndpoint);
|
||||
// if (ret != FI_SUCCESS)
|
||||
// throw SocketError(tools::ToString("Failed enabling ofi endpoint, reason: ", fi_strerror(ret)));
|
||||
fDataEndpoint = fContext.MakeOfiConnectedEndpoint(fRemoteDataAddr);
|
||||
fDataEndpoint->enable();
|
||||
LOG(debug) << "OFI transport (" << fId << "): local data band address: " << Context::ConvertAddress(fDataEndpoint->get_local_address());
|
||||
fDataEndpoint->connect([&]() {
|
||||
LOG(debug) << "OFI transport (" << fId << "): data band connected.";
|
||||
});
|
||||
}
|
||||
|
||||
void free_string(void* /*data*/, void* hint)
|
||||
auto Socket::ProcessControlMessage(CtrlMsgPtr<DataAddressAnnouncement> daa) -> void
|
||||
{
|
||||
delete static_cast<string*>(hint);
|
||||
assert(daa->type == ControlMessageType::DataAddressAnnouncement);
|
||||
|
||||
sockaddr_in remoteAddr;
|
||||
remoteAddr.sin_family = AF_INET;
|
||||
remoteAddr.sin_port = daa->port;
|
||||
remoteAddr.sin_addr.s_addr = daa->ipv4;
|
||||
|
||||
auto addr = Context::ConvertAddress(remoteAddr);
|
||||
LOG(debug) << "OFI transport (" << fId << "): Data address announcement of remote endpoint received: " << addr;
|
||||
fRemoteDataAddr = addr;
|
||||
}
|
||||
|
||||
auto Socket::AnnounceDataAddress() -> void
|
||||
try {
|
||||
// size_t addrlen = sizeof(sockaddr_in);
|
||||
// auto ret = fi_getname(&fDataEndpoint->fid, &fLocalDataAddr, &addrlen);
|
||||
// if (ret != FI_SUCCESS)
|
||||
// throw SocketError(tools::ToString("Failed retrieving native address from ofi endpoint, reason: ", fi_strerror(ret)));
|
||||
// assert(addrlen == sizeof(sockaddr_in));
|
||||
//
|
||||
// fLocalDataAddr = fDataEndpoint->get_local_address();
|
||||
// LOG(debug) << "Address of local ofi endpoint in socket " << fId << ": " << Context::ConvertAddress(fLocalDataAddr);
|
||||
|
||||
// Create new control message
|
||||
// auto ctrl = tools::make_unique<ControlMessage>();
|
||||
// auto daa = tools::make_unique<DataAddressAnnouncement>();
|
||||
// Create new data address announcement message
|
||||
auto daa = MakeControlMessage<DataAddressAnnouncement>(&fCtrlMemPool);
|
||||
auto addr = Context::ConvertAddress(fLocalDataAddr);
|
||||
daa->ipv4 = addr.sin_addr.s_addr;
|
||||
daa->port = addr.sin_port;
|
||||
|
||||
// Fill data address announcement
|
||||
// daa->set_ipv4(fLocalDataAddr.sin_addr.s_addr);
|
||||
// daa->set_port(fLocalDataAddr.sin_port);
|
||||
SendControlMessage(StaticUniquePtrUpcast<ControlMessage>(std::move(daa)));
|
||||
|
||||
// Fill control message
|
||||
// ctrl->set_allocated_data_address_announcement(daa.release());
|
||||
// assert(ctrl->IsInitialized());
|
||||
|
||||
// SendControlMessage(move(ctrl));
|
||||
LOG(debug) << "OFI transport (" << fId << "): data address announced.";
|
||||
} catch (const SocketError& e) {
|
||||
throw SocketError(tools::ToString("Failed to announce data address, reason: ", e.what()));
|
||||
}
|
||||
|
||||
// auto Socket::SendControlMessage(unique_ptr<ControlMessage> ctrl) -> void
|
||||
// {
|
||||
// assert(fControlSocket);
|
||||
auto Socket::SendControlMessage(CtrlMsgPtr<ControlMessage> ctrl) -> void
|
||||
{
|
||||
assert(fControlSocket);
|
||||
// LOG(debug) << "About to send control message: " << ctrl->DebugString();
|
||||
//
|
||||
|
||||
// Serialize
|
||||
// string* str = new string();
|
||||
// ctrl->SerializeToString(str);
|
||||
// zmq_msg_t msg;
|
||||
// auto ret = zmq_msg_init_data(&msg, const_cast<char*>(str->c_str()), str->length(), free_string, str);
|
||||
// assert(ret == 0);
|
||||
//
|
||||
struct ZmqMsg
|
||||
{
|
||||
zmq_msg_t msg;
|
||||
~ZmqMsg() { zmq_msg_close(&msg); }
|
||||
operator zmq_msg_t*() { return &msg; }
|
||||
} msg;
|
||||
|
||||
switch (ctrl->type) {
|
||||
case ControlMessageType::DataAddressAnnouncement:
|
||||
{
|
||||
auto ret = zmq_msg_init_size(msg, sizeof(DataAddressAnnouncement));
|
||||
(void)ret;
|
||||
assert(ret == 0);
|
||||
std::memcpy(zmq_msg_data(msg), ctrl.get(), sizeof(DataAddressAnnouncement));
|
||||
}
|
||||
break;
|
||||
default:
|
||||
throw SocketError(tools::ToString("Cannot send control message of unknown type."));
|
||||
}
|
||||
|
||||
// Send
|
||||
// if (zmq_msg_send(&msg, fControlSocket, 0) == -1) {
|
||||
// zmq_msg_close(&msg);
|
||||
// throw SocketError(tools::ToString("Failed to send control message, reason: ", zmq_strerror(errno)));
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// auto Socket::ReceiveControlMessage() -> unique_ptr<ControlMessage>
|
||||
// {
|
||||
// assert(fControlSocket);
|
||||
//
|
||||
if (zmq_msg_send(msg, fControlSocket, 0) == -1) {
|
||||
throw SocketError(
|
||||
tools::ToString("Failed to send control message, reason: ", zmq_strerror(errno)));
|
||||
}
|
||||
}
|
||||
|
||||
auto Socket::ReceiveControlMessage() -> CtrlMsgPtr<ControlMessage>
|
||||
{
|
||||
assert(fControlSocket);
|
||||
|
||||
// Receive
|
||||
// zmq_msg_t msg;
|
||||
// auto ret = zmq_msg_init(&msg);
|
||||
// assert(ret == 0);
|
||||
// if (zmq_msg_recv(&msg, fControlSocket, 0) == -1) {
|
||||
// zmq_msg_close(&msg);
|
||||
// throw SocketError(tools::ToString("Failed to receive control message, reason: ", zmq_strerror(errno)));
|
||||
// }
|
||||
//
|
||||
// Deserialize
|
||||
// auto ctrl = tools::make_unique<ControlMessage>();
|
||||
// ctrl->ParseFromArray(zmq_msg_data(&msg), zmq_msg_size(&msg));
|
||||
//
|
||||
// zmq_msg_close(&msg);
|
||||
// LOG(debug) << "Received control message: " << ctrl->DebugString();
|
||||
// return ctrl;
|
||||
// }
|
||||
struct ZmqMsg
|
||||
{
|
||||
zmq_msg_t msg;
|
||||
~ZmqMsg() { zmq_msg_close(&msg); }
|
||||
operator zmq_msg_t*() { return &msg; }
|
||||
} msg;
|
||||
auto ret = zmq_msg_init(msg);
|
||||
(void)ret;
|
||||
assert(ret == 0);
|
||||
if (zmq_msg_recv(msg, fControlSocket, 0) == -1) {
|
||||
throw SocketError(
|
||||
tools::ToString("Failed to receive control message, reason: ", zmq_strerror(errno)));
|
||||
}
|
||||
|
||||
// Deserialize and sanity check
|
||||
const void* msg_data = zmq_msg_data(msg);
|
||||
const size_t msg_size = zmq_msg_size(msg);
|
||||
(void)msg_size;
|
||||
assert(msg_size >= sizeof(ControlMessage));
|
||||
|
||||
switch (static_cast<const ControlMessage*>(msg_data)->type) {
|
||||
case ControlMessageType::DataAddressAnnouncement: {
|
||||
assert(msg_size == sizeof(DataAddressAnnouncement));
|
||||
auto daa = MakeControlMessage<DataAddressAnnouncement>(&fCtrlMemPool);
|
||||
std::memcpy(daa.get(), msg_data, sizeof(DataAddressAnnouncement));
|
||||
// LOG(debug) << "Received control message: " << ctrl->DebugString();
|
||||
return StaticUniquePtrUpcast<ControlMessage>(std::move(daa));
|
||||
}
|
||||
default:
|
||||
throw SocketError(tools::ToString("Received control message of unknown type."));
|
||||
}
|
||||
}
|
||||
|
||||
auto Socket::WaitForControlPeer() -> void
|
||||
{
|
||||
@@ -302,12 +333,6 @@ auto Socket::TryReceive(std::vector<MessagePtr>& msgVec) -> int64_t { return Rec
|
||||
|
||||
auto Socket::SendImpl(FairMQMessagePtr& msg, const int /*flags*/, const int /*timeout*/) -> int
|
||||
try {
|
||||
if (fWaitingForControlPeer) {
|
||||
WaitForControlPeer();
|
||||
AnnounceDataAddress();
|
||||
// ProcessDataAddressAnnouncement(ReceiveControlMessage());
|
||||
}
|
||||
|
||||
auto size = msg->GetSize();
|
||||
|
||||
// Create and send control message
|
||||
@@ -358,7 +383,7 @@ auto Socket::ReceiveImpl(FairMQMessagePtr& /*msg*/, const int /*flags*/, const i
|
||||
try {
|
||||
if (fWaitingForControlPeer) {
|
||||
WaitForControlPeer();
|
||||
AnnounceDataAddress();
|
||||
// AnnounceDataAddress();
|
||||
// ProcessDataAddressAnnouncement(ReceiveControlMessage());
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user