]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: REPLACING state to resolve racing and retain session
authorYingxin Cheng <yingxin.cheng@intel.com>
Thu, 8 Aug 2019 09:09:34 +0000 (17:09 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Mon, 12 Aug 2019 09:22:45 +0000 (17:22 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/Connection.h
src/crimson/net/ProtocolV2.cc
src/crimson/net/ProtocolV2.h

index e4408c19bc10daf50b5c0e921ffde2a8c7c4d7bb..0f8fb12204dfe8843cd32a6bbcf8301bf9371d81 100644 (file)
@@ -35,6 +35,7 @@ class Connection : public seastar::enable_shared_from_this<Connection> {
 
   void set_peer_type(entity_type_t peer_type) { peer_name._type = peer_type; }
   void set_peer_id(int64_t peer_id) { peer_name._num = peer_id; }
+  void set_peer_name(entity_name_t name) { peer_name = name; }
 
  public:
   uint64_t peer_global_id = 0;
index 892f6a2afbdd33b72e2b55b08ce2699dcf2b7764..2cb7b1352c3d5a7298a2a5771d03d87cad0f3b92 100644 (file)
@@ -935,8 +935,6 @@ void ProtocolV2::execute_connecting()
             return client_connect();
           } else {
             ceph_assert(connect_seq > 0);
-            // TODO: lossless policy
-            ceph_assert(false);
             return client_reconnect();
           }
         }).then([this] (next_step_t next) {
@@ -1084,15 +1082,46 @@ ProtocolV2::send_wait()
   });
 }
 
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::reuse_connection(
+    ProtocolV2* existing_proto, bool do_reset,
+    bool reconnect, uint64_t conn_seq, uint64_t msg_seq)
+{
+  existing_proto->trigger_replacing(reconnect,
+                                    do_reset,
+                                    std::move(socket),
+                                    std::move(auth_meta),
+                                    std::move(session_stream_handlers),
+                                    peer_global_seq,
+                                    client_cookie,
+                                    conn.get_peer_name(),
+                                    connection_features,
+                                    conn_seq,
+                                    msg_seq);
+  // close this connection because all the necessary information is delivered
+  // to the exisiting connection, and jump to error handling code to abort the
+  // current state.
+  abort_in_close(*this);
+  return seastar::make_ready_future<next_step_t>(next_step_t::none);
+}
+
 seastar::future<ProtocolV2::next_step_t>
 ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn)
 {
   // handle_existing_connection() logic
-  logger().trace("{} {}: {}", conn, __func__, *existing_conn);
-
   ProtocolV2 *existing_proto = dynamic_cast<ProtocolV2*>(
       existing_conn->protocol.get());
   ceph_assert(existing_proto);
+  logger().debug("{}(gs={}, pgs={}, cs={}, cc={}, sc={}) connecting,"
+                 " found existing {}(state={}, gs={}, pgs={}, cs={}, cc={}, sc={})",
+                 conn, global_seq, peer_global_seq, connect_seq,
+                 client_cookie, server_cookie,
+                 existing_conn, get_state_name(existing_proto->state),
+                 existing_proto->global_seq,
+                 existing_proto->peer_global_seq,
+                 existing_proto->connect_seq,
+                 existing_proto->client_cookie,
+                 existing_proto->server_cookie);
 
   if (existing_proto->state == state_t::CLOSING) {
     logger().warn("{} existing connection {} already closed.", conn, *existing_conn);
@@ -1111,8 +1140,7 @@ ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn)
                   " in favor of existing connection {}",
                   conn, peer_global_seq,
                   existing_proto->peer_global_seq, *existing_conn);
-    dispatch_reset();
-    abort_in_close(*this);
+    abort_in_fault();
   }
 
   if (existing_conn->policy.lossy) {
@@ -1124,8 +1152,42 @@ ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn)
     return send_server_ident();
   }
 
-  // TODO: lossless policy
-  ceph_assert(false);
+  if (existing_proto->server_cookie != 0) {
+    if (existing_proto->client_cookie != client_cookie) {
+      // Found previous session
+      // peer has reset and we're going to reuse the existing connection
+      // by replacing the socket
+      logger().warn("{} found previous session with existing {}, peer must have reset",
+                    conn, *existing_conn);
+      return reuse_connection(existing_proto, conn.policy.resetcheck);
+    } else {
+      // session establishment interrupted between client_ident and server_ident,
+      // continuing...
+      logger().warn("{} found previous session with existing {}, continuing session establishment",
+                    conn, *existing_conn);
+      return reuse_connection(existing_proto);
+    }
+  } else {
+    // Looks like a connection race: server and client are both connecting to
+    // each other at the same time.
+    if (existing_proto->client_cookie != client_cookie) {
+      if (conn.peer_addr < messenger.get_myaddr() || existing_conn->policy.server) {
+        // this connection wins
+        logger().warn("{} connection race detected and win, reusing existing {}",
+                      conn, *existing_conn);
+        return reuse_connection(existing_proto);
+      } else {
+        // the existing connection wins
+        logger().warn("{} connection race detected and lose to existing {}",
+                      conn, *existing_conn);
+        existing_conn->keepalive();
+        return send_wait();
+      }
+    } else {
+      logger().warn("{} found previous client session with existing {}, continuing session establishment");
+      return reuse_connection(existing_proto);
+    }
+  }
 }
 
 seastar::future<ProtocolV2::next_step_t>
