]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: fix incorrect reset events according to async-msgr
authorYingxin Cheng <yingxin.cheng@intel.com>
Thu, 12 Mar 2020 06:28:56 +0000 (14:28 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Fri, 20 Mar 2020 08:07:48 +0000 (16:07 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/ProtocolV1.cc
src/crimson/net/ProtocolV2.cc
src/crimson/net/ProtocolV2.h

index becb7d6812563b83076b90e0190c3e4dddea316f..db18a55044520a45911e49b165d14c57de85343c 100644 (file)
@@ -377,7 +377,7 @@ void ProtocolV1::start_connect(const entity_addr_t& _peer_addr,
         }).handle_exception([this] (std::exception_ptr eptr) {
           // TODO: handle fault in the connecting state
           logger().warn("{} connecting fault: {}", conn, eptr);
-          close(false);
+          close(true);
         });
     });
 }
@@ -466,7 +466,7 @@ seastar::future<stop_t> ProtocolV1::replace_existing(
     // will all be performed using v2 protocol.
     ceph_abort("lossless policy not supported for v1");
   }
-  (void) existing->close();
+  existing->protocol->close(true);
   return send_connect_reply_ready(reply_tag, std::move(authorizer_reply));
 }
 
@@ -583,6 +583,7 @@ seastar::future<stop_t> ProtocolV1::repeat_handle_connect()
           logger().warn("{} existing {} proto version is {} not 1, close existing",
                         conn, *existing,
                         static_cast<int>(existing->protocol->proto_type));
+          // NOTE: this is following async messenger logic, but we may miss the reset event.
           (void) existing->close();
         } else {
           return handle_connect_with_existing(existing, std::move(authorizer_reply));
@@ -900,22 +901,17 @@ void ProtocolV1::execute_open()
         .handle_exception_type([this] (const std::system_error& e) {
           logger().warn("{} open fault: {}", conn, e);
           if (e.code() == error::protocol_aborted ||
-              e.code() == std::errc::connection_reset) {
+              e.code() == std::errc::connection_reset ||
+              e.code() == error::read_eof) {
             close(true);
             return seastar::now();
-          } else if (e.code() == error::read_eof) {
-            return dispatcher.ms_handle_remote_reset(
-                seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()))
-              .then([this] {
-                close(false);
-              });
           } else {
             throw e;
           }
         }).handle_exception([this] (std::exception_ptr eptr) {
           // TODO: handle fault in the open state
           logger().warn("{} open fault: {}", conn, eptr);
-          close(false);
+          close(true);
         });
     });
 }
index 8a42e89d01b2595885b9692a9fb50e226c85f3fc..5169f78b50c5cbe37b830073fa990878b270fccd 100644 (file)
@@ -450,7 +450,7 @@ void ProtocolV2::reset_session(bool full)
   }
 }
 
-seastar::future<entity_type_t, entity_addr_t> ProtocolV2::banner_exchange()
+seastar::future<entity_type_t, entity_addr_t> ProtocolV2::banner_exchange(bool is_connect)
 {
   // 1. prepare and send banner
   bufferlist banner_payload;
@@ -503,7 +503,7 @@ seastar::future<entity_type_t, entity_addr_t> ProtocolV2::banner_exchange()
       logger().debug("{} GOT banner: payload_len={}", conn, payload_len);
       INTERCEPT_CUSTOM(custom_bp_t::BANNER_PAYLOAD_READ, bp_type_t::READ);
       return read(payload_len);
-    }).then([this] (bufferlist bl) {
+    }).then([this, is_connect] (bufferlist bl) {
       // 4. process peer banner_payload and send HelloFrame
       auto p = bl.cbegin();
       uint64_t peer_supported_features;
@@ -526,13 +526,13 @@ seastar::future<entity_type_t, entity_addr_t> ProtocolV2::banner_exchange()
         logger().error("{} peer does not support all required features"
                        " required={} peer_supported={}",
                        conn, required_features, peer_supported_features);
-        abort_in_close(*this, false);
+        abort_in_close(*this, is_connect);
       }
       if ((supported_features & peer_required_features) != peer_required_features) {
         logger().error("{} we do not support all peer required features"
                        " peer_required={} supported={}",
                        conn, peer_required_features, supported_features);
-        abort_in_close(*this, false);
+        abort_in_close(*this, is_connect);
       }
       this->peer_required_features = peer_required_features;
       if (this->peer_required_features == 0) {
@@ -895,7 +895,7 @@ void ProtocolV2::execute_connecting()
           auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
           session_stream_handlers = { nullptr, nullptr };
           enable_recording();
-          return banner_exchange();
+          return banner_exchange(true);
         }).then([this] (entity_type_t _peer_type,
                         entity_addr_t _my_addr_from_peer) {
           if (conn.get_peer_type() != _peer_type) {
@@ -1295,6 +1295,7 @@ ProtocolV2::server_connect()
                       conn, *existing_conn,
                       static_cast<int>(existing_conn->protocol->proto_type));
         // should unregister the existing from msgr atomically
+        // NOTE: this is following async messenger logic, but we may miss the reset event.
         (void) existing_conn->close();
       } else {
         return handle_existing_connection(existing_conn);
@@ -1404,6 +1405,7 @@ ProtocolV2::server_reconnect()
                     "close existing and reset client.",
                     conn, *existing_conn,
                     static_cast<int>(existing_conn->protocol->proto_type));
+      // NOTE: this is following async messenger logic, but we may miss the reset event.
       (void) existing_conn->close();
       return send_reset(true);
     }
@@ -1503,7 +1505,7 @@ void ProtocolV2::execute_accepting()
           auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
           session_stream_handlers = { nullptr, nullptr };
           enable_recording();
-          return banner_exchange();
+          return banner_exchange(false);
         }).then([this] (entity_type_t _peer_type,
                         entity_addr_t _my_addr_from_peer) {
           ceph_assert(conn.get_peer_type() == 0);
index 54db2722d752b032ab2c3615f403e2bcbbdedc4d..7a64d410942ea9690c5bf2c879e70b094c286b19 100644 (file)
@@ -125,7 +125,7 @@ class ProtocolV2 final : public Protocol {
  private:
   void fault(bool backoff, const char* func_name, std::exception_ptr eptr);
   void reset_session(bool full);
-  seastar::future<entity_type_t, entity_addr_t> banner_exchange();
+  seastar::future<entity_type_t, entity_addr_t> banner_exchange(bool is_connect);
 
   enum class next_step_t {
     ready,