From: myoungwon oh Date: Sat, 22 Jul 2023 06:03:04 +0000 (+0000) Subject: crimson/os/seastore: introduce generalized scan_valid_records for RBM X-Git-Tag: v19.0.0~659^2~5 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=a580e516965ff8ae57dbaab202854f9ebe38ba5f;p=ceph-ci.git crimson/os/seastore: introduce generalized scan_valid_records for RBM Signed-off-by: Myoungwon Oh --- diff --git a/src/crimson/os/seastore/journal/circular_bounded_journal.cc b/src/crimson/os/seastore/journal/circular_bounded_journal.cc index 8d0de4e18ec..a92b9ecbd6b 100644 --- a/src/crimson/os/seastore/journal/circular_bounded_journal.cc +++ b/src/crimson/os/seastore/journal/circular_bounded_journal.cc @@ -111,115 +111,175 @@ CircularBoundedJournal::do_submit_record( }); } +RecordScanner::read_validate_record_metadata_ret CircularBoundedJournal::read_validate_record_metadata( + scan_valid_records_cursor &cursor, + segment_nonce_t nonce) +{ + LOG_PREFIX(CircularBoundedJournal::read_validate_record_metadata); + paddr_t start = cursor.seq.offset; + return read_record(start, 0 + ).safe_then([FNAME, &cursor](auto ret) { + if (!ret.has_value()) { + return read_validate_record_metadata_ret( + read_validate_record_metadata_ertr::ready_future_marker{}, + std::nullopt); + } + auto [r_header, bl] = *ret; + if ((cursor.last_committed != JOURNAL_SEQ_NULL && + cursor.last_committed > r_header.committed_to) || + r_header.committed_to.segment_seq != cursor.seq.segment_seq) { + DEBUG("invalid header: {}", r_header); + return read_validate_record_metadata_ret( + read_validate_record_metadata_ertr::ready_future_marker{}, + std::nullopt); + } + + bufferlist mdbuf; + mdbuf.substr_of(bl, 0, r_header.mdlength); + DEBUG("header: {}", r_header); + return read_validate_record_metadata_ret( + read_validate_record_metadata_ertr::ready_future_marker{}, + std::make_pair(std::move(r_header), std::move(mdbuf))); + }); +} + +RecordScanner::read_validate_data_ret CircularBoundedJournal::read_validate_data( + paddr_t record_base, + const record_group_header_t &header) +{ + return read_record(record_base, header.segment_nonce + ).safe_then([](auto ret) { + // read_record would return non-empty value if the record is valid + if (!ret.has_value()) { + return read_validate_data_ret( + read_validate_data_ertr::ready_future_marker{}, + false); + } + return read_validate_data_ertr::make_ready_future(true); + }); +} + +Journal::replay_ret CircularBoundedJournal::replay_segment( + cbj_delta_handler_t &handler, scan_valid_records_cursor& cursor) +{ + LOG_PREFIX(Journal::replay_segment); + return seastar::do_with( + RecordScanner::found_record_handler_t( + [this, &handler, FNAME]( + record_locator_t locator, + const record_group_header_t& r_header, + const bufferlist& mdbuf) + -> RecordScanner::scan_valid_records_ertr::future<> + { + auto maybe_record_deltas_list = try_decode_deltas( + r_header, mdbuf, locator.record_block_base); + if (!maybe_record_deltas_list) { + // This should be impossible, we did check the crc on the mdbuf + ERROR("unable to decode deltas for record {} at {}", + r_header, locator.record_block_base); + return crimson::ct_error::input_output_error::make(); + } + auto cursor_addr = convert_paddr_to_abs_addr(r_header.committed_to.offset); + DEBUG("{} at {}", r_header, cursor_addr); + auto write_result = write_result_t{ + r_header.committed_to, + r_header.mdlength + r_header.dlength + }; + auto expected_seq = r_header.committed_to.segment_seq; + cursor_addr += (r_header.mdlength + r_header.dlength); + if (cursor_addr >= get_journal_end()) { + cursor_addr = get_records_start(); + ++expected_seq; + paddr_t addr = convert_abs_addr_to_paddr( + cursor_addr, + get_device_id()); + write_result.start_seq.offset = addr; + write_result.start_seq.segment_seq = expected_seq; + } + paddr_t addr = convert_abs_addr_to_paddr( + cursor_addr, + get_device_id()); + set_written_to( + journal_seq_t{expected_seq, addr}); + return seastar::do_with( + std::move(*maybe_record_deltas_list), + [write_result, + &handler, + FNAME](auto& record_deltas_list) { + return crimson::do_for_each( + record_deltas_list, + [write_result, + &handler, FNAME](record_deltas_t& record_deltas) { + auto locator = record_locator_t{ + record_deltas.record_block_base, + write_result + }; + DEBUG("processing {} deltas at block_base {}", + record_deltas.deltas.size(), + locator); + return crimson::do_for_each( + record_deltas.deltas, + [locator, + &handler](auto& p) { + auto& modify_time = p.first; + auto& delta = p.second; + return handler( + locator, + delta, + modify_time).discard_result(); + }); + }); + }); + }), + [=, this, &cursor](auto &dhandler) { + return scan_valid_records( + cursor, + 0, + std::numeric_limits::max(), + dhandler).safe_then([](auto){} + ).handle_error( + replay_ertr::pass_further{}, + crimson::ct_error::assert_all{ + "shouldn't meet with any other error other replay_ertr" + } + ); + } + ); +} + + Journal::replay_ret CircularBoundedJournal::scan_valid_record_delta( - cbj_delta_handler_t &&delta_handler, journal_seq_t tail) + cbj_delta_handler_t &&handler, journal_seq_t tail) { - LOG_PREFIX(CircularBoundedJournal::scan_valid_record_delta); + LOG_PREFIX(Journal::scan_valid_record_delta); + INFO("starting at {} ", tail); return seastar::do_with( + scan_valid_records_cursor(tail), + std::move(handler), bool(false), - rbm_abs_addr(get_rbm_addr(tail)), - std::move(delta_handler), - segment_seq_t(NULL_SEG_SEQ), - [this, FNAME](auto &is_rolled, auto &cursor_addr, auto &d_handler, auto &expected_seq) { - return crimson::repeat( - [this, &is_rolled, &cursor_addr, &d_handler, &expected_seq, FNAME]() mutable - -> replay_ertr::future { - paddr_t record_paddr = convert_abs_addr_to_paddr( - cursor_addr, - get_device_id()); - return read_record(record_paddr, expected_seq - ).safe_then([this, &is_rolled, &cursor_addr, &d_handler, &expected_seq, FNAME](auto ret) - -> replay_ertr::future { - if (!ret.has_value()) { - if (expected_seq == NULL_SEG_SEQ || is_rolled) { - DEBUG("no more records, stop replaying"); - return replay_ertr::make_ready_future< - seastar::stop_iteration>(seastar::stop_iteration::yes); - } else { - cursor_addr = get_records_start(); - ++expected_seq; - is_rolled = true; - return replay_ertr::make_ready_future< - seastar::stop_iteration>(seastar::stop_iteration::no); - } - } - auto [r_header, bl] = *ret; - bufferlist mdbuf; - mdbuf.substr_of(bl, 0, r_header.mdlength); - paddr_t record_block_base = paddr_t::make_blk_paddr( - get_device_id(), cursor_addr + r_header.mdlength); - auto maybe_record_deltas_list = try_decode_deltas( - r_header, mdbuf, record_block_base); - if (!maybe_record_deltas_list) { - // This should be impossible, we did check the crc on the mdbuf - ERROR("unable to decode deltas for record {} at {}", - r_header, record_block_base); - return crimson::ct_error::input_output_error::make(); - } - DEBUG("{} at {}", r_header, cursor_addr); - auto write_result = write_result_t{ - r_header.committed_to, - bl.length() - }; - if (expected_seq == NULL_SEG_SEQ) { - expected_seq = r_header.committed_to.segment_seq; - } else { - assert(expected_seq == r_header.committed_to.segment_seq); - } - cursor_addr += bl.length(); - if (cursor_addr >= get_journal_end()) { - assert(cursor_addr == get_journal_end()); - cursor_addr = get_records_start(); - ++expected_seq; - paddr_t addr = convert_abs_addr_to_paddr( - cursor_addr, - get_device_id()); - write_result.start_seq.offset = addr; - write_result.start_seq.segment_seq = expected_seq; - is_rolled = true; - } - paddr_t addr = convert_abs_addr_to_paddr( - cursor_addr, - get_device_id()); - set_written_to( - journal_seq_t{expected_seq, addr}); - return seastar::do_with( - std::move(*maybe_record_deltas_list), - [write_result, - &d_handler, - FNAME](auto& record_deltas_list) { - return crimson::do_for_each( - record_deltas_list, - [write_result, - &d_handler, FNAME](record_deltas_t& record_deltas) { - auto locator = record_locator_t{ - record_deltas.record_block_base, - write_result - }; - DEBUG("processing {} deltas at block_base {}", - record_deltas.deltas.size(), - locator); - return crimson::do_for_each( - record_deltas.deltas, - [locator, - &d_handler](auto& p) { - auto& modify_time = p.first; - auto& delta = p.second; - return d_handler( - locator, - delta, - modify_time).discard_result(); - }); - }).safe_then([]() { - return replay_ertr::make_ready_future< - seastar::stop_iteration>(seastar::stop_iteration::no); - }); - }); + [this] (auto &cursor, auto &handler, auto &rolled) { + return crimson::repeat([this, &handler, &cursor, &rolled]() + -> replay_ertr::future + { + return replay_segment(handler, cursor + ).safe_then([this, &cursor, &rolled] { + if (!rolled) { + cursor.last_valid_header_found = false; + } + if (!cursor.is_complete()) { + try_read_rolled_header(cursor); + rolled = true; + return replay_ertr::make_ready_future< + seastar::stop_iteration>(seastar::stop_iteration::no); + } + return replay_ertr::make_ready_future< + seastar::stop_iteration>(seastar::stop_iteration::yes); }); }); }); } + Journal::replay_ret CircularBoundedJournal::replay( delta_handler_t &&delta_handler) { diff --git a/src/crimson/os/seastore/journal/circular_bounded_journal.h b/src/crimson/os/seastore/journal/circular_bounded_journal.h index bb3e2a86065..3c696f99704 100644 --- a/src/crimson/os/seastore/journal/circular_bounded_journal.h +++ b/src/crimson/os/seastore/journal/circular_bounded_journal.h @@ -21,6 +21,7 @@ #include #include "crimson/os/seastore/journal/record_submitter.h" #include "crimson/os/seastore/journal/circular_journal_space.h" +#include "crimson/os/seastore/record_scanner.h" namespace crimson::os::seastore::journal { @@ -55,7 +56,7 @@ using RBMDevice = random_block_device::RBMDevice; constexpr uint64_t DEFAULT_BLOCK_SIZE = 4096; -class CircularBoundedJournal : public Journal { +class CircularBoundedJournal : public Journal, RecordScanner { public: CircularBoundedJournal( JournalTrimmer &trimmer, RBMDevice* device, const std::string &path); @@ -179,6 +180,27 @@ public: submit_record_ret do_submit_record(record_t &&record, OrderingHandle &handle); + void try_read_rolled_header(scan_valid_records_cursor &cursor) { + paddr_t addr = convert_abs_addr_to_paddr( + get_records_start(), + get_device_id()); + cursor.seq.offset = addr; + cursor.seq.segment_seq += 1; + } + + void initialize_cursor(scan_valid_records_cursor& cursor) final {}; + + Journal::replay_ret replay_segment( + cbj_delta_handler_t &handler, scan_valid_records_cursor& cursor); + + read_validate_record_metadata_ret read_validate_record_metadata( + scan_valid_records_cursor &cursor, + segment_nonce_t nonce) final; + + read_validate_data_ret read_validate_data( + paddr_t record_base, + const record_group_header_t &header) final; + // Test interfaces CircularJournalSpace& get_cjs() { diff --git a/src/crimson/os/seastore/record_scanner.cc b/src/crimson/os/seastore/record_scanner.cc index f3ed54d0164..74bfdeb7cfa 100644 --- a/src/crimson/os/seastore/record_scanner.cc +++ b/src/crimson/os/seastore/record_scanner.cc @@ -26,7 +26,7 @@ RecordScanner::scan_valid_records( -> scan_valid_records_ertr::future { return [=, &handler, &cursor, &budget_used, this] { if (!cursor.last_valid_header_found) { - return read_validate_record_metadata(cursor.seq.offset, nonce + return read_validate_record_metadata(cursor, nonce ).safe_then([=, &cursor](auto md) { if (!md) { cursor.last_valid_header_found = true; diff --git a/src/crimson/os/seastore/record_scanner.h b/src/crimson/os/seastore/record_scanner.h index c8486f59013..10569ef4e5d 100644 --- a/src/crimson/os/seastore/record_scanner.h +++ b/src/crimson/os/seastore/record_scanner.h @@ -38,7 +38,7 @@ protected: std::optional> >; virtual read_validate_record_metadata_ret read_validate_record_metadata( - paddr_t start, + scan_valid_records_cursor &cursor, segment_nonce_t nonce) = 0; /// read and validate data diff --git a/src/crimson/os/seastore/seastore_types.h b/src/crimson/os/seastore/seastore_types.h index 2dc038dfa7b..9fd008f4bec 100644 --- a/src/crimson/os/seastore/seastore_types.h +++ b/src/crimson/os/seastore/seastore_types.h @@ -2083,9 +2083,7 @@ struct scan_valid_records_cursor { } void increment_seq(segment_off_t off) { - auto& seg_addr = seq.offset.as_seg_paddr(); - seg_addr.set_segment_off( - seg_addr.get_segment_off() + off); + seq.offset = seq.offset.add_offset(off); } void emplace_record_group(const record_group_header_t&, ceph::bufferlist&&); diff --git a/src/crimson/os/seastore/segment_manager_group.cc b/src/crimson/os/seastore/segment_manager_group.cc index 6fe56501c8a..efbbb0c888c 100644 --- a/src/crimson/os/seastore/segment_manager_group.cc +++ b/src/crimson/os/seastore/segment_manager_group.cc @@ -106,10 +106,11 @@ void SegmentManagerGroup::initialize_cursor( SegmentManagerGroup::read_validate_record_metadata_ret SegmentManagerGroup::read_validate_record_metadata( - paddr_t start, + scan_valid_records_cursor &cursor, segment_nonce_t nonce) { LOG_PREFIX(SegmentManagerGroup::read_validate_record_metadata); + paddr_t start = cursor.seq.offset; auto& seg_addr = start.as_seg_paddr(); assert(has_device(seg_addr.get_segment_id().device_id())); auto& segment_manager = *segment_managers[seg_addr.get_segment_id().device_id()]; diff --git a/src/crimson/os/seastore/segment_manager_group.h b/src/crimson/os/seastore/segment_manager_group.h index 826ab61a7e7..10d8d4f292c 100644 --- a/src/crimson/os/seastore/segment_manager_group.h +++ b/src/crimson/os/seastore/segment_manager_group.h @@ -129,7 +129,7 @@ private: void initialize_cursor(scan_valid_records_cursor &cursor) final; read_validate_record_metadata_ret read_validate_record_metadata( - paddr_t start, + scan_valid_records_cursor &cursor, segment_nonce_t nonce) final; read_validate_data_ret read_validate_data(