if (segment_info.is_empty()) {
auto old_usage = calc_utilization(seg_id);
segments.mark_open(seg_id, seq, type, category, generation);
- gc_process.maybe_wake_on_space_used();
+ gc_process.maybe_wake_background();
auto new_usage = calc_utilization(seg_id);
adjust_segment_util(old_usage, new_usage);
INFO("opened, {}", gc_stat_printer_t{this, false});
journal_alloc_tail = alloc_tail;
}
- gc_process.maybe_wake_on_space_used();
- maybe_wake_gc_blocked_io();
+ gc_process.maybe_wake_background();
+ gc_process.maybe_wake_blocked_io();
}
void AsyncCleaner::close_segment(segment_id_t segment)
adjust_segment_util(old_usage, new_usage);
INFO("released {}, {}",
segment_to_release, gc_stat_printer_t{this, false});
- maybe_wake_gc_blocked_io();
+ gc_process.maybe_wake_blocked_io();
});
} else {
return gc_reclaim_space_ertr::now();
auto new_usage = calc_utilization(seg_addr.get_segment_id());
adjust_segment_util(old_usage, new_usage);
- gc_process.maybe_wake_on_space_used();
+ gc_process.maybe_wake_background();
assert(ret > 0);
DEBUG("segment {} new len: {}~{}, live_bytes: {}",
seg_addr.get_segment_id(),
len);
auto new_usage = calc_utilization(seg_addr.get_segment_id());
adjust_segment_util(old_usage, new_usage);
- maybe_wake_gc_blocked_io();
+ gc_process.maybe_wake_blocked_io();
assert(ret >= 0);
DEBUG("segment {} free len: {}~{}, live_bytes: {}",
seg_addr.get_segment_id(),
ceph_assert(is_ready());
// The pipeline configuration prevents another IO from entering
// prepare until the prior one exits and clears this.
- ceph_assert(!blocked_io_wake);
++stats.io_count;
bool is_blocked = false;
if (should_block_on_trim()) {
++stats.io_blocked_count;
stats.io_blocked_sum += stats.io_blocking_num;
}
- return seastar::do_until(
- [this] {
- log_gc_state("await_hard_limits");
- return !should_block_on_gc();
- },
- [this] {
- blocked_io_wake = seastar::promise<>();
- return blocked_io_wake->get_future();
- }
+ return gc_process.io_await_hard_limits(
).then([this, projected_usage, is_blocked] {
- ceph_assert(!blocked_io_wake);
stats.projected_used_bytes += projected_usage;
++stats.projected_count;
stats.projected_used_bytes_sum += stats.projected_used_bytes;
ceph_assert(is_ready());
ceph_assert(stats.projected_used_bytes >= projected_usage);
stats.projected_used_bytes -= projected_usage;
- return maybe_wake_gc_blocked_io();
+ gc_process.maybe_wake_blocked_io();
}
std::ostream &operator<<(std::ostream &os, AsyncCleaner::gc_stat_printer_t stats)
}
};
+/**
+ * Callback interface to wake up background works
+ */
+struct BackgroundListener {
+ virtual ~BackgroundListener() = default;
+ virtual void maybe_wake_background() = 0;
+ virtual void maybe_wake_blocked_io() = 0;
+};
+
/**
* Callback interface for journal trimming
*/
ExtentCallbackInterface *ecb = nullptr;
- /// populated if there is an IO blocked on hard limits
- std::optional<seastar::promise<>> blocked_io_wake;
-
SegmentSeqAllocatorRef ool_segment_seq_allocator;
public:
}
journal_head = head;
- gc_process.maybe_wake_on_space_used();
+ gc_process.maybe_wake_background();
}
journal_seq_t get_dirty_tail() const final {
void update_segment_avail_bytes(segment_type_t type, paddr_t offset) final {
segments.update_written_to(type, offset);
- gc_process.maybe_wake_on_space_used();
+ gc_process.maybe_wake_background();
}
void update_modify_time(
* GCProcess
*
* Background gc process.
+ *
+ * TODO: move up to EPM
*/
using gc_cycle_ret = seastar::future<>;
- class GCProcess {
+ class GCProcess : public BackgroundListener {
public:
GCProcess(AsyncCleaner &cleaner) : cleaner(cleaner) {}
assert(!is_stopping());
}
- void maybe_wake_on_space_used() {
- if (is_stopping()) {
- return;
- }
- if (cleaner.gc_should_run()) {
- wake();
- }
- }
-
gc_cycle_ret stop() {
if (is_stopping()) {
return seastar::now();
auto ret = std::move(*process_join);
process_join.reset();
assert(is_stopping());
- wake();
+ do_wake_background();
return ret;
}
);
}
+ seastar::future<> io_await_hard_limits() {
+ ceph_assert(!blocking_io);
+ return seastar::do_until(
+ [this] {
+ cleaner.log_gc_state("GCProcess::io_await_hard_limits");
+ return !cleaner.should_block_on_gc();
+ },
+ [this] {
+ blocking_io = seastar::promise<>();
+ return blocking_io->get_future();
+ }
+ );
+ }
+
+ void maybe_wake_background() final {
+ if (is_stopping()) {
+ return;
+ }
+ if (cleaner.gc_should_run()) {
+ do_wake_background();
+ }
+ }
+
+ void maybe_wake_blocked_io() final {
+ if (!cleaner.is_ready()) {
+ return;
+ }
+ if (!cleaner.should_block_on_gc() && blocking_io) {
+ blocking_io->set_value();
+ blocking_io = std::nullopt;
+ }
+ }
+
private:
bool is_stopping() const {
return !process_join;
gc_cycle_ret run();
- void wake() {
- if (blocking) {
- blocking->set_value();
- blocking = std::nullopt;
+ void do_wake_background() {
+ if (blocking_background) {
+ blocking_background->set_value();
+ blocking_background = std::nullopt;
}
}
AsyncCleaner &cleaner;
std::optional<gc_cycle_ret> process_join;
- std::optional<seastar::promise<>> blocking;
+ std::optional<seastar::promise<>> blocking_background;
+ std::optional<seastar::promise<>> blocking_io;
bool is_running_until_halt = false;
} gc_process;
void log_gc_state(const char *caller) const;
- void maybe_wake_gc_blocked_io() {
- if (!is_ready()) {
- return;
- }
- if (!should_block_on_gc() && blocked_io_wake) {
- blocked_io_wake->set_value();
- blocked_io_wake = std::nullopt;
- }
- }
-
using scan_extents_ertr = SegmentManagerGroup::scan_valid_records_ertr;
using scan_extents_ret = scan_extents_ertr::future<>;
scan_extents_ret scan_no_tail_segment(