From 310ed9ee811aa8e91f6b28fb9e57f3b915291d1c Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Fri, 19 Nov 2021 13:17:16 +0800 Subject: [PATCH] crimson/os/seastore: refactor, introduce record_t and record_group_t with sizes Signed-off-by: Yingxin Cheng --- src/crimson/os/seastore/cache.cc | 18 +- .../os/seastore/extent_placement_manager.cc | 9 +- .../os/seastore/extent_placement_manager.h | 32 ++- src/crimson/os/seastore/extent_reader.cc | 60 +---- src/crimson/os/seastore/extent_reader.h | 9 - src/crimson/os/seastore/journal.cc | 172 ++++++-------- src/crimson/os/seastore/journal.h | 56 ++--- src/crimson/os/seastore/seastore_types.cc | 223 +++++++++++++++--- src/crimson/os/seastore/seastore_types.h | 210 ++++++++++++++--- 9 files changed, 493 insertions(+), 296 deletions(-) diff --git a/src/crimson/os/seastore/cache.cc b/src/crimson/os/seastore/cache.cc index 0e3cd7661f4..c24a1577277 100644 --- a/src/crimson/os/seastore/cache.cc +++ b/src/crimson/os/seastore/cache.cc @@ -977,7 +977,7 @@ record_t Cache::prepare_record(Transaction &t) if (i->get_type() == extent_types_t::ROOT) { root = t.root; DEBUGT("writing out root delta for {}", t, *t.root); - record.deltas.push_back( + record.push_back( delta_info_t{ extent_types_t::ROOT, paddr_t{}, @@ -989,7 +989,7 @@ record_t Cache::prepare_record(Transaction &t) t.root->get_delta() }); } else { - record.deltas.push_back( + record.push_back( delta_info_t{ i->get_type(), i->get_paddr(), @@ -1047,7 +1047,7 @@ record_t Cache::prepare_record(Transaction &t) } assert(bl.length() == i->get_length()); - record.extents.push_back(extent_t{ + record.push_back(extent_t{ i->get_type(), i->is_logical() ? i->cast()->get_laddr() @@ -1062,7 +1062,7 @@ record_t Cache::prepare_record(Transaction &t) delta_info_t delta; delta.type = extent_types_t::RBM_ALLOC_INFO; delta.bl = bl; - record.deltas.push_back(delta); + record.push_back(std::move(delta)); } for (auto &i: t.ool_block_list) { @@ -1087,13 +1087,13 @@ record_t Cache::prepare_record(Transaction &t) record_header_fullness.ool_stats.filled_bytes += ool_stats.header_raw_bytes; record_header_fullness.ool_stats.total_bytes += ool_stats.header_bytes; - auto record_size = get_encoded_record_length( - record, reader.get_block_size()); + auto record_size = record_group_size_t( + record.size, reader.get_block_size()); auto inline_overhead = - record_size.mdlength + record_size.dlength - record.get_raw_data_size(); + record_size.get_encoded_length() - record.get_raw_data_size(); efforts.inline_record_overhead_bytes += inline_overhead; - record_header_fullness.inline_stats.filled_bytes += record_size.raw_mdlength; - record_header_fullness.inline_stats.total_bytes += record_size.mdlength; + record_header_fullness.inline_stats.filled_bytes += record_size.get_raw_mdlength(); + record_header_fullness.inline_stats.total_bytes += record_size.get_mdlength(); return record; } diff --git a/src/crimson/os/seastore/extent_placement_manager.cc b/src/crimson/os/seastore/extent_placement_manager.cc index b78ec32eebc..a4d335a637a 100644 --- a/src/crimson/os/seastore/extent_placement_manager.cc +++ b/src/crimson/os/seastore/extent_placement_manager.cc @@ -71,10 +71,9 @@ SegmentedAllocator::Writer::_write( Transaction& t, ool_record_t& record) { - record_size_t record_size = record.get_encoded_record_length(); - allocated_to += record_size.mdlength + record_size.dlength; + auto record_size = record.get_encoded_record_length(); + allocated_to += record_size.get_encoded_length(); bufferlist bl = record.encode( - record_size, current_segment->segment->get_segment_id(), 0); seastar::promise<> pr; @@ -93,8 +92,8 @@ SegmentedAllocator::Writer::_write( auto& stats = t.get_ool_write_stats(); stats.extents.num += record.get_num_extents(); stats.extents.bytes += record_size.dlength; - stats.header_raw_bytes += record_size.raw_mdlength; - stats.header_bytes += record_size.mdlength; + stats.header_raw_bytes += record_size.get_raw_mdlength(); + stats.header_bytes += record_size.get_mdlength(); stats.num_records += 1; return trans_intr::make_interruptible( diff --git a/src/crimson/os/seastore/extent_placement_manager.h b/src/crimson/os/seastore/extent_placement_manager.h index 0b040d72f76..506edde8147 100644 --- a/src/crimson/os/seastore/extent_placement_manager.h +++ b/src/crimson/os/seastore/extent_placement_manager.h @@ -47,39 +47,38 @@ class ool_record_t { public: ool_record_t(size_t block_size) : block_size(block_size) {} - record_size_t get_encoded_record_length() { + record_group_size_t get_encoded_record_length() { assert(extents.size() == record.extents.size()); - return crimson::os::seastore::get_encoded_record_length(record, block_size); + return record_group_size_t(record.size, block_size); } size_t get_wouldbe_encoded_record_length(LogicalCachedExtentRef& extent) { - auto raw_mdlength = get_encoded_record_raw_mdlength(record, block_size); - auto wouldbe_mdlength = p2roundup( - raw_mdlength + ceph::encoded_sizeof_bounded(), - block_size); - return wouldbe_mdlength + extent_buf_len + extent->get_bptr().length(); + record_size_t rsize = record.size; + rsize.account_extent(extent->get_bptr().length()); + return record_group_size_t(rsize, block_size).get_encoded_length(); } - ceph::bufferlist encode(const record_size_t& rsize, - segment_id_t segment, + ceph::bufferlist encode(segment_id_t segment, segment_nonce_t nonce) { assert(extents.size() == record.extents.size()); - segment_off_t extent_offset = base + rsize.mdlength; + assert(!record.deltas.size()); + auto record_group = record_group_t(std::move(record), block_size); + segment_off_t extent_offset = base + record_group.size.get_mdlength(); for (auto& extent : extents) { extent.set_ool_paddr( paddr_t::make_seg_paddr(segment, extent_offset)); extent_offset += extent.get_bptr().length(); } - assert(extent_offset == (segment_off_t)(base + rsize.mdlength + rsize.dlength)); - return encode_record(rsize, std::move(record), block_size, journal_seq_t(), nonce); + assert(extent_offset == + (segment_off_t)(base + record_group.size.get_encoded_length())); + return encode_records(record_group, journal_seq_t(), nonce); } void add_extent(LogicalCachedExtentRef& extent) { extents.emplace_back(extent); ceph::bufferlist bl; bl.append(extent->get_bptr()); - record.extents.emplace_back(extent_t{ + record.push_back(extent_t{ extent->get_type(), extent->get_laddr(), std::move(bl)}); - extent_buf_len += extent->get_bptr().length(); } std::vector& get_extents() { return extents; @@ -91,10 +90,8 @@ public: return base; } void clear() { - record.extents.clear(); + record = {}; extents.clear(); - assert(!record.deltas.size()); - extent_buf_len = 0; base = MAX_SEG_OFF; } uint64_t get_num_extents() const { @@ -105,7 +102,6 @@ private: std::vector extents; record_t record; size_t block_size; - segment_off_t extent_buf_len = 0; segment_off_t base = MAX_SEG_OFF; }; diff --git a/src/crimson/os/seastore/extent_reader.cc b/src/crimson/os/seastore/extent_reader.cc index b257d266ea6..46b4165055f 100644 --- a/src/crimson/os/seastore/extent_reader.cc +++ b/src/crimson/os/seastore/extent_reader.cc @@ -77,7 +77,7 @@ ExtentReader::scan_extents_ret ExtentReader::scan_extents( const record_header_t& header, const bufferlist& mdbuf) mutable -> scan_valid_records_ertr::future<> { - auto maybe_record_extent_infos = try_decode_extent_infos(header, mdbuf); + auto maybe_record_extent_infos = try_decode_extent_info(header, mdbuf); if (!maybe_record_extent_infos) { // This should be impossible, we did check the crc on the mdbuf logger().error( @@ -88,8 +88,8 @@ ExtentReader::scan_extents_ret ExtentReader::scan_extents( paddr_t extent_offset = locator.record_block_base; logger().debug("ExtentReader::scan_extents: decoded {} extents", - maybe_record_extent_infos->size()); - for (const auto &i : *maybe_record_extent_infos) { + maybe_record_extent_infos->extent_infos.size()); + for (const auto &i : maybe_record_extent_infos->extent_infos) { extents->emplace_back(extent_offset, i); auto& seg_addr = extent_offset.as_seg_paddr(); seg_addr.set_segment_off( @@ -237,21 +237,14 @@ ExtentReader::read_validate_record_metadata( segment_manager.get_block_size()); bufferlist bl; bl.append(bptr); - auto bp = bl.cbegin(); - record_header_t header; - try { - decode(header, bp); - } catch (ceph::buffer::error &e) { - return read_validate_record_metadata_ret( - read_validate_record_metadata_ertr::ready_future_marker{}, - std::nullopt); - } - if (header.segment_nonce != nonce) { + auto maybe_header = try_decode_record_header(bl, nonce); + if (!maybe_header.has_value()) { return read_validate_record_metadata_ret( read_validate_record_metadata_ertr::ready_future_marker{}, std::nullopt); } auto& seg_addr = start.as_seg_paddr(); + auto& header = *maybe_header; if (header.mdlength < block_size || header.mdlength % block_size != 0 || header.dlength % block_size != 0 || @@ -281,8 +274,8 @@ ExtentReader::read_validate_record_metadata( read_validate_record_metadata_ertr::ready_future_marker{}, std::make_pair(std::move(header), std::move(bl))); }); - }).safe_then([this](auto p) { - if (p && validate_metadata(p->second)) { + }).safe_then([](auto p) { + if (p && validate_record_metadata(p->second)) { return read_validate_record_metadata_ret( read_validate_record_metadata_ertr::ready_future_marker{}, std::move(*p) @@ -295,26 +288,6 @@ ExtentReader::read_validate_record_metadata( }); } -std::optional> -ExtentReader::try_decode_extent_infos( - record_header_t header, - const bufferlist &bl) -{ - auto bliter = bl.cbegin(); - bliter += ceph::encoded_sizeof_bounded(); - bliter += sizeof(checksum_t) /* crc */; - logger().debug("{}: decoding {} extents", __func__, header.extents); - std::vector extent_infos(header.extents); - for (auto &&i : extent_infos) { - try { - decode(i, bliter); - } catch (ceph::buffer::error &e) { - return std::nullopt; - } - } - return extent_infos; -} - ExtentReader::read_validate_data_ret ExtentReader::read_validate_data( paddr_t record_base, @@ -330,25 +303,10 @@ ExtentReader::read_validate_data( ).safe_then([=, &header](auto bptr) { bufferlist bl; bl.append(bptr); - return bl.crc32c(-1) == header.data_crc; + return validate_record_data(header, bl); }); } -bool ExtentReader::validate_metadata(const bufferlist &bl) -{ - auto bliter = bl.cbegin(); - auto test_crc = bliter.crc32c( - ceph::encoded_sizeof_bounded(), - -1); - ceph_le32 recorded_crc_le; - decode(recorded_crc_le, bliter); - uint32_t recorded_crc = recorded_crc_le; - test_crc = bliter.crc32c( - bliter.get_remaining(), - test_crc); - return test_crc == recorded_crc; -} - ExtentReader::consume_record_group_ertr::future<> ExtentReader::consume_next_records( scan_valid_records_cursor& cursor, diff --git a/src/crimson/os/seastore/extent_reader.h b/src/crimson/os/seastore/extent_reader.h index 243f3dc86d6..dfa2cd5b67d 100644 --- a/src/crimson/os/seastore/extent_reader.h +++ b/src/crimson/os/seastore/extent_reader.h @@ -98,11 +98,6 @@ private: paddr_t start, segment_nonce_t nonce); - /// attempts to decode extent infos from bl, return nullopt if unsuccessful - std::optional> try_decode_extent_infos( - record_header_t header, - const bufferlist &bl); - /// read and validate data using read_validate_data_ertr = read_ertr; using read_validate_data_ret = read_validate_data_ertr::future; @@ -118,10 +113,6 @@ private: found_record_handler_t& handler, std::size_t& budget_used); - - /// validate embedded metadata checksum - static bool validate_metadata(const bufferlist &bl); - friend class TransactionManager; }; diff --git a/src/crimson/os/seastore/journal.cc b/src/crimson/os/seastore/journal.cc index f0331970236..3d4eb66df56 100644 --- a/src/crimson/os/seastore/journal.cc +++ b/src/crimson/os/seastore/journal.cc @@ -144,26 +144,6 @@ Journal::prep_replay_segments( std::move(ret)); } -std::optional> Journal::try_decode_deltas( - record_header_t header, - const bufferlist &bl) -{ - auto bliter = bl.cbegin(); - bliter += ceph::encoded_sizeof_bounded(); - bliter += sizeof(checksum_t) /* crc */; - bliter += header.extents * ceph::encoded_sizeof_bounded(); - logger().debug("Journal::try_decode_deltas: decoding {} deltas", header.deltas); - std::vector deltas(header.deltas); - for (auto &&i : deltas) { - try { - decode(i, bliter); - } catch (ceph::buffer::error &e) { - return std::nullopt; - } - } - return deltas; -} - Journal::replay_ertr::future<> Journal::replay_segment( journal_seq_t seq, @@ -191,14 +171,18 @@ Journal::replay_segment( return seastar::do_with( std::move(*maybe_record_deltas_list), - [=](auto &deltas) + [locator, + this, + &handler](record_deltas_t& record_deltas) { logger().debug("Journal::replay_segment: decoded {} deltas at block_base {}", - deltas.size(), + record_deltas.deltas.size(), locator.record_block_base); return crimson::do_for_each( - deltas, - [=](auto &delta) + record_deltas.deltas, + [locator, + this, + &handler](delta_info_t& delta) { /* The journal may validly contain deltas for extents in * since released segments. We can detect those cases by @@ -213,7 +197,7 @@ Journal::replay_segment( auto& seg_addr = delta.paddr.as_seg_paddr(); if (delta.paddr != P_ADDR_NULL && (segment_provider->get_seq(seg_addr.get_segment_id()) > - seq.segment_seq)) { + locator.write_result.start_seq.segment_seq)) { return replay_ertr::now(); } else { return handler(locator, delta); @@ -438,21 +422,23 @@ Journal::JournalSegmentManager::initialize_segment(Segment& segment) Journal::RecordBatch::add_pending_ret Journal::RecordBatch::add_pending( record_t&& record, - const record_size_t& rsize) + extent_len_t block_size) { + auto new_encoded_length = get_encoded_length_after(record, block_size); logger().debug( "Journal::RecordBatch::add_pending: batches={}, write_size={}", - records.size() + 1, - get_encoded_length(rsize)); + pending.get_size() + 1, + new_encoded_length); assert(state != state_t::SUBMITTING); - assert(can_batch(rsize)); - - auto block_start_offset = encoded_length + rsize.mdlength; - records.push_back(std::move(record)); - record_sizes.push_back(rsize); - auto new_encoded_length = get_encoded_length(rsize); - assert(new_encoded_length < MAX_SEG_OFF); - encoded_length = new_encoded_length; + assert(can_batch(record, block_size) == new_encoded_length); + + auto block_start_offset = record.size.get_mdlength(block_size); + if (state != state_t::EMPTY) { + block_start_offset += pending.size.get_encoded_length(); + } + pending.push_back( + std::move(record), block_size); + assert(pending.size.get_encoded_length() == new_encoded_length); if (state == state_t::EMPTY) { assert(!io_promise.has_value()); io_promise = seastar::shared_promise(); @@ -477,33 +463,24 @@ Journal::RecordBatch::add_pending( }); } -ceph::bufferlist Journal::RecordBatch::encode_records( - size_t block_size, +ceph::bufferlist Journal::RecordBatch::encode_batch( const journal_seq_t& committed_to, segment_nonce_t segment_nonce) { logger().debug( - "Journal::RecordBatch::encode_records: batches={}, committed_to={}", - records.size(), + "Journal::RecordBatch::encode_batch: batches={}, committed_to={}", + pending.get_size(), committed_to); assert(state == state_t::PENDING); - assert(records.size()); - assert(records.size() == record_sizes.size()); + assert(pending.get_size() > 0); assert(io_promise.has_value()); state = state_t::SUBMITTING; - ceph::bufferlist bl; - std::size_t i = 0; - do { - auto record_bl = encode_record( - record_sizes[i], - std::move(records[i]), - block_size, - committed_to, - segment_nonce); - bl.claim_append(record_bl); - } while ((++i) < records.size()); - assert(bl.length() == (std::size_t)encoded_length); + submitting_size = pending.get_size(); + submitting_length = pending.size.get_encoded_length(); + auto bl = encode_records(pending, committed_to, segment_nonce); + // Note: pending is cleared here + assert(bl.length() == (std::size_t)submitting_length); return bl; } @@ -513,47 +490,45 @@ void Journal::RecordBatch::set_result( if (maybe_write_result.has_value()) { logger().debug( "Journal::RecordBatch::set_result: batches={}, write_start {} + {}", - records.size(), + submitting_size, maybe_write_result->start_seq, maybe_write_result->length); - assert(maybe_write_result->length == encoded_length); + assert(maybe_write_result->length == submitting_length); } else { logger().error( "Journal::RecordBatch::set_result: batches={}, write is failed!", - records.size()); + submitting_size); } assert(state == state_t::SUBMITTING); assert(io_promise.has_value()); state = state_t::EMPTY; - encoded_length = 0; - records.clear(); - record_sizes.clear(); + submitting_size = 0; + submitting_length = 0; io_promise->set_value(maybe_write_result); io_promise.reset(); } -ceph::bufferlist Journal::RecordBatch::submit_pending_fast( +std::pair +Journal::RecordBatch::submit_pending_fast( record_t&& record, - const record_size_t& rsize, - size_t block_size, + extent_len_t block_size, const journal_seq_t& committed_to, segment_nonce_t segment_nonce) { + auto encoded_length = get_encoded_length_after(record, block_size); logger().debug( "Journal::RecordBatch::submit_pending_fast: write_size={}", - get_encoded_length(rsize)); + encoded_length); assert(state == state_t::EMPTY); - assert(can_batch(rsize)); - - auto bl = encode_record( - rsize, - std::move(record), - block_size, - committed_to, - segment_nonce); - assert(bl.length() == get_encoded_length(rsize)); - return bl; + assert(can_batch(record, block_size) == encoded_length); + + auto group = record_group_t(std::move(record), block_size); + auto size = group.size; + auto bl = encode_records(group, committed_to, segment_nonce); + assert(bl.length() == encoded_length); + assert(bl.length() == size.get_encoded_length()); + return std::make_pair(bl, size); } Journal::RecordSubmitter::RecordSubmitter( @@ -583,20 +558,21 @@ Journal::RecordSubmitter::submit( OrderingHandle& handle) { assert(write_pipeline); - auto rsize = get_encoded_record_length( - record, journal_segment_manager.get_block_size()); - auto total = rsize.mdlength + rsize.dlength; + auto expected_size = record_group_size_t( + record.size, + journal_segment_manager.get_block_size() + ).get_encoded_length(); auto max_record_length = journal_segment_manager.get_max_write_length(); - if (total > max_record_length) { + if (expected_size > max_record_length) { logger().error( "Journal::RecordSubmitter::submit: record size {} exceeds max {}", - total, + expected_size, max_record_length ); return crimson::ct_error::erange::make(); } - return do_submit(std::move(record), rsize, handle); + return do_submit(std::move(record), handle); } void Journal::RecordSubmitter::update_state() @@ -630,8 +606,7 @@ void Journal::RecordSubmitter::flush_current_batch() pop_free_batch(); increment_io(); - ceph::bufferlist to_write = p_batch->encode_records( - journal_segment_manager.get_block_size(), + ceph::bufferlist to_write = p_batch->encode_batch( journal_segment_manager.get_committed_to(), journal_segment_manager.get_nonce()); std::ignore = journal_segment_manager.write(to_write @@ -655,27 +630,25 @@ void Journal::RecordSubmitter::flush_current_batch() Journal::RecordSubmitter::submit_pending_ret Journal::RecordSubmitter::submit_pending( record_t&& record, - const record_size_t& rsize, OrderingHandle& handle, bool flush) { assert(!p_current_batch->is_submitting()); record_batch_stats.increment( p_current_batch->get_num_records() + 1); - auto write_fut = [this, flush, record=std::move(record), &rsize]() mutable { + auto write_fut = [this, flush, record=std::move(record)]() mutable { if (flush && p_current_batch->is_empty()) { // fast path with direct write increment_io(); - ceph::bufferlist to_write = p_current_batch->submit_pending_fast( + auto [to_write, sizes] = p_current_batch->submit_pending_fast( std::move(record), - rsize, journal_segment_manager.get_block_size(), journal_segment_manager.get_committed_to(), journal_segment_manager.get_nonce()); return journal_segment_manager.write(to_write - ).safe_then([rsize](auto write_result) { + ).safe_then([mdlength = sizes.get_mdlength()](auto write_result) { return record_locator_t{ - write_result.start_seq.offset.add_offset(rsize.mdlength), + write_result.start_seq.offset.add_offset(mdlength), write_result }; }).finally([this] { @@ -684,7 +657,7 @@ Journal::RecordSubmitter::submit_pending( } else { // indirect write with or without the existing pending records auto write_fut = p_current_batch->add_pending( - std::move(record), rsize); + std::move(record), journal_segment_manager.get_block_size()); if (flush) { flush_current_batch(); } @@ -707,35 +680,36 @@ Journal::RecordSubmitter::submit_pending( Journal::RecordSubmitter::do_submit_ret Journal::RecordSubmitter::do_submit( record_t&& record, - const record_size_t& rsize, OrderingHandle& handle) { assert(!p_current_batch->is_submitting()); if (state <= state_t::PENDING) { // can increment io depth assert(!wait_submit_promise.has_value()); - auto batched_size = p_current_batch->can_batch(rsize); + auto batched_size = p_current_batch->can_batch( + record, journal_segment_manager.get_block_size()); if (batched_size == 0 || batched_size > journal_segment_manager.get_max_write_length()) { assert(p_current_batch->is_pending()); flush_current_batch(); - return do_submit(std::move(record), rsize, handle); + return do_submit(std::move(record), handle); } else if (journal_segment_manager.needs_roll(batched_size)) { if (p_current_batch->is_pending()) { flush_current_batch(); } return journal_segment_manager.roll( - ).safe_then([this, record=std::move(record), rsize, &handle]() mutable { - return do_submit(std::move(record), rsize, handle); + ).safe_then([this, record=std::move(record), &handle]() mutable { + return do_submit(std::move(record), handle); }); } else { - return submit_pending(std::move(record), rsize, handle, true); + return submit_pending(std::move(record), handle, true); } } assert(state == state_t::FULL); // cannot increment io depth - auto batched_size = p_current_batch->can_batch(rsize); + auto batched_size = p_current_batch->can_batch( + record, journal_segment_manager.get_block_size()); if (batched_size == 0 || batched_size > journal_segment_manager.get_max_write_length() || journal_segment_manager.needs_roll(batched_size)) { @@ -743,11 +717,11 @@ Journal::RecordSubmitter::do_submit( wait_submit_promise = seastar::promise<>(); } return wait_submit_promise->get_future( - ).then([this, record=std::move(record), rsize, &handle]() mutable { - return do_submit(std::move(record), rsize, handle); + ).then([this, record=std::move(record), &handle]() mutable { + return do_submit(std::move(record), handle); }); } else { - return submit_pending(std::move(record), rsize, handle, false); + return submit_pending(std::move(record), handle, false); } } diff --git a/src/crimson/os/seastore/journal.h b/src/crimson/os/seastore/journal.h index 5aed2ed57e6..ba4c4498efe 100644 --- a/src/crimson/os/seastore/journal.h +++ b/src/crimson/os/seastore/journal.h @@ -250,19 +250,22 @@ private: } std::size_t get_num_records() const { - return records.size(); + return pending.get_size(); } // return the expected write size if allows to batch, // otherwise, return 0 - std::size_t can_batch(const record_size_t& rsize) const { + std::size_t can_batch( + const record_t& record, + extent_len_t block_size) const { assert(state != state_t::SUBMITTING); - if (records.size() >= batch_capacity || - static_cast(encoded_length) > batch_flush_size) { + if (pending.get_size() >= batch_capacity || + (pending.get_size() > 0 && + pending.size.get_encoded_length() > batch_flush_size)) { assert(state == state_t::PENDING); return 0; } - return get_encoded_length(rsize); + return get_encoded_length_after(record, block_size); } void initialize(std::size_t i, @@ -272,8 +275,7 @@ private: index = i; batch_capacity = _batch_capacity; batch_flush_size = _batch_flush_size; - records.reserve(batch_capacity); - record_sizes.reserve(batch_capacity); + pending.reserve(batch_capacity); } // Add to the batch, the future will be resolved after the batch is @@ -283,11 +285,12 @@ private: // in the batch. using add_pending_ertr = JournalSegmentManager::write_ertr; using add_pending_ret = add_pending_ertr::future; - add_pending_ret add_pending(record_t&&, const record_size_t&); + add_pending_ret add_pending( + record_t&&, + extent_len_t block_size); // Encode the batched records for write. - ceph::bufferlist encode_records( - size_t block_size, + ceph::bufferlist encode_batch( const journal_seq_t& committed_to, segment_nonce_t segment_nonce); @@ -298,31 +301,33 @@ private: // The fast path that is equivalent to submit a single record as a batch. // // Essentially, equivalent to the combined logic of: - // add_pending(), encode_records() and set_result() above without + // add_pending(), encode_batch() and set_result() above without // the intervention of the shared io_promise. // // Note the current RecordBatch can be reused afterwards. - ceph::bufferlist submit_pending_fast( + std::pair submit_pending_fast( record_t&&, - const record_size_t&, - size_t block_size, + extent_len_t block_size, const journal_seq_t& committed_to, segment_nonce_t segment_nonce); private: - std::size_t get_encoded_length(const record_size_t& rsize) const { - auto ret = encoded_length + rsize.mdlength + rsize.dlength; - assert(ret > 0); - return ret; + std::size_t get_encoded_length_after( + const record_t& record, + extent_len_t block_size) const { + return pending.size.get_encoded_length_after( + record.size, block_size); } state_t state = state_t::EMPTY; std::size_t index = 0; std::size_t batch_capacity = 0; std::size_t batch_flush_size = 0; - segment_off_t encoded_length = 0; - std::vector records; - std::vector record_sizes; + + record_group_t pending; + std::size_t submitting_size = 0; + segment_off_t submitting_length = 0; + std::optional > io_promise; }; @@ -416,11 +421,11 @@ private: using submit_pending_ret = submit_pending_ertr::future< record_locator_t>; submit_pending_ret submit_pending( - record_t&&, const record_size_t&, OrderingHandle &handle, bool flush); + record_t&&, OrderingHandle &handle, bool flush); using do_submit_ret = submit_pending_ret; do_submit_ret do_submit( - record_t&&, const record_size_t&, OrderingHandle&); + record_t&&, OrderingHandle&); state_t state = state_t::IDLE; std::size_t num_outstanding_io = 0; @@ -456,11 +461,6 @@ private: prep_replay_segments_fut prep_replay_segments( std::vector> segments); - /// attempts to decode deltas from bl, return nullopt if unsuccessful - std::optional> try_decode_deltas( - record_header_t header, - const bufferlist &bl); - /// replays records starting at start through end of segment replay_ertr::future<> replay_segment( diff --git a/src/crimson/os/seastore/seastore_types.cc b/src/crimson/os/seastore/seastore_types.cc index 6f77b3f1f8e..839b2cc1c3c 100644 --- a/src/crimson/os/seastore/seastore_types.cc +++ b/src/crimson/os/seastore/seastore_types.cc @@ -2,6 +2,15 @@ // vim: ts=8 sw=2 smarttab #include "crimson/os/seastore/seastore_types.h" +#include "crimson/common/log.h" + +namespace { + +seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_seastore); +} + +} namespace crimson::os::seastore { @@ -125,38 +134,42 @@ std::ostream &operator<<(std::ostream &lhs, const delta_info_t &rhs) << ")"; } -extent_len_t get_encoded_record_raw_mdlength( - const record_t &record, - size_t block_size) { - extent_len_t metadata = - (extent_len_t)ceph::encoded_sizeof_bounded(); - metadata += sizeof(checksum_t) /* crc */; - metadata += record.extents.size() * - ceph::encoded_sizeof_bounded(); - for (const auto &i: record.deltas) { - metadata += ceph::encoded_sizeof(i); - } - return metadata; +void record_size_t::account_extent(extent_len_t extent_len) +{ + assert(extent_len); + plain_mdlength += ceph::encoded_sizeof_bounded(); + dlength += extent_len; } -record_size_t get_encoded_record_length( - const record_t &record, - size_t block_size) { - extent_len_t raw_mdlength = - get_encoded_record_raw_mdlength(record, block_size); - extent_len_t mdlength = - p2roundup(raw_mdlength, (extent_len_t)block_size); - extent_len_t dlength = 0; - for (const auto &i: record.extents) { - dlength += i.bl.length(); - } - return record_size_t{raw_mdlength, mdlength, dlength}; +void record_size_t::account(const delta_info_t& delta) +{ + assert(delta.bl.length()); + plain_mdlength += ceph::encoded_sizeof(delta); +} + +extent_len_t record_size_t::get_raw_mdlength() const +{ + return plain_mdlength + + sizeof(checksum_t) + + ceph::encoded_sizeof_bounded(); +} + +void record_group_size_t::account( + const record_size_t& rsize, + extent_len_t _block_size) +{ + assert(_block_size > 0); + assert(rsize.dlength % _block_size == 0); + assert(block_size == 0 || block_size == _block_size); + raw_mdlength += rsize.get_raw_mdlength(); + mdlength += rsize.get_mdlength(_block_size); + dlength += rsize.dlength; + block_size = _block_size; } ceph::bufferlist encode_record( - record_size_t rsize, - record_t &&record, - size_t block_size, + record_t&& record, + extent_len_t block_size, const journal_seq_t& committed_to, segment_nonce_t current_segment_nonce) { @@ -167,17 +180,17 @@ ceph::bufferlist encode_record( bufferlist bl; record_header_t header{ - rsize.mdlength, - rsize.dlength, - (uint32_t)record.deltas.size(), - (uint32_t)record.extents.size(), + record.size.get_mdlength(block_size), + record.size.dlength, + (extent_len_t)record.deltas.size(), + (extent_len_t)record.extents.size(), current_segment_nonce, committed_to, data_bl.crc32c(-1) }; encode(header, bl); - auto metadata_crc_filler = bl.append_hole(sizeof(uint32_t)); + auto metadata_crc_filler = bl.append_hole(sizeof(checksum_t)); for (const auto &i: record.extents) { encode(extent_info_t(i), bl); @@ -185,19 +198,19 @@ ceph::bufferlist encode_record( for (const auto &i: record.deltas) { encode(i, bl); } - ceph_assert(bl.length() == rsize.raw_mdlength); + ceph_assert(bl.length() == record.size.get_raw_mdlength()); - if (bl.length() % block_size != 0) { - bl.append_zero( - block_size - (bl.length() % block_size)); + auto aligned_mdlength = record.size.get_mdlength(block_size); + if (bl.length() != aligned_mdlength) { + assert(bl.length() < aligned_mdlength); + bl.append_zero(aligned_mdlength - bl.length()); } - ceph_assert(bl.length() == rsize.mdlength); auto bliter = bl.cbegin(); auto metadata_crc = bliter.crc32c( ceph::encoded_sizeof_bounded(), -1); - bliter += sizeof(checksum_t); /* crc hole again */ + bliter += sizeof(checksum_t); /* metadata crc hole */ metadata_crc = bliter.crc32c( bliter.get_remaining(), metadata_crc); @@ -208,11 +221,143 @@ ceph::bufferlist encode_record( reinterpret_cast(&metadata_crc_le)); bl.claim_append(data_bl); - ceph_assert(bl.length() == (rsize.dlength + rsize.mdlength)); + ceph_assert(bl.length() == (record.size.get_encoded_length(block_size))); + + return bl; +} + +ceph::bufferlist encode_records( + record_group_t& record_group, + const journal_seq_t& committed_to, + segment_nonce_t current_segment_nonce) +{ + assert(record_group.size.block_size > 0); + assert(record_group.records.size() > 0); + + bufferlist bl; + for (auto& r: record_group.records) { + bl.claim_append( + encode_record( + std::move(r), + record_group.size.block_size, + committed_to, + current_segment_nonce)); + } + ceph_assert(bl.length() == record_group.size.get_encoded_length()); + record_group.clear(); return bl; } +std::optional +try_decode_record_header( + const ceph::bufferlist& header_bl, + segment_nonce_t expected_nonce) +{ + auto bp = header_bl.cbegin(); + record_header_t header; + try { + decode(header, bp); + } catch (ceph::buffer::error &e) { + logger().debug( + "try_decode_record_header: failed, " + "cannot decode record_header_t, got {}.", + e); + return std::nullopt; + } + if (header.segment_nonce != expected_nonce) { + logger().debug( + "try_decode_record_header: failed, record_header nonce mismatch, " + "read {}, expected {}!", + header.segment_nonce, + expected_nonce); + return std::nullopt; + } + return header; +} + +bool validate_record_metadata( + const ceph::bufferlist& md_bl) +{ + auto bliter = md_bl.cbegin(); + auto test_crc = bliter.crc32c( + ceph::encoded_sizeof_bounded(), + -1); + ceph_le32 recorded_crc_le; + decode(recorded_crc_le, bliter); + uint32_t recorded_crc = recorded_crc_le; + test_crc = bliter.crc32c( + bliter.get_remaining(), + test_crc); + bool success = (test_crc == recorded_crc); + if (!success) { + logger().debug("validate_record_metadata: failed, metadata crc mismatch."); + } + return success; +} + +bool validate_record_data( + const record_header_t& header, + const ceph::bufferlist& data_bl) +{ + bool success = (data_bl.crc32c(-1) == header.data_crc); + if (!success) { + logger().debug("validate_record_data: failed, data crc mismatch!"); + } + return success; +} + +std::optional +try_decode_extent_info( + const record_header_t& header, + const ceph::bufferlist& md_bl) +{ + auto bliter = md_bl.cbegin(); + bliter += ceph::encoded_sizeof_bounded(); + bliter += sizeof(checksum_t) /* metadata crc hole */; + + record_extent_infos_t record_extent_info; + record_extent_info.extent_infos.resize(header.extents); + for (auto& i: record_extent_info.extent_infos) { + try { + decode(i, bliter); + } catch (ceph::buffer::error &e) { + logger().debug( + "try_decode_extent_infos: failed, " + "cannot decode extent_info_t, got {}.", + e); + return std::nullopt; + } + } + return record_extent_info; +} + +std::optional +try_decode_deltas( + const record_header_t& header, + const ceph::bufferlist& md_bl) +{ + auto bliter = md_bl.cbegin(); + bliter += ceph::encoded_sizeof_bounded(); + bliter += sizeof(checksum_t) /* metadata crc hole */; + bliter += header.extents * ceph::encoded_sizeof_bounded(); + + record_deltas_t record_delta; + record_delta.deltas.resize(header.deltas); + for (auto& i: record_delta.deltas) { + try { + decode(i, bliter); + } catch (ceph::buffer::error &e) { + logger().debug( + "try_decode_deltas: failed, " + "cannot decode delta_info_t, got {}.", + e); + return std::nullopt; + } + } + return record_delta; +} + bool can_delay_allocation(device_type_t type) { // Some types of device may not support delayed allocation, for example PMEM. return type <= RANDOM_BLOCK; diff --git a/src/crimson/os/seastore/seastore_types.h b/src/crimson/os/seastore/seastore_types.h index 2c115df23c9..bd05d227159 100644 --- a/src/crimson/os/seastore/seastore_types.h +++ b/src/crimson/os/seastore/seastore_types.h @@ -5,7 +5,9 @@ #include #include +#include #include +#include #include "include/byteorder.h" #include "include/denc.h" @@ -822,27 +824,6 @@ struct delta_info_t { std::ostream &operator<<(std::ostream &lhs, const delta_info_t &rhs); -struct record_t { - std::vector extents; - std::vector deltas; - - std::size_t get_raw_data_size() const { - auto extent_size = std::accumulate( - extents.begin(), extents.end(), 0, - [](uint64_t sum, auto& extent) { - return sum + extent.bl.length(); - } - ); - auto delta_size = std::accumulate( - deltas.begin(), deltas.end(), 0, - [](uint64_t sum, auto& delta) { - return sum + delta.bl.length(); - } - ); - return extent_size + delta_size; - } -}; - class object_data_t { laddr_t reserved_data_base = L_ADDR_NULL; extent_len_t reserved_data_len = 0; @@ -1179,6 +1160,7 @@ struct extent_info_t { DENC_FINISH(p); } }; +std::ostream &operator<<(std::ostream &out, const extent_info_t &header); using segment_nonce_t = uint32_t; @@ -1210,6 +1192,71 @@ struct segment_header_t { }; std::ostream &operator<<(std::ostream &out, const segment_header_t &header); +struct record_size_t { + extent_len_t plain_mdlength = 0; // mdlength without the record header + extent_len_t dlength = 0; + + void account_extent(extent_len_t extent_len); + + void account(const extent_t& extent) { + account_extent(extent.bl.length()); + } + + void account(const delta_info_t& delta); + + // TODO: remove + extent_len_t get_raw_mdlength() const; + + extent_len_t get_mdlength(extent_len_t block_size) const { + assert(block_size > 0); + return p2roundup(get_raw_mdlength(), block_size); + } + + extent_len_t get_encoded_length(extent_len_t block_size) const { + assert(block_size > 0); + assert(dlength % block_size == 0); + return get_mdlength(block_size) + dlength; + } +}; + +struct record_t { + std::vector extents; + std::vector deltas; + record_size_t size; + + record_t() = default; + record_t(std::vector&& _extents, + std::vector&& _deltas) { + for (auto& e: _extents) { + push_back(std::move(e)); + } + for (auto& d: _deltas) { + push_back(std::move(d)); + } + } + + // the size of extents and delta buffers + std::size_t get_raw_data_size() const { + auto delta_size = std::accumulate( + deltas.begin(), deltas.end(), 0, + [](uint64_t sum, auto& delta) { + return sum + delta.bl.length(); + } + ); + return size.dlength + delta_size; + } + + void push_back(extent_t&& extent) { + size.account(extent); + extents.push_back(std::move(extent)); + } + + void push_back(delta_info_t&& delta) { + size.account(delta); + deltas.push_back(std::move(delta)); + } +}; + struct record_header_t { // Fixed portion extent_len_t mdlength; // block aligned, length of metadata @@ -1235,32 +1282,119 @@ struct record_header_t { } }; -std::ostream &operator<<(std::ostream &out, const extent_info_t &header); - -struct record_size_t { +struct record_group_size_t { extent_len_t raw_mdlength = 0; extent_len_t mdlength = 0; extent_len_t dlength = 0; + extent_len_t block_size = 0; + + record_group_size_t() = default; + record_group_size_t( + const record_size_t& rsize, + extent_len_t block_size) { + account(rsize, block_size); + } + + extent_len_t get_raw_mdlength() const { + assert(block_size > 0); + return raw_mdlength; + } + + extent_len_t get_mdlength() const { + assert(block_size > 0); + assert(mdlength % block_size == 0); + return mdlength; + } + + extent_len_t get_encoded_length() const { + assert(block_size > 0); + assert(dlength % block_size == 0); + return get_mdlength() + dlength; + } + + extent_len_t get_encoded_length_after( + const record_size_t& rsize, + extent_len_t block_size) const { + record_group_size_t tmp = *this; + tmp.account(rsize, block_size); + return tmp.get_encoded_length(); + } + + void account(const record_size_t& rsize, + extent_len_t block_size); }; -extent_len_t get_encoded_record_raw_mdlength( - const record_t &record, - size_t block_size); +struct record_group_t { + std::vector records; + record_group_size_t size; -/** - * Return pair denoting length of - * metadata and blocks respectively. - */ -record_size_t get_encoded_record_length( - const record_t &record, - size_t block_size); + record_group_t() = default; + record_group_t( + record_t&& record, + extent_len_t block_size) { + push_back(std::move(record), block_size); + } + + std::size_t get_size() const { + return records.size(); + } + + void push_back( + record_t&& record, + extent_len_t block_size) { + size.account(record.size, block_size); + records.push_back(std::move(record)); + assert(size.get_encoded_length() < MAX_SEG_OFF); + } + + void reserve(std::size_t limit) { + records.reserve(limit); + } + + void clear() { + records.clear(); + size = {}; + } +}; ceph::bufferlist encode_record( - record_size_t rsize, - record_t &&record, - size_t block_size, + record_t&& record, + extent_len_t block_size, + const journal_seq_t& committed_to, + segment_nonce_t current_segment_nonce); + +ceph::bufferlist encode_records( + record_group_t& record_group, const journal_seq_t& committed_to, - segment_nonce_t current_segment_nonce = 0); + segment_nonce_t current_segment_nonce); + +std::optional +try_decode_record_header( + const ceph::bufferlist& header_bl, + segment_nonce_t expected_nonce); + +bool validate_record_metadata( + const ceph::bufferlist& md_bl); + +bool validate_record_data( + const record_header_t& header, + const ceph::bufferlist& data_bl); + +struct record_extent_infos_t { + std::vector extent_infos; +}; +std::optional +try_decode_extent_info( + const record_header_t& header, + const ceph::bufferlist& md_bl); + +struct record_deltas_t { + std::vector deltas; +}; +std::optional +try_decode_deltas( + const record_header_t& header, + const ceph::bufferlist& md_bl); struct write_result_t { journal_seq_t start_seq; -- 2.39.5