From 91025cbc88635b10e147014aa8b2d52f54c7f651 Mon Sep 17 00:00:00 2001 From: Dennis Klein Date: Tue, 30 Oct 2018 10:33:46 +0100 Subject: [PATCH] Deactivate control band monitor socket --- fairmq/ofi/Context.cxx | 4 +- fairmq/ofi/Context.h | 8 --- fairmq/ofi/Socket.cxx | 110 ++++++++++++++++++++--------------------- fairmq/ofi/Socket.h | 7 ++- 4 files changed, 60 insertions(+), 69 deletions(-) diff --git a/fairmq/ofi/Context.cxx b/fairmq/ofi/Context.cxx index 1025e32c..15db7819 100644 --- a/fairmq/ofi/Context.cxx +++ b/fairmq/ofi/Context.cxx @@ -97,7 +97,7 @@ auto Context::InitOfi(ConnectionType type, Address addr) -> void } else { fOfiInfo = tools::make_unique(addr.Ip.c_str(), std::to_string(addr.Port).c_str(), 0, hints); } - LOG(debug) << "OFI transport: " << *fOfiInfo; + // LOG(debug) << "OFI transport: " << *fOfiInfo; fOfiFabric = tools::make_unique(*fOfiInfo); @@ -113,6 +113,8 @@ auto Context::MakeOfiPassiveEndpoint(Address addr) -> unique_ptr std::unique_ptr { + assert(fOfiDomain); + return tools::make_unique(fIoContext, *fOfiDomain, info); } diff --git a/fairmq/ofi/Context.h b/fairmq/ofi/Context.h index 7d8fa4de..98edb141 100644 --- a/fairmq/ofi/Context.h +++ b/fairmq/ofi/Context.h @@ -79,14 +79,6 @@ class Context struct ContextError : std::runtime_error { using std::runtime_error::runtime_error; }; -template -std::unique_ptr -static_unique_ptr_downcast( std::unique_ptr&& p ) -{ - auto d = static_cast(p.release()); - return std::unique_ptr(d, std::move(p.get_deleter())); -} - } /* namespace ofi */ } /* namespace mq */ } /* namespace fair */ diff --git a/fairmq/ofi/Socket.cxx b/fairmq/ofi/Socket.cxx index f33f9365..1496e642 100644 --- a/fairmq/ofi/Socket.cxx +++ b/fairmq/ofi/Socket.cxx @@ -32,14 +32,9 @@ namespace ofi using namespace std; -Socket::Socket(Context& context, const string& type, const string& name, const string& id /*= ""*/, FairMQTransportFactory* fac) - : FairMQSocket{fac} - , fDataEndpoint(nullptr) - , fDataCompletionQueueTx(nullptr) - , fDataCompletionQueueRx(nullptr) - , fId(id + "." + name + "." + type) - , fControlSocket(nullptr) - , fMonitorSocket(nullptr) +Socket::Socket(Context& context, const string& type, const string& name, const string& id /*= ""*/) + : fControlSocket(nullptr) + // , fMonitorSocket(nullptr) , fPassiveDataEndpoint(nullptr) , fDataEndpoint(nullptr) , fId(id + "." + name + "." + type) @@ -77,17 +72,17 @@ Socket::Socket(Context& context, const string& type, const string& name, const s // if (zmq_setsockopt(fControlSocket, ZMQ_RCVTIMEO, &fRcvTimeout, sizeof(fRcvTimeout)) != 0) // throw SocketError{tools::ToString("Failed setting ZMQ_RCVTIMEO socket option, reason: ", zmq_strerror(errno))}; - fMonitorSocket = zmq_socket(fContext.GetZmqContext(), ZMQ_PAIR); - - if (fMonitorSocket == nullptr) - throw SocketError{tools::ToString("Failed creating zmq monitor socket ", fId, ", reason: ", zmq_strerror(errno))}; - - auto mon_addr = tools::ToString("inproc://", fId); - if (zmq_socket_monitor(fControlSocket, mon_addr.c_str(), ZMQ_EVENT_ACCEPTED | ZMQ_EVENT_CONNECTED) < 0) - throw SocketError{tools::ToString("Failed setting up monitor on meta socket, reason: ", zmq_strerror(errno))}; - - if (zmq_connect(fMonitorSocket, mon_addr.c_str()) != 0) - throw SocketError{tools::ToString("Failed connecting monitor socket to meta socket, reason: ", zmq_strerror(errno))}; + // fMonitorSocket = zmq_socket(fContext.GetZmqContext(), ZMQ_PAIR); + // + // if (fMonitorSocket == nullptr) + // throw SocketError{tools::ToString("Failed creating zmq monitor socket ", fId, ", reason: ", zmq_strerror(errno))}; + // + // auto mon_addr = tools::ToString("inproc://", fId); + // if (zmq_socket_monitor(fControlSocket, mon_addr.c_str(), ZMQ_EVENT_ACCEPTED | ZMQ_EVENT_CONNECTED) < 0) + // throw SocketError{tools::ToString("Failed setting up monitor on meta socket, reason: ", zmq_strerror(errno))}; + // + // if (zmq_connect(fMonitorSocket, mon_addr.c_str()) != 0) + // throw SocketError{tools::ToString("Failed connecting monitor socket to meta socket, reason: ", zmq_strerror(errno))}; } } @@ -98,7 +93,7 @@ try { BindControlSocket(addr); // TODO make data port choice more robust - addr.Port += 500; + addr.Port += 555; fLocalDataAddr = addr; BindDataEndpoint(); @@ -174,6 +169,7 @@ auto Socket::ConnectDataEndpoint() -> void fDataEndpoint = fContext.MakeOfiConnectedEndpoint(fRemoteDataAddr); fDataEndpoint->enable(); + LOG(debug) << "OFI transport (" << fId << "): local data band address: " << Context::ConvertAddress(fDataEndpoint->get_local_address()); fDataEndpoint->connect([&]() { LOG(debug) << "OFI transport (" << fId << "): data band connected."; @@ -283,43 +279,43 @@ auto Socket::ReceiveControlMessage() -> CtrlMsgPtr } } -auto Socket::WaitForControlPeer() -> void -{ - assert(fWaitingForControlPeer); - +// auto Socket::WaitForControlPeer() -> void +// { + // assert(fWaitingForControlPeer); +// // First frame in message contains event number and value - zmq_msg_t msg; - zmq_msg_init(&msg); - if (zmq_msg_recv(&msg, fMonitorSocket, 0) == -1) - throw SocketError(tools::ToString("Failed to get monitor event, reason: ", zmq_strerror(errno))); - - uint8_t* data = (uint8_t*) zmq_msg_data(&msg); - uint16_t event = *(uint16_t*)(data); - int value = *(uint32_t *)(data + 2); - + // zmq_msg_t msg; + // zmq_msg_init(&msg); + // if (zmq_msg_recv(&msg, fMonitorSocket, 0) == -1) + // throw SocketError(tools::ToString("Failed to get monitor event, reason: ", zmq_strerror(errno))); +// + // uint8_t* data = (uint8_t*) zmq_msg_data(&msg); + // uint16_t event = *(uint16_t*)(data); + // int value = *(uint32_t *)(data + 2); +// // Second frame in message contains event address - zmq_msg_init(&msg); - if (zmq_msg_recv(&msg, fMonitorSocket, 0) == -1) - throw SocketError(tools::ToString("Failed to get monitor event, reason: ", zmq_strerror(errno))); - - if (event == ZMQ_EVENT_ACCEPTED) { + // zmq_msg_init(&msg); + // if (zmq_msg_recv(&msg, fMonitorSocket, 0) == -1) + // throw SocketError(tools::ToString("Failed to get monitor event, reason: ", zmq_strerror(errno))); +// + // if (event == ZMQ_EVENT_ACCEPTED) { // string localAddress = string(static_cast(zmq_msg_data(&msg)), zmq_msg_size(&msg)); - sockaddr_in remoteAddr; - socklen_t addrSize = sizeof(sockaddr_in); - int ret = getpeername(value, (sockaddr*)&remoteAddr, &addrSize); - if (ret != 0) - throw SocketError(tools::ToString("Failed retrieving remote address, reason: ", strerror(errno))); - string remoteIp(inet_ntoa(remoteAddr.sin_addr)); - int remotePort = ntohs(remoteAddr.sin_port); - LOG(debug) << "Accepted control peer connection from " << remoteIp << ":" << remotePort; - } else if (event == ZMQ_EVENT_CONNECTED) { - LOG(debug) << "Connected successfully to control peer"; - } else { - LOG(debug) << "Unknown monitor event received: " << event << ". Ignoring."; - } - - fWaitingForControlPeer = false; -} + // sockaddr_in remoteAddr; + // socklen_t addrSize = sizeof(sockaddr_in); + // int ret = getpeername(value, (sockaddr*)&remoteAddr, &addrSize); + // if (ret != 0) + // throw SocketError(tools::ToString("Failed retrieving remote address, reason: ", strerror(errno))); + // string remoteIp(inet_ntoa(remoteAddr.sin_addr)); + // int remotePort = ntohs(remoteAddr.sin_port); + // LOG(debug) << "Accepted control peer connection from " << remoteIp << ":" << remotePort; + // } else if (event == ZMQ_EVENT_CONNECTED) { + // LOG(debug) << "Connected successfully to control peer"; + // } else { + // LOG(debug) << "Unknown monitor event received: " << event << ". Ignoring."; + // } +// + // fWaitingForControlPeer = false; +// } auto Socket::Send(MessagePtr& msg, const int timeout) -> int { return SendImpl(msg, 0, timeout); } auto Socket::Receive(MessagePtr& msg, const int timeout) -> int { return ReceiveImpl(msg, 0, timeout); } @@ -335,6 +331,7 @@ auto Socket::SendImpl(FairMQMessagePtr& msg, const int /*flags*/, const int /*ti try { auto size = msg->GetSize(); + this_thread::sleep_for(std::chrono::seconds(10)); // Create and send control message // auto ctrl = tools::make_unique(); // auto buf = tools::make_unique(); @@ -381,6 +378,7 @@ catch (const std::exception& e) auto Socket::ReceiveImpl(FairMQMessagePtr& /*msg*/, const int /*flags*/, const int /*timeout*/) -> int try { + this_thread::sleep_for(std::chrono::seconds(10)); if (fWaitingForControlPeer) { WaitForControlPeer(); // AnnounceDataAddress(); @@ -611,8 +609,8 @@ auto Socket::Close() -> void if (zmq_close(fControlSocket) != 0) throw SocketError(tools::ToString("Failed closing zmq meta socket, reason: ", zmq_strerror(errno))); - if (zmq_close(fMonitorSocket) != 0) - throw SocketError(tools::ToString("Failed closing zmq monitor socket, reason: ", zmq_strerror(errno))); + // if (zmq_close(fMonitorSocket) != 0) + // throw SocketError(tools::ToString("Failed closing zmq monitor socket, reason: ", zmq_strerror(errno))); } auto Socket::SetOption(const string& option, const void* value, size_t valueSize) -> void diff --git a/fairmq/ofi/Socket.h b/fairmq/ofi/Socket.h index 71b63583..affab53e 100644 --- a/fairmq/ofi/Socket.h +++ b/fairmq/ofi/Socket.h @@ -19,7 +19,6 @@ #include #include // unique_ptr #include -#include class FairMQTransportFactory; namespace fair @@ -86,7 +85,7 @@ class Socket final : public fair::mq::Socket private: void* fControlSocket; - void* fMonitorSocket; + // void* fMonitorSocket; std::unique_ptr fPassiveDataEndpoint; std::unique_ptr fDataEndpoint; std::string fId; @@ -97,7 +96,7 @@ class Socket final : public fair::mq::Socket Context& fContext; Context::Address fRemoteDataAddr; Context::Address fLocalDataAddr; - bool fWaitingForControlPeer; + // bool fWaitingForControlPeer; boost::asio::io_service::strand fIoStrand; boost::container::pmr::unsynchronized_pool_resource fCtrlMemPool; @@ -109,7 +108,7 @@ class Socket final : public fair::mq::Socket auto SendImpl(std::vector& msgVec, const int flags, const int timeout) -> int64_t; auto ReceiveImpl(std::vector& msgVec, const int flags, const int timeout) -> int64_t; - auto WaitForControlPeer() -> void; + // auto WaitForControlPeer() -> void; auto AnnounceDataAddress() -> void; auto SendControlMessage(CtrlMsgPtr ctrl) -> void; auto ReceiveControlMessage() -> CtrlMsgPtr;