]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd/pg_recovery: throttle backfills together with pg-log based 60883/head
authorXuehan Xu <xuxuehan@qianxin.com>
Wed, 27 Nov 2024 01:32:48 +0000 (09:32 +0800)
committerMatan Breizman <mbreizma@redhat.com>
Sun, 5 Jan 2025 15:58:18 +0000 (15:58 +0000)
recoveries

Signed-off-by: Xuehan Xu <xuxuehan@qianxin.com>
src/crimson/osd/backfill_state.cc
src/crimson/osd/backfill_state.h
src/crimson/osd/osd_operation.h
src/crimson/osd/pg_recovery.cc
src/crimson/osd/pg_recovery.h
src/crimson/osd/shard_services.h

index 1392ee330ac2077f37772a62140edb6998c40b89..0ea9e6372f0d020d0fe7f392925afad6466950f9 100644 (file)
@@ -342,6 +342,7 @@ BackfillState::Enqueuing::Enqueuing(my_context ctx)
 
   do {
     if (!backfill_listener().budget_available()) {
+      DEBUGDPP("throttle failed, turning to Waiting", pg());
       post_event(RequestWaiting{});
       return;
     } else if (should_rescan_replicas(backfill_state().peer_backfill_info,
index 463be4a7a2eb5ef1f7e8364427dd137352bceea0..0217886832df689b6d0725d46830dda135c8eb72 100644 (file)
@@ -62,6 +62,8 @@ struct BackfillState {
   struct CancelBackfill : sc::event<CancelBackfill> {
   };
 
+  struct ThrottleAcquired : sc::event<ThrottleAcquired> {
+  };
 private:
   // internal events
   struct RequestPrimaryScanning : sc::event<RequestPrimaryScanning> {
@@ -257,6 +259,7 @@ public:
       sc::transition<RequestDone, Done>,
       sc::custom_reaction<CancelBackfill>,
       sc::custom_reaction<Triggered>,
+      sc::transition<ThrottleAcquired, Enqueuing>,
       sc::transition<sc::event_base, Crashed>>;
     explicit Waiting(my_context);
     sc::result react(ObjectPushed);
index 6376dabd04dbe3d6b08247ee321e3f3046ca516f..394375c11297b383ca4b5435a5032844a0b9d331 100644 (file)
@@ -341,6 +341,18 @@ public:
       with_throttle_while(std::forward<Args>(args)...), *this);
   }
 
+  // Returns std::nullopt if the throttle is acquired immediately,
+  // returns the future for the acquiring otherwise
+  std::optional<seastar::future<>>
+  try_acquire_throttle_now(crimson::osd::scheduler::params_t params) {
+    if (!max_in_progress || in_progress < max_in_progress) {
+      ++in_progress;
+      --pending;
+      return std::nullopt;
+    }
+    return acquire_throttle(params);
+  }
+
 private:
   void dump_detail(Formatter *f) const final;
 
index ec3af0d2b00061bde2b5801435d0109faecb83a3..5eef584c77671858c9e2b728ad5a39aaa7b120f1 100644 (file)
@@ -67,8 +67,6 @@ PGRecovery::start_recovery_ops(
   if (max_to_start > 0) {
     max_to_start -= start_replica_recovery_ops(trigger, max_to_start, &started);
   }
-  using interruptor =
-    crimson::interruptible::interruptor<crimson::osd::IOInterruptCondition>;
   return interruptor::parallel_for_each(started,
                                        [] (auto&& ifut) {
     return std::move(ifut);
@@ -609,8 +607,21 @@ void PGRecovery::update_peers_last_backfill(
 
 bool PGRecovery::budget_available() const
 {
-  // TODO: the limits!
-  return true;
+  crimson::osd::scheduler::params_t params =
+    {1, 0, crimson::osd::scheduler::scheduler_class_t::background_best_effort};
+  auto &ss = pg->get_shard_services();
+  auto futopt = ss.try_acquire_throttle_now(std::move(params));
+  if (!futopt) {
+    return true;
+  }
+  std::ignore = interruptor::make_interruptible(std::move(*futopt)
+  ).then_interruptible([this] {
+    assert(!backfill_state->is_triggered());
+    using BackfillState = crimson::osd::BackfillState;
+    backfill_state->process_event(
+      BackfillState::ThrottleAcquired{}.intrusive_from_this());
+  });
+  return false;
 }
 
 void PGRecovery::on_pg_clean()
index 657e6d3e888c7385a15fae9f6799b8f158f046a4..5c7b5c5ef2bf26094a01acd543e38e0dd7c0b2d8 100644 (file)
@@ -25,6 +25,8 @@ class PGBackend;
 
 class PGRecovery : public crimson::osd::BackfillState::BackfillListener {
 public:
+  using interruptor =
+    crimson::interruptible::interruptor<crimson::osd::IOInterruptCondition>;
   template <typename T = void>
   using interruptible_future = RecoveryBackend::interruptible_future<T>;
   PGRecovery(PGRecoveryListener* pg) : pg(pg) {}
index f4d4b4c2eb4f5f18bf89f46c560cab781527cfd2..f1ed9b8d911294368f90d2dc8ce3cc36841432aa 100644 (file)
@@ -591,6 +591,7 @@ public:
 
   FORWARD_TO_OSD_SINGLETON(get_pool_info)
   FORWARD(with_throttle_while, with_throttle_while, local_state.throttler)
+  FORWARD(try_acquire_throttle_now, try_acquire_throttle_now, local_state.throttler)
 
   FORWARD_TO_OSD_SINGLETON(build_incremental_map_msg)
   FORWARD_TO_OSD_SINGLETON(send_incremental_map)