Deactivate control band monitor socket

This commit is contained in:
Dennis Klein
2018-10-30 10:33:46 +01:00
committed by Dennis Klein
parent ba4e6f72c9
commit 91025cbc88
4 changed files with 60 additions and 69 deletions

View File

@@ -32,14 +32,9 @@ namespace ofi
using namespace std;
Socket::Socket(Context& context, const string& type, const string& name, const string& id /*= ""*/, FairMQTransportFactory* fac)
: FairMQSocket{fac}
, fDataEndpoint(nullptr)
, fDataCompletionQueueTx(nullptr)
, fDataCompletionQueueRx(nullptr)
, fId(id + "." + name + "." + type)
, fControlSocket(nullptr)
, fMonitorSocket(nullptr)
Socket::Socket(Context& context, const string& type, const string& name, const string& id /*= ""*/)
: fControlSocket(nullptr)
// , fMonitorSocket(nullptr)
, fPassiveDataEndpoint(nullptr)
, fDataEndpoint(nullptr)
, fId(id + "." + name + "." + type)
@@ -77,17 +72,17 @@ Socket::Socket(Context& context, const string& type, const string& name, const s
// if (zmq_setsockopt(fControlSocket, ZMQ_RCVTIMEO, &fRcvTimeout, sizeof(fRcvTimeout)) != 0)
// throw SocketError{tools::ToString("Failed setting ZMQ_RCVTIMEO socket option, reason: ", zmq_strerror(errno))};
fMonitorSocket = zmq_socket(fContext.GetZmqContext(), ZMQ_PAIR);
if (fMonitorSocket == nullptr)
throw SocketError{tools::ToString("Failed creating zmq monitor socket ", fId, ", reason: ", zmq_strerror(errno))};
auto mon_addr = tools::ToString("inproc://", fId);
if (zmq_socket_monitor(fControlSocket, mon_addr.c_str(), ZMQ_EVENT_ACCEPTED | ZMQ_EVENT_CONNECTED) < 0)
throw SocketError{tools::ToString("Failed setting up monitor on meta socket, reason: ", zmq_strerror(errno))};
if (zmq_connect(fMonitorSocket, mon_addr.c_str()) != 0)
throw SocketError{tools::ToString("Failed connecting monitor socket to meta socket, reason: ", zmq_strerror(errno))};
// fMonitorSocket = zmq_socket(fContext.GetZmqContext(), ZMQ_PAIR);
//
// if (fMonitorSocket == nullptr)
// throw SocketError{tools::ToString("Failed creating zmq monitor socket ", fId, ", reason: ", zmq_strerror(errno))};
//
// auto mon_addr = tools::ToString("inproc://", fId);
// if (zmq_socket_monitor(fControlSocket, mon_addr.c_str(), ZMQ_EVENT_ACCEPTED | ZMQ_EVENT_CONNECTED) < 0)
// throw SocketError{tools::ToString("Failed setting up monitor on meta socket, reason: ", zmq_strerror(errno))};
//
// if (zmq_connect(fMonitorSocket, mon_addr.c_str()) != 0)
// throw SocketError{tools::ToString("Failed connecting monitor socket to meta socket, reason: ", zmq_strerror(errno))};
}
}
@@ -98,7 +93,7 @@ try {
BindControlSocket(addr);
// TODO make data port choice more robust
addr.Port += 500;
addr.Port += 555;
fLocalDataAddr = addr;
BindDataEndpoint();
@@ -174,6 +169,7 @@ auto Socket::ConnectDataEndpoint() -> void
fDataEndpoint = fContext.MakeOfiConnectedEndpoint(fRemoteDataAddr);
fDataEndpoint->enable();
LOG(debug) << "OFI transport (" << fId << "): local data band address: " << Context::ConvertAddress(fDataEndpoint->get_local_address());
fDataEndpoint->connect([&]() {
LOG(debug) << "OFI transport (" << fId << "): data band connected.";
@@ -283,43 +279,43 @@ auto Socket::ReceiveControlMessage() -> CtrlMsgPtr<ControlMessage>
}
}
auto Socket::WaitForControlPeer() -> void
{
assert(fWaitingForControlPeer);
// auto Socket::WaitForControlPeer() -> void
// {
// assert(fWaitingForControlPeer);
//
// First frame in message contains event number and value
zmq_msg_t msg;
zmq_msg_init(&msg);
if (zmq_msg_recv(&msg, fMonitorSocket, 0) == -1)
throw SocketError(tools::ToString("Failed to get monitor event, reason: ", zmq_strerror(errno)));
uint8_t* data = (uint8_t*) zmq_msg_data(&msg);
uint16_t event = *(uint16_t*)(data);
int value = *(uint32_t *)(data + 2);
// zmq_msg_t msg;
// zmq_msg_init(&msg);
// if (zmq_msg_recv(&msg, fMonitorSocket, 0) == -1)
// throw SocketError(tools::ToString("Failed to get monitor event, reason: ", zmq_strerror(errno)));
//
// uint8_t* data = (uint8_t*) zmq_msg_data(&msg);
// uint16_t event = *(uint16_t*)(data);
// int value = *(uint32_t *)(data + 2);
//
// Second frame in message contains event address
zmq_msg_init(&msg);
if (zmq_msg_recv(&msg, fMonitorSocket, 0) == -1)
throw SocketError(tools::ToString("Failed to get monitor event, reason: ", zmq_strerror(errno)));
if (event == ZMQ_EVENT_ACCEPTED) {
// zmq_msg_init(&msg);
// if (zmq_msg_recv(&msg, fMonitorSocket, 0) == -1)
// throw SocketError(tools::ToString("Failed to get monitor event, reason: ", zmq_strerror(errno)));
//
// if (event == ZMQ_EVENT_ACCEPTED) {
// string localAddress = string(static_cast<char*>(zmq_msg_data(&msg)), zmq_msg_size(&msg));
sockaddr_in remoteAddr;
socklen_t addrSize = sizeof(sockaddr_in);
int ret = getpeername(value, (sockaddr*)&remoteAddr, &addrSize);
if (ret != 0)
throw SocketError(tools::ToString("Failed retrieving remote address, reason: ", strerror(errno)));
string remoteIp(inet_ntoa(remoteAddr.sin_addr));
int remotePort = ntohs(remoteAddr.sin_port);
LOG(debug) << "Accepted control peer connection from " << remoteIp << ":" << remotePort;
} else if (event == ZMQ_EVENT_CONNECTED) {
LOG(debug) << "Connected successfully to control peer";
} else {
LOG(debug) << "Unknown monitor event received: " << event << ". Ignoring.";
}
fWaitingForControlPeer = false;
}
// sockaddr_in remoteAddr;
// socklen_t addrSize = sizeof(sockaddr_in);
// int ret = getpeername(value, (sockaddr*)&remoteAddr, &addrSize);
// if (ret != 0)
// throw SocketError(tools::ToString("Failed retrieving remote address, reason: ", strerror(errno)));
// string remoteIp(inet_ntoa(remoteAddr.sin_addr));
// int remotePort = ntohs(remoteAddr.sin_port);
// LOG(debug) << "Accepted control peer connection from " << remoteIp << ":" << remotePort;
// } else if (event == ZMQ_EVENT_CONNECTED) {
// LOG(debug) << "Connected successfully to control peer";
// } else {
// LOG(debug) << "Unknown monitor event received: " << event << ". Ignoring.";
// }
//
// fWaitingForControlPeer = false;
// }
auto Socket::Send(MessagePtr& msg, const int timeout) -> int { return SendImpl(msg, 0, timeout); }
auto Socket::Receive(MessagePtr& msg, const int timeout) -> int { return ReceiveImpl(msg, 0, timeout); }
@@ -335,6 +331,7 @@ auto Socket::SendImpl(FairMQMessagePtr& msg, const int /*flags*/, const int /*ti
try {
auto size = msg->GetSize();
this_thread::sleep_for(std::chrono::seconds(10));
// Create and send control message
// auto ctrl = tools::make_unique<ControlMessage>();
// auto buf = tools::make_unique<PostBuffer>();
@@ -381,6 +378,7 @@ catch (const std::exception& e)
auto Socket::ReceiveImpl(FairMQMessagePtr& /*msg*/, const int /*flags*/, const int /*timeout*/) -> int
try {
this_thread::sleep_for(std::chrono::seconds(10));
if (fWaitingForControlPeer) {
WaitForControlPeer();
// AnnounceDataAddress();
@@ -611,8 +609,8 @@ auto Socket::Close() -> void
if (zmq_close(fControlSocket) != 0)
throw SocketError(tools::ToString("Failed closing zmq meta socket, reason: ", zmq_strerror(errno)));
if (zmq_close(fMonitorSocket) != 0)
throw SocketError(tools::ToString("Failed closing zmq monitor socket, reason: ", zmq_strerror(errno)));
// if (zmq_close(fMonitorSocket) != 0)
// throw SocketError(tools::ToString("Failed closing zmq monitor socket, reason: ", zmq_strerror(errno)));
}
auto Socket::SetOption(const string& option, const void* value, size_t valueSize) -> void