]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/osd/pg_recovery: use OperationThrottler to throttle object
authorXuehan Xu <xuxuehan@qianxin.com>
Tue, 1 Apr 2025 09:15:40 +0000 (17:15 +0800)
committerXuehan Xu <xuxuehan@qianxin.com>
Fri, 4 Apr 2025 06:15:10 +0000 (14:15 +0800)
pushes/pulls

Instead of throttling recovery/backfill operations

Fixes: https://tracker.ceph.com/issues/70180
Signed-off-by: Xuehan Xu <xuxuehan@qianxin.com>
src/crimson/osd/osd_operation.cc
src/crimson/osd/osd_operation.h
src/crimson/osd/osd_operations/background_recovery.cc
src/crimson/osd/osd_operations/background_recovery.h
src/crimson/osd/pg_recovery.cc
src/crimson/osd/pg_recovery.h
src/crimson/osd/shard_services.h

index 978009709761324fb48b9b6c2f8bcb374a686038..1416df4dac8a8504fd60a04e5402b6b40756c6a3 100644 (file)
@@ -158,8 +158,7 @@ OperationThrottler::OperationThrottler(ConfigProxy &conf)
 
 void OperationThrottler::wake()
 {
-  while ((!max_in_progress || in_progress < max_in_progress) &&
-        !scheduler->empty()) {
+  while (available() && !scheduler->empty()) {
     auto item = scheduler->dequeue();
     item.wake.set_value();
     ++in_progress;
index af983eae4ceaa4e4fa7b67a12892c4ee84d38606..6e75f2f826f25cc3e08140866509ffad26cf400c 100644 (file)
@@ -334,6 +334,22 @@ public:
                          const std::set<std::string> &changed) final;
   void update_from_config(const ConfigProxy &conf);
 
+  bool available() const {
+    return !max_in_progress || in_progress < max_in_progress;
+  }
+
+  template <typename F>
+  auto with_throttle(
+    crimson::osd::scheduler::params_t params,
+    F &&f) {
+    if (!max_in_progress) return f();
+    return acquire_throttle(params)
+      .then(std::forward<F>(f))
+      .finally([this] {
+       release_throttle();
+      });
+  }
+
   template <class OpT, class... Args>
   seastar::future<> with_throttle_while(
     BlockingEvent::Trigger<OpT>&& trigger,
index 7f93e42d53d8effd9bff74e36fc66e09e0bf10d9..dbfeab07e6b69978b94387b151b082a1db2307d6 100644 (file)
@@ -77,18 +77,19 @@ seastar::future<> BackgroundRecoveryT<T>::start()
       std::chrono::milliseconds(std::lround(delay * 1000)));
   }
   return maybe_delay.then([ref, this] {
-    return this->template with_blocking_event<OperationThrottler::BlockingEvent>(
-      [ref, this] (auto&& trigger) {
-      return ss.with_throttle_while(
-        std::move(trigger),
-        this, get_scheduler_params(), [this] {
-          return interruptor::with_interruption([this] {
-            return do_recovery();
-          }, [](std::exception_ptr) {
-            return seastar::make_ready_future<bool>(false);
-          }, pg, epoch_started);
-        });
+    return seastar::repeat([ref, this] {
+      return interruptor::with_interruption([this] {
+       return do_recovery();
+      }, [](std::exception_ptr) {
+       return seastar::make_ready_future<bool>(false);
+      }, pg, epoch_started).then([](bool recovery_done) {
+       if (recovery_done) {
+         return seastar::stop_iteration::yes;
+       } else {
+         return seastar::stop_iteration::no;
+       }
       });
+    });
   });
 }
 
@@ -117,7 +118,8 @@ UrgentRecovery::do_recovery()
   ).then_interruptible([this] {
     return with_blocking_event<RecoveryBackend::RecoveryBlockingEvent,
                               interruptor>([this] (auto&& trigger) {
-      return pg->get_recovery_handler()->recover_missing(trigger, soid, need);
+      return pg->get_recovery_handler()->recover_missing(
+       trigger, soid, need, false);
     }).then_interruptible([] {
       return seastar::make_ready_future<bool>(false);
     });
index 255a934cd49879263c3bc50ebaea9725af22dac9..2b3dbb121ae9f0598d6985eaf28f31b3bc7c411c 100644 (file)
@@ -66,7 +66,6 @@ public:
   void print(std::ostream&) const final;
 
   std::tuple<
-    OperationThrottler::BlockingEvent,
     RecoveryBackend::RecoveryBlockingEvent
   > tracking_events;
 
@@ -86,7 +85,6 @@ public:
     float delay = 0);
 
   std::tuple<
-    OperationThrottler::BlockingEvent,
     RecoveryBackend::RecoveryBlockingEvent
   > tracking_events;
 
index 8d5841758e364983141b5cf5e7ca54fb40539b48..f219411acda9d660faf52ec584413ab7549e35c0 100644 (file)
@@ -110,7 +110,7 @@ PGRecovery::start_recovery_ops(
       }
       pg->reset_pglog_based_recovery_op();
     }
-    return seastar::make_ready_future<bool>(!done);
+    return seastar::make_ready_future<bool>(done);
   });
 }
 
@@ -196,10 +196,10 @@ size_t PGRecovery::start_primary_recovery_ops(
        auto it = missing.get_items().find(head);
        assert(it != missing.get_items().end());
        auto head_need = it->second.need;
-       out->emplace_back(recover_missing(trigger, head, head_need));
+       out->emplace_back(recover_missing(trigger, head, head_need, true));
        ++skipped;
       } else {
-       out->emplace_back(recover_missing(trigger, soid, item.need));
+       out->emplace_back(recover_missing(trigger, soid, item.need, true));
       }
       ++started;
     }
