]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/os/seastore/EPM/BackgroundProcess: add promote and demote process
authorZhang Song <zhangsong02@qianxin.com>
Thu, 7 Aug 2025 11:22:58 +0000 (19:22 +0800)
committerXuehan Xu <xuxuehan@qianxin.com>
Sat, 23 May 2026 09:11:36 +0000 (17:11 +0800)
Signed-off-by: Zhang Song <zhangsong02@qianxin.com>
Signed-off-by: Xuehan Xu <xuxuehan@qianxin.com>
src/crimson/os/seastore/async_cleaner.h
src/crimson/os/seastore/extent_pinboard.cc
src/crimson/os/seastore/extent_placement_manager.cc
src/crimson/os/seastore/extent_placement_manager.h
src/crimson/os/seastore/logical_bucket.cc

index a55bdf3574ba48aa84256bbc1b6ec19d82e183d8..3399a5d3c90b27545edeaa617d4d34482b995283 100644 (file)
@@ -453,6 +453,7 @@ struct BackgroundListener {
   virtual ~BackgroundListener() = default;
   virtual void maybe_wake_background() = 0;
   virtual void maybe_wake_blocked_io() = 0;
+  virtual void maybe_wake_promote() = 0;
   virtual state_t get_state() const = 0;
 
   bool is_ready() const {
index a0773ca12d08a55ba35f8936bb355f1f5d0fba81..d45bb0efcfc021943ade272ed1644515467da9b6 100644 (file)
@@ -319,7 +319,8 @@ public:
       remove_extent(list.front(), extent_pin_state_t::Fresh);
     }
     if (should_run_promote()) {
-      // TODO: wake promote background process
+      assert(listener);
+      listener->maybe_wake_promote();
     }
   }
 
index c83cafd4cc9f73500d7244567c8e371ce7d6317c..eb926072d7012a549e8961a9f3d33c8024b4e235 100644 (file)
@@ -631,41 +631,49 @@ void ExtentPlacementManager::BackgroundProcess::start_background()
   ceph_assert(state == state_t::SCAN_SPACE);
   assert(!is_running());
   process_join = seastar::now();
+  promote_process_join = seastar::now();
   state = state_t::RUNNING;
   assert(is_running());
   process_join = run();
+  if (has_cold_tier()) {
+    promote_process_join = run_promote();
+  }
 }
 
 seastar::future<>
 ExtentPlacementManager::BackgroundProcess::stop_background()
 {
   LOG_PREFIX(BackgroundProcess::stop_background);
-  return seastar::futurize_invoke([this, FNAME] {
-    if (!is_running()) {
-      if (state != state_t::HALT) {
-        INFO("isn't RUNNING or HALT, STOP");
-        state = state_t::STOP;
-      } else {
-        INFO("isn't RUNNING, already HALT");
-      }
-      return seastar::now();
-    }
-    INFO("is RUNNING, going to HALT...");
-    auto ret = std::move(*process_join);
-    process_join.reset();
-    state = state_t::HALT;
-    assert(!is_running());
-    do_wake_background();
-    return ret;
-  }).then([this, FNAME] {
-    INFO("done, {}, {}",
-         JournalTrimmerImpl::stat_printer_t{*trimmer, true},
-         AsyncCleaner::stat_printer_t{*main_cleaner, true});
-    if (has_cold_tier()) {
-      INFO("done, cold_cleaner: {}",
-           AsyncCleaner::stat_printer_t{*cold_cleaner, true});
+  if (!is_running()) {
+    if (state != state_t::HALT) {
+      INFO("isn't RUNNING or HALT, STOP");
+      state = state_t::STOP;
+    } else {
+      INFO("isn't RUNNING, already HALT");
     }
-  });
+    co_return;
+  }
+  INFO("is RUNNING, going to HALT...");
+  std::vector<seastar::future<>> futs;
+  futs.emplace_back(std::move(*process_join));
+  process_join.reset();
+  if (promote_process_join) {
+    futs.emplace_back(std::move(*promote_process_join));
+    promote_process_join.reset();
+  }
+  state = state_t::HALT;
+  assert(!is_running());
+  do_wake_background();
+  do_wake_promote();
+  co_await seastar::when_all(futs.begin(), futs.end());
+  INFO("done, {}, {}",
+       JournalTrimmerImpl::stat_printer_t{*trimmer, true},
+       AsyncCleaner::stat_printer_t{*main_cleaner, true});
+  if (has_cold_tier()) {
+    INFO("done, cold_cleaner: {}",
+         AsyncCleaner::stat_printer_t{*cold_cleaner, true});
+  }
+  co_return;
 }
 
 seastar::future<>
@@ -987,7 +995,15 @@ ExtentPlacementManager::BackgroundProcess::do_background_cycle()
       proceed_clean_cold = true;
     }
 
