]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: misc cleanups to protocol v2 implementations
authorYingxin Cheng <yingxin.cheng@intel.com>
Thu, 1 Jun 2023 09:05:42 +0000 (17:05 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Sun, 25 Jun 2023 03:57:19 +0000 (11:57 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/ProtocolV2.cc
src/crimson/net/ProtocolV2.h

index 99a48536de5abdebc9b37b07cb6be6e91f6977a8..b3a25955483e9f6a9f04c4327fd53caf7630350a 100644 (file)
@@ -187,9 +187,10 @@ void ProtocolV2::start_accept(SocketFRef&& new_socket,
   execute_accepting();
 }
 
-void ProtocolV2::trigger_state(state_t new_state, io_state_t new_io_state, bool reentrant)
+void ProtocolV2::trigger_state(state_t new_state, io_state_t new_io_state)
 {
-  if (!reentrant && new_state == state) {
+  ceph_assert_always(!gate.is_closed());
+  if (new_state == state) {
     logger().error("{} is not allowed to re-trigger state {}",
                    conn, get_state_name(state));
     ceph_abort();
@@ -201,11 +202,11 @@ void ProtocolV2::trigger_state(state_t new_state, io_state_t new_io_state, bool
   }
   logger().debug("{} TRIGGER {}, was {}",
                  conn, get_state_name(new_state), get_state_name(state));
-  auto pre_state = state;
-  if (pre_state == state_t::READY) {
-    assert(!gate.is_closed());
-    ceph_assert_always(!exit_io.has_value());
-    exit_io = seastar::shared_promise<>();
+
+  if (state == state_t::READY) {
+    // from READY
+    ceph_assert_always(!pr_exit_io.has_value());
+    pr_exit_io = seastar::shared_promise<>();
   }
 
   bool need_notify_out;
@@ -215,28 +216,28 @@ void ProtocolV2::trigger_state(state_t new_state, io_state_t new_io_state, bool
     need_notify_out = false;
   }
 
+  auto pre_state = state;
   state = new_state;
+
+  FrameAssemblerV2Ref fa;
   if (new_state == state_t::READY) {
-    // I'm not responsible to shutdown the socket at READY
-    is_socket_valid = false;
-    io_handler.set_io_state(new_io_state, std::move(frame_assembler), need_notify_out);
+    assert(new_io_state == io_state_t::open);
+    fa = std::move(frame_assembler);
   } else {
-    io_handler.set_io_state(new_io_state, nullptr, need_notify_out);
+    assert(new_io_state != io_state_t::open);
   }
-
-  /*
-   * not atomic below
-   */
+  io_handler.set_io_state(new_io_state, std::move(fa), need_notify_out);
 
   if (pre_state == state_t::READY) {
+    assert(new_io_state != io_state_t::open);
     gate.dispatch_in_background("exit_io", conn, [this] {
       return io_handler.wait_io_exit_dispatching(
       ).then([this](auto ret) {
         frame_assembler = std::move(ret.frame_assembler);
         ceph_assert_always(!frame_assembler->is_socket_valid());
         io_states = ret.io_states;
-        exit_io->set_value();
-        exit_io = std::nullopt;
+        pr_exit_io->set_value();
+        pr_exit_io = std::nullopt;
       });
     });
   }
@@ -780,7 +781,7 @@ ProtocolV2::client_reconnect()
 void ProtocolV2::execute_connecting()
 {
   ceph_assert_always(!is_socket_valid);
-  trigger_state(state_t::CONNECTING, io_state_t::delay, false);
+  trigger_state(state_t::CONNECTING, io_state_t::delay);
   gated_execute("execute_connecting", conn, [this] {
       global_seq = messenger.get_global_seq();
       assert(client_cookie != 0);
@@ -1497,7 +1498,7 @@ ProtocolV2::server_reconnect()
 void ProtocolV2::execute_accepting()
 {
   assert(is_socket_valid);
-  trigger_state(state_t::ACCEPTING, io_state_t::none, false);
+  trigger_state(state_t::ACCEPTING, io_state_t::none);
   gate.dispatch_in_background("execute_accepting", conn, [this] {
       return seastar::futurize_invoke([this] {
 #ifdef UNIT_TESTS_BUILT
@@ -1631,10 +1632,13 @@ void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) {
   };
 
   ceph_assert_always(is_socket_valid);
-  trigger_state(state_t::ESTABLISHING, io_state_t::delay, false);
+  trigger_state(state_t::ESTABLISHING, io_state_t::delay);
   if (existing_conn) {
-    static_cast<ProtocolV2*>(existing_conn->protocol.get())->do_close(
-        true /* is_dispatch_reset */, std::move(accept_me));
+    ProtocolV2 *existing_proto = dynamic_cast<ProtocolV2*>(
+        existing_conn->protocol.get());
+    existing_proto->do_close(
+        true, // is_dispatch_reset
+        std::move(accept_me));
     if (unlikely(state != state_t::ESTABLISHING)) {
       logger().warn("{} triggered {} during execute_establishing(), "
                     "the accept event will not be delivered!",
@@ -1653,6 +1657,7 @@ void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) {
   }
 
   gated_execute("execute_establishing", conn, [this] {
+    ceph_assert_always(state == state_t::ESTABLISHING);
     return seastar::futurize_invoke([this] {
       return send_server_ident();
     }).then([this] {
@@ -1677,6 +1682,8 @@ void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) {
 seastar::future<>
 ProtocolV2::send_server_ident()
 {
+  ceph_assert_always(state == state_t::ESTABLISHING ||
+                     state == state_t::REPLACING);
   // send_server_ident() logic
 
   // refered to async-conn v2: not assign gs to global_seq
@@ -1730,9 +1737,11 @@ void ProtocolV2::trigger_replacing(bool reconnect,
                                    uint64_t new_connect_seq,
                                    uint64_t new_msg_seq)
 {
+  ceph_assert_always(state >= state_t::ESTABLISHING);
+  ceph_assert_always(state <= state_t::WAIT);
   ceph_assert_always(has_socket || state == state_t::CONNECTING);
   ceph_assert_always(!mover.socket->is_shutdown());
-  trigger_state(state_t::REPLACING, io_state_t::delay, false);
+  trigger_state(state_t::REPLACING, io_state_t::delay);
   if (is_socket_valid) {
     frame_assembler->shutdown_socket<true>(&gate);
     is_socket_valid = false;
@@ -1773,6 +1782,7 @@ void ProtocolV2::trigger_replacing(bool reconnect,
       }
 
       if (unlikely(state != state_t::REPLACING)) {
+        ceph_assert_always(state == state_t::CLOSING);
         return mover.socket->close(
         ).then([sock = std::move(mover.socket)] {
           abort_protocol();
@@ -1813,11 +1823,13 @@ void ProtocolV2::trigger_replacing(bool reconnect,
       }
     }).then([this, reconnect] {
       if (unlikely(state != state_t::REPLACING)) {
-        logger().debug("{} triggered {} at the end of trigger_replacing()",
+        logger().debug("{} triggered {} at the end of trigger_replacing(), abort",
                        conn, get_state_name(state));
+        ceph_assert_always(state == state_t::CLOSING);
         abort_protocol();
       }
-      logger().info("{} replaced ({}): gs={}, pgs={}, cs={}, "
+      logger().info("{} replaced ({}), going to ready: "
+                    "gs={}, pgs={}, cs={}, "
                     "client_cookie={}, server_cookie={}, {}",
                     conn, reconnect ? "reconnected" : "connected",
                     global_seq, peer_global_seq, connect_seq,
@@ -1845,7 +1857,9 @@ void ProtocolV2::execute_ready()
   assert(conn.policy.lossy || (client_cookie != 0 && server_cookie != 0));
   protocol_timer.cancel();
   ceph_assert_always(is_socket_valid);
-  trigger_state(state_t::READY, io_state_t::open, false);
+  // I'm not responsible to shutdown the socket at READY
+  is_socket_valid = false;
+  trigger_state(state_t::READY, io_state_t::open);
 }
 
 // STANDBY state
@@ -1853,7 +1867,7 @@ void ProtocolV2::execute_ready()
 void ProtocolV2::execute_standby()
 {
   ceph_assert_always(!is_socket_valid);
-  trigger_state(state_t::STANDBY, io_state_t::delay, false);
+  trigger_state(state_t::STANDBY, io_state_t::delay);
 }
 
 void ProtocolV2::notify_out()
@@ -1871,7 +1885,7 @@ void ProtocolV2::notify_out()
 void ProtocolV2::execute_wait(bool max_backoff)
 {
   ceph_assert_always(!is_socket_valid);
-  trigger_state(state_t::WAIT, io_state_t::delay, false);
+  trigger_state(state_t::WAIT, io_state_t::delay);
   gated_execute("execute_wait", conn, [this, max_backoff] {
     double backoff = protocol_timer.last_dur();
     if (max_backoff) {
@@ -1909,7 +1923,7 @@ void ProtocolV2::execute_wait(bool max_backoff)
 void ProtocolV2::execute_server_wait()
 {
   ceph_assert_always(is_socket_valid);
-  trigger_state(state_t::SERVER_WAIT, io_state_t::none, false);
+  trigger_state(state_t::SERVER_WAIT, io_state_t::none);
   gated_execute("execute_server_wait", conn, [this] {
     return frame_assembler->read_exactly(1
     ).then([this](auto bptr) {
@@ -1944,22 +1958,21 @@ seastar::future<> ProtocolV2::close_clean_yielded()
   // the container when seastar::parallel_for_each() is still iterating in it.
   // that'd lead to a segfault.
   return seastar::yield(
-  ).then([this, conn_ref = conn.shared_from_this()] {
+  ).then([this] {
     do_close(false);
-    // it can happen if close_clean() is called inside Dispatcher::ms_handle_reset()
-    // which will otherwise result in deadlock
-    assert(closed_clean_fut.valid());
-    return closed_clean_fut.get_future();
-  });
+    return pr_closed_clean.get_shared_future();
+
+  // connection may be unreferenced from the messenger,
+  // so need to hold the additional reference.
+  }).finally([conn_ref = conn.shared_from_this()] {});;
 }
 
 void ProtocolV2::do_close(
     bool is_dispatch_reset,
     std::optional<std::function<void()>> f_accept_new)
 {
-  if (closed) {
+  if (state == state_t::CLOSING) {
     // already closing
-    assert(state == state_t::CLOSING);
     return;
   }
 
@@ -1972,9 +1985,9 @@ void ProtocolV2::do_close(
    * atomic operations
    */
 
-  closed = true;
+  ceph_assert_always(!gate.is_closed());
 
-  // trigger close
+  // messenger registrations, must before user events
   messenger.closing_conn(
       seastar::static_pointer_cast<SocketConnection>(
         conn.shared_from_this()));
@@ -1990,27 +2003,27 @@ void ProtocolV2::do_close(
     // cannot happen
     ceph_assert(false);
   }
-  protocol_timer.cancel();
-  trigger_state(state_t::CLOSING, io_state_t::drop, false);
-
   if (f_accept_new) {
+    // the replacing connection must be registerred after the replaced
+    // connection is unreigsterred.
     (*f_accept_new)();
   }
+
+  protocol_timer.cancel();
   if (is_socket_valid) {
     frame_assembler->shutdown_socket<true>(&gate);
     is_socket_valid = false;
   }
-  assert(!gate.is_closed());
-  auto handshake_closed = gate.close();
-  auto io_closed = io_handler.close_io(
-      is_dispatch_reset, is_replace);
-
-  // asynchronous operations
-  assert(!closed_clean_fut.valid());
-  closed_clean_fut = seastar::when_all(
-      std::move(handshake_closed), std::move(io_closed)
-  ).discard_result().then([this] {
-    ceph_assert_always(!exit_io.has_value());
+
+  trigger_state(state_t::CLOSING, io_state_t::drop);
+  gate.dispatch_in_background(
+      "close_io", conn, [this, is_dispatch_reset, is_replace] {
+    return io_handler.close_io(is_dispatch_reset, is_replace);
+  });
+
+  std::ignore = gate.close(
+  ).then([this] {
+    ceph_assert_always(!pr_exit_io.has_value());
     if (has_socket) {
       ceph_assert_always(frame_assembler);
       return frame_assembler->close_shutdown_socket();
@@ -2022,6 +2035,7 @@ void ProtocolV2::do_close(
     messenger.closed_conn(
         seastar::static_pointer_cast<SocketConnection>(
           conn.shared_from_this()));
+    pr_closed_clean.set_value();
 #ifdef UNIT_TESTS_BUILT
     closed_clean = true;
     if (conn.interceptor) {
@@ -2030,7 +2044,7 @@ void ProtocolV2::do_close(
     }
 #endif
   }).handle_exception([conn_ref = conn.shared_from_this(), this] (auto eptr) {
-    logger().error("{} closing: closed_clean_fut got unexpected exception {}",
+    logger().error("{} closing got unexpected exception {}",
                    conn, eptr);
     ceph_abort();
   });
index 2aa9496ef83e529da026235f38aa61976f00cb38..b1767f7da0108271226a9318cab2c5d5e1ae45ee 100644 (file)
@@ -52,7 +52,7 @@ public:
   }
 
   bool is_closed() const {
-    return closed;
+    return state == state_t::CLOSING;
   }
 
 #endif
@@ -60,8 +60,8 @@ private:
   using io_state_t = IOHandler::io_state_t;
 
   seastar::future<> wait_exit_io() {
-    if (exit_io.has_value()) {
-      return exit_io->get_shared_future();
+    if (pr_exit_io.has_value()) {
+      return pr_exit_io->get_shared_future();
     } else {
       return seastar::now();
     }
@@ -94,7 +94,7 @@ private:
     return statenames[static_cast<int>(state)];
   }
 
-  void trigger_state(state_t state, io_state_t io_state, bool reentrant);
+  void trigger_state(state_t new_state, io_state_t new_io_state);
 
   template <typename Func, typename T>
   void gated_execute(const char *what, T &who, Func &&func) {
@@ -227,16 +227,13 @@ private:
 
   FrameAssemblerV2Ref frame_assembler;
 
-  std::optional<seastar::shared_promise<>> exit_io;
+  std::optional<seastar::shared_promise<>> pr_exit_io;
 
   AuthConnectionMetaRef auth_meta;
 
   crimson::common::Gated gate;
 
-  bool closed = false;
-
-  // become valid only after closed == true
-  seastar::shared_future<> closed_clean_fut;
+  seastar::shared_promise<> pr_closed_clean;
 
 #ifdef UNIT_TESTS_BUILT
   bool closed_clean = false;