@@ -1306,6 +1368,17 @@ ProtocolV2::server_reconnect()
     ProtocolV2 *existing_proto = dynamic_cast<ProtocolV2*>(
         existing_conn->protocol.get());
     ceph_assert(existing_proto);
+    logger().debug("{}(gs={}, pgs={}, cs={}, cc={}, sc={}) re-connecting,"
+                   " found existing {}(state={}, gs={}, pgs={}, cs={}, cc={}, sc={})",
+                   conn, global_seq, peer_global_seq, reconnect.connect_seq(),
+                   reconnect.client_cookie(), reconnect.server_cookie(),
+                   existing_conn,
+                   get_state_name(existing_proto->state),
+                   existing_proto->global_seq,
+                   existing_proto->peer_global_seq,
+                   existing_proto->connect_seq,
+                   existing_proto->client_cookie,
+                   existing_proto->server_cookie);
 
     if (existing_proto->state == state_t::REPLACING) {
       logger().warn("{} server_reconnect: racing replace happened while "
@@ -1317,7 +1390,7 @@ ProtocolV2::server_reconnect()
     if (existing_proto->client_cookie != reconnect.client_cookie()) {
       logger().warn("{} server_reconnect:"
                     " client_cookie mismatch with existing connection {},"
-                    " cc={} rcc={}. I must have reseted, reseting client.",
+                    " cc={} rcc={}. I must have reset, reseting client.",
                     conn, *existing_conn,
                     existing_proto->client_cookie, reconnect.client_cookie());
       return send_reset(conn.policy.resetcheck);
@@ -1352,9 +1425,7 @@ ProtocolV2::server_reconnect()
                     conn, existing_proto->connect_seq, reconnect.connect_seq(),
                     *existing_conn);
       return send_retry(existing_proto->connect_seq);
-    }
-
-    if (existing_proto->connect_seq == reconnect.connect_seq()) {
+    } else if (existing_proto->connect_seq == reconnect.connect_seq()) {
       // reconnect race: both peers are sending reconnect messages
       if (existing_conn->peer_addr > messenger.get_myaddrs().msgr2_addr() &&
           !existing_conn->policy.server) {
@@ -1369,19 +1440,17 @@ ProtocolV2::server_reconnect()
                       " replacing existing connection {}"
                       " socket by this connection's socket",
                       conn, *existing_conn);
+        return reuse_connection(
+            existing_proto, false,
+            true, reconnect.connect_seq(), reconnect.msg_seq());
       }
+    } else { // existing_proto->connect_seq < reconnect.connect_seq()
+      logger().warn("{} server_reconnect: stale exsiting connection {},"
+                    " replacing", conn, *existing_conn);
+      return reuse_connection(
+          existing_proto, false,
+          true, reconnect.connect_seq(), reconnect.msg_seq());
     }
-
-    logger().warn("{} server_reconnect: reconnect to exsiting connection {}",
-                  conn, *existing_conn);
-
-    // everything looks good
-    existing_proto->connect_seq = reconnect.connect_seq();
-    //exproto->message_seq = reconnect.msg_seq();
-
-    // TODO: lossless policy
-    // return reuse_connection(existing, exproto);
-    ceph_assert(false);
   });
 }
 
@@ -1421,8 +1490,6 @@ void ProtocolV2::execute_accepting()
             case Tag::CLIENT_IDENT:
               return server_connect();
             case Tag::SESSION_RECONNECT:
