]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd: split ConnectionPipeline::get_pg into 2 phases
authorYingxin Cheng <yingxin.cheng@intel.com>
Mon, 25 Sep 2023 03:08:22 +0000 (11:08 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Thu, 2 Nov 2023 07:29:08 +0000 (15:29 +0800)
Split the cross-core phase into 2 independent core-local phases, and
preserve the ordering using sequential ID instead.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
17 files changed:
src/crimson/osd/osd_connection_priv.h
src/crimson/osd/osd_operation.h
src/crimson/osd/osd_operation_external_tracking.h
src/crimson/osd/osd_operations/client_request.cc
src/crimson/osd/osd_operations/client_request.h
src/crimson/osd/osd_operations/logmissing_request.cc
src/crimson/osd/osd_operations/logmissing_request.h
src/crimson/osd/osd_operations/logmissing_request_reply.cc
src/crimson/osd/osd_operations/logmissing_request_reply.h
src/crimson/osd/osd_operations/peering_event.cc
src/crimson/osd/osd_operations/peering_event.h
src/crimson/osd/osd_operations/recovery_subrequest.cc
src/crimson/osd/osd_operations/recovery_subrequest.h
src/crimson/osd/osd_operations/replicated_request.cc
src/crimson/osd/osd_operations/replicated_request.h
src/crimson/osd/pg_shard_manager.h
src/crimson/osd/shard_services.h

index 99f394b1e83f7bfbb0b4e2ac37c1db0c40c9ce88..2d2a459017bb7901c2588d15a650416542e5c899 100644 (file)
@@ -3,6 +3,8 @@
 
 #pragma once
 
+#include <seastar/core/smp.hh>
+
 #include "crimson/net/Connection.h"
 #include "crimson/osd/osd_operation.h"
 #include "crimson/osd/osd_operations/client_request.h"
 
 namespace crimson::osd {
 
+/**
+ * crosscore_ordering_t
+ *
+ * To preserve the event order from 1 source to n target cores.
+ */
+class crosscore_ordering_t {
+public:
+  using seq_t = uint64_t;
+
+  crosscore_ordering_t()
+    : out_seqs(seastar::smp::count, 0),
+      in_controls(seastar::smp::count) {}
+
+  ~crosscore_ordering_t() = default;
+
+  // Called by the original core to get the ordering sequence
+  seq_t prepare_submit(core_id_t target_core) {
+    auto &out_seq = out_seqs[target_core];
+    ++out_seq;
+    return out_seq;
+  }
+
+  /*
+   * Called by the target core to preserve the ordering
+   */
+
+  seq_t get_in_seq() const {
+    auto core = seastar::this_shard_id();
+    return in_controls[core].seq;
+  }
+
+  bool proceed_or_wait(seq_t seq) {
+    auto core = seastar::this_shard_id();
+    auto &in_control = in_controls[core];
+    if (seq == in_control.seq + 1) {
+      ++in_control.seq;
+      if (unlikely(in_control.pr_wait.has_value())) {
+        in_control.pr_wait->set_value();
+        in_control.pr_wait = std::nullopt;
+      }
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  seastar::future<> wait(seq_t seq) {
+    auto core = seastar::this_shard_id();
+    auto &in_control = in_controls[core];
+    assert(seq != in_control.seq + 1);
+    if (!in_control.pr_wait.has_value()) {
+      in_control.pr_wait = seastar::shared_promise<>();
+    }
+    return in_control.pr_wait->get_shared_future();
+  }
+
+private:
+  struct in_control_t {
+    seq_t seq = 0;
+    std::optional<seastar::shared_promise<>> pr_wait;
+  };
+
+  // source-side
+  std::vector<seq_t> out_seqs;
+  // target-side
+  std::vector<in_control_t> in_controls;
+};
+
 struct OSDConnectionPriv : public crimson::net::Connection::user_private_t {
   ConnectionPipeline client_request_conn_pipeline;
   ConnectionPipeline peering_request_conn_pipeline;
   ConnectionPipeline replicated_request_conn_pipeline;
+  crosscore_ordering_t crosscore_ordering;
 };
 
 static inline OSDConnectionPriv &get_osd_priv(crimson::net::Connection *conn) {
index 8ef44ee9e78947569a18b0798e5b7a5658b6903b..7174143fe01e994cb3033208aba08a70470f2ffb 100644 (file)
@@ -27,10 +27,17 @@ struct ConnectionPipeline {
       "ConnectionPipeline::await_map";
   } await_map;
 
-  struct GetPG : OrderedExclusivePhaseT<GetPG> {
+  struct GetPGMapping : OrderedExclusivePhaseT<GetPGMapping> {
     static constexpr auto type_name =
-      "ConnectionPipeline::get_pg";
-  } get_pg;
+      "ConnectionPipeline::get_pg_mapping";
+  } get_pg_mapping;
+};
+
+struct PerShardPipeline {
+  struct CreateOrWaitPG : OrderedExclusivePhaseT<CreateOrWaitPG> {
+    static constexpr auto type_name =
+      "PerShardPipeline::create_or_wait_pg";
+  } create_or_wait_pg;
 };
 
 enum class OperationTypeCode {
index 4b6dbf4b71007fe62a351d8c9fe3057e0795c7c3..d5e2ed45328445c627f25544d5ac0b162a19ef44 100644 (file)
@@ -22,7 +22,8 @@ struct LttngBackend
   : ClientRequest::StartEvent::Backend,
     ConnectionPipeline::AwaitActive::BlockingEvent::Backend,
     ConnectionPipeline::AwaitMap::BlockingEvent::Backend,
-    ConnectionPipeline::GetPG::BlockingEvent::Backend,
+    ConnectionPipeline::GetPGMapping::BlockingEvent::Backend,
+    PerShardPipeline::CreateOrWaitPG::BlockingEvent::Backend,
     OSD_OSDMapGate::OSDMapBlocker::BlockingEvent::Backend,
     PGMap::PGCreationBlockingEvent::Backend,
     ClientRequest::PGPipeline::AwaitMap::BlockingEvent::Backend,
@@ -55,9 +56,14 @@ struct LttngBackend
               const OSD_OSDMapGate::OSDMapBlocker&) override {
   }
 
-  void handle(ConnectionPipeline::GetPG::BlockingEvent& ev,
+  void handle(ConnectionPipeline::GetPGMapping::BlockingEvent& ev,
               const Operation& op,
-              const ConnectionPipeline::GetPG& blocker) override {
+              const ConnectionPipeline::GetPGMapping& blocker) override {
+  }
+
+  void handle(PerShardPipeline::CreateOrWaitPG::BlockingEvent& ev,
+              const Operation& op,
+              const PerShardPipeline::CreateOrWaitPG& blocker) override {
   }
 
   void handle(PGMap::PGCreationBlockingEvent&,
@@ -122,7 +128,8 @@ struct HistoricBackend
   : ClientRequest::StartEvent::Backend,
     ConnectionPipeline::AwaitActive::BlockingEvent::Backend,
     ConnectionPipeline::AwaitMap::BlockingEvent::Backend,
-    ConnectionPipeline::GetPG::BlockingEvent::Backend,
+    ConnectionPipeline::GetPGMapping::BlockingEvent::Backend,
+    PerShardPipeline::CreateOrWaitPG::BlockingEvent::Backend,
     OSD_OSDMapGate::OSDMapBlocker::BlockingEvent::Backend,
     PGMap::PGCreationBlockingEvent::Backend,
     ClientRequest::PGPipeline::AwaitMap::BlockingEvent::Backend,
@@ -155,9 +162,14 @@ struct HistoricBackend
               const OSD_OSDMapGate::OSDMapBlocker&) override {
   }
 
-  void handle(ConnectionPipeline::GetPG::BlockingEvent& ev,
+  void handle(ConnectionPipeline::GetPGMapping::BlockingEvent& ev,
+              const Operation& op,
+              const ConnectionPipeline::GetPGMapping& blocker) override {
+  }
+
+  void handle(PerShardPipeline::CreateOrWaitPG::BlockingEvent& ev,
               const Operation& op,
-              const ConnectionPipeline::GetPG& blocker) override {
+              const PerShardPipeline::CreateOrWaitPG& blocker) override {
   }
 
   void handle(PGMap::PGCreationBlockingEvent&,
index f01f0c491f1a12627e4083709f9f5857fda98574..287072642953fdaf49722655bfcf1a2a2dcd300e 100644 (file)
@@ -81,6 +81,12 @@ ConnectionPipeline &ClientRequest::get_connection_pipeline()
   return get_osd_priv(conn.get()).client_request_conn_pipeline;
 }
 
+PerShardPipeline &ClientRequest::get_pershard_pipeline(
+    ShardServices &shard_services)
+{
+  return shard_services.get_client_request_pipeline();
+}
+
 ClientRequest::PGPipeline &ClientRequest::client_pp(PG &pg)
 {
   return pg.request_pg_pipeline;
index b2dce1e873e1839c811711a1c8657116a03c6729..d534fd6ac4fae9fa2463ce6b0e26450d2fee6457 100644 (file)
@@ -81,7 +81,8 @@ public:
     ConnectionPipeline::AwaitActive::BlockingEvent,
     ConnectionPipeline::AwaitMap::BlockingEvent,
     OSD_OSDMapGate::OSDMapBlocker::BlockingEvent,
-    ConnectionPipeline::GetPG::BlockingEvent,
+    ConnectionPipeline::GetPGMapping::BlockingEvent,
+    PerShardPipeline::CreateOrWaitPG::BlockingEvent,
     PGMap::PGCreationBlockingEvent,
     CompletionEvent
   > tracking_events;
@@ -206,6 +207,14 @@ public:
   epoch_t get_epoch() const { return m->get_min_epoch(); }
 
   ConnectionPipeline &get_connection_pipeline();
+
+  PerShardPipeline &get_pershard_pipeline(ShardServices &);
+
+  crimson::net::Connection &get_connection() {
+    assert(conn);
+    return *conn;
+  };
+
   seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() {
     assert(conn);
     return conn.get_foreign(
index ee83977cd8a2e5d1eeecc3ec29222107dd32c76b..7c8e1d7e499a201e1ac8c82c0856da7ae36446eb 100644 (file)
@@ -49,6 +49,12 @@ ConnectionPipeline &LogMissingRequest::get_connection_pipeline()
   return get_osd_priv(conn.get()).replicated_request_conn_pipeline;
 }
 
+PerShardPipeline &LogMissingRequest::get_pershard_pipeline(
+    ShardServices &shard_services)
+{
+  return shard_services.get_replicated_request_pipeline();
+}
+
 ClientRequest::PGPipeline &LogMissingRequest::client_pp(PG &pg)
 {
   return pg.request_pg_pipeline;
index 71d0816fd201f247322790f67bae60d67246fd8b..5b01fee17b868adcf8502336202ad7c85f8cae78 100644 (file)
@@ -38,6 +38,14 @@ public:
   epoch_t get_epoch() const { return req->get_min_epoch(); }
 
   ConnectionPipeline &get_connection_pipeline();
+
+  PerShardPipeline &get_pershard_pipeline(ShardServices &);
+
+  crimson::net::Connection &get_connection() {
+    assert(conn);
+    return *conn;
+  };
+
   seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() {
     assert(conn);
     return conn.get_foreign(
@@ -58,7 +66,8 @@ public:
     StartEvent,
     ConnectionPipeline::AwaitActive::BlockingEvent,
     ConnectionPipeline::AwaitMap::BlockingEvent,
-    ConnectionPipeline::GetPG::BlockingEvent,
+    ConnectionPipeline::GetPGMapping::BlockingEvent,
+    PerShardPipeline::CreateOrWaitPG::BlockingEvent,
     ClientRequest::PGPipeline::AwaitMap::BlockingEvent,
     PG_OSDMapGate::OSDMapBlocker::BlockingEvent,
     PGMap::PGCreationBlockingEvent,
index 16e61ab4a9858e33fd620549b921c976d8e3a4d8..5cfe5b215307ac0a8b4c43cd372cb02dd36dd52b 100644 (file)
@@ -49,6 +49,12 @@ ConnectionPipeline &LogMissingRequestReply::get_connection_pipeline()
   return get_osd_priv(conn.get()).replicated_request_conn_pipeline;
 }
 
+PerShardPipeline &LogMissingRequestReply::get_pershard_pipeline(
+    ShardServices &shard_services)
+{
+  return shard_services.get_replicated_request_pipeline();
+}
+
 ClientRequest::PGPipeline &LogMissingRequestReply::client_pp(PG &pg)
 {
   return pg.request_pg_pipeline;
index c89131fec1d7deeb80b6f5ad8dabc1dea4bedfce..b01cae15421db4cdaf449d47d4ee04634ff1fab8 100644 (file)
@@ -38,6 +38,14 @@ public:
   epoch_t get_epoch() const { return req->get_min_epoch(); }
 
   ConnectionPipeline &get_connection_pipeline();
+
+  PerShardPipeline &get_pershard_pipeline(ShardServices &);
+
+  crimson::net::Connection &get_connection() {
+    assert(conn);
+    return *conn;
+  };
+
   seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() {
     assert(conn);
     return conn.get_foreign(
@@ -58,7 +66,8 @@ public:
     StartEvent,
     ConnectionPipeline::AwaitActive::BlockingEvent,
     ConnectionPipeline::AwaitMap::BlockingEvent,
-    ConnectionPipeline::GetPG::BlockingEvent,
+    ConnectionPipeline::GetPGMapping::BlockingEvent,
+    PerShardPipeline::CreateOrWaitPG::BlockingEvent,
     PGMap::PGCreationBlockingEvent,
     OSD_OSDMapGate::OSDMapBlocker::BlockingEvent
   > tracking_events;
index 0712147ab2b7b355a91a3ecb9fbee727cacc001a..9139e337f80a46ba577a32fc53685a6587a9fa5a 100644 (file)
@@ -128,6 +128,12 @@ ConnectionPipeline &RemotePeeringEvent::get_connection_pipeline()
   return get_osd_priv(conn.get()).peering_request_conn_pipeline;
 }
 
+PerShardPipeline &RemotePeeringEvent::get_pershard_pipeline(
+    ShardServices &shard_services)
+{
+  return shard_services.get_peering_request_pipeline();
+}
+
 void RemotePeeringEvent::on_pg_absent(ShardServices &shard_services)
 {
   if (auto& e = get_event().get_event();
index a780a26768ef3240ec2f08d07d5a62a3e591ed1a..6bbfe6c91174c3725fb57e7acdeab32e192eee07 100644 (file)
@@ -131,7 +131,8 @@ public:
     ConnectionPipeline::AwaitActive::BlockingEvent,
     ConnectionPipeline::AwaitMap::BlockingEvent,
     OSD_OSDMapGate::OSDMapBlocker::BlockingEvent,
-    ConnectionPipeline::GetPG::BlockingEvent,
+    ConnectionPipeline::GetPGMapping::BlockingEvent,
+    PerShardPipeline::CreateOrWaitPG::BlockingEvent,
     PGMap::PGCreationBlockingEvent,
     PGPeeringPipeline::AwaitMap::BlockingEvent,
     PG_OSDMapGate::OSDMapBlocker::BlockingEvent,
@@ -148,6 +149,14 @@ public:
   epoch_t get_epoch() const { return evt.get_epoch_sent(); }
 
   ConnectionPipeline &get_connection_pipeline();
+
+  PerShardPipeline &get_pershard_pipeline(ShardServices &);
+
+  crimson::net::Connection &get_connection() {
+    assert(conn);
+    return *conn;
+  };
+
   seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() {
     assert(conn);
     return conn.get_foreign(
index dd310d8d72743df966617f8c035531b5e9bcb8b1..2e939880cbeab91cc2a3163309989c2bffc71379 100644 (file)
@@ -49,4 +49,10 @@ ConnectionPipeline &RecoverySubRequest::get_connection_pipeline()
   return get_osd_priv(conn.get()).peering_request_conn_pipeline;
 }
 
+PerShardPipeline &RecoverySubRequest::get_pershard_pipeline(
+    ShardServices &shard_services)
+{
+  return shard_services.get_peering_request_pipeline();
+}
+
 }
index 07c7c95b5e0fe1396015419e41434d3de12f6ce6..31e6045cb0eeb2c65be07fc197a95acab5d3c166 100644 (file)
@@ -41,6 +41,14 @@ public:
   epoch_t get_epoch() const { return m->get_min_epoch(); }
 
   ConnectionPipeline &get_connection_pipeline();
+
+  PerShardPipeline &get_pershard_pipeline(ShardServices &);
+
+  crimson::net::Connection &get_connection() {
+    assert(conn);
+    return *conn;
+  };
+
   seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() {
     assert(conn);
     return conn.get_foreign(
@@ -61,7 +69,8 @@ public:
     StartEvent,
     ConnectionPipeline::AwaitActive::BlockingEvent,
     ConnectionPipeline::AwaitMap::BlockingEvent,
-    ConnectionPipeline::GetPG::BlockingEvent,
+    ConnectionPipeline::GetPGMapping::BlockingEvent,
+    PerShardPipeline::CreateOrWaitPG::BlockingEvent,
     PGMap::PGCreationBlockingEvent,
     OSD_OSDMapGate::OSDMapBlocker::BlockingEvent,
     CompletionEvent
index 7e16b2ebd06af527a6962e88c0bb7caa93c07b98..7b8592b1e02d58682b615f66e4c3aa5d880a0bf3 100644 (file)
@@ -49,6 +49,12 @@ ConnectionPipeline &RepRequest::get_connection_pipeline()
   return get_osd_priv(conn.get()).replicated_request_conn_pipeline;
 }
 
+PerShardPipeline &RepRequest::get_pershard_pipeline(
+    ShardServices &shard_services)
+{
+  return shard_services.get_replicated_request_pipeline();
+}
+
 ClientRequest::PGPipeline &RepRequest::client_pp(PG &pg)
 {
   return pg.request_pg_pipeline;
index c742888d9390a5c58012f389db8bb8eba6379b50..32cf271788ba8dd574650ca53805a1229336ceb9 100644 (file)
@@ -38,6 +38,14 @@ public:
   epoch_t get_epoch() const { return req->get_min_epoch(); }
 
   ConnectionPipeline &get_connection_pipeline();
+
+  PerShardPipeline &get_pershard_pipeline(ShardServices &);
+
+  crimson::net::Connection &get_connection() {
+    assert(conn);
+    return *conn;
+  };
+
   seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() {
     assert(conn);
     return conn.get_foreign(
@@ -58,7 +66,8 @@ public:
     StartEvent,
     ConnectionPipeline::AwaitActive::BlockingEvent,
     ConnectionPipeline::AwaitMap::BlockingEvent,
-    ConnectionPipeline::GetPG::BlockingEvent,
+    ConnectionPipeline::GetPGMapping::BlockingEvent,
+    PerShardPipeline::CreateOrWaitPG::BlockingEvent,
     ClientRequest::PGPipeline::AwaitMap::BlockingEvent,
     PG_OSDMapGate::OSDMapBlocker::BlockingEvent,
     PGMap::PGCreationBlockingEvent,
index e080dc43e4add3fd0356ca9142e57cbe63e24b95..cf13cb52bbf70c400ebbbad59e6641f6c4135216 100644 (file)
@@ -7,6 +7,7 @@
 #include <seastar/core/shared_future.hh>
 #include <seastar/core/sharded.hh>
 
+#include "crimson/osd/osd_connection_priv.h"
 #include "crimson/osd/shard_services.h"
 #include "crimson/osd/pg_map.h"
 
@@ -148,6 +149,31 @@ public:
       });
   }
 
+  template <typename T, typename F>
+  auto process_ordered_op_remotely(
+      crosscore_ordering_t::seq_t cc_seq,
+      ShardServices &target_shard_services,
+      typename T::IRef &&op,
+      F &&f) {
+    auto &crosscore_ordering = get_osd_priv(&op->get_connection()).crosscore_ordering;
+    if (crosscore_ordering.proceed_or_wait(cc_seq)) {
+      return std::invoke(
+        std::move(f),
+        target_shard_services,
+        std::move(op));
+    } else {
+      auto &logger = crimson::get_logger(ceph_subsys_osd);
+      logger.debug("{} got {} at the remote pg, wait at {}",
+                   *op, cc_seq, crosscore_ordering.get_in_seq());
+      return crosscore_ordering.wait(cc_seq
+      ).then([this, cc_seq, &target_shard_services,
+              op=std::move(op), f=std::move(f)]() mutable {
+        return this->template process_ordered_op_remotely<T>(
+            cc_seq, target_shard_services, std::move(op), std::move(f));
+      });
+    }
+  }
+
   template <typename T, typename F>
   auto with_remote_shard_state_and_op(
       core_id_t core,
@@ -161,21 +187,28 @@ public:
         target_shard_services,
         std::move(op));
     }
+    // Note: the ordering in only preserved until f is invoked.
     auto &opref = *op;
-    get_local_state().registry.remove_from_registry(opref);
-    return opref.prepare_remote_submission(
-    ).then([op=std::move(op), f=std::move(f), this, core
-           ](auto f_conn) mutable {
+    auto &crosscore_ordering = get_osd_priv(&opref.get_connection()).crosscore_ordering;
+    auto cc_seq = crosscore_ordering.prepare_submit(core);
+    auto &logger = crimson::get_logger(ceph_subsys_osd);
+    logger.debug("{}: send {} to the remote pg core {}",
+                 opref, cc_seq, core);
+    return opref.get_handle().complete(
+    ).then([&opref, this] {
+      get_local_state().registry.remove_from_registry(opref);
+      return opref.prepare_remote_submission();
+    }).then([op=std::move(op), f=std::move(f), this, core, cc_seq
+            ](auto f_conn) mutable {
       return shard_services.invoke_on(
         core,
-        [f=std::move(f), op=std::move(op), f_conn=std::move(f_conn)
+        [this, cc_seq,
+         f=std::move(f), op=std::move(op), f_conn=std::move(f_conn)
         ](auto &target_shard_services) mutable {
         op->finish_remote_submission(std::move(f_conn));
         target_shard_services.local_state.registry.add_to_registry(*op);
-        return std::invoke(
-          std::move(f),
-          target_shard_services,
-          std::move(op));
+        return this->template process_ordered_op_remotely<T>(
+            cc_seq, target_shard_services, std::move(op), std::move(f));
       });
     });
   }
@@ -335,9 +368,9 @@ public:
              &get_shard_services());
       });
     }).then([&logger, &opref](auto epoch) {
-      logger.debug("{}: got map {}, entering get_pg", opref, epoch);
+      logger.debug("{}: got map {}, entering get_pg_mapping", opref, epoch);
       return opref.template enter_stage<>(
-       opref.get_connection_pipeline().get_pg);
+       opref.get_connection_pipeline().get_pg_mapping);
     }).then([this, &opref] {
       return get_pg_to_shard_mapping().maybe_create_pg(opref.get_pgid());
     }).then_wrapped([this, &logger, op=std::move(op)](auto fut) mutable {
@@ -353,14 +386,21 @@ public:
       return this->template with_remote_shard_state_and_op<T>(
         core, std::move(op),
         [this](ShardServices &target_shard_services,
-           typename T::IRef op) {
-        if constexpr (T::can_create()) {
-          return this->template run_with_pg_maybe_create<T>(
-              std::move(op), target_shard_services);
-        } else {
-          return this->template run_with_pg_maybe_wait<T>(
-              std::move(op), target_shard_services);
-        }
+               typename T::IRef op) {
+        auto &opref = *op;
+        auto &logger = crimson::get_logger(ceph_subsys_osd);
+        logger.debug("{}: entering create_or_wait_pg", opref);
+        return opref.template enter_stage<>(
+          opref.get_pershard_pipeline(target_shard_services).create_or_wait_pg
+        ).then([this, &target_shard_services, op=std::move(op)]() mutable {
+          if constexpr (T::can_create()) {
+            return this->template run_with_pg_maybe_create<T>(
+                std::move(op), target_shard_services);
+          } else {
+            return this->template run_with_pg_maybe_wait<T>(
+                std::move(op), target_shard_services);
+          }
+        });
       });
     });
     return std::make_pair(id, std::move(fut));
index 8786ec9626fd3b483bcf787f63570ecfab7c053f..d71513a6645efed5b6aeaa29d3194017856cd6ce 100644 (file)
@@ -70,6 +70,10 @@ class PerShardState {
   OSDState &osd_state;
   OSD_OSDMapGate osdmap_gate;
 
+  PerShardPipeline client_request_pipeline;
+  PerShardPipeline peering_request_pipeline;
+  PerShardPipeline replicated_request_pipeline;
+
   PerfCounters *perf = nullptr;
   PerfCounters *recoverystate_perf = nullptr;
 
@@ -453,6 +457,18 @@ public:
     return dispatch_context({}, std::move(ctx));
   }
 
+  PerShardPipeline &get_client_request_pipeline() {
+    return local_state.client_request_pipeline;
+  }
+
+  PerShardPipeline &get_peering_request_pipeline() {
+    return local_state.peering_request_pipeline;
+  }
+
+  PerShardPipeline &get_replicated_request_pipeline() {
+    return local_state.replicated_request_pipeline;
+  }
+
   /// Return per-core tid
   ceph_tid_t get_tid() { return local_state.get_tid(); }