]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/os/seastore: introduce background gc framework
authorSamuel Just <sjust@redhat.com>
Sat, 13 Mar 2021 00:40:07 +0000 (16:40 -0800)
committerSamuel Just <sjust@redhat.com>
Wed, 24 Mar 2021 05:41:09 +0000 (22:41 -0700)
Signed-off-by: Samuel Just <sjust@redhat.com>
src/crimson/os/seastore/ordering_handle.h
src/crimson/os/seastore/segment_cleaner.cc
src/crimson/os/seastore/segment_cleaner.h
src/crimson/os/seastore/transaction_manager.cc

index 0f6a77a5ceb632da632679a350119091ced917e7..b7d526b6a4b8e5971068b6da839d4173efac2b87 100644 (file)
@@ -55,6 +55,9 @@ inline OrderingHandle get_dummy_ordering_handle() {
 }
 
 struct WritePipeline {
+  OrderedExclusivePhase wait_throttle{
+    "TransactionManager::wait_throttle"
+  };
   OrderedExclusivePhase prepare{
     "TransactionManager::prepare_phase"
   };
index 3d43203a1adbdf426c98e17972511b41b8b8baab..7d19e5b885604734c3a0959ad9fbbe9de66031c6 100644 (file)
@@ -4,6 +4,7 @@
 #include "crimson/common/log.h"
 
 #include "crimson/os/seastore/segment_cleaner.h"
+#include "crimson/os/seastore/transaction_manager.h"
 
 namespace {
   seastar::logger& logger() {
@@ -169,6 +170,8 @@ void SegmentCleaner::update_journal_tail_target(journal_seq_t target)
   if (journal_tail_target == journal_seq_t() || target > journal_tail_target) {
     journal_tail_target = target;
   }
+  gc_process.maybe_wake_on_space_used();
+  maybe_wake_gc_blocked_io();
 }
 
 void SegmentCleaner::update_journal_tail_committed(journal_seq_t committed)
@@ -246,11 +249,6 @@ SegmentCleaner::rewrite_dirty_ret SegmentCleaner::rewrite_dirty(
   return ecb->get_next_dirty_extents(
     limit
   ).then([=, &t](auto dirty_list) {
-    if (dirty_list.empty()) {
-      return rewrite_dirty_ertr::now();
-    } else {
-      update_journal_tail_target(dirty_list.front()->get_dirty_from());
-    }
     return seastar::do_with(
       std::move(dirty_list),
       [this, &t](auto &dirty_list) {
@@ -258,7 +256,7 @@ SegmentCleaner::rewrite_dirty_ret SegmentCleaner::rewrite_dirty(
          dirty_list,
          [this, &t](auto &e) {
            logger().debug(
-             "SegmentCleaner::do_immediate_work cleaning {}",
+             "SegmentCleaner::rewrite_dirty cleaning {}",
              *e);
            return ecb->rewrite_extent(t, e);
          });
@@ -266,6 +264,139 @@ SegmentCleaner::rewrite_dirty_ret SegmentCleaner::rewrite_dirty(
   });
 }
 
+SegmentCleaner::gc_cycle_ret SegmentCleaner::GCProcess::run()
+{
+  return seastar::do_until(
+    [this] { return stopping; },
+    [this] {
+      return maybe_wait_should_run(
+      ).then([this] {
+       cleaner.log_gc_state("GCProcess::run");
+
+       if (stopping) {
+         return seastar::now();
+       } else {
+         return cleaner.do_gc_cycle();
+       }
+      });
+    });
+}
+
+SegmentCleaner::gc_cycle_ret SegmentCleaner::do_gc_cycle()
+{
+  if (gc_should_trim_journal()) {
+    return gc_trim_journal(
+    ).handle_error(
+      crimson::ct_error::assert_all{
+       "GCProcess::run encountered invalid error in gc_trim_journal"
+      }
+    );
+  } else if (gc_should_reclaim_space()) {
+    return gc_reclaim_space(
+    ).handle_error(
+      crimson::ct_error::assert_all{
+       "GCProcess::run encountered invalid error in gc_reclaim_space"
+      }
+    );
+  } else {
+    return seastar::now();
+  }
+}
+
+SegmentCleaner::gc_trim_journal_ret SegmentCleaner::gc_trim_journal()
+{
+  return repeat_eagain(
+    [this] {
+      return seastar::do_with(
+       make_transaction(),
+       [this](auto &t) {
+         return rewrite_dirty(*t, get_dirty_tail()
+         ).safe_then([this, &t] {
+           return ecb->submit_transaction_direct(
+             std::move(t));
+         });
+       });
+    });
+}
+
+SegmentCleaner::gc_reclaim_space_ret SegmentCleaner::gc_reclaim_space()
+{
+  if (!scan_cursor) {
+    paddr_t next = P_ADDR_NULL;
+    next.segment = get_next_gc_target();
+    if (next == P_ADDR_NULL) {
+      logger().debug(
+       "SegmentCleaner::do_gc: no segments to gc");
+      return seastar::now();
+    }
+    next.offset = 0;
+    scan_cursor =
+      std::make_unique<ExtentCallbackInterface::scan_extents_cursor>(
+       next);
+    logger().debug(
+      "SegmentCleaner::do_gc: starting gc on segment {}",
+      scan_cursor->get_offset().segment);
+  } else {
+    ceph_assert(!scan_cursor->is_complete());
+  }
+
+  return ecb->scan_extents(
+    *scan_cursor,
+    config.reclaim_bytes_stride
+  ).safe_then([this](auto &&_extents) {
+    return seastar::do_with(
+      std::move(_extents),
+      [this](auto &extents) {
+       return repeat_eagain([this, &extents]() mutable {
+         return seastar::do_with(
+           make_transaction(),
+           [this, &extents](auto &t) mutable {
+             return crimson::do_for_each(
+               extents,
+               [this, &t](auto &extent) {
+                 auto &[addr, info] = extent;
+                 logger().debug(
+                   "SegmentCleaner::gc_reclaim_space: checking extent {}",
+                   info);
+                 return ecb->get_extent_if_live(
+                   *t,
+                   info.type,
+                   addr,
+                   info.addr,
+                   info.len
+                 ).safe_then([addr=addr, &t, this](CachedExtentRef ext) {
+                   if (!ext) {
+                     logger().debug(
+                       "SegmentCleaner::gc_reclaim_space: addr {} dead, skipping",
+                       addr);
+                     return ExtentCallbackInterface::rewrite_extent_ertr::now();
+                   } else {
+                     logger().debug(
+                       "SegmentCleaner::gc_reclaim_space: addr {} alive, gc'ing {}",
+                       addr,
+                       *ext);
+                     return ecb->rewrite_extent(
+                       *t,
+                       ext);
+                   }
+                 });
+               }
+             ).safe_then([this, &t] {
+               if (scan_cursor->is_complete()) {
+                 t->mark_segment_to_release(scan_cursor->get_offset().segment);
+               }
+               return ecb->submit_transaction_direct(std::move(t));
+             });
+           });
+       });
+      });
+  }).safe_then([this] {
+    if (scan_cursor->is_complete()) {
+      scan_cursor.reset();
+    }
+  });
+}
+
 SegmentCleaner::do_gc_ret SegmentCleaner::do_gc(
   Transaction &t,
   size_t bytes)
index 2f8ae318ccf38184d06e6b54b6c1566e5018154e..205189c56ccf4b0a7784f4aa55167ca615555a0d 100644 (file)
@@ -7,6 +7,7 @@
 
 #include "common/ceph_time.h"
 
+#include "crimson/common/log.h"
 #include "crimson/os/seastore/cached_extent.h"
 #include "crimson/os/seastore/journal.h"
 #include "crimson/os/seastore/seastore_types.h"
@@ -212,26 +213,35 @@ public:
     size_t num_segments = 0;
     size_t segment_size = 0;
     size_t block_size = 0;
+
     size_t target_journal_segments = 0;
     size_t max_journal_segments = 0;
 
+    double available_ratio_gc_max = 0;
     double reclaim_ratio_hard_limit = 0;
-    // don't apply reclaim ratio with available space below this
+    double reclaim_ratio_gc_threshhold = 0;
+
+    // don't apply reclaim ratio with available space below this (TODO remove)
     double reclaim_ratio_usage_min = 0;
 
     double available_ratio_hard_limit = 0;
 
+    size_t reclaim_bytes_stride = 0; // Number of bytes to reclaim on each cycle
+
     static config_t default_from_segment_manager(
       SegmentManager &manager) {
       return config_t{
        manager.get_num_segments(),
        static_cast<size_t>(manager.get_segment_size()),
        (size_t)manager.get_block_size(),
-       2,
-       4,
-       .5,
-       .95,
-       .2
+         2,    // target_journal_segments
+         4,    // max_journal_segments
+         .9,   // available_ratio_gc_max
+         .6,   // reclaim_ratio_hard_limit
+         .3,   // reclaim_ratio_gc_threshhold
+         .95,  // reclaim_ratio_usage_min
+         .1,   // available_ratio_hard_limit
+         1<<20 // reclaim 1MB per gc cycle
        };
     }
   };
@@ -341,6 +351,9 @@ private:
 
   ExtentCallbackInterface *ecb = nullptr;
 
+  /// populated if there is an IO blocked on hard limits
+  std::optional<seastar::promise<>> blocked_io_wake;
+
 public:
   SegmentCleaner(config_t config, bool detailed = false)
     : config(config),
@@ -353,7 +366,8 @@ public:
        (SpaceTrackerI*)new SpaceTrackerSimple(
          config.num_segments)),
       segments(config.num_segments),
-      empty_segments(config.num_segments) {}
+      empty_segments(config.num_segments),
+      gc_process(*this) {}
 
   get_segment_ret get_segment() final;
 
@@ -381,6 +395,7 @@ public:
   void set_journal_head(journal_seq_t head) {
     assert(journal_head == journal_seq_t() || head >= journal_head);
     journal_head = head;
+    gc_process.maybe_wake_on_space_used();
   }
 
   void init_mark_segment_closed(segment_id_t segment, segment_seq_t seq) final {
@@ -414,6 +429,7 @@ public:
       addr.segment,
       addr.offset,
       len);
+    gc_process.maybe_wake_on_space_used();
     assert(ret > 0);
   }
 
@@ -430,6 +446,7 @@ public:
       addr.segment,
       addr.offset,
       len);
+    maybe_wake_gc_blocked_io();
     assert(ret >= 0);
   }
 