-    if (!proceed_clean_main && !proceed_clean_cold) {
+    bool proceed_demote = false;
+    if (has_cold_tier() &&
+        logical_bucket->could_demote() &&
+        (eviction_state.is_fast_mode() ||
+         logical_bucket->should_demote())) {
+      proceed_demote = true;
+    }
+
+    if (!proceed_clean_main && !proceed_clean_cold && !proceed_demote) {
       ceph_abort_msg("no background process will start");
     }
     return seastar::when_all(
@@ -1029,11 +1045,51 @@ ExtentPlacementManager::BackgroundProcess::do_background_cycle()
         ).finally([FNAME] {
           DEBUG("finished clean cold");
         });
+      },
+      [this, proceed_demote] {
+        if (!proceed_demote) {
+          return seastar::now();
+        }
+        return logical_bucket->demote();
       }
     ).discard_result();
   }
 }
 
+seastar::future<> ExtentPlacementManager::BackgroundProcess::run_promote()
+{
+  assert(pinboard);
+  assert(is_running());
+  return seastar::repeat([this] {
+    if (!is_running()) {
+      return seastar::make_ready_future<seastar::stop_iteration>(
+          seastar::stop_iteration::yes);
+    }
+
+    return seastar::futurize_invoke([this] {
+      if (pinboard->should_promote()) {
+        auto usage = cleaner_usage_t{pinboard->get_promotion_size(), 0};
+        auto res = try_reserve_cleaner(usage);
+        if (res.is_successful()) {
+          return pinboard->promote(
+          ).finally([this, usage, res] {
+            abort_cleaner_usage(usage, res);
+          });
+        } else {
+          // reserve usage failed, block
+          abort_cleaner_usage(usage, res);
+        }
+      } // shouldn't promote, block
+
+      ceph_assert(!blocking_promote);
+      blocking_promote = seastar::promise<>();
+      return blocking_promote->get_future();
+    }).then([] {
+      return seastar::stop_iteration::no;
+    });
+  });
+}
+
 void ExtentPlacementManager::BackgroundProcess::register_metrics(store_index_t store_index)
 {
   namespace sm = seastar::metrics;
index 6cb9e7a3c8b521414336c9b0fc66ea15f407824d..01e5923646f53370d2f931a17220249888b58877 100644 (file)
@@ -949,6 +949,15 @@ private:
 
     void maybe_wake_blocked_io() final;
 
+    void maybe_wake_promote() final {
+      if (!is_ready()) {
+        return;
+      }
+      if (pinboard && pinboard->should_promote()) {
+        do_wake_promote();
+      }
+    }
+
   private:
     // reserve helpers
     bool try_reserve_cold(std::size_t usage);
@@ -983,6 +992,13 @@ private:
       }
     }
 
+    void do_wake_promote() {
+      if (blocking_promote) {
+        blocking_promote->set_value();
+        blocking_promote = std::nullopt;
+      }
+    }
+
     // background_should_run() should be atomic with do_background_cycle()
     // to make sure the condition is consistent.
     bool background_should_run() {
@@ -1141,6 +1157,7 @@ private:
     };
 
     seastar::future<> do_background_cycle();
+    seastar::future<> run_promote();
 
     void register_metrics(store_index_t store_index);
 
@@ -1171,6 +1188,8 @@ private:
     std::optional<seastar::future<>> process_join;
     std::optional<seastar::promise<>> blocking_background;
     std::optional<seastar::promise<>> blocking_io;
+    std::optional<seastar::future<>> promote_process_join;
+    std::optional<seastar::promise<>> blocking_promote;
     bool is_running_until_halt = false;
     state_t state = state_t::STOP;
     eviction_state_t eviction_state;
index 7f45ac870d81a7084fe47bcdb128c2019a54ca47..ecbb7ff5c596362c3aae346c18edae8ce6e3a93f 100644 (file)
@@ -41,6 +41,10 @@ public:
     } else {
       TRACE("create bucket: {}", laddr);
       index[laddr] = lru.emplace(lru.end(), laddr);
+      if (should_demote()) {
+       assert(listener);
+       listener->maybe_wake_background();
+      }
     }
   }