]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/net: add ESTABLISHING state
authorYingxin Cheng <yingxin.cheng@intel.com>
Fri, 6 Sep 2019 03:31:06 +0000 (11:31 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Wed, 18 Sep 2019 04:30:36 +0000 (12:30 +0800)
With the new ESTABLISHING state, connection lookup and acceptance can be
atomic, solving the issues related to racing connect.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/ProtocolV2.cc
src/crimson/net/ProtocolV2.h

index 4c62f8182761f936c2c8a9647d54dff3a819deb6..537c0af0b99113feffc00570f27e1392f51a24be 100644 (file)
@@ -1214,7 +1214,14 @@ ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn)
                   " this connection", conn, *existing_conn);
     existing_proto->dispatch_reset();
     existing_proto->close();
-    return send_server_ident();
+
+    if (unlikely(state != state_t::ACCEPTING)) {
+      logger().debug("{} triggered {} in execute_accepting()",
+                     conn, get_state_name(state));
+      abort_protocol();
+    }
+    execute_establishing();
+    return seastar::make_ready_future<next_step_t>(next_step_t::ready);
   }
 
   if (existing_proto->server_cookie != 0) {
@@ -1348,10 +1355,13 @@ ProtocolV2::server_connect()
       }
     }
 
-    // TODO: atomically register & unaccept the connecton with lookup_conn()
-
-    // if everything is OK reply with server identification
-    return send_server_ident();
+    if (unlikely(state != state_t::ACCEPTING)) {
+      logger().debug("{} triggered {} in execute_accepting()",
+                     conn, get_state_name(state));
+      abort_protocol();
+    }
+    execute_establishing();
+    return seastar::make_ready_future<next_step_t>(next_step_t::ready);
   });
 }
 
