]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/os/seastore/async_cleaner: introduce BackgroundListener callbacks
authorYingxin Cheng <yingxin.cheng@intel.com>
Thu, 25 Aug 2022 13:29:08 +0000 (21:29 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Fri, 26 Aug 2022 09:47:30 +0000 (17:47 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/os/seastore/async_cleaner.cc
src/crimson/os/seastore/async_cleaner.h

index 83ec658e50ebd2dd2a02d165e9b1794f93d888b7..81b2ddb8fcf3e897cb6a71d5bc9645bc0b1ebc4d 100644 (file)
@@ -630,7 +630,7 @@ segment_id_t AsyncCleaner::allocate_segment(
     if (segment_info.is_empty()) {
       auto old_usage = calc_utilization(seg_id);
       segments.mark_open(seg_id, seq, type, category, generation);
-      gc_process.maybe_wake_on_space_used();
+      gc_process.maybe_wake_background();
       auto new_usage = calc_utilization(seg_id);
       adjust_segment_util(old_usage, new_usage);
       INFO("opened, {}", gc_stat_printer_t{this, false});
@@ -684,8 +684,8 @@ void AsyncCleaner::update_journal_tails(
     journal_alloc_tail = alloc_tail;
   }
 
-  gc_process.maybe_wake_on_space_used();
-  maybe_wake_gc_blocked_io();
+  gc_process.maybe_wake_background();
+  gc_process.maybe_wake_blocked_io();
 }
 
 void AsyncCleaner::close_segment(segment_id_t segment)
@@ -1062,7 +1062,7 @@ AsyncCleaner::gc_reclaim_space_ret AsyncCleaner::gc_reclaim_space()
             adjust_segment_util(old_usage, new_usage);
             INFO("released {}, {}",
                  segment_to_release, gc_stat_printer_t{this, false});
-            maybe_wake_gc_blocked_io();
+            gc_process.maybe_wake_blocked_io();
           });
         } else {
           return gc_reclaim_space_ertr::now();
@@ -1309,7 +1309,7 @@ void AsyncCleaner::mark_space_used(
   auto new_usage = calc_utilization(seg_addr.get_segment_id());
   adjust_segment_util(old_usage, new_usage);
 
-  gc_process.maybe_wake_on_space_used();
+  gc_process.maybe_wake_background();
   assert(ret > 0);
   DEBUG("segment {} new len: {}~{}, live_bytes: {}",
         seg_addr.get_segment_id(),
@@ -1343,7 +1343,7 @@ void AsyncCleaner::mark_space_free(
     len);
   auto new_usage = calc_utilization(seg_addr.get_segment_id());
   adjust_segment_util(old_usage, new_usage);
-  maybe_wake_gc_blocked_io();
+  gc_process.maybe_wake_blocked_io();
   assert(ret >= 0);
   DEBUG("segment {} free len: {}~{}, live_bytes: {}",
         seg_addr.get_segment_id(),
@@ -1408,7 +1408,6 @@ AsyncCleaner::reserve_projected_usage(std::size_t projected_usage)
   ceph_assert(is_ready());
   // The pipeline configuration prevents another IO from entering
   // prepare until the prior one exits and clears this.
-  ceph_assert(!blocked_io_wake);
   ++stats.io_count;
   bool is_blocked = false;
   if (should_block_on_trim()) {
@@ -1424,17 +1423,8 @@ AsyncCleaner::reserve_projected_usage(std::size_t projected_usage)
     ++stats.io_blocked_count;
     stats.io_blocked_sum += stats.io_blocking_num;
   }
-  return seastar::do_until(
-    [this] {
-      log_gc_state("await_hard_limits");
-      return !should_block_on_gc();
-    },
-    [this] {
-      blocked_io_wake = seastar::promise<>();
-      return blocked_io_wake->get_future();
-    }
+  return gc_process.io_await_hard_limits(
   ).then([this, projected_usage, is_blocked] {
-    ceph_assert(!blocked_io_wake);
     stats.projected_used_bytes += projected_usage;
     ++stats.projected_count;
     stats.projected_used_bytes_sum += stats.projected_used_bytes;
@@ -1450,7 +1440,7 @@ void AsyncCleaner::release_projected_usage(std::size_t projected_usage)
   ceph_assert(is_ready());
   ceph_assert(stats.projected_used_bytes >= projected_usage);
   stats.projected_used_bytes -= projected_usage;
-  return maybe_wake_gc_blocked_io();
+  gc_process.maybe_wake_blocked_io();
 }
 
 std::ostream &operator<<(std::ostream &os, AsyncCleaner::gc_stat_printer_t stats)
index 110e7ae8b20f3c9eca2b4dc31b6492edef448b5d..f4a5b09efd7b3f90ce2d7b3bf95ab346a5699b96 100644 (file)
@@ -382,6 +382,15 @@ private:
   }
 };
 
+/**
+ * Callback interface to wake up background works
+ */
+struct BackgroundListener {
+  virtual ~BackgroundListener() = default;
+  virtual void maybe_wake_background() = 0;
+  virtual void maybe_wake_blocked_io() = 0;
+};
+
 /**
  * Callback interface for journal trimming
  */
@@ -816,9 +825,6 @@ private:
 
   ExtentCallbackInterface *ecb = nullptr;
 
-  /// populated if there is an IO blocked on hard limits
-  std::optional<seastar::promise<>> blocked_io_wake;
-
   SegmentSeqAllocatorRef ool_segment_seq_allocator;
 
 public:
@@ -883,7 +889,7 @@ public:
     }
 
     journal_head = head;
-    gc_process.maybe_wake_on_space_used();
+    gc_process.maybe_wake_background();
   }
 
   journal_seq_t get_dirty_tail() const final {
@@ -912,7 +918,7 @@ public:
 
   void update_segment_avail_bytes(segment_type_t type, paddr_t offset) final {
     segments.update_written_to(type, offset);
-    gc_process.maybe_wake_on_space_used();
+    gc_process.maybe_wake_background();
   }
 
   void update_modify_time(
@@ -1074,9 +1080,11 @@ private:
    * GCProcess
    *
    * Background gc process.
+   *
+   * TODO: move up to EPM
    */
   using gc_cycle_ret = seastar::future<>;
-  class GCProcess {
+  class GCProcess : public BackgroundListener {
   public:
     GCProcess(AsyncCleaner &cleaner) : cleaner(cleaner) {}
 
@@ -1087,15 +1095,6 @@ private:
       assert(!is_stopping());
     }
 
-    void maybe_wake_on_space_used() {
-      if (is_stopping()) {
-        return;
-      }
-      if (cleaner.gc_should_run()) {
-       wake();
-      }
-    }
-
     gc_cycle_ret stop() {
       if (is_stopping()) {
         return seastar::now();
@@ -1103,7 +1102,7 @@ private:
       auto ret = std::move(*process_join);
       process_join.reset();
       assert(is_stopping());
-      wake();
+      do_wake_background();
       return ret;
     }
 
@@ -1130,6 +1129,39 @@ private:
       );
     }
 
+    seastar::future<> io_await_hard_limits() {
+      ceph_assert(!blocking_io);
+      return seastar::do_until(
+        [this] {
+          cleaner.log_gc_state("GCProcess::io_await_hard_limits");
+          return !cleaner.should_block_on_gc();
+        },
+        [this] {
+          blocking_io = seastar::promise<>();
+          return blocking_io->get_future();
+        }
+      );
+    }
+
+    void maybe_wake_background() final {
+      if (is_stopping()) {
+        return;
+      }
+      if (cleaner.gc_should_run()) {
+        do_wake_background();
+      }
+    }
+
+    void maybe_wake_blocked_io() final {
+      if (!cleaner.is_ready()) {
+        return;
+      }
+      if (!cleaner.should_block_on_gc() && blocking_io) {
+        blocking_io->set_value();
+        blocking_io = std::nullopt;
+      }
+    }
+
   private:
     bool is_stopping() const {
       return !process_join;
@@ -1137,16 +1169,17 @@ private:
 
     gc_cycle_ret run();
 
-    void wake() {
-      if (blocking) {
-       blocking->set_value();
-       blocking = std::nullopt;
+    void do_wake_background() {
+      if (blocking_background) {
+       blocking_background->set_value();
+       blocking_background = std::nullopt;
       }
     }
 
     AsyncCleaner &cleaner;
     std::optional<gc_cycle_ret> process_join;
-    std::optional<seastar::promise<>> blocking;
+    std::optional<seastar::promise<>> blocking_background;
+    std::optional<seastar::promise<>> blocking_io;
     bool is_running_until_halt = false;
   } gc_process;
 
@@ -1300,16 +1333,6 @@ private:
 
   void log_gc_state(const char *caller) const;
 
-  void maybe_wake_gc_blocked_io() {
-    if (!is_ready()) {
-      return;
-    }
-    if (!should_block_on_gc() && blocked_io_wake) {
-      blocked_io_wake->set_value();
-      blocked_io_wake = std::nullopt;
-    }
-  }
-
   using scan_extents_ertr = SegmentManagerGroup::scan_valid_records_ertr;
   using scan_extents_ret = scan_extents_ertr::future<>;
   scan_extents_ret scan_no_tail_segment(