From c5fe37277b42381d6bb509c472d1afa399f421c7 Mon Sep 17 00:00:00 2001 From: Ricardo Dias Date: Tue, 15 Jan 2019 16:34:02 +0000 Subject: [PATCH] msg/async: msgr2: fix client address learning Signed-off-by: Ricardo Dias --- src/msg/async/AsyncConnection.cc | 33 +---------- src/msg/async/AsyncConnection.h | 2 - src/msg/async/ProtocolV2.cc | 98 ++++++++++++++++++++++---------- 3 files changed, 69 insertions(+), 64 deletions(-) diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index ebdfa3fd942..13711c98bc9 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -406,11 +406,6 @@ void AsyncConnection::process() { } center->delete_file_event(cs.fd(), EVENT_WRITABLE); - - update_socket_addr(target_addr.get_type()); - lock.unlock(); - async_msgr->learned_addr(socket_addr); - lock.lock(); ldout(async_msgr->cct, 10) << __func__ << " connect successfully, ready to send banner" << dendl; state = STATE_CONNECTION_ESTABLISHED; @@ -449,28 +444,6 @@ bool AsyncConnection::is_connected() { return protocol->is_connected(); } -bool AsyncConnection::update_socket_addr(uint32_t type) { - sockaddr_storage ss; - socklen_t slen = sizeof(ss); - int r = getsockname(cs.fd(), (sockaddr *)&ss, &slen); - if (r == 0) { - entity_addr_t bind_addr; - bind_addr.set_type(type); - bind_addr.set_sockaddr((sockaddr*)&ss); - for (auto a : async_msgr->get_myaddrs().v) { - if (a.is_same_host(bind_addr) && a.get_port() == bind_addr.get_port()) { - socket_addr = a; - break; - } - } - if (socket_addr.is_blank_ip()) { - socket_addr = bind_addr; - } - return true; - } - return false; -} - void AsyncConnection::connect(const entity_addrvec_t &addrs, int type, entity_addr_t &target) { @@ -503,13 +476,9 @@ void AsyncConnection::accept(ConnectedSocket socket, ceph_assert(socket.fd() >= 0); std::lock_guard l(lock); - cs = std::move(socket); - if (!update_socket_addr(listen_addr.get_type())) { - socket_addr = listen_addr; - } + socket_addr = listen_addr; target_addr = peer_addr; // until we know better - state = STATE_ACCEPTING; protocol->accept(); // rescheduler connection in order to avoid lock dep diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index 519ebb739ee..c30b6d44328 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -136,8 +136,6 @@ class AsyncConnection : public Connection { return target_addr; } - bool update_socket_addr(uint32_t type); - private: enum { STATE_NONE, diff --git a/src/msg/async/ProtocolV2.cc b/src/msg/async/ProtocolV2.cc index 4689943f9eb..4b203c10836 100644 --- a/src/msg/async/ProtocolV2.cc +++ b/src/msg/async/ProtocolV2.cc @@ -298,19 +298,20 @@ struct ClientIdentFrame }; struct ServerIdentFrame - : public SignedEncryptedFrame { + : public SignedEncryptedFrame { const ProtocolV2::Tag tag = ProtocolV2::Tag::IDENT; using SignedEncryptedFrame::SignedEncryptedFrame; inline entity_addrvec_t &addrs() { return get_val<0>(); } - inline int64_t &gid() { return get_val<1>(); } - inline uint64_t &global_seq() { return get_val<2>(); } - inline uint64_t &supported_features() { return get_val<3>(); } - inline uint64_t &required_features() { return get_val<4>(); } - inline uint64_t &flags() { return get_val<5>(); } - inline uint64_t &cookie() { return get_val<6>(); } + inline entity_addr_t &peer_addr() { return get_val<1>(); } + inline int64_t &gid() { return get_val<2>(); } + inline uint64_t &global_seq() { return get_val<3>(); } + inline uint64_t &supported_features() { return get_val<4>(); } + inline uint64_t &required_features() { return get_val<5>(); } + inline uint64_t &flags() { return get_val<6>(); } + inline uint64_t &cookie() { return get_val<7>(); } }; struct ReconnectFrame @@ -2158,17 +2159,8 @@ CtPtr ProtocolV2::send_client_ident() { flags |= CEPH_MSG_CONNECT_LOSSY; } - entity_addrvec_t maddrs = messenger->get_myaddrs(); - if (!messenger->get_myaddrs().front().is_msgr2()) { - entity_addr_t a = messenger->get_myaddrs().front(); - a.set_type(entity_addr_t::TYPE_MSGR2); - ldout(cct, 20) << "encoding addr " << a << " instead of non-v2 myaddrs " - << messenger->get_myaddrs() << dendl; - maddrs.v.push_back(a); - } - - ClientIdentFrame client_ident(this, maddrs, messenger->get_myname().num(), - global_seq, + ClientIdentFrame client_ident(this, messenger->get_myaddrs(), + messenger->get_myname().num(), global_seq, connection->policy.features_supported, connection->policy.features_required, flags); @@ -2281,8 +2273,9 @@ CtPtr ProtocolV2::handle_server_ident(char *payload, uint32_t length) { ldout(cct, 20) << __func__ << " payload_len=" << length << dendl; ServerIdentFrame server_ident(this, payload, length); - ldout(cct, 5) << __func__ << " received server identification: " - << "addrs=" << server_ident.addrs() + ldout(cct, 5) << __func__ << " received server identification:" + << " addrs=" << server_ident.addrs() + << " my_addr=" << server_ident.peer_addr() << " gid=" << server_ident.gid() << " global_seq=" << server_ident.global_seq() << " features_supported=" << std::hex @@ -2291,6 +2284,26 @@ CtPtr ProtocolV2::handle_server_ident(char *payload, uint32_t length) { << " flags=" << server_ident.flags() << " cookie=" << std::dec << server_ident.cookie() << dendl; + connection->lock.unlock(); + messenger->learned_addr(server_ident.peer_addr()); + if (cct->_conf->ms_inject_internal_delays && + cct->_conf->ms_inject_socket_failures) { + if (rand() % cct->_conf->ms_inject_socket_failures == 0) { + ldout(cct, 10) << __func__ << " sleep for " + << cct->_conf->ms_inject_internal_delays << dendl; + utime_t t; + t.set_from_double(cct->_conf->ms_inject_internal_delays); + t.sleep(); + } + } + connection->lock.lock(); + if (state != CONNECTING) { + ldout(cct, 1) << __func__ + << " state changed while learned_addr, mark_down or " + << " replacing must be happened just now" << dendl; + return nullptr; + } + cookie = server_ident.cookie(); connection->set_peer_addrs(server_ident.addrs()); @@ -2491,13 +2504,37 @@ CtPtr ProtocolV2::handle_client_ident(char *payload, uint32_t length) { << " features_required=" << client_ident.required_features() << " flags=" << client_ident.flags() << std::dec << dendl; + connection->target_addr.set_type(entity_addr_t::TYPE_MSGR2); + if (client_ident.addrs().empty()) { connection->set_peer_addr(connection->target_addr); } else { - // Should we check if one of the ident.addrs match connection->target_addr - // as we do in ProtocolV1? - connection->set_peer_addrs(client_ident.addrs()); - connection->target_addr = client_ident.addrs().msgr2_addr(); + entity_addr_t peer_addr = client_ident.addrs().msgr2_addr(); + + ldout(cct, 10) << __func__ << " peer addr is " << peer_addr << dendl; + if (peer_addr.type == entity_addr_t::TYPE_NONE) { + // no address is known + peer_addr = client_ident.addrs().legacy_addr(); + peer_addr.set_type(entity_addr_t::TYPE_MSGR2); + } else if (peer_addr.is_blank_ip()) { + // peer apparently doesn't know what ip they have; figure it out for them. + int port = peer_addr.get_port(); + peer_addr.u = connection->target_addr.u; + peer_addr.set_port(port); + peer_addr.set_type(entity_addr_t::TYPE_MSGR2); + } + ldout(cct, 0) << __func__ << " peer addr is really " << peer_addr + << " (socket is " << connection->target_addr << ")" + << dendl; + connection->target_addr = peer_addr; + entity_addrvec_t addrs; + addrs.v.push_back(peer_addr); + for (const auto &addr : client_ident.addrs().v) { + if (addr.type != entity_addr_t::TYPE_MSGR2) { + addrs.v.push_back(addr); + } + } + connection->set_peer_addrs(addrs); } peer_name = entity_name_t(connection->get_peer_type(), client_ident.gid()); @@ -2857,12 +2894,13 @@ CtPtr ProtocolV2::send_server_ident() { uint64_t gs = messenger->get_global_seq(); ServerIdentFrame server_ident( - this, messenger->get_myaddrs(), messenger->get_myname().num(), gs, - connection->policy.features_supported, + this, messenger->get_myaddrs(), connection->target_addr, + messenger->get_myname().num(), gs, connection->policy.features_supported, connection->policy.features_required, flags, cookie); - ldout(cct, 5) << __func__ << " sending identification: " - << "addrs=" << messenger->get_myaddrs() + ldout(cct, 5) << __func__ << " sending identification:" + << " addrs=" << messenger->get_myaddrs() + << " peer_addr=" << connection->target_addr << " gid=" << messenger->get_myname().num() << " global_seq=" << gs << " features_supported=" << std::hex << connection->policy.features_supported -- 2.39.5