@@ -1540,8 +1550,7 @@ ProtocolV2::server_reconnect()
 
 void ProtocolV2::execute_accepting()
 {
-  // TODO: change to write_state_t::none
-  trigger_state(state_t::ACCEPTING, write_state_t::delay, false);
+  trigger_state(state_t::ACCEPTING, write_state_t::none, false);
   seastar::with_gate(pending_dispatch, [this] {
       return seastar::futurize_apply([this] {
           INTERCEPT_N_RW(custom_bp_t::SOCKET_ACCEPTED);
@@ -1584,41 +1593,21 @@ void ProtocolV2::execute_accepting()
             }
           }
         }).then([this] (next_step_t next) {
-          if (unlikely(state != state_t::ACCEPTING)) {
-            logger().debug("{} triggered {} at the end of execute_accepting()",
-                           conn, get_state_name(state));
-            abort_protocol();
-          }
           switch (next) {
-           case next_step_t::ready: {
-            seastar::with_gate(pending_dispatch, [this] {
-              return dispatcher.ms_handle_accept(
-                  seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
-            }).handle_exception([this] (std::exception_ptr eptr) {
-              logger().error("{} ms_handle_accept caught exception: {}", conn, eptr);
-              ceph_abort("unexpected exception from ms_handle_accept()");
-            });
-            messenger.register_conn(
-              seastar::static_pointer_cast<SocketConnection>(
-                conn.shared_from_this()));
-            messenger.unaccept_conn(
-              seastar::static_pointer_cast<SocketConnection>(
-                conn.shared_from_this()));
-            logger().info("{} accepted: gs={}, pgs={}, cs={},"
-                          " client_cookie={}, server_cookie={}, in_seq={}, out_seq={}",
-                          conn, global_seq, peer_global_seq, connect_seq,
-                          client_cookie, server_cookie, conn.in_seq, conn.out_seq);
-            execute_ready();
+           case next_step_t::ready:
+            assert(state != state_t::ACCEPTING);
             break;
-           }
-           case next_step_t::wait: {
+           case next_step_t::wait:
+            if (unlikely(state != state_t::ACCEPTING)) {
+              logger().debug("{} triggered {} at the end of execute_accepting()",
+                             conn, get_state_name(state));
+              abort_protocol();
+            }
             logger().info("{} execute_accepting(): going to SERVER_WAIT", conn);
             execute_server_wait();
             break;
-           }
-           default: {
+           default:
             ceph_abort("impossible next step");
-           }
           }
         }).handle_exception([this] (std::exception_ptr eptr) {
           logger().info("{} execute_accepting(): fault at {}, going to CLOSING -- {}",
@@ -1663,9 +1652,53 @@ seastar::future<> ProtocolV2::finish_auth()
   });
 }
 
-// ACCEPTING or REPLACING state
+// ESTABLISHING
 
-seastar::future<ProtocolV2::next_step_t>
+void ProtocolV2::execute_establishing() {
+  trigger_state(state_t::ESTABLISHING, write_state_t::delay, false);
+  seastar::with_gate(pending_dispatch, [this] {
+    return dispatcher.ms_handle_accept(
+        seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+  }).handle_exception([this] (std::exception_ptr eptr) {
+    logger().error("{} ms_handle_accept caught exception: {}", conn, eptr);
+    ceph_abort("unexpected exception from ms_handle_accept()");
+  });
+  messenger.register_conn(
+    seastar::static_pointer_cast<SocketConnection>(
+      conn.shared_from_this()));
+  messenger.unaccept_conn(
+    seastar::static_pointer_cast<SocketConnection>(
+      conn.shared_from_this()));
+  logger().info("{} accepted: gs={}, pgs={}, cs={},"
+                " client_cookie={}, server_cookie={}, in_seq={}, out_seq={}",
+                conn, global_seq, peer_global_seq, connect_seq,
+                client_cookie, server_cookie, conn.in_seq, conn.out_seq);
+  execution_done = seastar::with_gate(pending_dispatch, [this] {
+    return seastar::futurize_apply([this] {
+      return send_server_ident();
+    }).then([this] {
+      if (unlikely(state != state_t::ESTABLISHING)) {
+        logger().debug("{} triggered {} at the end of execute_establishing()",
+                       conn, get_state_name(state));
+        abort_protocol();
+      }
+      execute_ready();
+    }).handle_exception([this] (std::exception_ptr eptr) {
+      if (state != state_t::ESTABLISHING) {
+        logger().info("{} execute_establishing() protocol aborted at {} -- {}",
+                      conn, get_state_name(state), eptr);
+        assert(state == state_t::CLOSING ||
+               state == state_t::REPLACING);
+        return;
+      }
+      fault(false, "execute_establishing()", eptr);
+    });
+  });
+}
+
+// ESTABLISHING or REPLACING state
+
+seastar::future<>
 ProtocolV2::send_server_ident()
 {
   // send_server_ident() logic
@@ -1707,8 +1740,6 @@ ProtocolV2::send_server_ident()
     conn.set_features(connection_features);
 
     return write_frame(server_ident);
-  }).then([] {
-    return next_step_t::ready;
   });
 }
 
@@ -1730,6 +1761,15 @@ void ProtocolV2::trigger_replacing(bool reconnect,
   if (socket) {
     socket->shutdown();
   }
+  if (!reconnect && new_client_cookie != client_cookie) {
+    seastar::with_gate(pending_dispatch, [this] {
+      return dispatcher.ms_handle_accept(
+          seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+    }).handle_exception([this] (std::exception_ptr eptr) {
+      logger().error("{} ms_handle_accept caught exception: {}", conn, eptr);
+      ceph_abort("unexpected exception from ms_handle_accept()");
+    });
+  }
   seastar::with_gate(pending_dispatch,
                      [this,
                       reconnect,
@@ -1782,9 +1822,7 @@ void ProtocolV2::trigger_replacing(bool reconnect,
         client_cookie = new_client_cookie;
         conn.set_peer_name(new_peer_name);
         connection_features = new_conn_features;
-        return send_server_ident().then([] (next_step_t next) {
-          assert(next == next_step_t::ready);
-        });
+        return send_server_ident();
       }
     }).then([this] {
       if (unlikely(state != state_t::REPLACING)) {
@@ -2126,7 +2164,7 @@ void ProtocolV2::trigger_close()
     messenger.unaccept_conn(
       seastar::static_pointer_cast<SocketConnection>(
         conn.shared_from_this()));
-  } else if (state >= state_t::CONNECTING && state < state_t::CLOSING) {
+  } else if (state >= state_t::ESTABLISHING && state < state_t::CLOSING) {
     messenger.unregister_conn(
       seastar::static_pointer_cast<SocketConnection>(
         conn.shared_from_this()));
index 677008acde3a98c02282c2788a88f7c3929d1768..7bba5d79a68fd78a87736b1e614d6edb119a9db3 100644 (file)
@@ -43,6 +43,7 @@ class ProtocolV2 final : public Protocol {
     NONE = 0,
     ACCEPTING,
     SERVER_WAIT,
+    ESTABLISHING,
     CONNECTING,
     READY,
     STANDBY,
@@ -56,6 +57,7 @@ class ProtocolV2 final : public Protocol {
     const char *const statenames[] = {"NONE",
                                       "ACCEPTING",
                                       "SERVER_WAIT",
+                                      "ESTABLISHING",
                                       "CONNECTING",
                                       "READY",
                                       "STANDBY",
@@ -171,8 +173,11 @@ class ProtocolV2 final : public Protocol {
   // CONNECTING/ACCEPTING
   seastar::future<> finish_auth();
 
-  // ACCEPTING/REPLACING (server)
-  seastar::future<next_step_t> send_server_ident();
+  // ESTABLISHING
+  void execute_establishing();
+
+  // ESTABLISHING/REPLACING (server)
+  seastar::future<> send_server_ident();
 
   // REPLACING (server)
   void trigger_replacing(bool reconnect,