@@ -457,7 +474,22 @@ public:
     return space_tracker->make_empty();
   }
 
-  void complete_init() { init_complete = true; }
+  void start() {
+    gc_process.start();
+  }
+
+  void complete_init() {
+    init_complete = true;
+    start();
+  }
+
+  seastar::future<> stop() {
+    return gc_process.stop();
+  }
+
+  seastar::future<> run_until_halt() {
+    return gc_process.run_until_halt();
+  }
 
   void set_extent_callback(ExtentCallbackInterface *cb) {
     ecb = cb;
@@ -531,7 +563,99 @@ private:
   }
 
   // GC status helpers
-  std::unique_ptr<ExtentCallbackInterface::scan_extents_cursor> scan_cursor;
+  std::unique_ptr<
+    ExtentCallbackInterface::scan_extents_cursor
+    > scan_cursor;
+
+  /**
+   * GCProcess
+   *
+   * Background gc process.
+   */
+  using gc_cycle_ret = seastar::future<>;
+  class GCProcess {
+    std::optional<gc_cycle_ret> process_join;
+
+    SegmentCleaner &cleaner;
+
+    bool stopping = false;
+
+    std::optional<seastar::promise<>> blocking;
+
+    gc_cycle_ret run();
+
+    void wake() {
+      if (blocking) {
+       blocking->set_value();
+       blocking = std::nullopt;
+      }
+    }
+
+    seastar::future<> maybe_wait_should_run() {
+      return seastar::do_until(
+       [this] {
+         cleaner.log_gc_state("GCProcess::maybe_wait_should_run");
+         return stopping || cleaner.gc_should_run();
+       },
+       [this] {
+         ceph_assert(!blocking);
+         blocking = seastar::promise<>();
+         return blocking->get_future();
+       });
+    }
+  public:
+    GCProcess(SegmentCleaner &cleaner) : cleaner(cleaner) {}
+
+    void start() {
+      ceph_assert(!process_join);
+      process_join = run();
+    }
+
+    gc_cycle_ret stop() {
+      if (!process_join)
+       return seastar::now();
+      stopping = true;
+      wake();
+      ceph_assert(process_join);
+      auto ret = std::move(*process_join);
+      process_join = std::nullopt;
+      return ret.then([this] { stopping = false; });
+    }
+
+    gc_cycle_ret run_until_halt() {
+      ceph_assert(!process_join);
+      return seastar::do_until(
+       [this] {
+         cleaner.log_gc_state("GCProcess::run_until_halt");
+         return !cleaner.gc_should_run();
+       },
+       [this] {
+         return cleaner.do_gc_cycle();
+       });
+    }
+
+    void maybe_wake_on_space_used() {
+      cleaner.log_gc_state("GCProcess::maybe_wake_on_space_used");
+      if (cleaner.gc_should_run()) {
+       wake();
+      }
+    }
+  } gc_process;
+
+  using gc_ertr = ExtentCallbackInterface::extent_mapping_ertr::extend_ertr<
+    ExtentCallbackInterface::scan_extents_ertr
+    >;
+
+  gc_cycle_ret do_gc_cycle();
+
+  using gc_trim_journal_ertr = gc_ertr;
+  using gc_trim_journal_ret = gc_trim_journal_ertr::future<>;
+  gc_trim_journal_ret gc_trim_journal();
+
+  using gc_reclaim_space_ertr = gc_ertr;
+  using gc_reclaim_space_ret = gc_reclaim_space_ertr::future<>;
+  gc_reclaim_space_ret gc_reclaim_space();
+
 
   /**
    * do_gc
@@ -610,11 +734,119 @@ private:
     return (double)get_available_bytes() / (double)get_total_bytes();
   }
 
+  /**
+   * should_block_on_gc
+   *
+   * Encapsulates whether block pending gc.
+   */
+  bool should_block_on_gc() const {
+    auto aratio = get_available_ratio();
+    return (
+      ((aratio < config.available_ratio_gc_max) &&
+       (get_reclaim_ratio() > config.reclaim_ratio_hard_limit ||
+       aratio < config.available_ratio_hard_limit)) ||
+      (get_dirty_tail_limit() > journal_tail_target)
+    );
+  }
+
+  void log_gc_state(const char *caller) const {
+    auto &logger = crimson::get_logger(ceph_subsys_filestore);
+    if (logger.is_enabled(seastar::log_level::debug)) {
+      logger.debug(
+       "SegmentCleaner::log_gc_state({}): "
+       "total {}, "
+       "available {}, "
+       "unavailable {}, "
+       "used {}, "
+       "reclaimable {}, "
+       "reclaim_ratio {}, "
+       "available_ratio {}, "
+       "should_block_on_gc {}, "
+       "gc_should_reclaim_space {}, "
+       "journal_head {}, "
+       "journal_tail_target {}, "
+       "dirty_tail {}, "
+       "dirty_tail_limit {}, "
+       "gc_should_trim_journal {}, ",
+       caller,
+       get_total_bytes(),
+       get_available_bytes(),
+       get_unavailable_bytes(),
+       get_used_bytes(),
+       get_reclaimable_bytes(),
+       get_reclaim_ratio(),
+       get_available_ratio(),
+       should_block_on_gc(),
+       gc_should_reclaim_space(),
+       journal_head,
+       journal_tail_target,
+       get_dirty_tail(),
+       get_dirty_tail_limit(),
+       gc_should_trim_journal()
+      );
+    }
+  }
+
+public:
+  seastar::future<> await_hard_limits() {
+    // The pipeline configuration prevents another IO from entering
+    // prepare until the prior one exits and clears this.
+    ceph_assert(!blocked_io_wake);
+    return seastar::do_until(
+      [this] {
+       log_gc_state("await_hard_limits");
+       return !should_block_on_gc();
+      },
+      [this] {
+       blocked_io_wake = seastar::promise<>();
+       return blocked_io_wake->get_future();
+      });
+  }
+private:
+  void maybe_wake_gc_blocked_io() {
+    if (!should_block_on_gc() && blocked_io_wake) {
+      blocked_io_wake->set_value();
+      blocked_io_wake = std::nullopt;
+    }
+  }
+
+  /**
+   * gc_should_reclaim_space
+   *
+   * Encapsulates logic for whether gc should be reclaiming segment space.
+   */
+  bool gc_should_reclaim_space() const {
+    auto aratio = get_available_ratio();
+    return (
+      (aratio < config.available_ratio_gc_max) &&
+      (get_reclaim_ratio() > config.reclaim_ratio_gc_threshhold ||
+       aratio < config.available_ratio_hard_limit)
+    );
+  }
+
+  /**
+   * gc_should_trim_journal
+   *
+   * Encapsulates logic for whether gc should be reclaiming segment space.
+   */
+  bool gc_should_trim_journal() const {
+    return get_dirty_tail() > journal_tail_target;
+  }
+
+  /**
+   * gc_should_run
+   *
+   * True if gc should be running.
+   */
+  bool gc_should_run() const {
+    return gc_should_reclaim_space() || gc_should_trim_journal();
+  }
+
   /**
    * get_immediate_bytes_to_gc_for_reclaim
    *
    * Returns the number of bytes to gc in order to bring the
-   * reclaim ratio below reclaim_ratio_usage_min.
+   * reclaim ratio below reclaim_ratio_hard_limit.
    */
   size_t get_immediate_bytes_to_gc_for_reclaim() const {
     if (get_reclaim_ratio() < config.reclaim_ratio_hard_limit)
@@ -689,6 +921,7 @@ private:
       assert(space_tracker->get_usage(segment) == 0);
     }
     segments[segment].state = Segment::segment_state_t::EMPTY;
