]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: keep the order of cross-core events in msgr v2
authorYingxin Cheng <yingxin.cheng@intel.com>
Wed, 31 May 2023 06:34:17 +0000 (14:34 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Sun, 25 Jun 2023 03:57:19 +0000 (11:57 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/ProtocolV2.cc
src/crimson/net/ProtocolV2.h
src/crimson/net/io_handler.cc
src/crimson/net/io_handler.h

index 15d3d565dd7784688cc1a994014870e4db16c1cb..5a9e0c21c5ada80723bf4d173e2cd408e935c5fd 100644 (file)
@@ -239,34 +239,40 @@ void ProtocolV2::trigger_state_phase2(
   } else {
     assert(new_io_state != io_state_t::open);
   }
-  logger().debug("{} IOHandler::set_io_state(): new_state={}, new_io_state={}, "
+
+  auto cc_seq = crosscore.prepare_submit();
+  logger().debug("{} send {} IOHandler::set_io_state(): new_state={}, new_io_state={}, "
                  "fa={}, set_notify_out={}",
-                 conn, get_state_name(new_state), new_io_state,
+                 conn, cc_seq, get_state_name(new_state), new_io_state,
                  fa ? fmt::format("(sid={})", fa->get_shard_id()) : "N/A",
                  need_notify_out);
   gate.dispatch_in_background(
       "set_io_state", conn,
-      [this, new_io_state, fa=std::move(fa)]() mutable {
+      [this, cc_seq, new_io_state, fa=std::move(fa)]() mutable {
     return seastar::smp::submit_to(
         io_handler.get_shard_id(),
-        [this, new_io_state, fa=std::move(fa), set_notify_out=need_notify_out]() mutable {
-      io_handler.set_io_state(new_io_state, std::move(fa), set_notify_out);
+        [this, cc_seq, new_io_state,
+         fa=std::move(fa), set_notify_out=need_notify_out]() mutable {
+      return io_handler.set_io_state(
+          cc_seq, new_io_state, std::move(fa), set_notify_out);
     });
   });
 
   if (need_exit_io) {
     // from READY
-    logger().debug("{} IOHandler::wait_io_exit_dispatching() start ...", conn);
+    auto cc_seq = crosscore.prepare_submit();
+    logger().debug("{} send {} IOHandler::wait_io_exit_dispatching() ...",
+                   conn, cc_seq);
     assert(pr_exit_io.has_value());
     assert(new_io_state != io_state_t::open);
     need_exit_io = false;
-    gate.dispatch_in_background("exit_io", conn, [this] {
+    gate.dispatch_in_background("exit_io", conn, [this, cc_seq] {
       return seastar::smp::submit_to(
-          io_handler.get_shard_id(), [this] {
-        return io_handler.wait_io_exit_dispatching();
-      }).then([this](auto ret) {
-        logger().debug("{} IOHandler::wait_io_exit_dispatching() finish, {}",
-                       conn, ret.io_states);
+          io_handler.get_shard_id(), [this, cc_seq] {
+        return io_handler.wait_io_exit_dispatching(cc_seq);
+      }).then([this, cc_seq](auto ret) {
+        logger().debug("{} finish {} IOHandler::wait_io_exit_dispatching(), {}",
+                       conn, cc_seq, ret.io_states);
         frame_assembler = std::move(ret.frame_assembler);
         assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
         ceph_assert_always(
@@ -391,13 +397,15 @@ void ProtocolV2::reset_session(bool full)
     peer_global_seq = 0;
   }
 
-  logger().debug("{} IOHandler::reset_session({})", conn, full);
+  auto cc_seq = crosscore.prepare_submit();
+  logger().debug("{} send {} IOHandler::reset_session({})",
+                 conn, cc_seq, full);
   io_states.reset_session(full);
   gate.dispatch_in_background(
-      "reset_session", conn, [this, full] {
+      "reset_session", conn, [this, cc_seq, full] {
     return seastar::smp::submit_to(
-        io_handler.get_shard_id(), [this, full] {
-      io_handler.reset_session(full);
+        io_handler.get_shard_id(), [this, cc_seq, full] {
+      return io_handler.reset_session(cc_seq, full);
     });
   });
   // user can make changes
@@ -687,13 +695,15 @@ ProtocolV2::client_connect()
           }
 
           // handle_server_ident() logic
-          logger().debug("{} IOHandler::requeue_out_sent()", conn);
+          auto cc_seq = crosscore.prepare_submit();
+          logger().debug("{} send {} IOHandler::requeue_out_sent()",
+                         conn, cc_seq);
           io_states.requeue_out_sent();
           gate.dispatch_in_background(
-              "requeue_out_sent", conn, [this] {
+              "requeue_out_sent", conn, [this, cc_seq] {
             return seastar::smp::submit_to(
-                io_handler.get_shard_id(), [this] {
-              io_handler.requeue_out_sent();
+                io_handler.get_shard_id(), [this, cc_seq] {
+              return io_handler.requeue_out_sent(cc_seq);
             });
           });
 
@@ -834,17 +844,18 @@ ProtocolV2::client_reconnect()
 
           // handle_reconnect_ok() logic
           auto reconnect_ok = ReconnectOkFrame::Decode(payload->back());
+          auto cc_seq = crosscore.prepare_submit();
           logger().debug("{} GOT ReconnectOkFrame: msg_seq={}, "
-                         "IOHandler::requeue_out_sent_up_to()",
-                         conn, reconnect_ok.msg_seq());
+                         "send {} IOHandler::requeue_out_sent_up_to()",
+                         conn, reconnect_ok.msg_seq(), cc_seq);
 
           io_states.requeue_out_sent_up_to();
           auto msg_seq = reconnect_ok.msg_seq();
           gate.dispatch_in_background(
-              "requeue_out_reconnecting", conn, [this, msg_seq] {
+              "requeue_out_reconnecting", conn, [this, cc_seq, msg_seq] {
             return seastar::smp::submit_to(
-                io_handler.get_shard_id(), [this, msg_seq] {
-              io_handler.requeue_out_sent_up_to(msg_seq);
+                io_handler.get_shard_id(), [this, cc_seq, msg_seq] {
+              return io_handler.requeue_out_sent_up_to(cc_seq, msg_seq);
             });
           });
 
@@ -982,12 +993,13 @@ void ProtocolV2::execute_connecting()
               abort_protocol();
             }
 
+            auto cc_seq = crosscore.prepare_submit();
             logger().info("{} connected: gs={}, pgs={}, cs={}, "
                           "client_cookie={}, server_cookie={}, {}, new_sid={}, "
-                          "IOHandler::dispatch_connect()",
+                          "send {} IOHandler::dispatch_connect()",
                           conn, global_seq, peer_global_seq, connect_seq,
                           client_cookie, server_cookie, io_states,
-                          frame_assembler->get_socket_shard_id());
+                          frame_assembler->get_socket_shard_id(), cc_seq);
 
             // set io_handler to a new shard
             auto new_io_shard = frame_assembler->get_socket_shard_id();
@@ -997,9 +1009,10 @@ void ProtocolV2::execute_connecting()
             pr_switch_io_shard = seastar::shared_promise<>();
             return seastar::smp::submit_to(
                 io_handler.get_shard_id(),
-                [this, new_io_shard, conn_fref=std::move(conn_fref)]() mutable {
+                [this, cc_seq, new_io_shard,
+                 conn_fref=std::move(conn_fref)]() mutable {
               return io_handler.dispatch_connect(
-                  new_io_shard, std::move(conn_fref));
+                  cc_seq, new_io_shard, std::move(conn_fref));
             }).then([this, new_io_shard] {
               ceph_assert_always(io_handler.get_shard_id() == new_io_shard);
               pr_switch_io_shard->set_value();
@@ -1771,17 +1784,20 @@ void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) {
     ceph_assert_always(state == state_t::ESTABLISHING);
 
     // set io_handler to a new shard
+    auto cc_seq = crosscore.prepare_submit();
     auto new_io_shard = frame_assembler->get_socket_shard_id();
-    logger().debug("{} IOHandler::dispatch_accept({})", conn, new_io_shard);
+    logger().debug("{} send {} IOHandler::dispatch_accept({})",
+                   conn, cc_seq, new_io_shard);
     ConnectionFRef conn_fref = seastar::make_foreign(
         conn.shared_from_this());
     ceph_assert_always(!pr_switch_io_shard.has_value());
     pr_switch_io_shard = seastar::shared_promise<>();
     return seastar::smp::submit_to(
         io_handler.get_shard_id(),
-        [this, new_io_shard, conn_fref=std::move(conn_fref)]() mutable {
+        [this, cc_seq, new_io_shard,
+         conn_fref=std::move(conn_fref)]() mutable {
       return io_handler.dispatch_accept(
-          new_io_shard, std::move(conn_fref));
+          cc_seq, new_io_shard, std::move(conn_fref));
     }).then([this, new_io_shard] {
       ceph_assert_always(io_handler.get_shard_id() == new_io_shard);
       pr_switch_io_shard->set_value();
@@ -1820,17 +1836,18 @@ ProtocolV2::send_server_ident()
 
   // refered to async-conn v2: not assign gs to global_seq
   global_seq = messenger.get_global_seq();
+  auto cc_seq = crosscore.prepare_submit();
   logger().debug("{} UPDATE: gs={} for server ident, "
-                 "IOHandler::reset_peer_state()",
-                 conn, global_seq);
+                 "send {} IOHandler::reset_peer_state()",
+                 conn, global_seq, cc_seq);
 
   // this is required for the case when this connection is being replaced
   io_states.reset_peer_state();
   gate.dispatch_in_background(
-      "reset_peer_state", conn, [this] {
+      "reset_peer_state", conn, [this, cc_seq] {
     return seastar::smp::submit_to(
-        io_handler.get_shard_id(), [this] {
-      io_handler.reset_peer_state();
+        io_handler.get_shard_id(), [this, cc_seq] {
+      return io_handler.reset_peer_state(cc_seq);
     });
   });
 
@@ -1938,16 +1955,19 @@ void ProtocolV2::trigger_replacing(bool reconnect,
 
       // set io_handler to a new shard
       // we should prevent parallel switching core attemps
-      logger().debug("{} IOHandler::dispatch_accept({})", conn, new_io_shard);
+      auto cc_seq = crosscore.prepare_submit();
+      logger().debug("{} send {} IOHandler::dispatch_accept({})",
+                     conn, cc_seq, new_io_shard);
       ConnectionFRef conn_fref = seastar::make_foreign(
           conn.shared_from_this());
       ceph_assert_always(!pr_switch_io_shard.has_value());
       pr_switch_io_shard = seastar::shared_promise<>();
       return seastar::smp::submit_to(
           io_handler.get_shard_id(),
-          [this, new_io_shard, conn_fref=std::move(conn_fref)]() mutable {
+          [this, cc_seq, new_io_shard,
+           conn_fref=std::move(conn_fref)]() mutable {
         return io_handler.dispatch_accept(
-            new_io_shard, std::move(conn_fref));
+            cc_seq, new_io_shard, std::move(conn_fref));
       }).then([this, new_io_shard] {
         ceph_assert_always(io_handler.get_shard_id() == new_io_shard);
         pr_switch_io_shard->set_value();
@@ -1994,13 +2014,15 @@ void ProtocolV2::trigger_replacing(bool reconnect,
         connect_seq = new_connect_seq;
         // send_reconnect_ok() logic
 
-        logger().debug("{} IOHandler::requeue_out_sent_up_to({})", conn, new_msg_seq);
+        auto cc_seq = crosscore.prepare_submit();
+        logger().debug("{} send {} IOHandler::requeue_out_sent_up_to({})",
+                       conn, cc_seq, new_msg_seq);
         io_states.requeue_out_sent_up_to();
         gate.dispatch_in_background(
-            "requeue_out_replacing", conn, [this, new_msg_seq] {
+            "requeue_out_replacing", conn, [this, cc_seq, new_msg_seq] {
           return seastar::smp::submit_to(
-              io_handler.get_shard_id(), [this, new_msg_seq] {
-            io_handler.requeue_out_sent_up_to(new_msg_seq);
+              io_handler.get_shard_id(), [this, cc_seq, new_msg_seq] {
+            return io_handler.requeue_out_sent_up_to(cc_seq, new_msg_seq);
           });
         });
 
@@ -2041,15 +2063,27 @@ void ProtocolV2::trigger_replacing(bool reconnect,
 
 // READY state
 
-void ProtocolV2::notify_out_fault(
+seastar::future<> ProtocolV2::notify_out_fault(
+    crosscore_t::seq_t cc_seq,
     const char *where,
     std::exception_ptr eptr,
     io_handler_state _io_states)
 {
   assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
+  if (!crosscore.proceed_or_wait(cc_seq)) {
+    logger().debug("{} got {} notify_out_fault(), wait at {}",
+                   conn, cc_seq, crosscore.get_in_seq());
+    return crosscore.wait(cc_seq
+    ).then([this, cc_seq, where, eptr, _io_states] {
+      return notify_out_fault(cc_seq, where, eptr, _io_states);
+    });
+  }
+
   io_states = _io_states;
-  logger().debug("{} got notify_out_fault(): io_states={}", conn, io_states);
+  logger().debug("{} got {} notify_out_fault(): io_states={}",
+                 conn, cc_seq, io_states);
   fault(state_t::READY, where, eptr);
+  return seastar::now();
 }
 
 void ProtocolV2::execute_ready()
@@ -2070,16 +2104,28 @@ void ProtocolV2::execute_standby()
   trigger_state(state_t::STANDBY, io_state_t::delay);
 }
 
-void ProtocolV2::notify_out()
+seastar::future<> ProtocolV2::notify_out(
+    crosscore_t::seq_t cc_seq)
 {
   assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
-  logger().debug("{} got notify_out(): at {}", conn, get_state_name(state));
+  if (!crosscore.proceed_or_wait(cc_seq)) {
+    logger().debug("{} got {} notify_out(), wait at {}",
+                   conn, cc_seq, crosscore.get_in_seq());
+    return crosscore.wait(cc_seq
+    ).then([this, cc_seq] {
+      return notify_out(cc_seq);
+    });
+  }
+
+  logger().debug("{} got {} notify_out(): at {}",
+                 conn, cc_seq, get_state_name(state));
   io_states.is_out_queued = true;
   if (unlikely(state == state_t::STANDBY && !conn.policy.server)) {
     logger().info("{} notify_out(): at {}, going to CONNECTING",
                   conn, get_state_name(state));
     execute_connecting();
   }
+  return seastar::now();
 }
 
 // WAIT state
@@ -2147,11 +2193,23 @@ void ProtocolV2::execute_server_wait()
 
 // CLOSING state
 
-void ProtocolV2::notify_mark_down()
+seastar::future<> ProtocolV2::notify_mark_down(
+    crosscore_t::seq_t cc_seq)
 {
   assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
-  logger().debug("{} got notify_mark_down()", conn);
+  if (!crosscore.proceed_or_wait(cc_seq)) {
+    logger().debug("{} got {} notify_mark_down(), wait at {}",
+                   conn, cc_seq, crosscore.get_in_seq());
+    return crosscore.wait(cc_seq
+    ).then([this, cc_seq] {
+      return notify_mark_down(cc_seq);
+    });
+  }
+
+  logger().debug("{} got {} notify_mark_down()",
+                 conn, cc_seq);
   do_close(false);
+  return seastar::now();
 }
 
 seastar::future<> ProtocolV2::close_clean_yielded()
@@ -2226,8 +2284,9 @@ void ProtocolV2::do_close(
     return wait_switch_io_shard(
     ).then([this, is_dispatch_reset, is_replace] {
       trigger_state_phase2(state_t::CLOSING, io_state_t::drop);
-      logger().debug("{} IOHandler::close_io(reset={}, replace={})",
-                     conn, is_dispatch_reset, is_replace);
+      auto cc_seq = crosscore.prepare_submit();
+      logger().debug("{} send {} IOHandler::close_io(reset={}, replace={})",
+                     conn, cc_seq, is_dispatch_reset, is_replace);
 
       std::ignore = gate.close(
       ).then([this] {
@@ -2262,8 +2321,8 @@ void ProtocolV2::do_close(
 
       return seastar::smp::submit_to(
           io_handler.get_shard_id(),
-          [this, is_dispatch_reset, is_replace] {
-        return io_handler.close_io(is_dispatch_reset, is_replace);
+          [this, cc_seq, is_dispatch_reset, is_replace] {
+        return io_handler.close_io(cc_seq, is_dispatch_reset, is_replace);
       });
       // user can make changes
     });
index f083d1721ac61eb5080c65362324d2390517c352..a9aa4ecdf76a708590931aad1b1e5add1d1530c2 100644 (file)
@@ -28,11 +28,17 @@ public:
  * as HandshakeListener
  */
 private:
-  void notify_out() final;
+  seastar::future<> notify_out(
+      crosscore_t::seq_t cc_seq) final;
 
-  void notify_out_fault(const char *where, std::exception_ptr, io_handler_state) final;
+  seastar::future<> notify_out_fault(
+      crosscore_t::seq_t cc_seq,
+      const char *where,
+      std::exception_ptr,
+      io_handler_state) final;
 
-  void notify_mark_down() final;
+  seastar::future<> notify_mark_down(
+      crosscore_t::seq_t cc_seq) final;
 
 /*
 * as ProtocolV2 to be called by SocketConnection
@@ -237,6 +243,8 @@ private:
   // asynchronously populated from io_handler
   io_handler_state io_states;
 
+  crosscore_t crosscore;
+
   bool has_socket = false;
 
   // the socket exists and it is not shutdown
index f04ffff3a006577472d58e5ff298c00b59a07c0b..adcb8611148b92ab94935f6b4456d37ebba6a4c6 100644 (file)
@@ -215,14 +215,15 @@ void IOHandler::mark_down()
     return;
   }
 
-  logger().info("{} mark_down() at {}, send notify_mark_down()",
-                conn, io_stat_printer{*this});
-  set_io_state(io_state_t::drop);
+  auto cc_seq = crosscore.prepare_submit();
+  logger().info("{} mark_down() at {}, send {} notify_mark_down()",
+                conn, io_stat_printer{*this}, cc_seq);
+  do_set_io_state(io_state_t::drop);
   shard_states->dispatch_in_background(
-      "notify_mark_down", conn, [this] {
+      "notify_mark_down", conn, [this, cc_seq] {
     return seastar::smp::submit_to(
-        conn.get_messenger_shard_id(), [this] {
-      handshake_listener->notify_mark_down();
+        conn.get_messenger_shard_id(), [this, cc_seq] {
+      return handshake_listener->notify_mark_down(cc_seq);
     });
   });
 }
@@ -255,16 +256,19 @@ void IOHandler::assign_frame_assembler(FrameAssemblerV2Ref fa)
   ceph_assert_always(frame_assembler->is_socket_valid());
 }
 
-void IOHandler::set_io_state(
+void IOHandler::do_set_io_state(
     io_state_t new_state,
+    std::optional<crosscore_t::seq_t> cc_seq,
     FrameAssemblerV2Ref fa,
     bool set_notify_out)
 {
   ceph_assert_always(seastar::this_shard_id() == get_shard_id());
   auto prv_state = get_io_state();
-  logger().debug("{} got set_io_state(): prv_state={}, new_state={}, "
+  logger().debug("{} got {}do_set_io_state(): prv_state={}, new_state={}, "
                  "fa={}, set_notify_out={}, at {}",
-                 conn, prv_state, new_state,
+                 conn,
+                 cc_seq.has_value() ? fmt::format("{} ", *cc_seq) : "",
+                 prv_state, new_state,
                  fa ? "present" : "N/A", set_notify_out,
                  io_stat_printer{*this});
   ceph_assert_always(!(
@@ -331,11 +335,43 @@ void IOHandler::set_io_state(
   }
 }
 
+seastar::future<> IOHandler::set_io_state(
+    crosscore_t::seq_t cc_seq,
+    io_state_t new_state,
+    FrameAssemblerV2Ref fa,
+    bool set_notify_out)
+{
+  assert(seastar::this_shard_id() == get_shard_id());
+  if (!crosscore.proceed_or_wait(cc_seq)) {
+    logger().debug("{} got {} set_io_state(), wait at {}",
+                   conn, cc_seq, crosscore.get_in_seq());
+    return crosscore.wait(cc_seq
+    ).then([this, cc_seq, new_state,
+            fa=std::move(fa), set_notify_out]() mutable {
+      return set_io_state(cc_seq, new_state, std::move(fa), set_notify_out);
+    });
+  }
+
+  do_set_io_state(new_state, cc_seq, std::move(fa), set_notify_out);
+  return seastar::now();
+}
+
 seastar::future<IOHandler::exit_dispatching_ret>
-IOHandler::wait_io_exit_dispatching()
+IOHandler::wait_io_exit_dispatching(
+    crosscore_t::seq_t cc_seq)
 {
   assert(seastar::this_shard_id() == get_shard_id());
-  logger().debug("{} got wait_io_exit_dispatching()", conn);
+  if (!crosscore.proceed_or_wait(cc_seq)) {
+    logger().debug("{} got {} wait_io_exit_dispatching(), wait at {}",
+                   conn, cc_seq, crosscore.get_in_seq());
+    return crosscore.wait(cc_seq
+    ).then([this, cc_seq] {
+      return wait_io_exit_dispatching(cc_seq);
+    });
+  }
+
+  logger().debug("{} got {} wait_io_exit_dispatching()",
+                 conn, cc_seq);
   ceph_assert_always(get_io_state() != io_state_t::open);
   ceph_assert_always(frame_assembler != nullptr);
   ceph_assert_always(!frame_assembler->is_socket_valid());
@@ -365,31 +401,74 @@ IOHandler::wait_io_exit_dispatching()
   });
 }
 
-void IOHandler::reset_session(bool full)
+seastar::future<> IOHandler::reset_session(
+    crosscore_t::seq_t cc_seq,
+    bool full)
 {
   assert(seastar::this_shard_id() == get_shard_id());
-  logger().debug("{} got reset_session({})", conn, full);
+  if (!crosscore.proceed_or_wait(cc_seq)) {
+    logger().debug("{} got {} reset_session(), wait at {}",
+                   conn, cc_seq, crosscore.get_in_seq());
+    return crosscore.wait(cc_seq
+    ).then([this, cc_seq, full] {
+      return reset_session(cc_seq, full);
+    });
+  }
+
+  logger().debug("{} got {} reset_session({})",
+                 conn, cc_seq, full);
   assert(get_io_state() != io_state_t::open);
   reset_in();
   if (full) {
     reset_out();
     dispatch_remote_reset();
   }
+  return seastar::now();
 }
 
-void IOHandler::reset_peer_state()
+seastar::future<> IOHandler::reset_peer_state(
+    crosscore_t::seq_t cc_seq)
 {
   assert(seastar::this_shard_id() == get_shard_id());
-  logger().debug("{} got reset_peer_state()", conn);
+  if (!crosscore.proceed_or_wait(cc_seq)) {
+    logger().debug("{} got {} reset_peer_state(), wait at {}",
+                   conn, cc_seq, crosscore.get_in_seq());
+    return crosscore.wait(cc_seq
+    ).then([this, cc_seq] {
+      return reset_peer_state(cc_seq);
+    });
+  }
+
+  logger().debug("{} got {} reset_peer_state()",
+                 conn, cc_seq);
   assert(get_io_state() != io_state_t::open);
   reset_in();
-  requeue_out_sent_up_to(0);
+  do_requeue_out_sent_up_to(0);
   discard_out_sent();
+  return seastar::now();
 }
 
-void IOHandler::requeue_out_sent()
+seastar::future<> IOHandler::requeue_out_sent(
+    crosscore_t::seq_t cc_seq)
 {
   assert(seastar::this_shard_id() == get_shard_id());
+  if (!crosscore.proceed_or_wait(cc_seq)) {
+    logger().debug("{} got {} requeue_out_sent(), wait at {}",
+                   conn, cc_seq, crosscore.get_in_seq());
+    return crosscore.wait(cc_seq
+    ).then([this, cc_seq] {
+      return requeue_out_sent(cc_seq);
+    });
+  }
+
+  logger().debug("{} got {} requeue_out_sent()",
+                 conn, cc_seq);
+  do_requeue_out_sent();
+  return seastar::now();
+}
+
+void IOHandler::do_requeue_out_sent()
+{
   assert(get_io_state() != io_state_t::open);
   if (out_sent_msgs.empty()) {
     return;
@@ -410,9 +489,28 @@ void IOHandler::requeue_out_sent()
   maybe_notify_out_dispatch();
 }
 
-void IOHandler::requeue_out_sent_up_to(seq_num_t seq)
+seastar::future<> IOHandler::requeue_out_sent_up_to(
+    crosscore_t::seq_t cc_seq,
+    seq_num_t msg_seq)
 {
   assert(seastar::this_shard_id() == get_shard_id());
+  if (!crosscore.proceed_or_wait(cc_seq)) {
+    logger().debug("{} got {} requeue_out_sent_up_to(), wait at {}",
+                   conn, cc_seq, crosscore.get_in_seq());
+    return crosscore.wait(cc_seq
+    ).then([this, cc_seq, msg_seq] {
+      return requeue_out_sent_up_to(cc_seq, msg_seq);
+    });
+  }
+
+  logger().debug("{} got {} requeue_out_sent_up_to({})",
+                 conn, cc_seq, msg_seq);
+  do_requeue_out_sent_up_to(msg_seq);
+  return seastar::now();
+}
+
+void IOHandler::do_requeue_out_sent_up_to(seq_num_t seq)
+{
   assert(get_io_state() != io_state_t::open);
   if (out_sent_msgs.empty() && out_pending_msgs.empty()) {
     logger().debug("{} nothing to requeue, reset out_seq from {} to seq {}",
@@ -430,7 +528,7 @@ void IOHandler::requeue_out_sent_up_to(seq_num_t seq)
       out_sent_msgs.pop_front();
     }
   }
-  requeue_out_sent();
+  do_requeue_out_sent();
 }
 
 void IOHandler::reset_in()
@@ -458,12 +556,23 @@ void IOHandler::discard_out_sent()
 
 seastar::future<>
 IOHandler::dispatch_accept(
+    crosscore_t::seq_t cc_seq,
     seastar::shard_id new_sid,
     ConnectionFRef conn_fref)
 {
   ceph_assert_always(seastar::this_shard_id() == get_shard_id());
-  logger().debug("{} got dispatch_accept({}) at {}",
-                 conn, new_sid, io_stat_printer{*this});
+  if (!crosscore.proceed_or_wait(cc_seq)) {
+    logger().debug("{} got {} dispatch_accept(), wait at {}",
+                   conn, cc_seq, crosscore.get_in_seq());
+    return crosscore.wait(cc_seq
+    ).then([this, cc_seq, new_sid,
+            conn_fref=std::move(conn_fref)]() mutable {
+      return dispatch_accept(cc_seq, new_sid, std::move(conn_fref));
+    });
+  }
+
+  logger().debug("{} got {} dispatch_accept({}) at {}",
+                 conn, cc_seq, new_sid, io_stat_printer{*this});
   if (get_io_state() == io_state_t::drop) {
     assert(!protocol_is_connected);
     // it is possible that both io_handler and protocolv2 are
@@ -485,12 +594,23 @@ IOHandler::dispatch_accept(
 
 seastar::future<>
 IOHandler::dispatch_connect(
+    crosscore_t::seq_t cc_seq,
     seastar::shard_id new_sid,
     ConnectionFRef conn_fref)
 {
   ceph_assert_always(seastar::this_shard_id() == get_shard_id());
-  logger().debug("{} got dispatch_connect({}) at {}",
-                 conn, new_sid, io_stat_printer{*this});
+  if (!crosscore.proceed_or_wait(cc_seq)) {
+    logger().debug("{} got {} dispatch_connect(), wait at {}",
+                   conn, cc_seq, crosscore.get_in_seq());
+    return crosscore.wait(cc_seq
+    ).then([this, cc_seq, new_sid,
+            conn_fref=std::move(conn_fref)]() mutable {
+      return dispatch_connect(cc_seq, new_sid, std::move(conn_fref));
+    });
+  }
+
+  logger().debug("{} got {} dispatch_connect({}) at {}",
+                 conn, cc_seq, new_sid, io_stat_printer{*this});
   if (get_io_state() == io_state_t::drop) {
     assert(!protocol_is_connected);
     // it is possible that both io_handler and protocolv2 are
@@ -693,23 +813,24 @@ IOHandler::do_out_dispatch(shard_states_t &ctx)
     }
 
     if (io_state == io_state_t::open) {
+      auto cc_seq = crosscore.prepare_submit();
       logger().info("{} do_out_dispatch(): fault at {}, {}, going to delay -- {}, "
-                    "send notify_out_fault()",
-                    conn, io_state, io_stat_printer{*this}, e.what());
+                    "send {} notify_out_fault()",
+                    conn, io_state, io_stat_printer{*this}, e.what(), cc_seq);
       std::exception_ptr eptr;
       try {
         throw e;
       } catch(...) {
         eptr = std::current_exception();
       }
-      set_io_state(io_state_t::delay);
+      do_set_io_state(io_state_t::delay);
       shard_states->dispatch_in_background(
-          "notify_out_fault(out)", conn, [this, eptr] {
+          "notify_out_fault(out)", conn, [this, cc_seq, eptr] {
         auto states = get_states();
         return seastar::smp::submit_to(
-            conn.get_messenger_shard_id(), [this, eptr, states] {
-          handshake_listener->notify_out_fault(
-              "do_out_dispatch", eptr, states);
+            conn.get_messenger_shard_id(), [this, cc_seq, eptr, states] {
+          return handshake_listener->notify_out_fault(
+              cc_seq, "do_out_dispatch", eptr, states);
         });
       });
     } else {
@@ -739,12 +860,14 @@ void IOHandler::notify_out_dispatch()
   ceph_assert_always(seastar::this_shard_id() == get_shard_id());
   assert(is_out_queued());
   if (need_notify_out) {
-    logger().debug("{} send notify_out()", conn);
+    auto cc_seq = crosscore.prepare_submit();
+    logger().debug("{} send {} notify_out()",
+                   conn, cc_seq);
     shard_states->dispatch_in_background(
-        "notify_out", conn, [this] {
+        "notify_out", conn, [this, cc_seq] {
       return seastar::smp::submit_to(
-          conn.get_messenger_shard_id(), [this] {
-        handshake_listener->notify_out();
+          conn.get_messenger_shard_id(), [this, cc_seq] {
+        return handshake_listener->notify_out(cc_seq);
       });
     });
   }
@@ -967,17 +1090,18 @@ void IOHandler::do_in_dispatch()
 
       auto io_state = ctx.get_io_state();
       if (io_state == io_state_t::open) {
+        auto cc_seq = crosscore.prepare_submit();
         logger().info("{} do_in_dispatch(): fault at {}, {}, going to delay -- {}, "
-                      "send notify_out_fault()",
-                      conn, io_state, io_stat_printer{*this}, e_what);
-        set_io_state(io_state_t::delay);
+                      "send {} notify_out_fault()",
+                      conn, io_state, io_stat_printer{*this}, e_what, cc_seq);
+        do_set_io_state(io_state_t::delay);
         shard_states->dispatch_in_background(
-            "notify_out_fault(in)", conn, [this, eptr] {
+            "notify_out_fault(in)", conn, [this, cc_seq, eptr] {
           auto states = get_states();
           return seastar::smp::submit_to(
-              conn.get_messenger_shard_id(), [this, eptr, states] {
-            handshake_listener->notify_out_fault(
-                "do_in_dispatch", eptr, states);
+              conn.get_messenger_shard_id(), [this, cc_seq, eptr, states] {
+            return handshake_listener->notify_out_fault(
+                cc_seq, "do_in_dispatch", eptr, states);
           });
         });
       } else {
@@ -996,13 +1120,24 @@ void IOHandler::do_in_dispatch()
 }
 
 seastar::future<>
-IOHandler::close_io(bool is_dispatch_reset, bool is_replace)
+IOHandler::close_io(
+    crosscore_t::seq_t cc_seq,
+    bool is_dispatch_reset,
+    bool is_replace)
 {
   ceph_assert_always(seastar::this_shard_id() == get_shard_id());
-  ceph_assert_always(get_io_state() == io_state_t::drop);
+  if (!crosscore.proceed_or_wait(cc_seq)) {
+    logger().debug("{} got {} close_io(), wait at {}",
+                   conn, cc_seq, crosscore.get_in_seq());
+    return crosscore.wait(cc_seq
+    ).then([this, cc_seq, is_dispatch_reset, is_replace] {
+      return close_io(cc_seq, is_dispatch_reset, is_replace);
+    });
+  }
 
-  logger().debug("{} got close_io(reset={}, replace={})",
-                 conn, is_dispatch_reset, is_replace);
+  logger().debug("{} got {} close_io(reset={}, replace={})",
+                 conn, cc_seq, is_dispatch_reset, is_replace);
+  ceph_assert_always(get_io_state() == io_state_t::drop);
 
   if (is_dispatch_reset) {
     dispatch_reset(is_replace);
index f478f14296300bfd221c161d0f71b5edc5526579..108386fc7ae40343a1596a9659213cf74e21ea03 100644 (file)
@@ -3,6 +3,7 @@
 
 #pragma once
 
+#include <seastar/core/shared_future.hh>
 #include <seastar/util/later.hh>
 
 #include "crimson/common/gated.h"
 
 namespace crimson::net {
 
+/**
+ * crosscore_t
+ *
+ * To preserve the event order across cores.
+ */
+class crosscore_t {
+public:
+  using seq_t = uint64_t;
+
+  crosscore_t() = default;
+  ~crosscore_t() = default;
+
+  seq_t get_in_seq() const {
+    return in_seq;
+  }
+
+  seq_t prepare_submit() {
+    ++out_seq;
+    return out_seq;
+  }
+
+  bool proceed_or_wait(seq_t seq) {
+    if (seq == in_seq + 1) {
+      ++in_seq;
+      if (unlikely(in_pr_wait.has_value())) {
+        in_pr_wait->set_value();
+        in_pr_wait = std::nullopt;
+      }
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  seastar::future<> wait(seq_t seq) {
+    assert(seq != in_seq + 1);
+    if (!in_pr_wait.has_value()) {
+      in_pr_wait = seastar::shared_promise<>();
+    }
+    return in_pr_wait->get_shared_future();
+  }
+
+private:
+  seq_t out_seq = 0;
+  seq_t in_seq = 0;
+  std::optional<seastar::shared_promise<>> in_pr_wait;
+};
+
 /**
  * io_handler_state
  *
@@ -74,14 +123,17 @@ public:
   HandshakeListener &operator=(const HandshakeListener &) = delete;
   HandshakeListener &operator=(HandshakeListener &&) = delete;
 
-  virtual void notify_out() = 0;
+  virtual seastar::future<> notify_out(
+      crosscore_t::seq_t cc_seq) = 0;
 
-  virtual void notify_out_fault(
+  virtual seastar::future<> notify_out_fault(
+      crosscore_t::seq_t cc_seq,
       const char *where,
       std::exception_ptr,
       io_handler_state) = 0;
 
-  virtual void notify_mark_down() = 0;
+  virtual seastar::future<> notify_mark_down(
+      crosscore_t::seq_t cc_seq) = 0;
 
 protected:
   HandshakeListener() = default;
@@ -157,7 +209,7 @@ public:
   }
 
   io_handler_state get_states() const {
-    assert(seastar::this_shard_id() == get_shard_id());
+    // might be called from prv_sid during wait_io_exit_dispatching()
     return {in_seq, is_out_queued(), has_out_sent()};
   }
 
@@ -170,7 +222,10 @@ public:
    * may be called cross-core
    */
 
-  seastar::future<> close_io(bool is_dispatch_reset, bool is_replace);
+  seastar::future<> close_io(
+      crosscore_t::seq_t cc_seq,
+      bool is_dispatch_reset,
+      bool is_replace);
 
   /**
    * io_state_t
@@ -188,30 +243,43 @@ public:
   };
   friend class fmt::formatter<io_state_t>;
 
-  void set_io_state(
+  seastar::future<> set_io_state(
+      crosscore_t::seq_t cc_seq,
       io_state_t new_state,
-      FrameAssemblerV2Ref fa = nullptr,
-      bool set_notify_out = false);
+      FrameAssemblerV2Ref fa,
+      bool set_notify_out);
 
   struct exit_dispatching_ret {
     FrameAssemblerV2Ref frame_assembler;
     io_handler_state io_states;
   };
-  seastar::future<exit_dispatching_ret> wait_io_exit_dispatching();
+  seastar::future<exit_dispatching_ret>
+  wait_io_exit_dispatching(
+      crosscore_t::seq_t cc_seq);
 
-  void reset_session(bool full);
+  seastar::future<> reset_session(
+      crosscore_t::seq_t cc_seq,
+      bool full);
 
-  void reset_peer_state();
+  seastar::future<> reset_peer_state(
+      crosscore_t::seq_t cc_seq);
 
-  void requeue_out_sent_up_to(seq_num_t seq);
+  seastar::future<> requeue_out_sent_up_to(
+      crosscore_t::seq_t cc_seq,
+      seq_num_t msg_seq);
 
-  void requeue_out_sent();
+  seastar::future<> requeue_out_sent(
+      crosscore_t::seq_t cc_seq);
 
   seastar::future<> dispatch_accept(
-      seastar::shard_id new_sid, ConnectionFRef);
+      crosscore_t::seq_t cc_seq,
+      seastar::shard_id new_sid,
+      ConnectionFRef);
 
   seastar::future<> dispatch_connect(
-      seastar::shard_id new_sid, ConnectionFRef);
+      crosscore_t::seq_t cc_seq,
+      seastar::shard_id new_sid,
+      ConnectionFRef);
 
  private:
   class shard_states_t;
@@ -348,10 +416,20 @@ public:
     std::optional<seastar::promise<>> in_exit_dispatching;
   };
 
+  void do_set_io_state(
+      io_state_t new_state,
+      std::optional<crosscore_t::seq_t> cc_seq = std::nullopt,
+      FrameAssemblerV2Ref fa = nullptr,
+      bool set_notify_out = false);
+
   io_state_t get_io_state() const {
     return shard_states->get_io_state();
   }
 
+  void do_requeue_out_sent();
+
+  void do_requeue_out_sent_up_to(seq_num_t seq);
+
   void assign_frame_assembler(FrameAssemblerV2Ref);
 
   seastar::future<> send_redirected(MessageFRef msg);
@@ -411,6 +489,8 @@ public:
 private:
   shard_states_ref_t shard_states;
 
+  crosscore_t crosscore;
+
   // drop was happening in the previous sid
   std::optional<seastar::shard_id> maybe_dropped_sid;