#include "crimson/common/log.h"
#include "crimson/os/seastore/segment_cleaner.h"
+#include "crimson/os/seastore/transaction_manager.h"
namespace {
seastar::logger& logger() {
if (journal_tail_target == journal_seq_t() || target > journal_tail_target) {
journal_tail_target = target;
}
+ gc_process.maybe_wake_on_space_used();
+ maybe_wake_gc_blocked_io();
}
void SegmentCleaner::update_journal_tail_committed(journal_seq_t committed)
return ecb->get_next_dirty_extents(
limit
).then([=, &t](auto dirty_list) {
- if (dirty_list.empty()) {
- return rewrite_dirty_ertr::now();
- } else {
- update_journal_tail_target(dirty_list.front()->get_dirty_from());
- }
return seastar::do_with(
std::move(dirty_list),
[this, &t](auto &dirty_list) {
dirty_list,
[this, &t](auto &e) {
logger().debug(
- "SegmentCleaner::do_immediate_work cleaning {}",
+ "SegmentCleaner::rewrite_dirty cleaning {}",
*e);
return ecb->rewrite_extent(t, e);
});
});
}
+SegmentCleaner::gc_cycle_ret SegmentCleaner::GCProcess::run()
+{
+ return seastar::do_until(
+ [this] { return stopping; },
+ [this] {
+ return maybe_wait_should_run(
+ ).then([this] {
+ cleaner.log_gc_state("GCProcess::run");
+
+ if (stopping) {
+ return seastar::now();
+ } else {
+ return cleaner.do_gc_cycle();
+ }
+ });
+ });
+}
+
+SegmentCleaner::gc_cycle_ret SegmentCleaner::do_gc_cycle()
+{
+ if (gc_should_trim_journal()) {
+ return gc_trim_journal(
+ ).handle_error(
+ crimson::ct_error::assert_all{
+ "GCProcess::run encountered invalid error in gc_trim_journal"
+ }
+ );
+ } else if (gc_should_reclaim_space()) {
+ return gc_reclaim_space(
+ ).handle_error(
+ crimson::ct_error::assert_all{
+ "GCProcess::run encountered invalid error in gc_reclaim_space"
+ }
+ );
+ } else {
+ return seastar::now();
+ }
+}
+
+SegmentCleaner::gc_trim_journal_ret SegmentCleaner::gc_trim_journal()
+{
+ return repeat_eagain(
+ [this] {
+ return seastar::do_with(
+ make_transaction(),
+ [this](auto &t) {
+ return rewrite_dirty(*t, get_dirty_tail()
+ ).safe_then([this, &t] {
+ return ecb->submit_transaction_direct(
+ std::move(t));
+ });
+ });
+ });
+}
+
+SegmentCleaner::gc_reclaim_space_ret SegmentCleaner::gc_reclaim_space()
+{
+ if (!scan_cursor) {
+ paddr_t next = P_ADDR_NULL;
+ next.segment = get_next_gc_target();
+ if (next == P_ADDR_NULL) {
+ logger().debug(
+ "SegmentCleaner::do_gc: no segments to gc");
+ return seastar::now();
+ }
+ next.offset = 0;
+ scan_cursor =
+ std::make_unique<ExtentCallbackInterface::scan_extents_cursor>(
+ next);
+ logger().debug(
+ "SegmentCleaner::do_gc: starting gc on segment {}",
+ scan_cursor->get_offset().segment);
+ } else {
+ ceph_assert(!scan_cursor->is_complete());
+ }
+
+ return ecb->scan_extents(
+ *scan_cursor,
+ config.reclaim_bytes_stride
+ ).safe_then([this](auto &&_extents) {
+ return seastar::do_with(
+ std::move(_extents),
+ [this](auto &extents) {
+ return repeat_eagain([this, &extents]() mutable {
+ return seastar::do_with(
+ make_transaction(),
+ [this, &extents](auto &t) mutable {
+ return crimson::do_for_each(
+ extents,
+ [this, &t](auto &extent) {
+ auto &[addr, info] = extent;
+ logger().debug(
+ "SegmentCleaner::gc_reclaim_space: checking extent {}",
+ info);
+ return ecb->get_extent_if_live(
+ *t,
+ info.type,
+ addr,
+ info.addr,
+ info.len
+ ).safe_then([addr=addr, &t, this](CachedExtentRef ext) {
+ if (!ext) {
+ logger().debug(
+ "SegmentCleaner::gc_reclaim_space: addr {} dead, skipping",
+ addr);
+ return ExtentCallbackInterface::rewrite_extent_ertr::now();
+ } else {
+ logger().debug(
+ "SegmentCleaner::gc_reclaim_space: addr {} alive, gc'ing {}",
+ addr,
+ *ext);
+ return ecb->rewrite_extent(
+ *t,
+ ext);
+ }
+ });
+ }
+ ).safe_then([this, &t] {
+ if (scan_cursor->is_complete()) {
+ t->mark_segment_to_release(scan_cursor->get_offset().segment);
+ }
+ return ecb->submit_transaction_direct(std::move(t));
+ });
+ });
+ });
+ });
+ }).safe_then([this] {
+ if (scan_cursor->is_complete()) {
+ scan_cursor.reset();
+ }
+ });
+}
+
SegmentCleaner::do_gc_ret SegmentCleaner::do_gc(
Transaction &t,
size_t bytes)
#include "common/ceph_time.h"
+#include "crimson/common/log.h"
#include "crimson/os/seastore/cached_extent.h"
#include "crimson/os/seastore/journal.h"
#include "crimson/os/seastore/seastore_types.h"
size_t num_segments = 0;
size_t segment_size = 0;
size_t block_size = 0;
+
size_t target_journal_segments = 0;
size_t max_journal_segments = 0;
+ double available_ratio_gc_max = 0;
double reclaim_ratio_hard_limit = 0;
- // don't apply reclaim ratio with available space below this
+ double reclaim_ratio_gc_threshhold = 0;
+
+ // don't apply reclaim ratio with available space below this (TODO remove)
double reclaim_ratio_usage_min = 0;
double available_ratio_hard_limit = 0;
+ size_t reclaim_bytes_stride = 0; // Number of bytes to reclaim on each cycle
+
static config_t default_from_segment_manager(
SegmentManager &manager) {
return config_t{
manager.get_num_segments(),
static_cast<size_t>(manager.get_segment_size()),
(size_t)manager.get_block_size(),
- 2,
- 4,
- .5,
- .95,
- .2
+ 2, // target_journal_segments
+ 4, // max_journal_segments
+ .9, // available_ratio_gc_max
+ .6, // reclaim_ratio_hard_limit
+ .3, // reclaim_ratio_gc_threshhold
+ .95, // reclaim_ratio_usage_min
+ .1, // available_ratio_hard_limit
+ 1<<20 // reclaim 1MB per gc cycle
};
}
};
ExtentCallbackInterface *ecb = nullptr;
+ /// populated if there is an IO blocked on hard limits
+ std::optional<seastar::promise<>> blocked_io_wake;
+
public:
SegmentCleaner(config_t config, bool detailed = false)
: config(config),
(SpaceTrackerI*)new SpaceTrackerSimple(
config.num_segments)),
segments(config.num_segments),
- empty_segments(config.num_segments) {}
+ empty_segments(config.num_segments),
+ gc_process(*this) {}
get_segment_ret get_segment() final;
void set_journal_head(journal_seq_t head) {
assert(journal_head == journal_seq_t() || head >= journal_head);
journal_head = head;
+ gc_process.maybe_wake_on_space_used();
}
void init_mark_segment_closed(segment_id_t segment, segment_seq_t seq) final {
addr.segment,
addr.offset,
len);
+ gc_process.maybe_wake_on_space_used();
assert(ret > 0);
}
addr.segment,
addr.offset,
len);
+ maybe_wake_gc_blocked_io();
assert(ret >= 0);
}
return space_tracker->make_empty();
}
- void complete_init() { init_complete = true; }
+ void start() {
+ gc_process.start();
+ }
+
+ void complete_init() {
+ init_complete = true;
+ start();
+ }
+
+ seastar::future<> stop() {
+ return gc_process.stop();
+ }
+
+ seastar::future<> run_until_halt() {
+ return gc_process.run_until_halt();
+ }
void set_extent_callback(ExtentCallbackInterface *cb) {
ecb = cb;
}
// GC status helpers
- std::unique_ptr<ExtentCallbackInterface::scan_extents_cursor> scan_cursor;
+ std::unique_ptr<
+ ExtentCallbackInterface::scan_extents_cursor
+ > scan_cursor;
+
+ /**
+ * GCProcess
+ *
+ * Background gc process.
+ */
+ using gc_cycle_ret = seastar::future<>;
+ class GCProcess {
+ std::optional<gc_cycle_ret> process_join;
+
+ SegmentCleaner &cleaner;
+
+ bool stopping = false;
+
+ std::optional<seastar::promise<>> blocking;
+
+ gc_cycle_ret run();
+
+ void wake() {
+ if (blocking) {
+ blocking->set_value();
+ blocking = std::nullopt;
+ }
+ }
+
+ seastar::future<> maybe_wait_should_run() {
+ return seastar::do_until(
+ [this] {
+ cleaner.log_gc_state("GCProcess::maybe_wait_should_run");
+ return stopping || cleaner.gc_should_run();
+ },
+ [this] {
+ ceph_assert(!blocking);
+ blocking = seastar::promise<>();
+ return blocking->get_future();
+ });
+ }
+ public:
+ GCProcess(SegmentCleaner &cleaner) : cleaner(cleaner) {}
+
+ void start() {
+ ceph_assert(!process_join);
+ process_join = run();
+ }
+
+ gc_cycle_ret stop() {
+ if (!process_join)
+ return seastar::now();
+ stopping = true;
+ wake();
+ ceph_assert(process_join);
+ auto ret = std::move(*process_join);
+ process_join = std::nullopt;
+ return ret.then([this] { stopping = false; });
+ }
+
+ gc_cycle_ret run_until_halt() {
+ ceph_assert(!process_join);
+ return seastar::do_until(
+ [this] {
+ cleaner.log_gc_state("GCProcess::run_until_halt");
+ return !cleaner.gc_should_run();
+ },
+ [this] {
+ return cleaner.do_gc_cycle();
+ });
+ }
+
+ void maybe_wake_on_space_used() {
+ cleaner.log_gc_state("GCProcess::maybe_wake_on_space_used");
+ if (cleaner.gc_should_run()) {
+ wake();
+ }
+ }
+ } gc_process;
+
+ using gc_ertr = ExtentCallbackInterface::extent_mapping_ertr::extend_ertr<
+ ExtentCallbackInterface::scan_extents_ertr
+ >;
+
+ gc_cycle_ret do_gc_cycle();
+
+ using gc_trim_journal_ertr = gc_ertr;
+ using gc_trim_journal_ret = gc_trim_journal_ertr::future<>;
+ gc_trim_journal_ret gc_trim_journal();
+
+ using gc_reclaim_space_ertr = gc_ertr;
+ using gc_reclaim_space_ret = gc_reclaim_space_ertr::future<>;
+ gc_reclaim_space_ret gc_reclaim_space();
+
/**
* do_gc
return (double)get_available_bytes() / (double)get_total_bytes();
}
+ /**
+ * should_block_on_gc
+ *
+ * Encapsulates whether block pending gc.
+ */
+ bool should_block_on_gc() const {
+ auto aratio = get_available_ratio();
+ return (
+ ((aratio < config.available_ratio_gc_max) &&
+ (get_reclaim_ratio() > config.reclaim_ratio_hard_limit ||
+ aratio < config.available_ratio_hard_limit)) ||
+ (get_dirty_tail_limit() > journal_tail_target)
+ );
+ }
+
+ void log_gc_state(const char *caller) const {
+ auto &logger = crimson::get_logger(ceph_subsys_filestore);
+ if (logger.is_enabled(seastar::log_level::debug)) {
+ logger.debug(
+ "SegmentCleaner::log_gc_state({}): "
+ "total {}, "
+ "available {}, "
+ "unavailable {}, "
+ "used {}, "
+ "reclaimable {}, "
+ "reclaim_ratio {}, "
+ "available_ratio {}, "
+ "should_block_on_gc {}, "
+ "gc_should_reclaim_space {}, "
+ "journal_head {}, "
+ "journal_tail_target {}, "
+ "dirty_tail {}, "
+ "dirty_tail_limit {}, "
+ "gc_should_trim_journal {}, ",
+ caller,
+ get_total_bytes(),
+ get_available_bytes(),
+ get_unavailable_bytes(),
+ get_used_bytes(),
+ get_reclaimable_bytes(),
+ get_reclaim_ratio(),
+ get_available_ratio(),
+ should_block_on_gc(),
+ gc_should_reclaim_space(),
+ journal_head,
+ journal_tail_target,
+ get_dirty_tail(),
+ get_dirty_tail_limit(),
+ gc_should_trim_journal()
+ );
+ }
+ }
+
+public:
+ seastar::future<> await_hard_limits() {
+ // The pipeline configuration prevents another IO from entering
+ // prepare until the prior one exits and clears this.
+ ceph_assert(!blocked_io_wake);
+ return seastar::do_until(
+ [this] {
+ log_gc_state("await_hard_limits");
+ return !should_block_on_gc();
+ },
+ [this] {
+ blocked_io_wake = seastar::promise<>();
+ return blocked_io_wake->get_future();
+ });
+ }
+private:
+ void maybe_wake_gc_blocked_io() {
+ if (!should_block_on_gc() && blocked_io_wake) {
+ blocked_io_wake->set_value();
+ blocked_io_wake = std::nullopt;
+ }
+ }
+
+ /**
+ * gc_should_reclaim_space
+ *
+ * Encapsulates logic for whether gc should be reclaiming segment space.
+ */
+ bool gc_should_reclaim_space() const {
+ auto aratio = get_available_ratio();
+ return (
+ (aratio < config.available_ratio_gc_max) &&
+ (get_reclaim_ratio() > config.reclaim_ratio_gc_threshhold ||
+ aratio < config.available_ratio_hard_limit)
+ );
+ }
+
+ /**
+ * gc_should_trim_journal
+ *
+ * Encapsulates logic for whether gc should be reclaiming segment space.
+ */
+ bool gc_should_trim_journal() const {
+ return get_dirty_tail() > journal_tail_target;
+ }
+
+ /**
+ * gc_should_run
+ *
+ * True if gc should be running.
+ */
+ bool gc_should_run() const {
+ return gc_should_reclaim_space() || gc_should_trim_journal();
+ }
+
/**
* get_immediate_bytes_to_gc_for_reclaim
*
* Returns the number of bytes to gc in order to bring the
- * reclaim ratio below reclaim_ratio_usage_min.
+ * reclaim ratio below reclaim_ratio_hard_limit.
*/
size_t get_immediate_bytes_to_gc_for_reclaim() const {
if (get_reclaim_ratio() < config.reclaim_ratio_hard_limit)
assert(space_tracker->get_usage(segment) == 0);
}
segments[segment].state = Segment::segment_state_t::EMPTY;
+ maybe_wake_gc_blocked_io();
}
void mark_open(segment_id_t segment) {
}
TransactionManager::close_ertr::future<> TransactionManager::close() {
- return cache->close(
- ).safe_then([this] {
+ return segment_cleaner->stop(
+ ).then([this] {
+ return cache->close();
+ }).safe_then([this] {
return journal->close();
});
}
{
logger().debug("TransactionManager::submit_transaction");
auto &tref = *t;
- return tref.handle.enter(write_pipeline.prepare
- ).then([this, &tref]() mutable {
- return segment_cleaner->do_immediate_work(tref);
- }).safe_then([this, &tref]() mutable
- -> submit_transaction_ertr::future<> {
- logger().debug("TransactionManager::submit_transaction after do_immediate");
- auto record = cache->try_construct_record(tref);
- if (!record) {
- return crimson::ct_error::eagain::make();
- }
-
- return journal->submit_record(std::move(*record), tref.handle
- ).safe_then([this, &tref](auto p) mutable {
- auto [addr, journal_seq] = p;
- segment_cleaner->set_journal_head(journal_seq);
- cache->complete_commit(tref, addr, journal_seq, segment_cleaner.get());
- lba_manager->complete_transaction(tref);
- auto to_release = tref.get_segment_to_release();
- if (to_release != NULL_SEG_ID) {
- segment_cleaner->mark_segment_released(to_release);
- return segment_manager.release(to_release);
- } else {
- return SegmentManager::release_ertr::now();
- }
- }).safe_then([&tref] {
- return tref.handle.complete();
- }).handle_error(
- submit_transaction_ertr::pass_further{},
- crimson::ct_error::all_same_way([](auto e) {
- ceph_assert(0 == "Hit error submitting to journal");
- }));
- }).finally([t=std::move(t)]() mutable {
- t->handle.exit();
- });
+ return tref.handle.enter(write_pipeline.wait_throttle
+ ).then([this] {
+ return segment_cleaner->await_hard_limits();
+ }).then([this, t=std::move(t)]() mutable {
+ return submit_transaction_direct(std::move(t));
+ });
}
TransactionManager::submit_transaction_direct_ret