+    maybe_wake_gc_blocked_io();
   }
 
   void mark_open(segment_id_t segment) {
index d7be39d33f407db0df964021de15f2ea96475209..cb89d28faaedb590381f790200a457289ba35792 100644 (file)
@@ -104,8 +104,10 @@ TransactionManager::mount_ertr::future<> TransactionManager::mount()
 }
 
 TransactionManager::close_ertr::future<> TransactionManager::close() {
-  return cache->close(
-  ).safe_then([this] {
+  return segment_cleaner->stop(
+  ).then([this] {
+    return cache->close();
+  }).safe_then([this] {
     return journal->close();
   });
 }
@@ -195,40 +197,12 @@ TransactionManager::submit_transaction(
 {
   logger().debug("TransactionManager::submit_transaction");
   auto &tref = *t;
-  return tref.handle.enter(write_pipeline.prepare
-  ).then([this, &tref]() mutable {
-    return segment_cleaner->do_immediate_work(tref);
-  }).safe_then([this, &tref]() mutable
-              -> submit_transaction_ertr::future<> {
-    logger().debug("TransactionManager::submit_transaction after do_immediate");
-    auto record = cache->try_construct_record(tref);
-    if (!record) {
-      return crimson::ct_error::eagain::make();
-    }
-
-    return journal->submit_record(std::move(*record), tref.handle
-    ).safe_then([this, &tref](auto p) mutable {
-      auto [addr, journal_seq] = p;
-      segment_cleaner->set_journal_head(journal_seq);
-      cache->complete_commit(tref, addr, journal_seq, segment_cleaner.get());
-      lba_manager->complete_transaction(tref);
-      auto to_release = tref.get_segment_to_release();
-      if (to_release != NULL_SEG_ID) {
-       segment_cleaner->mark_segment_released(to_release);
-       return segment_manager.release(to_release);
-      } else {
-       return SegmentManager::release_ertr::now();
-      }
-    }).safe_then([&tref] {
-      return tref.handle.complete();
-    }).handle_error(
-      submit_transaction_ertr::pass_further{},
-      crimson::ct_error::all_same_way([](auto e) {
-       ceph_assert(0 == "Hit error submitting to journal");
-      }));
-    }).finally([t=std::move(t)]() mutable {
-      t->handle.exit();
-    });
+  return tref.handle.enter(write_pipeline.wait_throttle
+  ).then([this] {
+    return segment_cleaner->await_hard_limits();
+  }).then([this, t=std::move(t)]() mutable {
+    return submit_transaction_direct(std::move(t));
+  });
 }
 
 TransactionManager::submit_transaction_direct_ret