From: Yingxin Cheng Date: Tue, 16 Nov 2021 08:47:12 +0000 (+0800) Subject: crimson/os/seastore: scan records based on record_locator_t X-Git-Tag: v17.1.0~268^2~13 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=28fec462610720f6c07769351730d0d559d508b1;p=ceph-ci.git crimson/os/seastore: scan records based on record_locator_t Record may not have its own base if headers are merged. Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/os/seastore/extent_reader.cc b/src/crimson/os/seastore/extent_reader.cc index e3c3a5a45c5..b257d266ea6 100644 --- a/src/crimson/os/seastore/extent_reader.cc +++ b/src/crimson/os/seastore/extent_reader.cc @@ -73,7 +73,7 @@ ExtentReader::scan_extents_ret ExtentReader::scan_extents( auto segment_nonce = segment_header.segment_nonce; return seastar::do_with( found_record_handler_t([extents, this]( - paddr_t base, + record_locator_t locator, const record_header_t& header, const bufferlist& mdbuf) mutable -> scan_valid_records_ertr::future<> { @@ -82,11 +82,11 @@ ExtentReader::scan_extents_ret ExtentReader::scan_extents( // This should be impossible, we did check the crc on the mdbuf logger().error( "ExtentReader::scan_extents: unable to decode extents for record {}", - base); + locator.record_block_base); return crimson::ct_error::input_output_error::make(); } - paddr_t extent_offset = base.add_offset(header.mdlength); + paddr_t extent_offset = locator.record_block_base; logger().debug("ExtentReader::scan_extents: decoded {} extents", maybe_record_extent_infos->size()); for (const auto &i : *maybe_record_extent_infos) { @@ -358,8 +358,18 @@ ExtentReader::consume_next_records( auto& next = cursor.pending_records.front(); auto total_length = next.header.dlength + next.header.mdlength; budget_used += total_length; + auto locator = record_locator_t{ + next.offset.add_offset(next.header.mdlength), + write_result_t{ + journal_seq_t{ + cursor.seq.segment_seq, + next.offset + }, + static_cast(total_length) + } + }; return handler( - next.offset, + locator, next.header, next.mdbuffer ).safe_then([&cursor] { diff --git a/src/crimson/os/seastore/extent_reader.h b/src/crimson/os/seastore/extent_reader.h index 99c5895847d..243f3dc86d6 100644 --- a/src/crimson/os/seastore/extent_reader.h +++ b/src/crimson/os/seastore/extent_reader.h @@ -56,11 +56,11 @@ public: size_t>; using found_record_handler_t = std::function< scan_valid_records_ertr::future<>( - paddr_t record_block_base, + record_locator_t record_locator, // callee may assume header and bl will remain valid until // returned future resolves const record_header_t &header, - const bufferlist &bl)>; + const bufferlist &mdbuf)>; scan_valid_records_ret scan_valid_records( scan_valid_records_cursor &cursor, ///< [in, out] cursor, updated during call segment_nonce_t nonce, ///< [in] nonce for segment diff --git a/src/crimson/os/seastore/journal.cc b/src/crimson/os/seastore/journal.cc index c83cd4a494d..f0331970236 100644 --- a/src/crimson/os/seastore/journal.cc +++ b/src/crimson/os/seastore/journal.cc @@ -174,7 +174,7 @@ Journal::replay_segment( return seastar::do_with( scan_valid_records_cursor(seq), ExtentReader::found_record_handler_t([=, &handler]( - paddr_t base, + record_locator_t locator, const record_header_t& header, const bufferlist& mdbuf) -> ExtentReader::scan_valid_records_ertr::future<> @@ -185,7 +185,7 @@ Journal::replay_segment( // This should be impossible, we did check the crc on the mdbuf logger().error( "Journal::replay_segment: unable to decode deltas for record {}", - base); + locator.record_block_base); return crimson::ct_error::input_output_error::make(); } @@ -193,9 +193,9 @@ Journal::replay_segment( std::move(*maybe_record_deltas_list), [=](auto &deltas) { - logger().debug("Journal::replay_segment: decoded {} deltas at base {}", + logger().debug("Journal::replay_segment: decoded {} deltas at block_base {}", deltas.size(), - base); + locator.record_block_base); return crimson::do_for_each( deltas, [=](auto &delta) @@ -216,14 +216,7 @@ Journal::replay_segment( seq.segment_seq)) { return replay_ertr::now(); } else { - auto offsets = submit_result_t{ - base.add_offset(header.mdlength), - write_result_t{ - journal_seq_t{seq.segment_seq, base}, - static_cast(header.mdlength + header.dlength) - } - }; - return handler(offsets, delta); + return handler(locator, delta); } }); }); @@ -474,7 +467,7 @@ Journal::RecordBatch::add_pending( if (!maybe_write_result.has_value()) { return crimson::ct_error::input_output_error::make(); } - auto submit_result = submit_result_t{ + auto submit_result = record_locator_t{ maybe_write_result->start_seq.offset.add_offset(block_start_offset), *maybe_write_result }; @@ -681,7 +674,7 @@ Journal::RecordSubmitter::submit_pending( journal_segment_manager.get_nonce()); return journal_segment_manager.write(to_write ).safe_then([rsize](auto write_result) { - return submit_result_t{ + return record_locator_t{ write_result.start_seq.offset.add_offset(rsize.mdlength), write_result }; diff --git a/src/crimson/os/seastore/journal.h b/src/crimson/os/seastore/journal.h index a8bf5372f05..5aed2ed57e6 100644 --- a/src/crimson/os/seastore/journal.h +++ b/src/crimson/os/seastore/journal.h @@ -85,24 +85,12 @@ public: * * write record with the ordering handle */ - struct write_result_t { - journal_seq_t start_seq; - segment_off_t length; - - journal_seq_t get_end_seq() const { - return start_seq.add_offset(length); - } - }; - struct submit_result_t { - paddr_t record_block_base; - write_result_t write_result; - }; using submit_record_ertr = crimson::errorator< crimson::ct_error::erange, crimson::ct_error::input_output_error >; using submit_record_ret = submit_record_ertr::future< - submit_result_t + record_locator_t >; submit_record_ret submit_record( record_t &&record, @@ -120,7 +108,7 @@ public: using replay_ertr = SegmentManager::read_ertr; using replay_ret = replay_ertr::future<>; using delta_handler_t = std::function< - replay_ret(const submit_result_t&, + replay_ret(const record_locator_t&, const delta_info_t&)>; replay_ret replay( std::vector>&& segment_headers, @@ -294,7 +282,7 @@ private: // Set write_result_t::write_length to 0 if the record is not the first one // in the batch. using add_pending_ertr = JournalSegmentManager::write_ertr; - using add_pending_ret = add_pending_ertr::future; + using add_pending_ret = add_pending_ertr::future; add_pending_ret add_pending(record_t&&, const record_size_t&); // Encode the batched records for write. @@ -426,7 +414,7 @@ private: using submit_pending_ertr = JournalSegmentManager::write_ertr; using submit_pending_ret = submit_pending_ertr::future< - submit_result_t>; + record_locator_t>; submit_pending_ret submit_pending( record_t&&, const record_size_t&, OrderingHandle &handle, bool flush); diff --git a/src/crimson/os/seastore/seastore_types.h b/src/crimson/os/seastore/seastore_types.h index b8275cda4a0..2c115df23c9 100644 --- a/src/crimson/os/seastore/seastore_types.h +++ b/src/crimson/os/seastore/seastore_types.h @@ -1262,6 +1262,20 @@ ceph::bufferlist encode_record( const journal_seq_t& committed_to, segment_nonce_t current_segment_nonce = 0); +struct write_result_t { + journal_seq_t start_seq; + segment_off_t length; + + journal_seq_t get_end_seq() const { + return start_seq.add_offset(length); + } +}; + +struct record_locator_t { + paddr_t record_block_base; + write_result_t write_result; +}; + /// scan segment for end incrementally struct scan_valid_records_cursor { bool last_valid_header_found = false;