* record_header_t to store the average modify time for dirty extents.
* Drop tracking rewrite-time.
* Drop the last-modify field in extent_info_t.
* Maintain modify-time during rewriting.
* Introduce 3 GC policies: greedy, benefit, and cost-benefit.
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
SET_SUBSYS(seastore_cleaner);
+namespace {
+
+enum class gc_formula_t {
+ GREEDY,
+ BENEFIT,
+ COST_BENEFIT,
+};
+constexpr auto gc_formula = gc_formula_t::COST_BENEFIT;
+
+}
+
namespace crimson::os::seastore {
void segment_info_t::set_open(
type = segment_type_t::NULL_SEG;
category = data_category_t::NUM;
generation = NULL_GENERATION;
- last_modified = {};
- last_rewritten = {};
+ modify_time = NULL_TIME;
+ num_extents = 0;
written_to = 0;
}
<< ", type=" << info.type
<< ", category=" << info.category
<< ", generation=" << reclaim_gen_printer_t{info.generation}
- << ", last_modified=" << info.last_modified.time_since_epoch()
- << ", last_rewritten=" << info.last_rewritten.time_since_epoch()
+ << ", modify_time=" << sea_time_point_printer_t{info.modify_time}
+ << ", num_extents=" << info.num_extents
<< ", written_to=" << info.written_to;
}
return out << ")";
total_bytes = 0;
avail_bytes_in_open = 0;
+
+ modify_times.clear();
}
void segments_info_t::add_segment_manager(
category, reclaim_gen_printer_t{generation},
segment_info, num_empty, num_open, num_closed);
ceph_assert(segment_info.is_empty());
- segment_info.init_closed(
- seq, type, category, generation, get_segment_size());
ceph_assert(num_empty > 0);
--num_empty;
++num_closed;
++num_type_ool;
}
// do not increment count_close_*;
+
+ if (segment_info.modify_time != NULL_TIME) {
+ modify_times.insert(segment_info.modify_time);
+ } else {
+ ceph_assert(segment_info.num_extents == 0);
+ }
+
+ segment_info.init_closed(
+ seq, type, category, generation, get_segment_size());
}
void segments_info_t::mark_open(
category, reclaim_gen_printer_t{generation},
segment_info, num_empty, num_open, num_closed);
ceph_assert(segment_info.is_empty());
- segment_info.set_open(seq, type, category, generation);
ceph_assert(num_empty > 0);
--num_empty;
++num_open;
++num_type_ool;
++count_open_ool;
}
- ceph_assert(segment_info.written_to == 0);
avail_bytes_in_open += get_segment_size();
+
+ segment_info.set_open(seq, type, category, generation);
}
void segments_info_t::mark_empty(
ceph_assert(segment_info.is_closed());
auto type = segment_info.type;
assert(type != segment_type_t::NULL_SEG);
- segment_info.set_empty();
ceph_assert(num_closed > 0);
--num_closed;
++num_empty;
--num_type_ool;
++count_release_ool;
}
+
+ if (segment_info.modify_time != NULL_TIME) {
+ auto to_erase = modify_times.find(segment_info.modify_time);
+ ceph_assert(to_erase != modify_times.end());
+ modify_times.erase(to_erase);
+ } else {
+ ceph_assert(segment_info.num_extents == 0);
+ }
+
+ segment_info.set_empty();
}
void segments_info_t::mark_closed(
segment, segment_info,
num_empty, num_open, num_closed);
ceph_assert(segment_info.is_open());
- segment_info.set_closed();
ceph_assert(num_open > 0);
--num_open;
++num_closed;
auto seg_avail_bytes = get_segment_size() - segment_info.written_to;
ceph_assert(avail_bytes_in_open >= seg_avail_bytes);
avail_bytes_in_open -= seg_avail_bytes;
+
+ if (segment_info.modify_time != NULL_TIME) {
+ modify_times.insert(segment_info.modify_time);
+ } else {
+ ceph_assert(segment_info.num_extents == 0);
+ }
+
+ segment_info.set_closed();
}
void segments_info_t::update_written_to(
);
}
+double AsyncCleaner::calc_gc_benefit_cost(
+ segment_id_t id,
+ const sea_time_point &now_time,
+ const sea_time_point &bound_time) const
+{
+ double util = calc_utilization(id);
+ ceph_assert(util >= 0 && util < 1);
+ if constexpr (gc_formula == gc_formula_t::GREEDY) {
+ return 1 - util;
+ }
+
+ if constexpr (gc_formula == gc_formula_t::COST_BENEFIT) {
+ if (util == 0) {
+ return std::numeric_limits<double>::max();
+ }
+ auto modify_time = segments[id].modify_time;
+ double age_segment = modify_time.time_since_epoch().count();
+ double age_now = now_time.time_since_epoch().count();
+ if (likely(age_now > age_segment)) {
+ return (1 - util) * (age_now - age_segment) / (2 * util);
+ } else {
+ // time is wrong
+ return (1 - util) / (2 * util);
+ }
+ }
+
+ assert(gc_formula == gc_formula_t::BENEFIT);
+ auto modify_time = segments[id].modify_time;
+ double age_factor = 0.5; // middle value if age is invalid
+ if (likely(bound_time != NULL_TIME &&
+ modify_time != NULL_TIME &&
+ now_time > modify_time)) {
+ assert(modify_time >= bound_time);
+ double age_bound = bound_time.time_since_epoch().count();
+ double age_now = now_time.time_since_epoch().count();
+ double age_segment = modify_time.time_since_epoch().count();
+ age_factor = (age_now - age_segment) / (age_now - age_bound);
+ }
+ return ((1 - 2 * age_factor) * util * util +
+ (2 * age_factor - 2) * util + 1);
+}
+
AsyncCleaner::rewrite_dirty_ret AsyncCleaner::rewrite_dirty(
Transaction &t,
journal_seq_t limit)
dirty_list,
[this, FNAME, &t](auto &e) {
DEBUGT("cleaning {}", t, *e);
- return ecb->rewrite_extent(t, e, DIRTY_GENERATION);
+ return ecb->rewrite_extent(t, e, DIRTY_GENERATION, NULL_TIME);
});
});
});
if (!reclaim_state) {
segment_id_t seg_id = get_next_reclaim_segment();
auto &segment_info = segments[seg_id];
- INFO("reclaim {} {} start, usage={}",
- seg_id, segment_info, space_tracker->calc_utilization(seg_id));
+ INFO("reclaim {} {} start, usage={}, time_bound={}",
+ seg_id, segment_info,
+ space_tracker->calc_utilization(seg_id),
+ sea_time_point_printer_t{segments.get_time_bound()});
ceph_assert(segment_info.is_closed());
reclaim_state = reclaim_state_t::create(
seg_id, segment_info.generation, segments.get_segment_size());
reclaim_state->start_pos,
reclaim_state->end_pos);
double pavail_ratio = get_projected_available_ratio();
- seastar::lowres_system_clock::time_point start = seastar::lowres_system_clock::now();
+ sea_time_point start = seastar::lowres_system_clock::now();
return seastar::do_with(
(size_t)0,
}
return fut;
}).si_then([&extents, this, &t, &reclaimed] {
+ auto modify_time = segments[reclaim_state->get_segment_id()].modify_time;
return trans_intr::do_for_each(
extents,
- [this, &t, &reclaimed](auto &ext) {
+ [this, modify_time, &t, &reclaimed](auto &ext) {
reclaimed += ext->get_length();
- return ecb->rewrite_extent(t, ext, reclaim_state->target_generation);
+ return ecb->rewrite_extent(
+ t, ext, reclaim_state->target_generation, modify_time);
});
});
}).si_then([this, &t, &seq] {
}
return sm_group->read_segment_tail(
segment_id
- ).safe_then([this, segment_id, header](auto tail)
+ ).safe_then([this, FNAME, segment_id, header](auto tail)
-> scan_extents_ertr::future<> {
if (tail.segment_nonce != header.segment_nonce) {
return scan_no_tail_segment(header, segment_id);
}
- time_point last_modified(duration(tail.last_modified));
- time_point last_rewritten(duration(tail.last_rewritten));
- segments.update_last_modified_rewritten(
- segment_id, last_modified, last_rewritten);
if (tail.get_type() == segment_type_t::JOURNAL) {
update_journal_tail_committed(tail.journal_tail);
update_journal_tail_target(
tail.journal_tail,
tail.alloc_replay_from);
}
+
+ sea_time_point modify_time = mod_to_timepoint(tail.modify_time);
+ std::size_t num_extents = tail.num_extents;
+ if ((modify_time == NULL_TIME && num_extents == 0) ||
+ (modify_time != NULL_TIME && num_extents != 0)) {
+ segments.update_modify_time(segment_id, modify_time, num_extents);
+ } else {
+ ERROR("illegal modify time {}", tail);
+ return crimson::ct_error::input_output_error::make();
+ }
+
init_mark_segment_closed(
segment_id,
header.segment_seq,
DEBUG("out-of-line segment {}, decodeing {} records",
segment_id,
header.records);
- auto maybe_headers = try_decode_record_headers(header, mdbuf);
- if (!maybe_headers) {
- ERROR("unable to decode record headers for record group {}",
- locator.record_block_base);
- return crimson::ct_error::input_output_error::make();
- }
-
- for (auto& header : *maybe_headers) {
- mod_time_point_t ctime = header.commit_time;
- auto commit_type = header.commit_type;
- if (!ctime) {
- ERROR("extent {} 0 commit_time",
- ctime);
- ceph_abort("0 commit_time");
- }
- time_point commit_time{duration(ctime)};
- assert(commit_type == record_commit_type_t::MODIFY
- || commit_type == record_commit_type_t::REWRITE);
- if (commit_type == record_commit_type_t::MODIFY) {
- segments.update_last_modified_rewritten(segment_id, commit_time, {});
- }
- if (commit_type == record_commit_type_t::REWRITE) {
- segments.update_last_modified_rewritten(segment_id, {}, commit_time);
- }
- }
} else {
DEBUG("inline segment {}, decodeing {} records",
segment_id,
}
}
}
+
+ auto maybe_headers = try_decode_record_headers(header, mdbuf);
+ if (!maybe_headers) {
+ // This should be impossible, we did check the crc on the mdbuf
+ ERROR("unable to decode record headers for record group {}",
+ locator.record_block_base);
+ return crimson::ct_error::input_output_error::make();
+ }
+
+ for (auto& header : *maybe_headers) {
+ auto modify_time = mod_to_timepoint(header.modify_time);
+ if (header.extents == 0 || modify_time != NULL_TIME) {
+ segments.update_modify_time(
+ segment_id, modify_time, header.extents);
+ } else {
+ ERROR("illegal modify time {}", header);
+ return crimson::ct_error::input_output_error::make();
+ }
+ }
return seastar::now();
}),
[this, header](auto &cursor, auto &handler)
init_complete = true;
return;
}
- INFO("done, start GC");
+ INFO("done, start GC, time_bound={}",
+ sea_time_point_printer_t{segments.get_time_bound()});
ceph_assert(segments.get_journal_head() != JOURNAL_SEQ_NULL);
init_complete = true;
gc_process.start();
void AsyncCleaner::mark_space_used(
paddr_t addr,
extent_len_t len,
- time_point last_modified,
- time_point last_rewritten,
bool init_scan)
{
LOG_PREFIX(AsyncCleaner::mark_space_used);
auto new_usage = calc_utilization(seg_addr.get_segment_id());
adjust_segment_util(old_usage, new_usage);
- // use the last extent's last modified time for the calculation of the projected
- // time the segments' live extents are to stay unmodified; this is an approximation
- // of the sprite lfs' segment "age".
-
- segments.update_last_modified_rewritten(
- seg_addr.get_segment_id(), last_modified, last_rewritten);
-
gc_process.maybe_wake_on_space_used();
assert(ret > 0);
DEBUG("segment {} new len: {}~{}, live_bytes: {}",
LOG_PREFIX(AsyncCleaner::get_next_reclaim_segment);
segment_id_t id = NULL_SEG_ID;
double max_benefit_cost = 0;
+ sea_time_point now_time;
+ if constexpr (gc_formula != gc_formula_t::GREEDY) {
+ now_time = seastar::lowres_system_clock::now();
+ } else {
+ now_time = NULL_TIME;
+ }
+ sea_time_point bound_time;
+ if constexpr (gc_formula == gc_formula_t::BENEFIT) {
+ bound_time = segments.get_time_bound();
+ if (bound_time == NULL_TIME) {
+ WARN("BENEFIT -- bound_time is NULL_TIME");
+ }
+ } else {
+ bound_time = NULL_TIME;
+ }
for (auto& [_id, segment_info] : segments) {
if (segment_info.is_closed() &&
!segment_info.is_in_journal(journal_tail_committed)) {
- double benefit_cost = calc_gc_benefit_cost(_id);
+ double benefit_cost = calc_gc_benefit_cost(_id, now_time, bound_time);
if (benefit_cost > max_benefit_cost) {
id = _id;
max_benefit_cost = benefit_cost;
* It is read-only outside segments_info_t.
*/
struct segment_info_t {
- using time_point = seastar::lowres_system_clock::time_point;
-
segment_id_t id = NULL_SEG_ID;
// segment_info_t is initiated as set_empty()
reclaim_gen_t generation = NULL_GENERATION;
- time_point last_modified;
- time_point last_rewritten;
+ sea_time_point modify_time = NULL_TIME;
+
+ std::size_t num_extents = 0;
std::size_t written_to = 0;
void set_closed();
- void update_last_modified_rewritten(
- time_point _last_modified, time_point _last_rewritten) {
- if (_last_modified != time_point() && last_modified < _last_modified) {
- last_modified = _last_modified;
- }
- if (_last_rewritten != time_point() && last_rewritten < _last_rewritten) {
- last_rewritten = _last_rewritten;
+ void update_modify_time(sea_time_point _modify_time, std::size_t _num_extents) {
+ ceph_assert(!is_closed());
+ assert(_modify_time != NULL_TIME);
+ assert(_num_extents != 0);
+ if (modify_time == NULL_TIME) {
+ modify_time = _modify_time;
+ num_extents = _num_extents;
+ } else {
+ modify_time = get_average_time(
+ modify_time, num_extents, _modify_time, _num_extents);
+ num_extents += _num_extents;
}
}
};
*/
class segments_info_t {
public:
- using time_point = seastar::lowres_system_clock::time_point;
-
segments_info_t() {
reset();
}
};
}
+ sea_time_point get_time_bound() const {
+ if (!modify_times.empty()) {
+ return *modify_times.begin();
+ } else {
+ return NULL_TIME;
+ }
+ }
+
void reset();
void add_segment_manager(SegmentManager &segment_manager);
void update_written_to(segment_type_t, paddr_t);
- void update_last_modified_rewritten(
- segment_id_t id, time_point last_modified, time_point last_rewritten) {
- segments[id].update_last_modified_rewritten(last_modified, last_rewritten);
+ void update_modify_time(
+ segment_id_t id, sea_time_point tp, std::size_t num) {
+ if (num == 0) {
+ return;
+ }
+
+ assert(tp != NULL_TIME);
+ segments[id].update_modify_time(tp, num);
}
private:
std::size_t total_bytes;
std::size_t avail_bytes_in_open;
+
+ std::multiset<sea_time_point> modify_times;
};
/**
virtual void update_segment_avail_bytes(segment_type_t, paddr_t) = 0;
+ virtual void update_modify_time(
+ segment_id_t, sea_time_point, std::size_t) = 0;
+
virtual SegmentManagerGroup* get_segment_manager_group() = 0;
virtual ~SegmentProvider() {}
class AsyncCleaner : public SegmentProvider {
public:
- using time_point = seastar::lowres_system_clock::time_point;
- using duration = seastar::lowres_system_clock::duration;
-
/// Config
struct config_t {
/// Number of minimum journal segments to stop trimming.
virtual rewrite_extent_ret rewrite_extent(
Transaction &t,
CachedExtentRef extent,
- reclaim_gen_t target_generation) = 0;
+ reclaim_gen_t target_generation,
+ sea_time_point modify_time) = 0;
/**
* get_extent_if_live
gc_process.maybe_wake_on_space_used();
}
+ void update_modify_time(
+ segment_id_t id, sea_time_point tp, std::size_t num_extents) final {
+ ceph_assert(num_extents == 0 || tp != NULL_TIME);
+ segments.update_modify_time(id, tp, num_extents);
+ }
+
SegmentManagerGroup* get_segment_manager_group() final {
return sm_group.get();
}
void mark_space_used(
paddr_t addr,
extent_len_t len,
- time_point last_modified = time_point(),
- time_point last_rewritten = time_point(),
bool init_scan = false);
void mark_space_free(
// journal status helpers
- double calc_gc_benefit_cost(segment_id_t id) const {
- double util = calc_utilization(id);
- ceph_assert(util >= 0 && util < 1);
- auto cur_time = seastar::lowres_system_clock::now();
- auto segment = segments[id];
- assert(cur_time >= segment.last_modified);
- auto segment_age =
- cur_time - std::max(segment.last_modified, segment.last_rewritten);
- uint64_t age = segment_age.count();
- return (1 - util) * age / (1 + util);
- }
+ double calc_gc_benefit_cost(
+ segment_id_t id,
+ const sea_time_point &now_time,
+ const sea_time_point &bound_time) const;
segment_id_t get_next_reclaim_segment() const;
0,
fixed_kv_extent.get_length(),
n_fixed_kv_extent->get_bptr().c_str());
+ n_fixed_kv_extent->set_modify_time(fixed_kv_extent.get_modify_time());
n_fixed_kv_extent->pin.set_range(n_fixed_kv_extent->get_node_meta());
/* This is a bit underhanded. Any relative addrs here must necessarily
{
assert(ref->is_dirty());
assert(!ref->primary_ref_list_hook.is_linked());
+ ceph_assert(ref->get_modify_time() != NULL_TIME);
intrusive_ptr_add_ref(&*ref);
dirty.push_back(*ref);
stats.dirty_bytes += ref->get_length();
record_t record;
auto commit_time = seastar::lowres_system_clock::now();
- record.commit_time = commit_time.time_since_epoch().count();
- record.commit_type = (t.get_src() == Transaction::src_t::MUTATE)
- ? record_commit_type_t::MODIFY
- : record_commit_type_t::REWRITE;
// Add new copy of mutated blocks, set_io_wait to block until written
record.deltas.reserve(t.mutated_block_list.size());
auto delta_length = delta_bl.length();
DEBUGT("mutated extent with {}B delta, commit replace extent ... -- {}, prior={}",
t, delta_length, *i, *i->prior_instance);
+ i->set_modify_time(commit_time);
commit_replace_extent(t, i, i->prior_instance);
i->prepare_write();
i->set_io_wait();
- i->set_last_modified(commit_time);
assert(i->get_version() > 0);
auto final_crc = i->get_crc32c();
if (i->get_type() == extent_types_t::ROOT) {
ceph_assert(0 == "ROOT never gets written as a fresh block");
}
- if (t.get_src() == Transaction::src_t::MUTATE) {
- i->set_last_modified(commit_time);
- } else {
- assert(is_cleaner_transaction(t.get_src()));
- i->set_last_rewritten(commit_time);
- }
-
assert(bl.length() == i->get_length());
+ auto modify_time = i->get_modify_time();
+ if (modify_time == NULL_TIME) {
+ modify_time = commit_time;
+ }
record.push_back(extent_t{
i->get_type(),
i->is_logical()
: (is_lba_node(i->get_type())
? i->cast<lba_manager::btree::LBANode>()->get_node_meta().begin
: L_ADDR_NULL),
- std::move(bl),
- i->get_last_modified().time_since_epoch().count()
- });
+ std::move(bl)
+ },
+ modify_time);
if (i->is_valid()
&& is_backref_mapped_extent_node(i)) {
alloc_delta.alloc_blk_ranges.emplace_back(
assert(ool_stats.is_clear());
}
+ if (record.modify_time == NULL_TIME) {
+ record.modify_time = commit_time;
+ }
+
SUBDEBUGT(seastore_t,
"commit H{} dirty_from={}, {} read, {} fresh with {} invalid, "
"{} delta, {} retire, {}(md={}B, data={}B) ool-records, "
- "{}B md, {}B data",
+ "{}B md, {}B data, modify_time={}",
t, (void*)&t.get_handle(),
get_oldest_dirty_from().value_or(JOURNAL_SEQ_NULL),
read_stat,
ool_stats.md_bytes,
ool_stats.get_data_bytes(),
record.size.get_raw_mdlength(),
- record.size.dlength);
+ record.size.dlength,
+ sea_time_point_printer_t{record.modify_time});
if (is_cleaner_transaction(trans_src)) {
// CLEANER transaction won't contain any onode tree operations
assert(t.onode_tree_stats.is_clear());
if (cleaner) {
cleaner->mark_space_used(
i->get_paddr(),
- i->get_length(),
- (t.get_src() == Transaction::src_t::MUTATE)
- ? i->last_modified
- : seastar::lowres_system_clock::time_point(),
- (is_cleaner_transaction(t.get_src()))
- ? i->last_rewritten
- : seastar::lowres_system_clock::time_point());
+ i->get_length());
}
if (is_backref_mapped_extent_node(i)) {
backref_list.emplace_back(
paddr_t record_base,
const delta_info_t &delta,
const journal_seq_t &alloc_replay_from,
- seastar::lowres_system_clock::time_point& last_modified)
+ sea_time_point &modify_time)
{
LOG_PREFIX(Cache::replay_delta);
assert(alloc_replay_from != JOURNAL_SEQ_NULL);
+ ceph_assert(modify_time != NULL_TIME);
if (delta.type == extent_types_t::ROOT) {
TRACE("replay root delta at {} {}, remove extent ... -- {}, prv_root={}",
journal_seq, record_base, delta, *root);
root->state = CachedExtent::extent_state_t::DIRTY;
DEBUG("replayed root delta at {} {}, add extent -- {}, root={}",
journal_seq, record_base, delta, *root);
- root->set_last_modified(last_modified);
+ root->set_modify_time(modify_time);
add_extent(root);
return replay_delta_ertr::now();
} else if (delta.type == extent_types_t::ALLOC_INFO) {
assert(extent->last_committed_crc == delta.prev_crc);
extent->apply_delta_and_adjust_crc(record_base, delta.bl);
- extent->set_last_modified(last_modified);
+ extent->set_modify_time(modify_time);
assert(extent->last_committed_crc == delta.final_crc);
extent->version++;
const delta_info_t &delta,
const journal_seq_t &, // journal seq from which alloc
// delta should be replayed
- seastar::lowres_system_clock::time_point& last_modified);
+ sea_time_point &modify_time);
/**
* init_cached_extents
CachedExtentRef prior_instance;
// time of the last modification
- seastar::lowres_system_clock::time_point last_modified;
-
- // time of the last rewrite
- seastar::lowres_system_clock::time_point last_rewritten;
+ sea_time_point modify_time = NULL_TIME;
public:
void init(extent_state_t _state,
reclaim_generation = gen;
}
- void set_last_modified(seastar::lowres_system_clock::duration d) {
- last_modified = seastar::lowres_system_clock::time_point(d);
- }
-
- void set_last_modified(seastar::lowres_system_clock::time_point t) {
- last_modified = t;
- }
-
- seastar::lowres_system_clock::time_point get_last_modified() const {
- return last_modified;
- }
-
- void set_last_rewritten(seastar::lowres_system_clock::duration d) {
- last_rewritten = seastar::lowres_system_clock::time_point(d);
+ void set_modify_time(sea_time_point t) {
+ modify_time = t;
}
- void set_last_rewritten(seastar::lowres_system_clock::time_point t) {
- last_rewritten = t;
+ sea_time_point get_modify_time() const {
+ return modify_time;
}
- seastar::lowres_system_clock::time_point get_last_rewritten() const {
- return last_rewritten;
- }
/**
* duplicate_for_write
*
<< ", type=" << get_type()
<< ", version=" << version
<< ", dirty_from_or_retired_at=" << dirty_from_or_retired_at
- << ", last_modified=" << last_modified.time_since_epoch()
- << ", last_rewritten=" << last_rewritten.time_since_epoch()
+ << ", modify_time=" << sea_time_point_printer_t{modify_time}
<< ", paddr=" << get_paddr()
<< ", length=" << get_length()
<< ", state=" << state
}
record_t record;
std::list<LogicalCachedExtentRef> pending_extents;
-
auto commit_time = seastar::lowres_system_clock::now();
- record_commit_type_t commit_type;
- if (t.get_src() == Transaction::src_t::MUTATE) {
- commit_type = record_commit_type_t::MODIFY;
- } else {
- assert(is_cleaner_transaction(t.get_src()));
- commit_type = record_commit_type_t::REWRITE;
- }
- record.commit_time = commit_time.time_since_epoch().count();
- record.commit_type = commit_type;
for (auto it = extents.begin(); it != extents.end();) {
auto& extent = *it;
TRACET("{} extents={} add extent to record -- {}",
t, segment_allocator.get_name(),
extents.size(), *extent);
- if (commit_type == record_commit_type_t::MODIFY) {
- extent->set_last_modified(commit_time);
- } else {
- assert(commit_type == record_commit_type_t::REWRITE);
- extent->set_last_rewritten(commit_time);
- }
ceph::bufferlist bl;
extent->prepare_write();
bl.append(extent->get_bptr());
assert(bl.length() == extent->get_length());
- record.push_back(extent_t{
- extent->get_type(),
- extent->get_laddr(),
- std::move(bl),
- extent->get_last_modified().time_since_epoch().count()});
+ auto modify_time = extent->get_modify_time();
+ if (modify_time == NULL_TIME) {
+ modify_time = commit_time;
+ }
+ record.push_back(
+ extent_t{
+ extent->get_type(),
+ extent->get_laddr(),
+ std::move(bl)},
+ modify_time);
pending_extents.push_back(extent);
it = extents.erase(it);
const delta_info_t&,
const journal_seq_t, // journal seq from which
// alloc delta should replayed
- seastar::lowres_system_clock::time_point last_modified)>;
+ sea_time_point modify_time)>;
virtual replay_ret replay(
delta_handler_t &&delta_handler) = 0;
record_deltas.deltas,
[locator,
&d_handler](auto& p) {
- auto& commit_time = p.first;
+ auto& modify_time = p.first;
auto& delta = p.second;
return d_handler(locator,
delta,
locator.write_result.start_seq,
- seastar::lowres_system_clock::time_point(
- seastar::lowres_system_clock::duration(commit_time))
- );
+ modify_time);
});
}).safe_then([this, &cursor_addr]() {
if (cursor_addr >= get_journal_end()) {
cur_journal_tail = NO_DELTAS;
new_alloc_replay_from = NO_DELTAS;
}
+ ceph_assert((close_seg_info.modify_time == NULL_TIME &&
+ close_seg_info.num_extents == 0) ||
+ (close_seg_info.modify_time != NULL_TIME &&
+ close_seg_info.num_extents != 0));
auto tail = segment_tail_t{
close_seg_info.seq,
close_segment_id,
new_alloc_replay_from,
current_segment_nonce,
type,
- close_seg_info.last_modified.time_since_epoch().count(),
- close_seg_info.last_rewritten.time_since_epoch().count()};
+ timepoint_to_mod(close_seg_info.modify_time),
+ close_seg_info.num_extents};
ceph::bufferlist bl;
encode(tail, bl);
INFO("{} close segment id={}, seq={}, written_to={}, nonce={}, journal_tail={}",
LOG_PREFIX(RecordSubmitter::submit);
assert(is_available());
assert(check_action(record.size) != action_t::ROLL);
+ segment_allocator.get_provider().update_modify_time(
+ segment_allocator.get_segment_id(),
+ record.modify_time,
+ record.extents.size());
auto eval = p_current_batch->evaluate_submit(
record.size, segment_allocator.get_block_size());
bool needs_flush = (
return print_name;
}
+ SegmentProvider &get_provider() {
+ return segment_provider;
+ }
+
seastore_off_t get_block_size() const {
return sm_group.get_block_size();
}
FNAME,
&handler](auto &p)
{
- auto& commit_time = p.first;
+ auto& modify_time = p.first;
auto& delta = p.second;
/* The journal may validly contain deltas for extents in
* since released segments. We can detect those cases by
locator,
delta,
segment_provider.get_alloc_info_replay_from(),
- seastar::lowres_system_clock::time_point(
- seastar::lowres_system_clock::duration(commit_time)));
+ modify_time);
});
});
});
}
}
+std::ostream &operator<<(std::ostream &out, sea_time_point_printer_t tp)
+{
+ if (tp.tp == NULL_TIME) {
+ return out << "tp(NULL)";
+ }
+ auto time = seastar::lowres_system_clock::to_time_t(tp.tp);
+ char buf[32];
+ std::strftime(buf, sizeof(buf), "%Y-%m-%d %H:%M:%S", std::localtime(&time));
+ return out << "tp(" << buf << ")";
+}
+
+std::ostream &operator<<(std::ostream &out, mod_time_point_printer_t tp) {
+ auto time = mod_to_timepoint(tp.tp);
+ return out << "mod_" << sea_time_point_printer_t{time};
+}
+
std::ostream &operator<<(std::ostream &out, const laddr_list_t &rhs)
{
bool first = false;
<< ", segment_id=" << tail.physical_segment_id
<< ", journal_tail=" << tail.journal_tail
<< ", segment_nonce=" << tail.segment_nonce
- << ", last_modified=" << tail.last_modified
- << ", last_rewritten=" << tail.last_rewritten
+ << ", modify_time=" << mod_time_point_printer_t{tail.modify_time}
+ << ", num_extents=" << tail.num_extents
<< ")";
}
return out << "record_t("
<< "num_extents=" << r.extents.size()
<< ", num_deltas=" << r.deltas.size()
+ << ", modify_time=" << sea_time_point_printer_t{r.modify_time}
<< ")";
}
return out << "record_header_t("
<< "num_extents=" << r.extents
<< ", num_deltas=" << r.deltas
+ << ", modify_time=" << mod_time_point_printer_t{r.modify_time}
<< ")";
}
record_header_t rheader{
(extent_len_t)r.deltas.size(),
(extent_len_t)r.extents.size(),
- r.commit_time,
- r.commit_type
+ timepoint_to_mod(r.modify_time)
};
encode(rheader, bl);
}
for (auto& i: result_iter->deltas) {
try {
decode(i.second, bliter);
- i.first = r.header.commit_time;
+ i.first = mod_to_timepoint(r.header.modify_time);
} catch (ceph::buffer::error &e) {
journal_logger().debug(
"try_decode_deltas: failed, "
}
}
-enum class record_commit_type_t : uint8_t {
- NONE,
- MODIFY,
- REWRITE
-};
-
// type for extent modification time, milliseconds since the epoch
+using sea_time_point = seastar::lowres_system_clock::time_point;
+using sea_duration = seastar::lowres_system_clock::duration;
using mod_time_point_t = int64_t;
+constexpr mod_time_point_t
+timepoint_to_mod(const sea_time_point &t) {
+ return std::chrono::duration_cast<std::chrono::milliseconds>(
+ t.time_since_epoch()).count();
+}
+
+constexpr sea_time_point
+mod_to_timepoint(mod_time_point_t t) {
+ return sea_time_point(std::chrono::duration_cast<sea_duration>(
+ std::chrono::milliseconds(t)));
+}
+
+constexpr auto NULL_TIME = sea_time_point();
+constexpr auto NULL_MOD_TIME = timepoint_to_mod(NULL_TIME);
+
+struct sea_time_point_printer_t {
+ sea_time_point tp;
+};
+std::ostream &operator<<(std::ostream &out, sea_time_point_printer_t tp);
+
+struct mod_time_point_printer_t {
+ mod_time_point_t tp;
+};
+std::ostream &operator<<(std::ostream &out, mod_time_point_printer_t tp);
+
+constexpr sea_time_point
+get_average_time(const sea_time_point& t1, std::size_t n1,
+ const sea_time_point& t2, std::size_t n2) {
+ assert(t1 != NULL_TIME);
+ assert(t2 != NULL_TIME);
+ auto new_size = n1 + n2;
+ assert(new_size > 0);
+ auto c1 = t1.time_since_epoch().count();
+ auto c2 = t2.time_since_epoch().count();
+ auto c_ret = c1 / new_size * n1 + c2 / new_size * n2;
+ return sea_time_point(sea_duration(c_ret));
+}
+
/* description of a new physical extent */
struct extent_t {
extent_types_t type; ///< type of extent
laddr_t addr; ///< laddr of extent (L_ADDR_NULL for non-logical)
ceph::bufferlist bl; ///< payload, bl.length() == length, aligned
- mod_time_point_t last_modified;
};
using extent_version_t = uint32_t;
extent_types_t type = extent_types_t::NONE;
laddr_t addr = L_ADDR_NULL;
extent_len_t len = 0;
- mod_time_point_t last_modified;
extent_info_t() = default;
extent_info_t(const extent_t &et)
: type(et.type), addr(et.addr),
- len(et.bl.length()),
- last_modified(et.last_modified)
+ len(et.bl.length())
{}
DENC(extent_info_t, v, p) {
denc(v.type, p);
denc(v.addr, p);
denc(v.len, p);
- denc(v.last_modified, p);
DENC_FINISH(p);
}
};
segment_type_t type;
- mod_time_point_t last_modified;
- mod_time_point_t last_rewritten;
+ mod_time_point_t modify_time;
+ std::size_t num_extents;
segment_type_t get_type() const {
return type;
denc(v.alloc_replay_from, p);
denc(v.segment_nonce, p);
denc(v.type, p);
- denc(v.last_modified, p);
- denc(v.last_rewritten, p);
+ denc(v.modify_time, p);
+ denc(v.num_extents, p);
DENC_FINISH(p);
}
};
std::vector<extent_t> extents;
std::vector<delta_info_t> deltas;
record_size_t size;
- mod_time_point_t commit_time;
- record_commit_type_t commit_type;
+ sea_time_point modify_time = NULL_TIME;
record_t() = default;
+
+ // unit test only
record_t(std::vector<extent_t>&& _extents,
std::vector<delta_info_t>&& _deltas) {
+ auto modify_time = seastar::lowres_system_clock::now();
for (auto& e: _extents) {
- push_back(std::move(e));
+ push_back(std::move(e), modify_time);
}
for (auto& d: _deltas) {
push_back(std::move(d));
return delta_size;
}
- void push_back(extent_t&& extent) {
+ void push_back(extent_t&& extent, sea_time_point &t) {
+ ceph_assert(t != NULL_TIME);
+ if (extents.size() == 0) {
+ assert(modify_time == NULL_TIME);
+ modify_time = t;
+ } else {
+ modify_time = get_average_time(modify_time, extents.size(), t, 1);
+ }
size.account(extent);
extents.push_back(std::move(extent));
}
struct record_header_t {
uint32_t deltas; // number of deltas
uint32_t extents; // number of extents
- mod_time_point_t commit_time = 0;
- record_commit_type_t commit_type;
+ mod_time_point_t modify_time;
DENC(record_header_t, v, p) {
DENC_START(1, 1, p);
denc(v.deltas, p);
denc(v.extents, p);
- denc(v.commit_time, p);
- denc(v.commit_type, p);
+ denc(v.modify_time, p);
DENC_FINISH(p);
}
};
struct record_deltas_t {
paddr_t record_block_base;
- // the mod time here can only be modification time, not rewritten time
- std::vector<std::pair<mod_time_point_t, delta_info_t>> deltas;
+ std::vector<std::pair<sea_time_point, delta_info_t>> deltas;
};
std::optional<std::vector<record_deltas_t> >
try_decode_deltas(
const auto &offsets,
const auto &e,
const journal_seq_t alloc_replay_from,
- auto last_modified)
+ auto modify_time)
{
auto start_seq = offsets.write_result.start_seq;
async_cleaner->update_journal_tail_target(
offsets.record_block_base,
e,
alloc_replay_from,
- last_modified);
+ modify_time);
});
}).safe_then([this] {
return journal->open_for_write();
async_cleaner->mark_space_used(
addr,
len ,
- seastar::lowres_system_clock::time_point(),
- seastar::lowres_system_clock::time_point(),
/* init_scan = */ true);
}
if (is_backref_node(type)) {
async_cleaner->mark_space_used(
backref.paddr,
backref.len,
- seastar::lowres_system_clock::time_point(),
- seastar::lowres_system_clock::time_point(),
true);
cache->update_tree_extents_num(backref.type, 1);
}
nlextent->get_bptr().c_str());
nlextent->set_laddr(lextent->get_laddr());
nlextent->set_pin(lextent->get_pin().duplicate());
- nlextent->last_modified = lextent->last_modified;
+ nlextent->set_modify_time(lextent->get_modify_time());
DEBUGT("rewriting logical extent -- {} to {}", t, *lextent, *nlextent);
TransactionManager::rewrite_extent_ret TransactionManager::rewrite_extent(
Transaction &t,
CachedExtentRef extent,
- reclaim_gen_t target_generation)
+ reclaim_gen_t target_generation,
+ sea_time_point modify_time)
{
LOG_PREFIX(TransactionManager::rewrite_extent);
extent->set_reclaim_generation(DIRTY_GENERATION);
} else {
extent->set_reclaim_generation(target_generation);
+ ceph_assert(modify_time != NULL_TIME);
+ extent->set_modify_time(modify_time);
}
t.get_rewrite_version_stats().increment(extent->get_version());
rewrite_extent_ret rewrite_extent(
Transaction &t,
CachedExtentRef extent,
- reclaim_gen_t target_generation) final;
+ reclaim_gen_t target_generation,
+ sea_time_point modify_time) final;
using AsyncCleaner::ExtentCallbackInterface::get_extent_if_live_ret;
get_extent_if_live_ret get_extent_if_live(
void update_segment_avail_bytes(segment_type_t, paddr_t) final {}
+ void update_modify_time(segment_id_t, sea_time_point, std::size_t) final {}
+
SegmentManagerGroup* get_segment_manager_group() final { return sms.get(); }
journal_seq_t get_dirty_extents_replay_from() const final {
void update_segment_avail_bytes(segment_type_t, paddr_t) final {}
+ void update_modify_time(segment_id_t, sea_time_point, std::size_t) final {}
+
SegmentManagerGroup* get_segment_manager_group() final { return sms.get(); }
seastar::future<> set_up_fut() final {