]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd: allow to send messages concurrently 54304/head
authorYingxin Cheng <yingxin.cheng@intel.com>
Tue, 31 Oct 2023 01:36:23 +0000 (09:36 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Mon, 20 Nov 2023 02:55:36 +0000 (10:55 +0800)
The ordering is now guaranteed upon calling send(), so there is no
reason to couple the crosscore send future with the operation phases --
exclusive phases will limit the send concurrency, potentially causing
OSD starvation.

Decouple the crosscore send futures in the IO pathes, mostly in
ClientRequest and OSDSingletonState::send_to_osd().

Issue-identified-by: Chunmei Liu <chunmei.liu@intel.com>
see PR53934.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/Connection.h
src/crimson/net/Fwd.h
src/crimson/osd/osd_operations/client_request.cc
src/crimson/osd/shard_services.cc

index 41596987b09f545516fd473d6a023f8f17df446b..c19bfb1ff57a110519a938e969c312f06fa2fc0b 100644 (file)
@@ -83,8 +83,33 @@ class Connection : public seastar::enable_shared_from_this<Connection> {
    *
    * May be invoked from any core, and the send order will be preserved upon
    * the call.
+   *
+   * The returned future will be resolved only after the message is enqueued
+   * remotely.
    */
-  virtual seastar::future<> send(MessageURef msg) = 0;
+  virtual seastar::future<> send(
+      MessageURef msg) = 0;
+
+  /**
+   * send_with_throttling
+   *
+   * Send a message over a connection that has completed its handshake.
+   *
+   * May be invoked from any core, and the send order will be preserved upon
+   * the call.
+   *
+   * TODO:
+   *
+   * The returned future is reserved for throttling.
+   *
+   * Gating is needed for graceful shutdown, to wait until the message is
+   * enqueued remotely.
+   */
+  seastar::future<> send_with_throttling(
+      MessageURef msg /* , seastar::gate & */) {
+    std::ignore = send(std::move(msg));
+    return seastar::now();
+  }
 
   /**
    * send_keepalive
index 2b159514193c7f23b7c0faff3c872d0cac9267a8..3a56cf5bb0a63c5b17037d274c70886aa2ad0dc8 100644 (file)
@@ -21,7 +21,7 @@
 #include <seastar/core/sharded.hh>
 
 #include "msg/Connection.h"
-#include "msg/MessageRef.h"
+#include "msg/Message.h"
 #include "msg/msg_types.h"
 
 #include "crimson/common/errorator.h"
index d208e2e53d97e8a3ba7ea3eb37ecc4319c893f70..c6dbd498c66dd477492b78bee35d0414eadd58e7 100644 (file)
@@ -204,7 +204,8 @@ ClientRequest::process_pg_op(
   return pg->do_pg_ops(
     m
   ).then_interruptible([this, pg=std::move(pg)](MURef<MOSDOpReply> reply) {
-    return conn->send(std::move(reply));
+    // TODO: gate the crosscore sending
+    return conn->send_with_throttling(std::move(reply));
   });
 }
 
@@ -218,7 +219,8 @@ auto ClientRequest::reply_op_error(const Ref<PG>& pg, int err)
     !m->has_flag(CEPH_OSD_FLAG_RETURNVEC));
   reply->set_reply_versions(eversion_t(), 0);
   reply->set_op_returns(std::vector<pg_log_op_return_item_t>{});
-  return conn->send(std::move(reply));
+  // TODO: gate the crosscore sending
+  return conn->send_with_throttling(std::move(reply));
 }
 
 ClientRequest::interruptible_future<>
@@ -246,7 +248,8 @@ ClientRequest::process_op(instance_handle_t &ihref, Ref<PG> &pg)
           m.get(), completed->err, pg->get_osdmap_epoch(),
           CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK, false);
        reply->set_reply_versions(completed->version, completed->user_version);
-        return conn->send(std::move(reply));
+        // TODO: gate the crosscore sending
+        return conn->send_with_throttling(std::move(reply));
       } else {
         return ihref.enter_stage<interruptor>(client_pp(*pg).get_obc, *this
        ).then_interruptible(
@@ -319,13 +322,13 @@ ClientRequest::do_process(
 
   SnapContext snapc = get_snapc(pg,obc);
 
-  if ((m->has_flag(CEPH_OSD_FLAG_ORDERSNAP)) &&
-       snapc.seq < obc->ssc->snapset.seq) {
-        DEBUGI("{} ORDERSNAP flag set and snapc seq {}",
-                       " < snapset seq {} on {}",
-                       __func__, snapc.seq, obc->ssc->snapset.seq,
-                       obc->obs.oi.soid);
-     return reply_op_error(pg, -EOLDSNAPC);
+  if (m->has_flag(CEPH_OSD_FLAG_ORDERSNAP) &&
+      snapc.seq < obc->ssc->snapset.seq) {
+    DEBUGI("{} ORDERSNAP flag set and snapc seq {}",
+           " < snapset seq {} on {}",
+           __func__, snapc.seq, obc->ssc->snapset.seq,
+           obc->obs.oi.soid);
+    return reply_op_error(pg, -EOLDSNAPC);
   }
 
   if (!pg->is_primary()) {
@@ -357,8 +360,10 @@ ClientRequest::do_process(
                [this, reply=std::move(reply)]() mutable {
                   LOG_PREFIX(ClientRequest::do_process);
                  DEBUGI("{}: sending response", *this);
-                 return conn->send(std::move(reply));
-               });
+                 // TODO: gate the crosscore sending
+                 return conn->send_with_throttling(std::move(reply));
+               }
+             );
            }, crimson::ct_error::eagain::handle([this, pg, &ihref]() mutable {
              return process_op(ihref, pg);
            }));
index 404f28d7d7f3b4ccd5515b6cc2146ff1893b01d9..ae36d7f6ea008d8d4a80787c65cea3db41b78c95 100644 (file)
@@ -166,7 +166,8 @@ seastar::future<> OSDSingletonState::send_to_osd(
   } else {
     auto conn = cluster_msgr.connect(
         osdmap->get_cluster_addrs(peer).front(), CEPH_ENTITY_TYPE_OSD);
-    return conn->send(std::move(m));
+    // TODO: gate the crosscore sending
+    return conn->send_with_throttling(std::move(m));
   }
 }