From: Yingxin Cheng Date: Thu, 12 Mar 2020 06:28:56 +0000 (+0800) Subject: crimson/net: fix incorrect reset events according to async-msgr X-Git-Tag: v16.0.0~8^2~8 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=0bae2d3d89d5316a2c037c92cd1e5a2d05264f1e;p=ceph-ci.git crimson/net: fix incorrect reset events according to async-msgr Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/net/ProtocolV1.cc b/src/crimson/net/ProtocolV1.cc index becb7d68125..db18a550445 100644 --- a/src/crimson/net/ProtocolV1.cc +++ b/src/crimson/net/ProtocolV1.cc @@ -377,7 +377,7 @@ void ProtocolV1::start_connect(const entity_addr_t& _peer_addr, }).handle_exception([this] (std::exception_ptr eptr) { // TODO: handle fault in the connecting state logger().warn("{} connecting fault: {}", conn, eptr); - close(false); + close(true); }); }); } @@ -466,7 +466,7 @@ seastar::future ProtocolV1::replace_existing( // will all be performed using v2 protocol. ceph_abort("lossless policy not supported for v1"); } - (void) existing->close(); + existing->protocol->close(true); return send_connect_reply_ready(reply_tag, std::move(authorizer_reply)); } @@ -583,6 +583,7 @@ seastar::future ProtocolV1::repeat_handle_connect() logger().warn("{} existing {} proto version is {} not 1, close existing", conn, *existing, static_cast(existing->protocol->proto_type)); + // NOTE: this is following async messenger logic, but we may miss the reset event. (void) existing->close(); } else { return handle_connect_with_existing(existing, std::move(authorizer_reply)); @@ -900,22 +901,17 @@ void ProtocolV1::execute_open() .handle_exception_type([this] (const std::system_error& e) { logger().warn("{} open fault: {}", conn, e); if (e.code() == error::protocol_aborted || - e.code() == std::errc::connection_reset) { + e.code() == std::errc::connection_reset || + e.code() == error::read_eof) { close(true); return seastar::now(); - } else if (e.code() == error::read_eof) { - return dispatcher.ms_handle_remote_reset( - seastar::static_pointer_cast(conn.shared_from_this())) - .then([this] { - close(false); - }); } else { throw e; } }).handle_exception([this] (std::exception_ptr eptr) { // TODO: handle fault in the open state logger().warn("{} open fault: {}", conn, eptr); - close(false); + close(true); }); }); } diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index 8a42e89d01b..5169f78b50c 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -450,7 +450,7 @@ void ProtocolV2::reset_session(bool full) } } -seastar::future ProtocolV2::banner_exchange() +seastar::future ProtocolV2::banner_exchange(bool is_connect) { // 1. prepare and send banner bufferlist banner_payload; @@ -503,7 +503,7 @@ seastar::future ProtocolV2::banner_exchange() logger().debug("{} GOT banner: payload_len={}", conn, payload_len); INTERCEPT_CUSTOM(custom_bp_t::BANNER_PAYLOAD_READ, bp_type_t::READ); return read(payload_len); - }).then([this] (bufferlist bl) { + }).then([this, is_connect] (bufferlist bl) { // 4. process peer banner_payload and send HelloFrame auto p = bl.cbegin(); uint64_t peer_supported_features; @@ -526,13 +526,13 @@ seastar::future ProtocolV2::banner_exchange() logger().error("{} peer does not support all required features" " required={} peer_supported={}", conn, required_features, peer_supported_features); - abort_in_close(*this, false); + abort_in_close(*this, is_connect); } if ((supported_features & peer_required_features) != peer_required_features) { logger().error("{} we do not support all peer required features" " peer_required={} supported={}", conn, peer_required_features, supported_features); - abort_in_close(*this, false); + abort_in_close(*this, is_connect); } this->peer_required_features = peer_required_features; if (this->peer_required_features == 0) { @@ -895,7 +895,7 @@ void ProtocolV2::execute_connecting() auth_meta = seastar::make_lw_shared(); session_stream_handlers = { nullptr, nullptr }; enable_recording(); - return banner_exchange(); + return banner_exchange(true); }).then([this] (entity_type_t _peer_type, entity_addr_t _my_addr_from_peer) { if (conn.get_peer_type() != _peer_type) { @@ -1295,6 +1295,7 @@ ProtocolV2::server_connect() conn, *existing_conn, static_cast(existing_conn->protocol->proto_type)); // should unregister the existing from msgr atomically + // NOTE: this is following async messenger logic, but we may miss the reset event. (void) existing_conn->close(); } else { return handle_existing_connection(existing_conn); @@ -1404,6 +1405,7 @@ ProtocolV2::server_reconnect() "close existing and reset client.", conn, *existing_conn, static_cast(existing_conn->protocol->proto_type)); + // NOTE: this is following async messenger logic, but we may miss the reset event. (void) existing_conn->close(); return send_reset(true); } @@ -1503,7 +1505,7 @@ void ProtocolV2::execute_accepting() auth_meta = seastar::make_lw_shared(); session_stream_handlers = { nullptr, nullptr }; enable_recording(); - return banner_exchange(); + return banner_exchange(false); }).then([this] (entity_type_t _peer_type, entity_addr_t _my_addr_from_peer) { ceph_assert(conn.get_peer_type() == 0); diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h index 54db2722d75..7a64d410942 100644 --- a/src/crimson/net/ProtocolV2.h +++ b/src/crimson/net/ProtocolV2.h @@ -125,7 +125,7 @@ class ProtocolV2 final : public Protocol { private: void fault(bool backoff, const char* func_name, std::exception_ptr eptr); void reset_session(bool full); - seastar::future banner_exchange(); + seastar::future banner_exchange(bool is_connect); enum class next_step_t { ready,