]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: prevent racing in protocol to switch core and to call io-handler interfaces
authorYingxin Cheng <yingxin.cheng@intel.com>
Mon, 5 Jun 2023 02:40:23 +0000 (10:40 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Sun, 25 Jun 2023 03:57:19 +0000 (11:57 +0800)
Otherwise, calling io-handler interfaces may result in wrong core/order.

This needs to take special care to handle preemptive cases such as
closing and replacing.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/ProtocolV2.cc
src/crimson/net/ProtocolV2.h

index 6c64db82fd558b018d291d47bf87bbc4c960bb37..f7f2bccfe41e4acf6376e3b1adee3f68b89c45d2 100644 (file)
@@ -187,7 +187,7 @@ void ProtocolV2::start_accept(SocketFRef&& new_socket,
   execute_accepting();
 }
 
-void ProtocolV2::trigger_state(state_t new_state, io_state_t new_io_state)
+void ProtocolV2::trigger_state_phase1(state_t new_state)
 {
   ceph_assert_always(!gate.is_closed());
   if (new_state == state) {
@@ -205,19 +205,27 @@ void ProtocolV2::trigger_state(state_t new_state, io_state_t new_io_state)
 
   if (state == state_t::READY) {
     // from READY
+    ceph_assert_always(!need_exit_io);
     ceph_assert_always(!pr_exit_io.has_value());
+    need_exit_io = true;
     pr_exit_io = seastar::shared_promise<>();
   }
 
-  bool need_notify_out;
   if (new_state == state_t::STANDBY && !conn.policy.server) {
     need_notify_out = true;
   } else {
     need_notify_out = false;
   }
 
-  auto pre_state = state;
   state = new_state;
+}
+
+void ProtocolV2::trigger_state_phase2(
+    state_t new_state, io_state_t new_io_state)
+{
+  ceph_assert_always(new_state == state);
+  ceph_assert_always(!gate.is_closed());
+  ceph_assert_always(!pr_switch_io_shard.has_value());
 
   FrameAssemblerV2Ref fa;
   if (new_state == state_t::READY) {
@@ -231,9 +239,12 @@ void ProtocolV2::trigger_state(state_t new_state, io_state_t new_io_state)
   }
   io_handler.set_io_state(new_io_state, std::move(fa), need_notify_out);
 
-  if (pre_state == state_t::READY) {
+  if (need_exit_io) {
+    // from READY
     logger().debug("{} IOHandler::wait_io_exit_dispatching() start ...", conn);
+    assert(pr_exit_io.has_value());
     assert(new_io_state != io_state_t::open);
+    need_exit_io = false;
     gate.dispatch_in_background("exit_io", conn, [this] {
       return seastar::smp::submit_to(
           io_handler.get_shard_id(), [this] {
@@ -246,6 +257,7 @@ void ProtocolV2::trigger_state(state_t new_state, io_state_t new_io_state)
         ceph_assert_always(
             seastar::this_shard_id() == frame_assembler->get_shard_id());
         ceph_assert_always(!frame_assembler->is_socket_valid());
+        assert(!need_exit_io);
         io_states = ret.io_states;
         pr_exit_io->set_value();
         pr_exit_io = std::nullopt;
@@ -1810,11 +1822,11 @@ void ProtocolV2::trigger_replacing(bool reconnect,
                 conn, reconnect ? "reconnected" : "connected",
                 peer_global_seq, connect_seq, client_cookie,
                 io_states, mover.socket->get_shard_id());
-  trigger_state(state_t::REPLACING, io_state_t::delay);
   if (is_socket_valid) {
     frame_assembler->shutdown_socket<true>(&gate);
     is_socket_valid = false;
   }
+  trigger_state_phase1(state_t::REPLACING);
   gate.dispatch_in_background(
       "trigger_replacing",
       conn,
@@ -1832,13 +1844,22 @@ void ProtocolV2::trigger_replacing(bool reconnect,
     // state may become CLOSING below, but we cannot abort the chain until
     // mover.socket is correctly handled (closed or replaced).
 
-    return wait_exit_io(
+    // this is preemptive
+    return wait_switch_io_shard(
     ).then([this] {
       if (unlikely(state != state_t::REPLACING)) {
         ceph_assert_always(state == state_t::CLOSING);
         return seastar::now();
       }
 
+      trigger_state_phase2(state_t::REPLACING, io_state_t::delay);
+      return wait_exit_io();
+    }).then([this] {
+      if (unlikely(state != state_t::REPLACING)) {
+        ceph_assert_always(state == state_t::CLOSING);
+        return seastar::now();
+      }
+
       ceph_assert_always(frame_assembler);
       protocol_timer.cancel();
       auto done = std::move(execution_done);
@@ -2115,38 +2136,54 @@ void ProtocolV2::do_close(
     is_socket_valid = false;
   }
 
-  trigger_state(state_t::CLOSING, io_state_t::drop);
+  trigger_state_phase1(state_t::CLOSING);
   gate.dispatch_in_background(
       "close_io", conn, [this, is_dispatch_reset, is_replace] {
-    return io_handler.close_io(is_dispatch_reset, is_replace);
-  });
-
-  std::ignore = gate.close(
-  ).then([this] {
-    ceph_assert_always(!pr_exit_io.has_value());
-    if (has_socket) {
-      ceph_assert_always(frame_assembler);
-      return frame_assembler->close_shutdown_socket();
-    } else {
-      return seastar::now();
-    }
-  }).then([this] {
-    logger().debug("{} closed!", conn);
-    messenger.closed_conn(
-        seastar::static_pointer_cast<SocketConnection>(
-          conn.shared_from_this()));
-    pr_closed_clean.set_value();
+    // this is preemptive
+    return wait_switch_io_shard(
+    ).then([this, is_dispatch_reset, is_replace] {
+      trigger_state_phase2(state_t::CLOSING, io_state_t::drop);
+      logger().debug("{} IOHandler::close_io(reset={}, replace={})",
+                     conn, is_dispatch_reset, is_replace);
+
+      std::ignore = gate.close(
+      ).then([this] {
+        ceph_assert_always(!need_exit_io);
+        ceph_assert_always(!pr_exit_io.has_value());
+        if (has_socket) {
+          ceph_assert_always(frame_assembler);
+          return frame_assembler->close_shutdown_socket();
+        } else {
+          return seastar::now();
+        }
+      }).then([this] {
+        logger().debug("{} closed!", conn);
+        messenger.closed_conn(
+            seastar::static_pointer_cast<SocketConnection>(
+              conn.shared_from_this()));
+        pr_closed_clean.set_value();
 #ifdef UNIT_TESTS_BUILT
-    closed_clean = true;
-    if (conn.interceptor) {
-      conn.interceptor->register_conn_closed(
-          conn.get_local_shared_foreign_from_this());
-    }
+        closed_clean = true;
+        if (conn.interceptor) {
+          conn.interceptor->register_conn_closed(
+              conn.get_local_shared_foreign_from_this());
+        }
 #endif
-  }).handle_exception([conn_ref = conn.shared_from_this(), this] (auto eptr) {
-    logger().error("{} closing got unexpected exception {}",
-                   conn, eptr);
-    ceph_abort();
+      // connection is unreferenced from the messenger,
+      // so need to hold the additional reference.
+      }).handle_exception([conn_ref = conn.shared_from_this(), this] (auto eptr) {
+        logger().error("{} closing got unexpected exception {}",
+                       conn, eptr);
+        ceph_abort();
+      });
+
+      return seastar::smp::submit_to(
+          io_handler.get_shard_id(),
+          [this, is_dispatch_reset, is_replace] {
+        return io_handler.close_io(is_dispatch_reset, is_replace);
+      });
+      // user can make changes
+    });
   });
 }
 
index d74425344b4c6ddf2ae3ceec13bbcc271939149e..f083d1721ac61eb5080c65362324d2390517c352 100644 (file)
@@ -59,10 +59,19 @@ public:
 private:
   using io_state_t = IOHandler::io_state_t;
 
+  seastar::future<> wait_switch_io_shard() {
+    if (pr_switch_io_shard.has_value()) {
+      return pr_switch_io_shard->get_shared_future();
+    } else {
+      return seastar::now();
+    }
+  }
+
   seastar::future<> wait_exit_io() {
     if (pr_exit_io.has_value()) {
       return pr_exit_io->get_shared_future();
     } else {
+      assert(!need_exit_io);
       return seastar::now();
     }
   }
@@ -94,7 +103,15 @@ private:
     return statenames[static_cast<int>(state)];
   }
 
-  void trigger_state(state_t new_state, io_state_t new_io_state);
+  void trigger_state_phase1(state_t new_state);
+
+  void trigger_state_phase2(state_t new_state, io_state_t new_io_state);
+
+  void trigger_state(state_t new_state, io_state_t new_io_state) {
+    ceph_assert_always(!pr_switch_io_shard.has_value());
+    trigger_state_phase1(new_state);
+    trigger_state_phase2(new_state, new_io_state);
+  }
 
   template <typename Func, typename T>
   void gated_execute(const char *what, T &who, Func &&func) {
@@ -227,8 +244,12 @@ private:
 
   FrameAssemblerV2Ref frame_assembler;
 
+  bool need_notify_out = false;
+
   std::optional<seastar::shared_promise<>> pr_switch_io_shard;
 
+  bool need_exit_io = false;
+
   std::optional<seastar::shared_promise<>> pr_exit_io;
 
   AuthConnectionMetaRef auth_meta;