-              // TODO: lossless policy
-              ceph_assert(false);
               return server_reconnect();
             default: {
               unexpected_tag(tag, conn, "post_server_auth");
@@ -1557,12 +1624,95 @@ ProtocolV2::send_server_ident()
 
 // REPLACING state
 
-seastar::future<> ProtocolV2::send_reconnect_ok()
+void ProtocolV2::trigger_replacing(bool reconnect,
+                                   bool do_reset,
+                                   SocketFRef&& new_socket,
+                                   AuthConnectionMetaRef&& new_auth_meta,
+                                   ceph::crypto::onwire::rxtx_t new_rxtx,
+                                   uint64_t new_peer_global_seq,
+                                   uint64_t new_client_cookie,
+                                   entity_name_t new_peer_name,
+                                   uint64_t new_conn_features,
+                                   uint64_t new_connect_seq,
+                                   uint64_t new_msg_seq)
 {
-  // send_reconnect_ok() logic
-  // <prepare and send ReconnectOKFrame>
+  trigger_state(state_t::REPLACING, write_state_t::delay, false);
+  if (socket) {
+    socket->shutdown();
+  }
+  seastar::with_gate(pending_dispatch,
+                     [this,
+                      reconnect,
+                      do_reset,
+                      new_socket = std::move(new_socket),
+                      new_auth_meta = std::move(new_auth_meta),
+                      new_rxtx = std::move(new_rxtx),
+                      new_client_cookie, new_peer_name,
+                      new_conn_features, new_peer_global_seq,
+                      new_connect_seq, new_msg_seq] () mutable {
+    return wait_write_exit().then([this, do_reset] {
+      if (do_reset) {
+        reset_session(true);
+      }
+      protocol_timer.cancel();
+      return std::move(execution_done);
+    }).then([this,
+             reconnect,
+             new_socket = std::move(new_socket),
+             new_auth_meta = std::move(new_auth_meta),
+             new_rxtx = std::move(new_rxtx),
+             new_client_cookie, new_peer_name,
+             new_conn_features, new_peer_global_seq,
+             new_connect_seq, new_msg_seq] () mutable {
+      if (state != state_t::REPLACING) {
+        return new_socket->close().then([sock = std::move(new_socket)] {
+          abort_protocol();
+        });
+      }
 
-  return seastar::now();
+      if (socket) {
+        with_gate(pending_dispatch, [this, sock = std::move(socket)] () mutable {
+          return sock->close().then([sock = std::move(sock)] {});
+        });
+      }
+      socket = std::move(new_socket);
+      auth_meta = std::move(new_auth_meta);
+      session_stream_handlers = std::move(new_rxtx);
+      record_io = false;
+      peer_global_seq = new_peer_global_seq;
+
+      if (reconnect) {
+        connect_seq = new_connect_seq;
+        // send_reconnect_ok() logic
+        requeue_up_to(new_msg_seq);
+        auto reconnect_ok = ReconnectOkFrame::Encode(conn.in_seq);
+        logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, conn.in_seq);
+        return write_frame(reconnect_ok);
+      } else {
+        client_cookie = new_client_cookie;
+        conn.set_peer_name(new_peer_name);
+        connection_features = new_conn_features;
+        return send_server_ident().then([] (next_step_t next) {
+          assert(next == next_step_t::ready);
+        });
+      }
+    }).then([this] {
+      logger().info("{} reconnected(replaced): gs={}, pgs={}, cs={},"
+                    " client_cookie={}, server_cookie={}, in_seq={}, out_seq={}",
+                    conn, global_seq, peer_global_seq, connect_seq,
+                    client_cookie, server_cookie, conn.in_seq, conn.out_seq);
+      execute_ready();
+    }).handle_exception([this] (std::exception_ptr eptr) {
+      logger().debug("{} trigger_replacing(): got exception {} at state {}",
+                     conn, eptr);
+      if (state != state_t::REPLACING) {
+        assert(state == state_t::CLOSING);
+        logger().debug("{} execute_replacing() protocol aborted", conn);
+        return;
+      }
+      fault(true);
+    });
+  });
 }
 
 // READY state
@@ -1876,10 +2026,6 @@ void ProtocolV2::trigger_close()
 
   protocol_timer.cancel();
 
-  if (!socket) {
-    ceph_assert(state == state_t::CONNECTING);
-  }
-
   trigger_state(state_t::CLOSING, write_state_t::drop, false);
 }
 
index b8e97ba80bfdbdfbc9109c4bd5c9b342fd0f374b..ff8973fa968893b0abd782c6906920fb4dc5da50 100644 (file)
@@ -47,7 +47,7 @@ class ProtocolV2 final : public Protocol {
     READY,
     STANDBY,
     WAIT,
-    REPLACING,      // ?
+    REPLACING,
     CLOSING
   };
   state_t state = state_t::NONE;
@@ -60,7 +60,7 @@ class ProtocolV2 final : public Protocol {
                                       "READY",
                                       "STANDBY",
                                       "WAIT",
-                                      "REPLACING",      // ?
+                                      "REPLACING",
                                       "CLOSING"};
     return statenames[static_cast<int>(state)];
   }
@@ -151,6 +151,11 @@ class ProtocolV2 final : public Protocol {
   seastar::future<> server_auth();
 
   seastar::future<next_step_t> send_wait();
+  seastar::future<next_step_t> reuse_connection(ProtocolV2* existing_proto,
+                                                bool do_reset=false,
+                                                bool reconnect=false,
+                                                uint64_t conn_seq=0,
+                                                uint64_t msg_seq=0);
 
   seastar::future<next_step_t> handle_existing_connection(SocketConnectionRef existing_conn);
   seastar::future<next_step_t> server_connect();
@@ -170,7 +175,19 @@ class ProtocolV2 final : public Protocol {
   seastar::future<next_step_t> send_server_ident();
 
   // REPLACING (server)
-  seastar::future<> send_reconnect_ok();
+  void trigger_replacing(bool reconnect,
+                         bool do_reset,
+                         SocketFRef&& new_socket,
+                         AuthConnectionMetaRef&& new_auth_meta,
+                         ceph::crypto::onwire::rxtx_t new_rxtx,
+                         uint64_t new_peer_global_seq,
+                         // !reconnect
+                         uint64_t new_client_cookie,
+                         entity_name_t new_peer_name,
+                         uint64_t new_conn_features,
+                         // reconnect
+                         uint64_t new_connect_seq,
+                         uint64_t new_msg_seq);
 
   // READY
   seastar::future<> read_message(utime_t throttle_stamp);