From: Yingxin Cheng Date: Fri, 19 Nov 2021 05:17:43 +0000 (+0800) Subject: crimson/os/seastore: merge records metadata if they are grouped X-Git-Tag: v17.1.0~268^2~10 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=91f46e79a7abe3108dfff330fbf2a40de4a5bf0b;p=ceph.git crimson/os/seastore: merge records metadata if they are grouped Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/os/seastore/cache.cc b/src/crimson/os/seastore/cache.cc index 57a3c8cf795e..9ab7c9e6f198 100644 --- a/src/crimson/os/seastore/cache.cc +++ b/src/crimson/os/seastore/cache.cc @@ -1087,6 +1087,7 @@ record_t Cache::prepare_record(Transaction &t) record_header_fullness.ool_stats.filled_bytes += ool_stats.header_raw_bytes; record_header_fullness.ool_stats.total_bytes += ool_stats.header_bytes; + // TODO: move to Journal to get accurate result auto record_size = record_group_size_t( record.size, reader.get_block_size()); auto inline_overhead = diff --git a/src/crimson/os/seastore/extent_reader.cc b/src/crimson/os/seastore/extent_reader.cc index 46b4165055fe..dd245d5eaf7b 100644 --- a/src/crimson/os/seastore/extent_reader.cc +++ b/src/crimson/os/seastore/extent_reader.cc @@ -72,12 +72,14 @@ ExtentReader::scan_extents_ret ExtentReader::scan_extents( ).safe_then([bytes_to_read, extents, &cursor, this](auto segment_header) { auto segment_nonce = segment_header.segment_nonce; return seastar::do_with( - found_record_handler_t([extents, this]( + found_record_handler_t([extents]( record_locator_t locator, - const record_header_t& header, + const record_group_header_t& header, const bufferlist& mdbuf) mutable -> scan_valid_records_ertr::future<> { - auto maybe_record_extent_infos = try_decode_extent_info(header, mdbuf); + logger().debug("ExtentReader::scan_extents: decoding {} records", + header.records); + auto maybe_record_extent_infos = try_decode_extent_infos(header, mdbuf); if (!maybe_record_extent_infos) { // This should be impossible, we did check the crc on the mdbuf logger().error( @@ -87,17 +89,19 @@ ExtentReader::scan_extents_ret ExtentReader::scan_extents( } paddr_t extent_offset = locator.record_block_base; - logger().debug("ExtentReader::scan_extents: decoded {} extents", - maybe_record_extent_infos->extent_infos.size()); - for (const auto &i : maybe_record_extent_infos->extent_infos) { - extents->emplace_back(extent_offset, i); - auto& seg_addr = extent_offset.as_seg_paddr(); - seg_addr.set_segment_off( - seg_addr.get_segment_off() + i.len); + for (auto& r: *maybe_record_extent_infos) { + logger().debug("ExtentReader::scan_extents: decoded {} extents", + r.extent_infos.size()); + for (const auto &i : r.extent_infos) { + extents->emplace_back(extent_offset, i); + auto& seg_addr = extent_offset.as_seg_paddr(); + seg_addr.set_segment_off( + seg_addr.get_segment_off() + i.len); + } } return scan_extents_ertr::now(); }), - [=, &cursor](auto &dhandler) { + [bytes_to_read, segment_nonce, &cursor, this](auto &dhandler) { return scan_valid_records( cursor, segment_nonce, @@ -150,7 +154,7 @@ ExtentReader::scan_valid_records_ret ExtentReader::scan_valid_records( ceph_assert(cursor.last_committed == journal_seq_t() || cursor.last_committed <= new_committed_to); cursor.last_committed = new_committed_to; - cursor.pending_records.emplace_back( + cursor.pending_record_groups.emplace_back( cursor.seq.offset, header, std::move(md_bl)); @@ -164,7 +168,7 @@ ExtentReader::scan_valid_records_ret ExtentReader::scan_valid_records( [=, &budget_used, &cursor, &handler] { logger().debug( "ExtentReader::scan_valid_records: valid record read, processing queue"); - if (cursor.pending_records.empty()) { + if (cursor.pending_record_groups.empty()) { /* This is only possible if the segment is empty. * A record's last_commited must be prior to its own * location since it itself cannot yet have been committed @@ -173,7 +177,7 @@ ExtentReader::scan_valid_records_ret ExtentReader::scan_valid_records( return scan_valid_records_ertr::make_ready_future< seastar::stop_iteration>(seastar::stop_iteration::yes); } - auto &next = cursor.pending_records.front(); + auto &next = cursor.pending_record_groups.front(); journal_seq_t next_seq = {cursor.seq.segment_seq, next.offset}; if (cursor.last_committed == journal_seq_t() || next_seq > cursor.last_committed) { @@ -188,12 +192,12 @@ ExtentReader::scan_valid_records_ret ExtentReader::scan_valid_records( }); }); } else { - assert(!cursor.pending_records.empty()); - auto &next = cursor.pending_records.front(); + assert(!cursor.pending_record_groups.empty()); + auto &next = cursor.pending_record_groups.front(); return read_validate_data(next.offset, next.header ).safe_then([this, &budget_used, &cursor, &handler](auto valid) { if (!valid) { - cursor.pending_records.clear(); + cursor.pending_record_groups.clear(); return scan_valid_records_ertr::now(); } return consume_next_records(cursor, handler, budget_used); @@ -237,7 +241,7 @@ ExtentReader::read_validate_record_metadata( segment_manager.get_block_size()); bufferlist bl; bl.append(bptr); - auto maybe_header = try_decode_record_header(bl, nonce); + auto maybe_header = try_decode_records_header(bl, nonce); if (!maybe_header.has_value()) { return read_validate_record_metadata_ret( read_validate_record_metadata_ertr::ready_future_marker{}, @@ -275,7 +279,7 @@ ExtentReader::read_validate_record_metadata( std::make_pair(std::move(header), std::move(bl))); }); }).safe_then([](auto p) { - if (p && validate_record_metadata(p->second)) { + if (p && validate_records_metadata(p->second)) { return read_validate_record_metadata_ret( read_validate_record_metadata_ertr::ready_future_marker{}, std::move(*p) @@ -291,7 +295,7 @@ ExtentReader::read_validate_record_metadata( ExtentReader::read_validate_data_ret ExtentReader::read_validate_data( paddr_t record_base, - const record_header_t &header) + const record_group_header_t &header) { auto& segment_manager = *segment_managers[record_base.get_device_id()]; auto data_addr = record_base.add_offset(header.mdlength); @@ -303,7 +307,7 @@ ExtentReader::read_validate_data( ).safe_then([=, &header](auto bptr) { bufferlist bl; bl.append(bptr); - return validate_record_data(header, bl); + return validate_records_data(header, bl); }); } @@ -313,7 +317,7 @@ ExtentReader::consume_next_records( found_record_handler_t& handler, std::size_t& budget_used) { - auto& next = cursor.pending_records.front(); + auto& next = cursor.pending_record_groups.front(); auto total_length = next.header.dlength + next.header.mdlength; budget_used += total_length; auto locator = record_locator_t{ @@ -331,7 +335,7 @@ ExtentReader::consume_next_records( next.header, next.mdbuffer ).safe_then([&cursor] { - cursor.pending_records.pop_front(); + cursor.pending_record_groups.pop_front(); }); } diff --git a/src/crimson/os/seastore/extent_reader.h b/src/crimson/os/seastore/extent_reader.h index dfa2cd5b67dc..b6e5013cba79 100644 --- a/src/crimson/os/seastore/extent_reader.h +++ b/src/crimson/os/seastore/extent_reader.h @@ -59,7 +59,7 @@ public: record_locator_t record_locator, // callee may assume header and bl will remain valid until // returned future resolves - const record_header_t &header, + const record_group_header_t &header, const bufferlist &mdbuf)>; scan_valid_records_ret scan_valid_records( scan_valid_records_cursor &cursor, ///< [in, out] cursor, updated during call @@ -92,7 +92,7 @@ private: using read_validate_record_metadata_ertr = read_ertr; using read_validate_record_metadata_ret = read_validate_record_metadata_ertr::future< - std::optional> + std::optional> >; read_validate_record_metadata_ret read_validate_record_metadata( paddr_t start, @@ -103,8 +103,8 @@ private: using read_validate_data_ret = read_validate_data_ertr::future; read_validate_data_ret read_validate_data( paddr_t record_base, - const record_header_t &header ///< caller must ensure lifetime through - /// future resolution + const record_group_header_t &header ///< caller must ensure lifetime through + /// future resolution ); using consume_record_group_ertr = scan_valid_records_ertr; diff --git a/src/crimson/os/seastore/journal.cc b/src/crimson/os/seastore/journal.cc index 3d4eb66df561..71534cbc1e8b 100644 --- a/src/crimson/os/seastore/journal.cc +++ b/src/crimson/os/seastore/journal.cc @@ -155,12 +155,14 @@ Journal::replay_segment( scan_valid_records_cursor(seq), ExtentReader::found_record_handler_t([=, &handler]( record_locator_t locator, - const record_header_t& header, + const record_group_header_t& header, const bufferlist& mdbuf) -> ExtentReader::scan_valid_records_ertr::future<> { + logger().debug("Journal::replay_segment: decoding {} records", + header.records); auto maybe_record_deltas_list = try_decode_deltas( - header, mdbuf); + header, mdbuf, locator.record_block_base); if (!maybe_record_deltas_list) { // This should be impossible, we did check the crc on the mdbuf logger().error( @@ -171,37 +173,48 @@ Journal::replay_segment( return seastar::do_with( std::move(*maybe_record_deltas_list), - [locator, + [write_result=locator.write_result, this, - &handler](record_deltas_t& record_deltas) + &handler](auto& record_deltas_list) { - logger().debug("Journal::replay_segment: decoded {} deltas at block_base {}", - record_deltas.deltas.size(), - locator.record_block_base); return crimson::do_for_each( - record_deltas.deltas, - [locator, + record_deltas_list, + [write_result, this, - &handler](delta_info_t& delta) + &handler](record_deltas_t& record_deltas) { - /* The journal may validly contain deltas for extents in - * since released segments. We can detect those cases by - * checking whether the segment in question currently has a - * sequence number > the current journal segment seq. We can - * safetly skip these deltas because the extent must already - * have been rewritten. - * - * Note, this comparison exploits the fact that - * SEGMENT_SEQ_NULL is a large number. - */ - auto& seg_addr = delta.paddr.as_seg_paddr(); - if (delta.paddr != P_ADDR_NULL && - (segment_provider->get_seq(seg_addr.get_segment_id()) > - locator.write_result.start_seq.segment_seq)) { - return replay_ertr::now(); - } else { - return handler(locator, delta); - } + logger().debug("Journal::replay_segment: decoded {} deltas at block_base {}", + record_deltas.deltas.size(), + record_deltas.record_block_base); + auto locator = record_locator_t{ + record_deltas.record_block_base, + write_result + }; + return crimson::do_for_each( + record_deltas.deltas, + [locator, + this, + &handler](delta_info_t& delta) + { + /* The journal may validly contain deltas for extents in + * since released segments. We can detect those cases by + * checking whether the segment in question currently has a + * sequence number > the current journal segment seq. We can + * safetly skip these deltas because the extent must already + * have been rewritten. + * + * Note, this comparison exploits the fact that + * SEGMENT_SEQ_NULL is a large number. + */ + auto& seg_addr = delta.paddr.as_seg_paddr(); + if (delta.paddr != P_ADDR_NULL && + (segment_provider->get_seq(seg_addr.get_segment_id()) > + locator.write_result.start_seq.segment_seq)) { + return replay_ertr::now(); + } else { + return handler(locator, delta); + } + }); }); }); }), @@ -432,30 +445,29 @@ Journal::RecordBatch::add_pending( assert(state != state_t::SUBMITTING); assert(can_batch(record, block_size) == new_encoded_length); - auto block_start_offset = record.size.get_mdlength(block_size); - if (state != state_t::EMPTY) { - block_start_offset += pending.size.get_encoded_length(); - } + auto dlength_offset = pending.current_dlength; pending.push_back( std::move(record), block_size); assert(pending.size.get_encoded_length() == new_encoded_length); if (state == state_t::EMPTY) { assert(!io_promise.has_value()); - io_promise = seastar::shared_promise(); + io_promise = seastar::shared_promise(); } else { assert(io_promise.has_value()); } state = state_t::PENDING; return io_promise->get_shared_future( - ).then([block_start_offset - ](auto maybe_write_result) -> add_pending_ret { - if (!maybe_write_result.has_value()) { + ).then([dlength_offset + ](auto maybe_promise_result) -> add_pending_ret { + if (!maybe_promise_result.has_value()) { return crimson::ct_error::input_output_error::make(); } + auto write_result = maybe_promise_result->write_result; auto submit_result = record_locator_t{ - maybe_write_result->start_seq.offset.add_offset(block_start_offset), - *maybe_write_result + write_result.start_seq.offset.add_offset( + maybe_promise_result->mdlength + dlength_offset), + write_result }; return add_pending_ret( add_pending_ertr::ready_future_marker{}, @@ -478,6 +490,7 @@ ceph::bufferlist Journal::RecordBatch::encode_batch( state = state_t::SUBMITTING; submitting_size = pending.get_size(); submitting_length = pending.size.get_encoded_length(); + submitting_mdlength = pending.size.get_mdlength(); auto bl = encode_records(pending, committed_to, segment_nonce); // Note: pending is cleared here assert(bl.length() == (std::size_t)submitting_length); @@ -487,6 +500,7 @@ ceph::bufferlist Journal::RecordBatch::encode_batch( void Journal::RecordBatch::set_result( maybe_result_t maybe_write_result) { + maybe_promise_result_t result; if (maybe_write_result.has_value()) { logger().debug( "Journal::RecordBatch::set_result: batches={}, write_start {} + {}", @@ -494,6 +508,10 @@ void Journal::RecordBatch::set_result( maybe_write_result->start_seq, maybe_write_result->length); assert(maybe_write_result->length == submitting_length); + result = promise_result_t{ + *maybe_write_result, + submitting_mdlength + }; } else { logger().error( "Journal::RecordBatch::set_result: batches={}, write is failed!", @@ -505,7 +523,8 @@ void Journal::RecordBatch::set_result( state = state_t::EMPTY; submitting_size = 0; submitting_length = 0; - io_promise->set_value(maybe_write_result); + submitting_mdlength = 0; + io_promise->set_value(result); io_promise.reset(); } diff --git a/src/crimson/os/seastore/journal.h b/src/crimson/os/seastore/journal.h index ba4c4498efee..60484afce613 100644 --- a/src/crimson/os/seastore/journal.h +++ b/src/crimson/os/seastore/journal.h @@ -327,8 +327,14 @@ private: record_group_t pending; std::size_t submitting_size = 0; segment_off_t submitting_length = 0; + segment_off_t submitting_mdlength = 0; - std::optional > io_promise; + struct promise_result_t { + write_result_t write_result; + segment_off_t mdlength; + }; + using maybe_promise_result_t = std::optional; + std::optional > io_promise; }; class RecordSubmitter { diff --git a/src/crimson/os/seastore/seastore_types.cc b/src/crimson/os/seastore/seastore_types.cc index 2901ab1a6f6a..e51f9f08689a 100644 --- a/src/crimson/os/seastore/seastore_types.cc +++ b/src/crimson/os/seastore/seastore_types.cc @@ -147,11 +147,11 @@ void record_size_t::account(const delta_info_t& delta) plain_mdlength += ceph::encoded_sizeof(delta); } -extent_len_t record_size_t::get_raw_mdlength() const +extent_len_t record_group_size_t::get_raw_mdlength() const { return plain_mdlength + sizeof(checksum_t) + - ceph::encoded_sizeof_bounded(); + ceph::encoded_sizeof_bounded(); } void record_group_size_t::account( @@ -163,8 +163,10 @@ void record_group_size_t::account( assert(_block_size > 0); assert(rsize.dlength % _block_size == 0); assert(block_size == 0 || block_size == _block_size); - raw_mdlength += rsize.get_raw_mdlength(); - mdlength += rsize.get_mdlength(_block_size); + plain_mdlength += ( + rsize.plain_mdlength + + ceph::encoded_sizeof_bounded() + ); dlength += rsize.dlength; block_size = _block_size; } @@ -175,17 +177,34 @@ ceph::bufferlist encode_record( const journal_seq_t& committed_to, segment_nonce_t current_segment_nonce) { + record_group_t record_group(std::move(record), block_size); + return encode_records( + record_group, + committed_to, + current_segment_nonce); +} + +ceph::bufferlist encode_records( + record_group_t& record_group, + const journal_seq_t& committed_to, + segment_nonce_t current_segment_nonce) +{ + assert(record_group.size.block_size > 0); + assert(record_group.records.size() > 0); + bufferlist data_bl; - for (auto &i: record.extents) { - data_bl.append(i.bl); + for (auto& r: record_group.records) { + for (auto& i: r.extents) { + assert(i.bl.length()); + data_bl.append(i.bl); + } } bufferlist bl; - record_header_t header{ - record.size.get_mdlength(block_size), - record.size.dlength, - (extent_len_t)record.deltas.size(), - (extent_len_t)record.extents.size(), + record_group_header_t header{ + static_cast(record_group.records.size()), + record_group.size.get_mdlength(), + record_group.size.dlength, current_segment_nonce, committed_to, data_bl.crc32c(-1) @@ -194,15 +213,26 @@ ceph::bufferlist encode_record( auto metadata_crc_filler = bl.append_hole(sizeof(checksum_t)); - for (const auto &i: record.extents) { - encode(extent_info_t(i), bl); + for (auto& r: record_group.records) { + record_header_t rheader{ + (extent_len_t)r.deltas.size(), + (extent_len_t)r.extents.size(), + }; + encode(rheader, bl); } - for (const auto &i: record.deltas) { - encode(i, bl); + for (auto& r: record_group.records) { + for (const auto& i: r.extents) { + encode(extent_info_t(i), bl); + } } - ceph_assert(bl.length() == record.size.get_raw_mdlength()); + for (auto& r: record_group.records) { + for (const auto& i: r.deltas) { + encode(i, bl); + } + } + ceph_assert(bl.length() == record_group.size.get_raw_mdlength()); - auto aligned_mdlength = record.size.get_mdlength(block_size); + auto aligned_mdlength = record_group.size.get_mdlength(); if (bl.length() != aligned_mdlength) { assert(bl.length() < aligned_mdlength); bl.append_zero(aligned_mdlength - bl.length()); @@ -210,7 +240,7 @@ ceph::bufferlist encode_record( auto bliter = bl.cbegin(); auto metadata_crc = bliter.crc32c( - ceph::encoded_sizeof_bounded(), + ceph::encoded_sizeof_bounded(), -1); bliter += sizeof(checksum_t); /* metadata crc hole */ metadata_crc = bliter.crc32c( @@ -223,53 +253,31 @@ ceph::bufferlist encode_record( reinterpret_cast(&metadata_crc_le)); bl.claim_append(data_bl); - ceph_assert(bl.length() == (record.size.get_encoded_length(block_size))); - - return bl; -} - -ceph::bufferlist encode_records( - record_group_t& record_group, - const journal_seq_t& committed_to, - segment_nonce_t current_segment_nonce) -{ - assert(record_group.size.block_size > 0); - assert(record_group.records.size() > 0); - - bufferlist bl; - for (auto& r: record_group.records) { - bl.claim_append( - encode_record( - std::move(r), - record_group.size.block_size, - committed_to, - current_segment_nonce)); - } ceph_assert(bl.length() == record_group.size.get_encoded_length()); record_group.clear(); return bl; } -std::optional -try_decode_record_header( +std::optional +try_decode_records_header( const ceph::bufferlist& header_bl, segment_nonce_t expected_nonce) { auto bp = header_bl.cbegin(); - record_header_t header; + record_group_header_t header; try { decode(header, bp); } catch (ceph::buffer::error &e) { logger().debug( - "try_decode_record_header: failed, " - "cannot decode record_header_t, got {}.", + "try_decode_records_header: failed, " + "cannot decode record_group_header_t, got {}.", e); return std::nullopt; } if (header.segment_nonce != expected_nonce) { logger().debug( - "try_decode_record_header: failed, record_header nonce mismatch, " + "try_decode_records_header: failed, record_group_header nonce mismatch, " "read {}, expected {}!", header.segment_nonce, expected_nonce); @@ -278,12 +286,12 @@ try_decode_record_header( return header; } -bool validate_record_metadata( +bool validate_records_metadata( const ceph::bufferlist& md_bl) { auto bliter = md_bl.cbegin(); auto test_crc = bliter.crc32c( - ceph::encoded_sizeof_bounded(), + ceph::encoded_sizeof_bounded(), -1); ceph_le32 recorded_crc_le; decode(recorded_crc_le, bliter); @@ -293,71 +301,136 @@ bool validate_record_metadata( test_crc); bool success = (test_crc == recorded_crc); if (!success) { - logger().debug("validate_record_metadata: failed, metadata crc mismatch."); + logger().debug("validate_records_metadata: failed, metadata crc mismatch."); } return success; } -bool validate_record_data( - const record_header_t& header, +bool validate_records_data( + const record_group_header_t& header, const ceph::bufferlist& data_bl) { bool success = (data_bl.crc32c(-1) == header.data_crc); if (!success) { - logger().debug("validate_record_data: failed, data crc mismatch!"); + logger().debug("validate_records_data: failed, data crc mismatch!"); } return success; } -std::optional -try_decode_extent_info( - const record_header_t& header, +namespace { + +std::optional> +try_decode_record_headers( + const record_group_header_t& header, const ceph::bufferlist& md_bl) { auto bliter = md_bl.cbegin(); - bliter += ceph::encoded_sizeof_bounded(); - bliter += sizeof(checksum_t) /* metadata crc hole */; - - record_extent_infos_t record_extent_info; - record_extent_info.extent_infos.resize(header.extents); - for (auto& i: record_extent_info.extent_infos) { + bliter += ceph::encoded_sizeof_bounded(); + bliter += sizeof(checksum_t); /* metadata crc hole */ + std::vector record_headers(header.records); + for (auto &&i: record_headers) { try { decode(i, bliter); } catch (ceph::buffer::error &e) { logger().debug( - "try_decode_extent_infos: failed, " - "cannot decode extent_info_t, got {}.", + "try_decode_record_headers: failed, " + "cannot decode record_header_t, got {}.", e); return std::nullopt; } } - return record_extent_info; + return record_headers; } -std::optional -try_decode_deltas( - const record_header_t& header, +} + +std::optional > +try_decode_extent_infos( + const record_group_header_t& header, const ceph::bufferlist& md_bl) { + auto maybe_headers = try_decode_record_headers(header, md_bl); + if (!maybe_headers) { + logger().debug( + "try_decode_extent_infos: failed, cannot decode record headers."); + return std::nullopt; + } + auto bliter = md_bl.cbegin(); - bliter += ceph::encoded_sizeof_bounded(); - bliter += sizeof(checksum_t) /* metadata crc hole */; - bliter += header.extents * ceph::encoded_sizeof_bounded(); + bliter += ceph::encoded_sizeof_bounded(); + bliter += sizeof(checksum_t); /* metadata crc hole */ + bliter += (ceph::encoded_sizeof_bounded() * + maybe_headers->size()); + + std::vector record_extent_infos( + maybe_headers->size()); + auto result_iter = record_extent_infos.begin(); + for (auto& h: *maybe_headers) { + result_iter->header = h; + result_iter->extent_infos.resize(h.extents); + for (auto& i: result_iter->extent_infos) { + try { + decode(i, bliter); + } catch (ceph::buffer::error &e) { + logger().debug( + "try_decode_extent_infos: failed, " + "cannot decode extent_info_t, got {}.", + e); + return std::nullopt; + } + } + ++result_iter; + } + return record_extent_infos; +} - record_deltas_t record_delta; - record_delta.deltas.resize(header.deltas); - for (auto& i: record_delta.deltas) { - try { - decode(i, bliter); - } catch (ceph::buffer::error &e) { - logger().debug( - "try_decode_deltas: failed, " - "cannot decode delta_info_t, got {}.", - e); - return std::nullopt; +std::optional > +try_decode_deltas( + const record_group_header_t& header, + const ceph::bufferlist& md_bl, + paddr_t record_block_base) +{ + auto maybe_record_extent_infos = try_decode_extent_infos(header, md_bl); + if (!maybe_record_extent_infos) { + logger().debug( + "try_decode_deltas: failed, cannot decode extent_infos."); + return std::nullopt; + } + + auto bliter = md_bl.cbegin(); + bliter += ceph::encoded_sizeof_bounded(); + bliter += sizeof(checksum_t); /* metadata crc hole */ + bliter += (ceph::encoded_sizeof_bounded() * + maybe_record_extent_infos->size()); + for (auto& r: *maybe_record_extent_infos) { + bliter += (ceph::encoded_sizeof_bounded() * + r.extent_infos.size()); + } + + std::vector record_deltas( + maybe_record_extent_infos->size()); + auto result_iter = record_deltas.begin(); + for (auto& r: *maybe_record_extent_infos) { + result_iter->record_block_base = record_block_base; + result_iter->deltas.resize(r.header.deltas); + for (auto& i: result_iter->deltas) { + try { + decode(i, bliter); + } catch (ceph::buffer::error &e) { + logger().debug( + "try_decode_deltas: failed, " + "cannot decode delta_info_t, got {}.", + e); + return std::nullopt; + } + } + for (auto& i: r.extent_infos) { + auto& seg_addr = record_block_base.as_seg_paddr(); + seg_addr.set_segment_off(seg_addr.get_segment_off() + i.len); } + ++result_iter; } - return record_delta; + return record_deltas; } bool can_delay_allocation(device_type_t type) { diff --git a/src/crimson/os/seastore/seastore_types.h b/src/crimson/os/seastore/seastore_types.h index 63582ce3d312..9d9119a2ec09 100644 --- a/src/crimson/os/seastore/seastore_types.h +++ b/src/crimson/os/seastore/seastore_types.h @@ -1208,20 +1208,6 @@ struct record_size_t { } void account(const delta_info_t& delta); - - // TODO: remove - extent_len_t get_raw_mdlength() const; - - extent_len_t get_mdlength(extent_len_t block_size) const { - assert(block_size > 0); - return p2roundup(get_raw_mdlength(), block_size); - } - - extent_len_t get_encoded_length(extent_len_t block_size) const { - assert(block_size > 0); - assert(dlength % block_size == 0); - return get_mdlength(block_size) + dlength; - } }; struct record_t { @@ -1268,23 +1254,33 @@ struct record_t { }; struct record_header_t { - // Fixed portion - extent_len_t mdlength; // block aligned, length of metadata - extent_len_t dlength; // block aligned, length of data uint32_t deltas; // number of deltas uint32_t extents; // number of extents + + + DENC(record_header_t, v, p) { + DENC_START(1, 1, p); + denc(v.deltas, p); + denc(v.extents, p); + DENC_FINISH(p); + } +}; + +struct record_group_header_t { + uint32_t records; + extent_len_t mdlength; // block aligned, length of metadata + extent_len_t dlength; // block aligned, length of data segment_nonce_t segment_nonce;// nonce of containing segment journal_seq_t committed_to; // records prior to committed_to have been // fully written, maybe in another segment. checksum_t data_crc; // crc of data payload - DENC(record_header_t, v, p) { + DENC(record_group_header_t, v, p) { DENC_START(1, 1, p); + denc(v.records, p); denc(v.mdlength, p); denc(v.dlength, p); - denc(v.deltas, p); - denc(v.extents, p); denc(v.segment_nonce, p); denc(v.committed_to, p); denc(v.data_crc, p); @@ -1293,8 +1289,7 @@ struct record_header_t { }; struct record_group_size_t { - extent_len_t raw_mdlength = 0; - extent_len_t mdlength = 0; + extent_len_t plain_mdlength = 0; // mdlength without the group header extent_len_t dlength = 0; extent_len_t block_size = 0; @@ -1305,15 +1300,11 @@ struct record_group_size_t { account(rsize, block_size); } - extent_len_t get_raw_mdlength() const { - assert(block_size > 0); - return raw_mdlength; - } + extent_len_t get_raw_mdlength() const; extent_len_t get_mdlength() const { assert(block_size > 0); - assert(mdlength % block_size == 0); - return mdlength; + return p2roundup(get_raw_mdlength(), block_size); } extent_len_t get_encoded_length() const { @@ -1337,6 +1328,7 @@ struct record_group_size_t { struct record_group_t { std::vector records; record_group_size_t size; + extent_len_t current_dlength = 0; record_group_t() = default; record_group_t( @@ -1353,6 +1345,7 @@ struct record_group_t { record_t&& record, extent_len_t block_size) { size.account(record.size, block_size); + current_dlength += record.size.dlength; records.push_back(std::move(record)); assert(size.get_encoded_length() < MAX_SEG_OFF); } @@ -1364,6 +1357,7 @@ struct record_group_t { void clear() { records.clear(); size = {}; + current_dlength = 0; } }; @@ -1378,33 +1372,36 @@ ceph::bufferlist encode_records( const journal_seq_t& committed_to, segment_nonce_t current_segment_nonce); -std::optional -try_decode_record_header( +std::optional +try_decode_records_header( const ceph::bufferlist& header_bl, segment_nonce_t expected_nonce); -bool validate_record_metadata( +bool validate_records_metadata( const ceph::bufferlist& md_bl); -bool validate_record_data( - const record_header_t& header, +bool validate_records_data( + const record_group_header_t& header, const ceph::bufferlist& data_bl); struct record_extent_infos_t { + record_header_t header; std::vector extent_infos; }; -std::optional -try_decode_extent_info( - const record_header_t& header, +std::optional > +try_decode_extent_infos( + const record_group_header_t& header, const ceph::bufferlist& md_bl); struct record_deltas_t { + paddr_t record_block_base; std::vector deltas; }; -std::optional +std::optional > try_decode_deltas( - const record_header_t& header, - const ceph::bufferlist& md_bl); + const record_group_header_t& header, + const ceph::bufferlist& md_bl, + paddr_t record_block_base); struct write_result_t { journal_seq_t start_seq; @@ -1426,21 +1423,21 @@ struct scan_valid_records_cursor { journal_seq_t seq; journal_seq_t last_committed; - struct found_record_t { + struct found_record_group_t { paddr_t offset; - record_header_t header; + record_group_header_t header; bufferlist mdbuffer; - found_record_t( + found_record_group_t( paddr_t offset, - const record_header_t &header, + const record_group_header_t &header, const bufferlist &mdbuffer) : offset(offset), header(header), mdbuffer(mdbuffer) {} }; - std::deque pending_records; + std::deque pending_record_groups; bool is_complete() const { - return last_valid_header_found && pending_records.empty(); + return last_valid_header_found && pending_record_groups.empty(); } segment_id_t get_segment_id() const { @@ -1524,6 +1521,7 @@ WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::paddr_t) WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::journal_seq_t) WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::delta_info_t) WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::record_header_t) +WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::record_group_header_t) WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::extent_info_t) WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::segment_header_t) WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::rbm_alloc_delta_t)