record.push_back(std::move(delta));
}
+ if (t.is_cleaner_transaction()) {
+ bufferlist bl;
+ encode(get_oldest_backref_dirty_from().value_or(JOURNAL_SEQ_NULL), bl);
+ delta_info_t delta;
+ delta.type = extent_types_t::ALLOC_TAIL;
+ delta.bl = bl;
+ record.push_back(std::move(delta));
+ }
+
ceph_assert(t.get_fresh_block_stats().num ==
t.inline_block_list.size() +
t.ool_block_list.size() +
journal_seq_t journal_seq,
paddr_t record_base,
const delta_info_t &delta,
+ const journal_seq_t &alloc_replay_from,
seastar::lowres_system_clock::time_point& last_modified)
{
LOG_PREFIX(Cache::replay_delta);
+ assert(alloc_replay_from != JOURNAL_SEQ_NULL);
if (delta.type == extent_types_t::ROOT) {
TRACE("replay root delta at {} {}, remove extent ... -- {}, prv_root={}",
journal_seq, record_base, delta, *root);
add_extent(root);
return replay_delta_ertr::now();
} else if (delta.type == extent_types_t::ALLOC_INFO) {
+ if (journal_seq < alloc_replay_from) {
+ DEBUG("journal_seq {} < alloc_replay_from {}, don't replay {}",
+ journal_seq, alloc_replay_from, delta);
+ return replay_delta_ertr::now();
+ }
may_roll_backref_buffer(journal_seq.offset);
alloc_delta_t alloc_delta;
decode(alloc_delta, delta.bl);
if (!backref_list.empty())
backref_batch_update(std::move(backref_list), journal_seq);
return replay_delta_ertr::now();
+ } else if (delta.type == extent_types_t::ALLOC_TAIL) {
+ // this delta should have been dealt with during segment cleaner mounting
+ return replay_delta_ertr::now();
} else {
auto _get_extent_if_cached = [this](paddr_t addr)
-> get_extent_ertr::future<CachedExtentRef> {
journal_seq_t seq,
paddr_t record_block_base,
const delta_info_t &delta,
+ const journal_seq_t &, // journal seq from which alloc
+ // delta should be replayed
seastar::lowres_system_clock::time_point& last_modified);
/**
journal_seq_t seq,
size_t max_bytes);
+ std::optional<journal_seq_t> get_oldest_backref_dirty_from() const {
+ LOG_PREFIX(Cache::get_oldest_backref_dirty_from);
+ journal_seq_t backref_oldest = JOURNAL_SEQ_NULL;
+ if (backref_bufs_to_flush.empty()) {
+ if (backref_buffer && !backref_buffer->backrefs.empty()) {
+ backref_oldest = backref_buffer->backrefs.begin()->first;
+ }
+ } else {
+ auto &oldest_buf = backref_bufs_to_flush.front();
+ backref_oldest = oldest_buf->backrefs.begin()->first;
+ }
+ if (backref_oldest == JOURNAL_SEQ_NULL) {
+ SUBDEBUG(seastore_cache, "backref_oldest: null");
+ return std::nullopt;
+ } else {
+ SUBDEBUG(seastore_cache, "backref_oldest: {}",
+ backref_oldest);
+ return backref_oldest;
+ }
+ }
+
/// returns std::nullopt if no dirty extents or get_dirty_from() for oldest
std::optional<journal_seq_t> get_oldest_dirty_from() const {
+ LOG_PREFIX(Cache::get_oldest_dirty_from);
if (dirty.empty()) {
+ SUBDEBUG(seastore_cache, "oldest: null");
return std::nullopt;
} else {
auto oldest = dirty.begin()->get_dirty_from();
if (oldest == JOURNAL_SEQ_NULL) {
+ SUBDEBUG(seastore_cache, "oldest: null");
return std::nullopt;
} else {
+ SUBDEBUG(seastore_cache, "oldest: {}", oldest);
return oldest;
}
}
using delta_handler_t = std::function<
replay_ret(const record_locator_t&,
const delta_info_t&,
+ const journal_seq_t, // journal seq from which
+ // alloc delta should replayed
seastar::lowres_system_clock::time_point last_modified)>;
virtual replay_ret replay(
delta_handler_t &&delta_handler) = 0;
).safe_then([this, FNAME, new_segment_seq](auto sref) {
// initialize new segment
journal_seq_t new_journal_tail;
+ journal_seq_t new_alloc_replay_from;
if (type == segment_type_t::JOURNAL) {
new_journal_tail = segment_provider.get_journal_tail_target();
+ new_alloc_replay_from = segment_provider.get_alloc_info_replay_from();
} else { // OOL
new_journal_tail = NO_DELTAS;
+ new_alloc_replay_from = NO_DELTAS;
}
segment_id_t segment_id = sref->get_segment_id();
auto header = segment_header_t{
new_segment_seq,
segment_id,
new_journal_tail,
+ new_alloc_replay_from,
current_segment_nonce,
type};
INFO("{} writing header to new segment ... -- {}",
}
auto close_seg_info = segment_provider.get_seg_info(close_segment_id);
journal_seq_t cur_journal_tail;
+ journal_seq_t new_alloc_replay_from;
if (type == segment_type_t::JOURNAL) {
cur_journal_tail = segment_provider.get_journal_tail_target();
+ new_alloc_replay_from = segment_provider.get_alloc_info_replay_from();
} else { // OOL
cur_journal_tail = NO_DELTAS;
+ new_alloc_replay_from = NO_DELTAS;
}
auto tail = segment_tail_t{
close_seg_info.seq,
close_segment_id,
cur_journal_tail,
+ new_alloc_replay_from,
current_segment_nonce,
type,
close_seg_info.last_modified.time_since_epoch().count(),
return handler(
locator,
delta,
+ segment_provider.get_alloc_info_replay_from(),
seastar::lowres_system_clock::time_point(
seastar::lowres_system_clock::duration(commit_time)));
});
// the following two types are not extent types,
// they are just used to indicates paddr allocation deltas
ALLOC_INFO = 9,
+ ALLOC_TAIL = 10,
// Test Block Types
- TEST_BLOCK = 10,
- TEST_BLOCK_PHYSICAL = 11,
- BACKREF_INTERNAL = 12,
- BACKREF_LEAF = 13,
+ TEST_BLOCK = 11,
+ TEST_BLOCK_PHYSICAL = 12,
+ BACKREF_INTERNAL = 13,
+ BACKREF_LEAF = 14,
// None and the number of valid extent_types_t
- NONE = 14,
+ NONE = 15,
};
using extent_types_le_t = uint8_t;
constexpr auto EXTENT_TYPES_MAX = static_cast<uint8_t>(extent_types_t::NONE);
segment_id_t physical_segment_id; // debugging
journal_seq_t journal_tail;
+ journal_seq_t alloc_replay_from;
segment_nonce_t segment_nonce;
segment_type_t type;
denc(v.segment_seq, p);
denc(v.physical_segment_id, p);
denc(v.journal_tail, p);
+ denc(v.alloc_replay_from, p);
denc(v.segment_nonce, p);
denc(v.type, p);
DENC_FINISH(p);
segment_id_t physical_segment_id; // debugging
journal_seq_t journal_tail;
+ journal_seq_t alloc_replay_from;
segment_nonce_t segment_nonce;
segment_type_t type;
denc(v.segment_seq, p);
denc(v.physical_segment_id, p);
denc(v.journal_tail, p);
+ denc(v.alloc_replay_from, p);
denc(v.segment_nonce, p);
denc(v.type, p);
denc(v.last_modified, p);
return NULL_SEG_ID;
}
-void SegmentCleaner::update_journal_tail_target(journal_seq_t target)
+void SegmentCleaner::update_journal_tail_target(
+ journal_seq_t dirty_replay_from,
+ journal_seq_t alloc_replay_from)
{
+ logger().debug(
+ "{}: {}, current dirty_extents_replay_from {}",
+ __func__,
+ dirty_replay_from,
+ dirty_extents_replay_from);
+ if (dirty_extents_replay_from == JOURNAL_SEQ_NULL
+ || dirty_replay_from > dirty_extents_replay_from) {
+ dirty_extents_replay_from = dirty_replay_from;
+ }
+
+ update_alloc_info_replay_from(alloc_replay_from);
+
+ journal_seq_t target = std::min(dirty_replay_from, alloc_replay_from);
logger().debug(
"{}: {}, current tail target {}",
__func__,
target,
journal_tail_target);
- assert(journal_tail_target == JOURNAL_SEQ_NULL || target >= journal_tail_target);
if (journal_tail_target == JOURNAL_SEQ_NULL || target > journal_tail_target) {
journal_tail_target = target;
}
maybe_wake_gc_blocked_io();
}
+void SegmentCleaner::update_alloc_info_replay_from(
+ journal_seq_t alloc_replay_from)
+{
+ logger().debug(
+ "{}: {}, current alloc_info_replay_from {}",
+ __func__,
+ alloc_replay_from,
+ alloc_info_replay_from);
+ if (alloc_info_replay_from == JOURNAL_SEQ_NULL
+ || alloc_replay_from > alloc_info_replay_from) {
+ alloc_info_replay_from = alloc_replay_from;
+ }
+}
+
void SegmentCleaner::update_journal_tail_committed(journal_seq_t committed)
{
if (journal_tail_committed == JOURNAL_SEQ_NULL ||
journal_tail_target = JOURNAL_SEQ_NULL;
journal_tail_committed = JOURNAL_SEQ_NULL;
journal_head = JOURNAL_SEQ_NULL;
+ dirty_extents_replay_from = JOURNAL_SEQ_NULL;
+ alloc_info_replay_from = JOURNAL_SEQ_NULL;
space_tracker.reset(
detailed ?
time_point last_rewritten(duration(tail.last_rewritten));
segments.update_last_modified_rewritten(
segment_id, last_modified, last_rewritten);
+ if (tail.get_type() == segment_type_t::JOURNAL) {
+ update_journal_tail_committed(tail.journal_tail);
+ update_journal_tail_target(
+ tail.journal_tail,
+ tail.alloc_replay_from);
+ }
init_mark_segment_closed(
segment_id,
header.segment_seq,
scan_extents_ret_bare& segment_set,
segment_id_t segment_id)
{
- if (header.get_type() == segment_type_t::OOL) {
- logger().info(
- "SegmentCleaner::scan_nonfull_segment: out-of-line segment {}",
- segment_id);
+ return seastar::do_with(
+ scan_valid_records_cursor({
+ segments[segment_id].seq,
+ paddr_t::make_seg_paddr(segment_id, 0)}),
+ [this, segment_id, segment_header=header](auto& cursor) {
return seastar::do_with(
- scan_valid_records_cursor({
- segments[segment_id].seq,
- paddr_t::make_seg_paddr(segment_id, 0)}),
- [this, segment_id, header](auto& cursor) {
- return seastar::do_with(
- SegmentManagerGroup::found_record_handler_t([this, segment_id](
- record_locator_t locator,
- const record_group_header_t& header,
- const bufferlist& mdbuf
- ) mutable -> SegmentManagerGroup::scan_valid_records_ertr::future<> {
- LOG_PREFIX(SegmentCleaner::scan_nonfull_segment);
- DEBUG("decodeing {} records", header.records);
+ SegmentManagerGroup::found_record_handler_t(
+ [this, segment_id, segment_header](
+ record_locator_t locator,
+ const record_group_header_t& header,
+ const bufferlist& mdbuf
+ ) mutable -> SegmentManagerGroup::scan_valid_records_ertr::future<> {
+ LOG_PREFIX(SegmentCleaner::scan_nonfull_segment);
+ if (segment_header.get_type() == segment_type_t::OOL) {
+ DEBUG("out-of-line segment {}, decodeing {} records",
+ segment_id,
+ header.records);
auto maybe_headers = try_decode_record_headers(header, mdbuf);
if (!maybe_headers) {
ERROR("unable to decode record headers for record group {}",
segments.update_last_modified_rewritten(segment_id, {}, commit_time);
}
}
- return seastar::now();
- }),
- [&cursor, header, this](auto& handler) {
- return sm_group->scan_valid_records(
- cursor,
- header.segment_nonce,
- segments.get_segment_size(),
- handler);
+ } else {
+ DEBUG("inline segment {}, decodeing {} records",
+ segment_id,
+ header.records);
+ auto maybe_record_deltas_list = try_decode_deltas(
+ header, mdbuf, locator.record_block_base);
+ if (!maybe_record_deltas_list) {
+ ERROR("unable to decode deltas for record {} at {}",
+ header, locator);
+ return crimson::ct_error::input_output_error::make();
+ }
+ for (auto &record_deltas : *maybe_record_deltas_list) {
+ for (auto &[ctime, delta] : record_deltas.deltas) {
+ if (delta.type == extent_types_t::ALLOC_TAIL) {
+ journal_seq_t seq;
+ decode(seq, delta.bl);
+ update_alloc_info_replay_from(seq);
+ }
+ }
+ }
}
- );
- }).safe_then([this, segment_id, header](auto) {
- init_mark_segment_closed(
- segment_id,
- header.segment_seq,
- header.type);
- return seastar::now();
- });
- } else if (header.get_type() == segment_type_t::JOURNAL) {
- logger().info(
- "SegmentCleaner::scan_nonfull_segment: journal segment {}",
- segment_id);
- segment_set.emplace_back(std::make_pair(segment_id, std::move(header)));
- } else {
- ceph_abort("unexpected segment type");
- }
- init_mark_segment_closed(
- segment_id,
- header.segment_seq,
- header.type);
- return seastar::now();
+ return seastar::now();
+ }),
+ [&cursor, segment_header, this](auto& handler) {
+ return sm_group->scan_valid_records(
+ cursor,
+ segment_header.segment_nonce,
+ segments.get_segment_size(),
+ handler);
+ }
+ );
+ }).safe_then([this, segment_id, header](auto) {
+ init_mark_segment_closed(
+ segment_id,
+ header.segment_seq,
+ header.type);
+ return seastar::now();
+ });
}
SegmentCleaner::release_ertr::future<>
virtual segment_id_t allocate_segment(
segment_seq_t seq, segment_type_t type) = 0;
+ virtual journal_seq_t get_dirty_extents_replay_from() const = 0;
+
+ virtual journal_seq_t get_alloc_info_replay_from() const = 0;
+
virtual void close_segment(segment_id_t) = 0;
virtual void update_journal_tail_committed(journal_seq_t tail_committed) = 0;
/// target journal_tail for next fresh segment
journal_seq_t journal_tail_target;
+ /// target replay_from for dirty extents
+ journal_seq_t dirty_extents_replay_from;
+
+ /// target replay_from for alloc infos
+ journal_seq_t alloc_info_replay_from;
+
/// most recently committed journal_tail
journal_seq_t journal_tail_committed;
return sm_group.get();
}
- void update_journal_tail_target(journal_seq_t target);
+ journal_seq_t get_dirty_extents_replay_from() const final {
+ return dirty_extents_replay_from;
+ }
+
+ journal_seq_t get_alloc_info_replay_from() const final {
+ return alloc_info_replay_from;
+ }
+
+ void update_journal_tail_target(
+ journal_seq_t dirty_replay_from,
+ journal_seq_t alloc_replay_from);
+
+ void update_alloc_info_replay_from(
+ journal_seq_t alloc_replay_from);
void init_mkfs(journal_seq_t head) {
journal_tail_target = head;
return src;
}
+ bool is_cleaner_transaction() const {
+ return src >= Transaction::src_t::CLEANER_TRIM;
+ }
+
bool is_weak() const {
return weak;
}
return segment_cleaner->mount(
).safe_then([this] {
return journal->replay(
- [this](const auto &offsets, const auto &e, auto last_modified) {
+ [this](
+ const auto &offsets,
+ const auto &e,
+ const journal_seq_t alloc_replay_from,
+ auto last_modified)
+ {
auto start_seq = offsets.write_result.start_seq;
segment_cleaner->update_journal_tail_target(
- cache->get_oldest_dirty_from().value_or(start_seq));
+ cache->get_oldest_dirty_from().value_or(start_seq),
+ cache->get_oldest_backref_dirty_from().value_or(start_seq));
return cache->replay_delta(
start_seq,
offsets.record_block_base,
e,
+ alloc_replay_from,
last_modified);
});
}).safe_then([this] {
backref_manager->complete_transaction(tref, backref_to_clear, backref_to_link);
segment_cleaner->update_journal_tail_target(
- cache->get_oldest_dirty_from().value_or(start_seq));
+ cache->get_oldest_dirty_from().value_or(start_seq),
+ cache->get_oldest_backref_dirty_from().value_or(start_seq));
return segment_cleaner->maybe_release_segment(tref);
}).safe_then([FNAME, &tref] {
SUBTRACET(seastore_t, "completed", tref);
SegmentManagerGroup* get_segment_manager_group() final { return sms.get(); }
+ journal_seq_t get_dirty_extents_replay_from() const final {
+ return JOURNAL_SEQ_NULL;
+ }
+
+ journal_seq_t get_alloc_info_replay_from() const final {
+ return JOURNAL_SEQ_NULL;
+ }
+
virtual void complete_commit(Transaction &t) {}
seastar::future<> submit_transaction(TransactionRef t)
{
return tmp_info;
}
+ journal_seq_t get_dirty_extents_replay_from() const final {
+ return JOURNAL_SEQ_NULL;
+ }
+
+ journal_seq_t get_alloc_info_replay_from() const final {
+ return JOURNAL_SEQ_NULL;
+ }
+
segment_id_t allocate_segment(
segment_seq_t seq,
segment_type_t type
replay(
[&advance,
&delta_checker]
- (const auto &offsets, const auto &di, auto t) mutable {
+ (const auto &offsets,
+ const auto &di,
+ const journal_seq_t,
+ auto t) mutable {
if (!delta_checker) {
EXPECT_FALSE("No Deltas Left");
}