From e135b79e6eb6c32a9f0fb230549b50b2a56d625f Mon Sep 17 00:00:00 2001 From: myoungwon oh Date: Wed, 2 Nov 2022 16:10:31 +0900 Subject: [PATCH] crimson/os/seastore/cbj: add deallocation map during replay to filter out out-dated delta Signed-off-by: Myoungwon Oh --- src/crimson/os/seastore/cache.cc | 12 +- .../journal/circular_bounded_journal.cc | 249 +++++++++++------- .../journal/circular_bounded_journal.h | 10 + 3 files changed, 159 insertions(+), 112 deletions(-) diff --git a/src/crimson/os/seastore/cache.cc b/src/crimson/os/seastore/cache.cc index 80d09889b1b2e..4cedc835311eb 100644 --- a/src/crimson/os/seastore/cache.cc +++ b/src/crimson/os/seastore/cache.cc @@ -1760,17 +1760,7 @@ Cache::replay_delta( DEBUG("replay extent delta at {} {} ... -- {}, prv_extent={}", journal_seq, record_base, delta, *extent); - if (extent->last_committed_crc != delta.prev_crc) { - // FIXME: we can't rely on crc to detect whether is delta is - // out-of-date. - ERROR("identified delta crc {} doesn't match the extent at {} {}, " - "probably is out-dated -- {}", - delta, journal_seq, record_base, *extent); - ceph_assert(epm.get_journal_type() == journal_type_t::RANDOM_BLOCK); - remove_extent(extent); - return replay_delta_ertr::make_ready_future(false); - } - + assert(extent->last_committed_crc == delta.prev_crc); assert(extent->version == delta.pversion); extent->apply_delta_and_adjust_crc(record_base, delta.bl); extent->set_modify_time(modify_time); diff --git a/src/crimson/os/seastore/journal/circular_bounded_journal.cc b/src/crimson/os/seastore/journal/circular_bounded_journal.cc index 003d6546bce25..a4e234a8a90d5 100644 --- a/src/crimson/os/seastore/journal/circular_bounded_journal.cc +++ b/src/crimson/os/seastore/journal/circular_bounded_journal.cc @@ -245,6 +245,110 @@ CircularBoundedJournal::read_header() }); } +Journal::replay_ret CircularBoundedJournal::scan_valid_record_delta( + cbj_delta_handler_t &&delta_handler, journal_seq_t tail) +{ + LOG_PREFIX(CircularBoundedJournal::scan_valid_record_delta); + return seastar::do_with( + bool(false), + rbm_abs_addr(get_rbm_addr(tail)), + std::move(delta_handler), + segment_seq_t(NULL_SEG_SEQ), + [this, FNAME](auto &is_rolled, auto &cursor_addr, auto &d_handler, auto &expected_seq) { + return crimson::repeat( + [this, &is_rolled, &cursor_addr, &d_handler, &expected_seq, FNAME]() mutable + -> replay_ertr::future { + paddr_t record_paddr = convert_abs_addr_to_paddr( + cursor_addr, + get_device_id()); + return read_record(record_paddr, expected_seq + ).safe_then([this, &is_rolled, &cursor_addr, &d_handler, &expected_seq, FNAME](auto ret) + -> replay_ertr::future { + if (!ret.has_value()) { + if (expected_seq == NULL_SEG_SEQ || is_rolled) { + DEBUG("no more records, stop replaying"); + return replay_ertr::make_ready_future< + seastar::stop_iteration>(seastar::stop_iteration::yes); + } else { + cursor_addr = get_records_start(); + ++expected_seq; + is_rolled = true; + return replay_ertr::make_ready_future< + seastar::stop_iteration>(seastar::stop_iteration::no); + } + } + auto [r_header, bl] = *ret; + bufferlist mdbuf; + mdbuf.substr_of(bl, 0, r_header.mdlength); + paddr_t record_block_base = paddr_t::make_blk_paddr( + get_device_id(), cursor_addr + r_header.mdlength); + auto maybe_record_deltas_list = try_decode_deltas( + r_header, mdbuf, record_block_base); + if (!maybe_record_deltas_list) { + // This should be impossible, we did check the crc on the mdbuf + ERROR("unable to decode deltas for record {} at {}", + r_header, record_block_base); + return crimson::ct_error::input_output_error::make(); + } + DEBUG("{} at {}", r_header, cursor_addr); + auto write_result = write_result_t{ + r_header.committed_to, + bl.length() + }; + if (expected_seq == NULL_SEG_SEQ) { + expected_seq = r_header.committed_to.segment_seq; + } else { + assert(expected_seq == r_header.committed_to.segment_seq); + } + cursor_addr += bl.length(); + if (cursor_addr >= get_journal_end()) { + assert(cursor_addr == get_journal_end()); + cursor_addr = get_records_start(); + ++expected_seq; + is_rolled = true; + } + paddr_t addr = convert_abs_addr_to_paddr( + cursor_addr, + get_device_id()); + set_written_to( + journal_seq_t{expected_seq, addr}); + return seastar::do_with( + std::move(*maybe_record_deltas_list), + [write_result, + &d_handler, + FNAME](auto& record_deltas_list) { + return crimson::do_for_each( + record_deltas_list, + [write_result, + &d_handler, FNAME](record_deltas_t& record_deltas) { + auto locator = record_locator_t{ + record_deltas.record_block_base, + write_result + }; + DEBUG("processing {} deltas at block_base {}", + record_deltas.deltas.size(), + locator); + return crimson::do_for_each( + record_deltas.deltas, + [locator, + &d_handler](auto& p) { + auto& modify_time = p.first; + auto& delta = p.second; + return d_handler( + locator, + delta, + modify_time).discard_result(); + }); + }).safe_then([]() { + return replay_ertr::make_ready_future< + seastar::stop_iteration>(seastar::stop_iteration::no); + }); + }); + }); + }); + }); +} + Journal::replay_ret CircularBoundedJournal::replay( delta_handler_t &&delta_handler) { @@ -257,116 +361,59 @@ Journal::replay_ret CircularBoundedJournal::replay( open_for_mount_ertr::pass_further{}, crimson::ct_error::assert_all{ "Invalid error read_header" - }).safe_then([this, FNAME, delta_handler=std::move(delta_handler)](auto p) mutable { + }).safe_then([this, FNAME, delta_handler=std::move(delta_handler)](auto p) + mutable { auto &[head, bl] = *p; header = head; DEBUG("header : {}", header); initialized = true; - written_to.segment_seq = NULL_SEG_SEQ; - auto tail = get_dirty_tail() <= get_alloc_tail() ? - get_dirty_tail() : get_alloc_tail(); - set_written_to(tail); return seastar::do_with( - bool(false), - rbm_abs_addr(get_rbm_addr(tail)), std::move(delta_handler), - segment_seq_t(NULL_SEG_SEQ), - [this, FNAME](auto &is_rolled, auto &cursor_addr, auto &d_handler, auto &expected_seq) { - return crimson::repeat( - [this, &is_rolled, &cursor_addr, &d_handler, &expected_seq, FNAME]() mutable - -> replay_ertr::future { - paddr_t record_paddr = convert_abs_addr_to_paddr( - cursor_addr, - get_device_id()); - return read_record(record_paddr, expected_seq - ).safe_then([this, &is_rolled, &cursor_addr, &d_handler, &expected_seq, FNAME](auto ret) - -> replay_ertr::future { - if (!ret.has_value()) { - if (expected_seq == NULL_SEG_SEQ || is_rolled) { - DEBUG("no more records, stop replaying"); - return replay_ertr::make_ready_future< - seastar::stop_iteration>(seastar::stop_iteration::yes); - } else { - cursor_addr = get_records_start(); - ++expected_seq; - is_rolled = true; - return replay_ertr::make_ready_future< - seastar::stop_iteration>(seastar::stop_iteration::no); + std::map(), + [this](auto &d_handler, auto &map) { + auto build_paddr_seq_map = [&map]( + const auto &offsets, + const auto &e, + sea_time_point modify_time) + { + if (e.type == extent_types_t::ALLOC_INFO) { + alloc_delta_t alloc_delta; + decode(alloc_delta, e.bl); + if (alloc_delta.op == alloc_delta_t::op_types_t::CLEAR) { + for (auto &alloc_blk : alloc_delta.alloc_blk_ranges) { + map[alloc_blk.paddr] = offsets.write_result.start_seq; } } - auto [r_header, bl] = *ret; - bufferlist mdbuf; - mdbuf.substr_of(bl, 0, r_header.mdlength); - paddr_t record_block_base = paddr_t::make_blk_paddr( - get_device_id(), cursor_addr + r_header.mdlength); - auto maybe_record_deltas_list = try_decode_deltas( - r_header, mdbuf, record_block_base); - if (!maybe_record_deltas_list) { - // This should be impossible, we did check the crc on the mdbuf - ERROR("unable to decode deltas for record {} at {}", - r_header, record_block_base); - return crimson::ct_error::input_output_error::make(); + } + return replay_ertr::make_ready_future(true); + }; + written_to.segment_seq = NULL_SEG_SEQ; + auto tail = get_dirty_tail() <= get_alloc_tail() ? + get_dirty_tail() : get_alloc_tail(); + set_written_to(tail); + // The first pass to build the paddr->journal_seq_t map + // from extent allocations + return scan_valid_record_delta(std::move(build_paddr_seq_map), tail + ).safe_then([this, &map, &d_handler, tail]() { + auto call_d_handler_if_valid = [this, &map, &d_handler]( + const auto &offsets, + const auto &e, + sea_time_point modify_time) + { + if (map.find(e.paddr) == map.end() || + map[e.paddr] <= offsets.write_result.start_seq) { + return d_handler( + offsets, + e, + header.dirty_tail, + header.alloc_tail, + modify_time + ); } - DEBUG("{} at {}", r_header, cursor_addr); - auto write_result = write_result_t{ - r_header.committed_to, - bl.length() - }; - if (expected_seq == NULL_SEG_SEQ) { - expected_seq = r_header.committed_to.segment_seq; - } else { - assert(expected_seq == r_header.committed_to.segment_seq); - } - cursor_addr += bl.length(); - if (cursor_addr >= get_journal_end()) { - assert(cursor_addr == get_journal_end()); - cursor_addr = get_records_start(); - ++expected_seq; - is_rolled = true; - } - paddr_t addr = convert_abs_addr_to_paddr( - cursor_addr, - get_device_id()); - set_written_to( - journal_seq_t{expected_seq, addr}); - return seastar::do_with( - std::move(*maybe_record_deltas_list), - [this, - write_result, - &d_handler, - FNAME](auto& record_deltas_list) { - return crimson::do_for_each( - record_deltas_list, - [this, - write_result, - &d_handler, FNAME](record_deltas_t& record_deltas) { - auto locator = record_locator_t{ - record_deltas.record_block_base, - write_result - }; - DEBUG("processing {} deltas at block_base {}", - record_deltas.deltas.size(), - locator); - return crimson::do_for_each( - record_deltas.deltas, - [this, - locator, - &d_handler](auto& p) { - auto& modify_time = p.first; - auto& delta = p.second; - return d_handler( - locator, - delta, - header.dirty_tail, - header.alloc_tail, - modify_time).discard_result(); - }); - }).safe_then([]() { - return replay_ertr::make_ready_future< - seastar::stop_iteration>(seastar::stop_iteration::no); - }); - }); - }); + return replay_ertr::make_ready_future(true); + }; + // The second pass to replay deltas + return scan_valid_record_delta(std::move(call_d_handler_if_valid), tail); }); }).safe_then([this]() { trimmer.update_journal_tails( diff --git a/src/crimson/os/seastore/journal/circular_bounded_journal.h b/src/crimson/os/seastore/journal/circular_bounded_journal.h index 09be167d97062..2977872a5e749 100644 --- a/src/crimson/os/seastore/journal/circular_bounded_journal.h +++ b/src/crimson/os/seastore/journal/circular_bounded_journal.h @@ -277,6 +277,16 @@ public: } seastar::future<> finish_commit(transaction_type_t type) final; + using cbj_delta_handler_t = std::function< + replay_ertr::future( + const record_locator_t&, + const delta_info_t&, + sea_time_point modify_time)>; + + Journal::replay_ret scan_valid_record_delta( + cbj_delta_handler_t &&delta_handler, + journal_seq_t tail); + private: cbj_header_t header; JournalTrimmer &trimmer; -- 2.39.5