]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: convert all interactions between protocol and io-handler to be cross...
authorYingxin Cheng <yingxin.cheng@intel.com>
Mon, 5 Jun 2023 02:56:27 +0000 (10:56 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Sun, 25 Jun 2023 03:57:19 +0000 (11:57 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/ProtocolV2.cc
src/crimson/net/io_handler.cc
src/crimson/net/io_handler.h

index f7f2bccfe41e4acf6376e3b1adee3f68b89c45d2..e1ffef29088942c95d72edbad8cbbaf906da406c 100644 (file)
@@ -237,7 +237,20 @@ void ProtocolV2::trigger_state_phase2(
   } else {
     assert(new_io_state != io_state_t::open);
   }
-  io_handler.set_io_state(new_io_state, std::move(fa), need_notify_out);
+  logger().debug("{} IOHandler::set_io_state(): new_state={}, new_io_state={}, "
+                 "fa={}, set_notify_out={}",
+                 conn, get_state_name(new_state), new_io_state,
+                 fa ? fmt::format("(sid={})", fa->get_shard_id()) : "N/A",
+                 need_notify_out);
+  gate.dispatch_in_background(
+      "set_io_state", conn,
+      [this, new_io_state, fa=std::move(fa)]() mutable {
+    return seastar::smp::submit_to(
+        io_handler.get_shard_id(),
+        [this, new_io_state, fa=std::move(fa), set_notify_out=need_notify_out]() mutable {
+      io_handler.set_io_state(new_io_state, std::move(fa), set_notify_out);
+    });
+  });
 
   if (need_exit_io) {
     // from READY
@@ -375,8 +388,17 @@ void ProtocolV2::reset_session(bool full)
     client_cookie = generate_client_cookie();
     peer_global_seq = 0;
   }
+
+  logger().debug("{} IOHandler::reset_session({})", conn, full);
   io_states.reset_session(full);
-  io_handler.reset_session(full);
+  gate.dispatch_in_background(
+      "reset_session", conn, [this, full] {
+    return seastar::smp::submit_to(
+        io_handler.get_shard_id(), [this, full] {
+      io_handler.reset_session(full);
+    });
+  });
+  // user can make changes
 }
 
 seastar::future<std::tuple<entity_type_t, entity_addr_t>>
@@ -656,9 +678,23 @@ ProtocolV2::client_connect()
       case Tag::SERVER_IDENT:
         return frame_assembler->read_frame_payload(
         ).then([this](auto payload) {
+          if (unlikely(state != state_t::CONNECTING)) {
+            logger().debug("{} triggered {} at receiving SERVER_IDENT",
+                           conn, get_state_name(state));
+            abort_protocol();
+          }
+
           // handle_server_ident() logic
+          logger().debug("{} IOHandler::requeue_out_sent()", conn);
           io_states.requeue_out_sent();
-          io_handler.requeue_out_sent();
+          gate.dispatch_in_background(
+              "requeue_out_sent", conn, [this] {
+            return seastar::smp::submit_to(
+                io_handler.get_shard_id(), [this] {
+              io_handler.requeue_out_sent();
+            });
+          });
+
           auto server_ident = ServerIdentFrame::Decode(payload->back());
           logger().debug("{} GOT ServerIdentFrame:"
                          " addrs={}, gid={}, gs={},"
@@ -785,12 +821,28 @@ ProtocolV2::client_reconnect()
       case Tag::SESSION_RECONNECT_OK:
         return frame_assembler->read_frame_payload(
         ).then([this](auto payload) {
+          if (unlikely(state != state_t::CONNECTING)) {
+            logger().debug("{} triggered {} at receiving RECONNECT_OK",
+                           conn, get_state_name(state));
+            abort_protocol();
+          }
+
           // handle_reconnect_ok() logic
           auto reconnect_ok = ReconnectOkFrame::Decode(payload->back());
-          logger().debug("{} GOT ReconnectOkFrame: msg_seq={}",
+          logger().debug("{} GOT ReconnectOkFrame: msg_seq={}, "
+                         "IOHandler::requeue_out_sent_up_to()",
                          conn, reconnect_ok.msg_seq());
+
           io_states.requeue_out_sent_up_to();
-          io_handler.requeue_out_sent_up_to(reconnect_ok.msg_seq());
+          auto msg_seq = reconnect_ok.msg_seq();
+          gate.dispatch_in_background(
+              "requeue_out_reconnecting", conn, [this, msg_seq] {
+            return seastar::smp::submit_to(
+                io_handler.get_shard_id(), [this, msg_seq] {
+              io_handler.requeue_out_sent_up_to(msg_seq);
+            });
+          });
+
           return seastar::make_ready_future<next_step_t>(next_step_t::ready);
         });
       default: {
@@ -1763,11 +1815,19 @@ ProtocolV2::send_server_ident()
 
   // refered to async-conn v2: not assign gs to global_seq
   global_seq = messenger.get_global_seq();
-  logger().debug("{} UPDATE: gs={} for server ident", conn, global_seq);
+  logger().debug("{} UPDATE: gs={} for server ident, "
+                 "IOHandler::reset_peer_state()",
+                 conn, global_seq);
 
   // this is required for the case when this connection is being replaced
   io_states.reset_peer_state();
-  io_handler.reset_peer_state();
+  gate.dispatch_in_background(
+      "reset_peer_state", conn, [this] {
+    return seastar::smp::submit_to(
+        io_handler.get_shard_id(), [this] {
+      io_handler.reset_peer_state();
+    });
+  });
 
   if (!conn.policy.lossy) {
     server_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
@@ -1925,8 +1985,17 @@ void ProtocolV2::trigger_replacing(bool reconnect,
       if (reconnect) {
         connect_seq = new_connect_seq;
         // send_reconnect_ok() logic
+
+        logger().debug("{} IOHandler::requeue_out_sent_up_to({})", conn, new_msg_seq);
         io_states.requeue_out_sent_up_to();
-        io_handler.requeue_out_sent_up_to(new_msg_seq);
+        gate.dispatch_in_background(
+            "requeue_out_replacing", conn, [this, new_msg_seq] {
+          return seastar::smp::submit_to(
+              io_handler.get_shard_id(), [this, new_msg_seq] {
+            io_handler.requeue_out_sent_up_to(new_msg_seq);
+          });
+        });
+
         auto reconnect_ok = ReconnectOkFrame::Encode(io_states.in_seq);
         logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, io_states.in_seq);
         return frame_assembler->write_flush_frame(reconnect_ok);
index 576d91b7a15e4416be1857a68c5603d5dd813280..7072c4d0b2ef845b633e54ab054122c65da207c2 100644 (file)
@@ -215,10 +215,16 @@ void IOHandler::mark_down()
     return;
   }
 
-  logger().info("{} mark_down() with {}",
+  logger().info("{} mark_down() at {}, send notify_mark_down()",
                 conn, io_stat_printer{*this});
   set_io_state(io_state_t::drop);
-  handshake_listener->notify_mark_down();
+  shard_states->dispatch_in_background(
+      "notify_mark_down", conn, [this] {
+    return seastar::smp::submit_to(
+        conn.get_messenger_shard_id(), [this] {
+      handshake_listener->notify_mark_down();
+    });
+  });
 }
 
 void IOHandler::print_io_stat(std::ostream &out) const
@@ -671,8 +677,9 @@ IOHandler::do_out_dispatch(shard_states_t &ctx)
     }
 
     if (io_state == io_state_t::open) {
-      logger().info("{} do_out_dispatch(): fault at {}, going to delay -- {}",
-                    conn, io_state, e.what());
+      logger().info("{} do_out_dispatch(): fault at {}, {}, going to delay -- {}, "
+                    "send notify_out_fault()",
+                    conn, io_state, io_stat_printer{*this}, e.what());
       std::exception_ptr eptr;
       try {
         throw e;
@@ -680,9 +687,15 @@ IOHandler::do_out_dispatch(shard_states_t &ctx)
         eptr = std::current_exception();
       }
       set_io_state(io_state_t::delay);
-      auto states = get_states();
-      handshake_listener->notify_out_fault(
-          "do_out_dispatch", eptr, states);
+      shard_states->dispatch_in_background(
+          "notify_out_fault(out)", conn, [this, eptr] {
+        auto states = get_states();
+        return seastar::smp::submit_to(
+            conn.get_messenger_shard_id(), [this, eptr, states] {
+          handshake_listener->notify_out_fault(
+              "do_out_dispatch", eptr, states);
+        });
+      });
     } else {
       if (io_state != io_state_t::switched) {
         logger().info("{} do_out_dispatch(): fault at {}, {} -- {}",
@@ -708,7 +721,14 @@ void IOHandler::notify_out_dispatch()
 {
   assert(is_out_queued());
   if (need_notify_out) {
-    handshake_listener->notify_out();
+    logger().debug("{} send notify_out()", conn);
+    shard_states->dispatch_in_background(
+        "notify_out", conn, [this] {
+      return seastar::smp::submit_to(
+          conn.get_messenger_shard_id(), [this] {
+        handshake_listener->notify_out();
+      });
+    });
   }
   if (shard_states->try_enter_out_dispatching()) {
     shard_states->dispatch_in_background(
@@ -927,12 +947,19 @@ void IOHandler::do_in_dispatch()
 
       auto io_state = ctx.get_io_state();
       if (io_state == io_state_t::open) {
-        logger().info("{} do_in_dispatch(): fault at {}, going to delay -- {}",
-                      conn, io_state, e_what);
+        logger().info("{} do_in_dispatch(): fault at {}, {}, going to delay -- {}, "
+                      "send notify_out_fault()",
+                      conn, io_state, io_stat_printer{*this}, e_what);
         set_io_state(io_state_t::delay);
-        auto states = get_states();
-        handshake_listener->notify_out_fault(
-            "do_in_dispatch", eptr, states);
+        shard_states->dispatch_in_background(
+            "notify_out_fault(in)", conn, [this, eptr] {
+          auto states = get_states();
+          return seastar::smp::submit_to(
+              conn.get_messenger_shard_id(), [this, eptr, states] {
+            handshake_listener->notify_out_fault(
+                "do_in_dispatch", eptr, states);
+          });
+        });
       } else {
         if (io_state != io_state_t::switched) {
           logger().info("{} do_in_dispatch(): fault at {}, {} -- {}",
index acb171bde282efe342601299d41e1cd2d4df55ee..ecdd02f1573355e33c5ac88ee722a8708f60d93d 100644 (file)
@@ -62,7 +62,8 @@ struct io_handler_state {
  *
  * The interface class for IOHandler to notify the ProtocolV2.
  *
- * The notifications may be cross-core and asynchronous.
+ * The notifications may be cross-core and must be sent to
+ * SocketConnection::get_messenger_shard_id()
  */
 class HandshakeListener {
 public:
@@ -145,6 +146,10 @@ public:
  * The calls may be cross-core and asynchronous
  */
 public:
+  /*
+   * should not be called cross-core
+   */
+
   void set_handshake_listener(HandshakeListener &hl) {
     ceph_assert_always(handshake_listener == nullptr);
     handshake_listener = &hl;
@@ -159,6 +164,10 @@ public:
   };
   void print_io_stat(std::ostream &out) const;
 
+  /*
+   * may be called cross-core
+   */
+
   seastar::future<> close_io(bool is_dispatch_reset, bool is_replace);
 
   /**