]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: make send/keepalive keep trying until the shard is correct
authorYingxin Cheng <yingxin.cheng@intel.com>
Fri, 2 Jun 2023 07:55:10 +0000 (15:55 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Sun, 25 Jun 2023 03:57:19 +0000 (11:57 +0800)
It will be possible that during cross-core send/keepalive, the
connection shard is changed on-the-fly.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/Connection.h
src/crimson/net/io_handler.cc
src/crimson/net/io_handler.h

index 4c90f6e6852c2f0deca5f17595a59bf00da44b91..d8336e524313daa62f953d8c9dd2835dddb2e4a0 100644 (file)
@@ -71,7 +71,9 @@ class Connection : public seastar::enable_shared_from_this<Connection> {
    * send
    *
    * Send a message over a connection that has completed its handshake.
-   * May be invoked from any core.
+   *
+   * May be invoked from any core, but that requires to chain the returned
+   * future to preserve ordering.
    */
   virtual seastar::future<> send(MessageURef msg) = 0;
 
@@ -81,7 +83,8 @@ class Connection : public seastar::enable_shared_from_this<Connection> {
    * Send a keepalive message over a connection that has completed its
    * handshake.
    *
-   * May be invoked from any core.
+   * May be invoked from any core, but that requires to chain the returned
+   * future to preserve ordering.
    */
   virtual seastar::future<> send_keepalive() = 0;
 
index b6c3cf694c3f9b2bbb5e22bad0804c2bd96fc1a1..49b13fe4484e0308841323261e759a29f0dbda23 100644 (file)
@@ -126,12 +126,30 @@ ceph::bufferlist IOHandler::sweep_out_pending_msgs_to_sent(
 
 seastar::future<> IOHandler::send(MessageFRef msg)
 {
+  // sid may be changed on-the-fly during the submission
   if (seastar::this_shard_id() == get_shard_id()) {
     return do_send(std::move(msg));
   } else {
+    logger().trace("{} send() is directed to {} -- {}",
+                   conn, get_shard_id(), *msg);
     return seastar::smp::submit_to(
         get_shard_id(), [this, msg=std::move(msg)]() mutable {
-      return do_send(std::move(msg));
+      return send_redirected(std::move(msg));
+    });
+  }
+}
+
+seastar::future<> IOHandler::send_redirected(MessageFRef msg)
+{
+  // sid may be changed on-the-fly during the submission
+  if (seastar::this_shard_id() == get_shard_id()) {
+    return do_send(std::move(msg));
+  } else {
+    logger().debug("{} send() is redirected to {} -- {}",
+                   conn, get_shard_id(), *msg);
+    return seastar::smp::submit_to(
+        get_shard_id(), [this, msg=std::move(msg)]() mutable {
+      return send_redirected(std::move(msg));
     });
   }
 }
@@ -139,6 +157,7 @@ seastar::future<> IOHandler::send(MessageFRef msg)
 seastar::future<> IOHandler::do_send(MessageFRef msg)
 {
   assert(seastar::this_shard_id() == get_shard_id());
+  logger().trace("{} do_send() got message -- {}", conn, *msg);
   if (get_io_state() != io_state_t::drop) {
     out_pending_msgs.push_back(std::move(msg));
     notify_out_dispatch();
@@ -148,12 +167,28 @@ seastar::future<> IOHandler::do_send(MessageFRef msg)
 
 seastar::future<> IOHandler::send_keepalive()
 {
+  // sid may be changed on-the-fly during the submission
+  if (seastar::this_shard_id() == get_shard_id()) {
+    return do_send_keepalive();
+  } else {
+    logger().trace("{} send_keepalive() is directed to {}", conn, get_shard_id());
+    return seastar::smp::submit_to(
+        get_shard_id(), [this] {
+      return send_keepalive_redirected();
+    });
+  }
+}
+
+seastar::future<> IOHandler::send_keepalive_redirected()
+{
+  // sid may be changed on-the-fly during the submission
   if (seastar::this_shard_id() == get_shard_id()) {
     return do_send_keepalive();
   } else {
+    logger().debug("{} send_keepalive() is redirected to {}", conn, get_shard_id());
     return seastar::smp::submit_to(
         get_shard_id(), [this] {
-      return do_send_keepalive();
+      return send_keepalive_redirected();
     });
   }
 }
@@ -161,6 +196,7 @@ seastar::future<> IOHandler::send_keepalive()
 seastar::future<> IOHandler::do_send_keepalive()
 {
   assert(seastar::this_shard_id() == get_shard_id());
+  logger().trace("{} do_send_keeplive(): need_keepalive={}", conn, need_keepalive);
   if (!need_keepalive) {
     need_keepalive = true;
     notify_out_dispatch();
index 76e0cc010bc87a8a59ac0ed72e8652495b4094d1..122e6281829f61647eab2afd836b51d6ad31e21c 100644 (file)
@@ -331,8 +331,12 @@ public:
     return shard_states->get_io_state();
   }
 
+  seastar::future<> send_redirected(MessageFRef msg);
+
   seastar::future<> do_send(MessageFRef msg);
 
+  seastar::future<> send_keepalive_redirected();
+
   seastar::future<> do_send_keepalive();
 
   void dispatch_reset(bool is_replace);