@@ -306,7 +306,9 @@ size_t PGRecovery::start_replica_recovery_ops(
 PGRecovery::interruptible_future<>
 PGRecovery::recover_missing(
   RecoveryBackend::RecoveryBlockingEvent::TriggerI& trigger,
-  const hobject_t &soid, eversion_t need)
+  const hobject_t &soid,
+  eversion_t need,
+  bool with_throttle)
 {
   logger().info("{} {} v {}", __func__, soid, need);
   auto [recovering, added] = pg->get_recovery_backend()->add_recovering(soid);
@@ -319,7 +321,9 @@ PGRecovery::recover_missing(
     } else {
       return recovering.wait_track_blocking(
        trigger,
-       pg->get_recovery_backend()->recover_object(soid, need)
+       with_throttle
+         ? recover_object_with_throttle(soid, need)
+         : recover_object(soid, need)
        .handle_exception_interruptible(
          [=, this, soid = std::move(soid)] (auto e) {
          on_failed_recover({ pg->get_pg_whoami() }, soid, need);
@@ -367,7 +371,7 @@ RecoveryBackend::interruptible_future<> PGRecovery::prep_object_replica_pushes(
     logger().info("{} {} v {}, new recovery", __func__, soid, need);
     return recovering.wait_track_blocking(
       trigger,
-      pg->get_recovery_backend()->recover_object(soid, need)
+      recover_object_with_throttle(soid, need)
       .handle_exception_interruptible(
        [=, this, soid = std::move(soid)] (auto e) {
        on_failed_recover({ pg->get_pg_whoami() }, soid, need);
@@ -516,6 +520,25 @@ void PGRecovery::request_primary_scan(
   });
 }
 
+PGRecovery::interruptible_future<>
+PGRecovery::recover_object_with_throttle(
+  const hobject_t &soid,
+  eversion_t need)
+{
+  crimson::osd::scheduler::params_t params =
+    {1, 0, crimson::osd::scheduler::scheduler_class_t::background_best_effort};
+  auto &ss = pg->get_shard_services();
+  logger().debug("{} {}", soid, need);
+  return ss.with_throttle(
+    std::move(params),
+    [this, soid, need] {
+    logger().debug("got throttle: {} {}", soid, need);
+    auto backend = pg->get_recovery_backend();
+    assert(backend);
+    return backend->recover_object(soid, need);
+  });
+}
+
 void PGRecovery::enqueue_push(
   const hobject_t& obj,
   const eversion_t& v,
@@ -527,7 +550,7 @@ void PGRecovery::enqueue_push(
   if (!added)
     return;
   peering_state.prepare_backfill_for_missing(obj, v, peers);
-  std::ignore = pg->get_recovery_backend()->recover_object(obj, v).\
+  std::ignore = recover_object_with_throttle(obj, v).\
   handle_exception_interruptible([] (auto) {
     ceph_abort_msg("got exception on backfill's push");
     return seastar::make_ready_future<>();
@@ -605,8 +628,8 @@ void PGRecovery::update_peers_last_backfill(
 
 bool PGRecovery::budget_available() const
 {
-  // TODO: the limits!
-  return true;
+  auto &ss = pg->get_shard_services();
+  return ss.throttle_available();
 }
 
 void PGRecovery::on_pg_clean()
index 9d4a4874402f4fc22c782c46d1a336226ca64098..37fae278fa09f12cd64ba283a22a6451ede4537d 100644 (file)
@@ -65,7 +65,9 @@ private:
   }
   RecoveryBackend::interruptible_future<> recover_missing(
     RecoveryBackend::RecoveryBlockingEvent::TriggerI&,
-    const hobject_t &soid, eversion_t need);
+    const hobject_t &soid,
+    eversion_t need,
+    bool with_throttle);
   RecoveryBackend::interruptible_future<> prep_object_replica_deletes(
     RecoveryBackend::RecoveryBlockingEvent::TriggerI& trigger,
     const hobject_t& soid,
@@ -97,6 +99,18 @@ private:
   friend class ReplicatedRecoveryBackend;
   friend class crimson::osd::UrgentRecovery;
 
+  interruptible_future<> recover_object_with_throttle(
+    const hobject_t &soid,
+    eversion_t need);
+
+  interruptible_future<> recover_object(
+    const hobject_t &soid,
+    eversion_t need) {
+    auto backend = pg->get_recovery_backend();
+    assert(backend);
+    return backend->recover_object(soid, need);
+  }
+
   // backfill begin
   std::unique_ptr<crimson::osd::BackfillState> backfill_state;
   std::map<pg_shard_t,
index aa83d3496bc98fc37c4584323e6e27ef4b592e8f..38daf776b99250f5fc21f8b19fd8b2c84410f1ac 100644 (file)
@@ -593,7 +593,7 @@ public:
   }
 
   FORWARD_TO_OSD_SINGLETON(get_pool_info)
-  FORWARD(with_throttle_while, with_throttle_while, local_state.throttler)
+  FORWARD(with_throttle, with_throttle, local_state.throttler)
 
   FORWARD_TO_OSD_SINGLETON(build_incremental_map_msg)
   FORWARD_TO_OSD_SINGLETON(send_incremental_map)
@@ -617,6 +617,9 @@ public:
     snap_dump_reservations,
     snap_reserver.dump)
 
+  bool throttle_available() const {
+    return local_state.throttler.available();
+  }
 
   auto local_update_priority(
     singleton_orderer_t &orderer,