From 22411110876f7bba83040732e75a69015bfff8ae Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Tue, 16 Nov 2021 14:23:15 +0800 Subject: [PATCH] crimson/os/seastore: misc cleanup and reformat Signed-off-by: Yingxin Cheng --- src/crimson/os/seastore/extent_reader.cc | 223 ++++++++++++----------- src/crimson/os/seastore/extent_reader.h | 7 + src/crimson/os/seastore/journal.cc | 114 ++++++------ 3 files changed, 182 insertions(+), 162 deletions(-) diff --git a/src/crimson/os/seastore/extent_reader.cc b/src/crimson/os/seastore/extent_reader.cc index 8b407eb2c69..bd95eb9dae7 100644 --- a/src/crimson/os/seastore/extent_reader.cc +++ b/src/crimson/os/seastore/extent_reader.cc @@ -72,39 +72,40 @@ ExtentReader::scan_extents_ret ExtentReader::scan_extents( ).safe_then([bytes_to_read, extents, &cursor, this](auto segment_header) { auto segment_nonce = segment_header.segment_nonce; return seastar::do_with( - found_record_handler_t( - [extents, this]( - paddr_t base, - const record_header_t &header, - const bufferlist &mdbuf) mutable { + found_record_handler_t([extents, this]( + paddr_t base, + const record_header_t& header, + const bufferlist& mdbuf) mutable -> scan_valid_records_ertr::future<> + { + auto maybe_record_extent_infos = try_decode_extent_infos(header, mdbuf); + if (!maybe_record_extent_infos) { + // This should be impossible, we did check the crc on the mdbuf + logger().error( + "ExtentReader::scan_extents: unable to decode extents for record {}", + base); + return crimson::ct_error::input_output_error::make(); + } - auto infos = try_decode_extent_infos( - header, - mdbuf); - if (!infos) { - // This should be impossible, we did check the crc on the mdbuf - logger().error( - "ExtentReader::scan_extents unable to decode extents for record {}", - base); - assert(infos); - } - - paddr_t extent_offset = base.add_offset(header.mdlength); - for (const auto &i : *infos) { - extents->emplace_back(extent_offset, i); - auto& seg_addr = extent_offset.as_seg_paddr(); - seg_addr.set_segment_off( - seg_addr.get_segment_off() + i.len); - } - return scan_extents_ertr::now(); - }), + paddr_t extent_offset = base.add_offset(header.mdlength); + logger().debug("ExtentReader::scan_extents: decoded {} extents", + maybe_record_extent_infos->size()); + for (const auto &i : *maybe_record_extent_infos) { + extents->emplace_back(extent_offset, i); + auto& seg_addr = extent_offset.as_seg_paddr(); + seg_addr.set_segment_off( + seg_addr.get_segment_off() + i.len); + } + return scan_extents_ertr::now(); + }), [=, &cursor](auto &dhandler) { - return scan_valid_records( - cursor, - segment_nonce, - bytes_to_read, - dhandler).discard_result(); - }); + return scan_valid_records( + cursor, + segment_nonce, + bytes_to_read, + dhandler + ).discard_result(); + } + ); }).safe_then([ret=std::move(ret)] { return std::move(*ret); }); @@ -140,7 +141,8 @@ ExtentReader::scan_valid_records_ret ExtentReader::scan_valid_records( cursor.last_valid_header_found = true; return scan_valid_records_ertr::now(); } else { - auto new_committed_to = md->first.committed_to; + auto& [header, md_bl] = *md; + auto new_committed_to = header.committed_to; logger().debug( "ExtentReader::scan_valid_records: valid record read at {}, now committed at {}", cursor.seq, @@ -150,9 +152,9 @@ ExtentReader::scan_valid_records_ret ExtentReader::scan_valid_records( cursor.last_committed = new_committed_to; cursor.pending_records.emplace_back( cursor.seq.offset, - md->first, - md->second); - cursor.increment(md->first.dlength + md->first.mdlength); + header, + std::move(md_bl)); + cursor.increment(header.dlength + header.mdlength); ceph_assert(new_committed_to == journal_seq_t() || new_committed_to < cursor.seq); return scan_valid_records_ertr::now(); @@ -178,14 +180,8 @@ ExtentReader::scan_valid_records_ret ExtentReader::scan_valid_records( return scan_valid_records_ertr::make_ready_future< seastar::stop_iteration>(seastar::stop_iteration::yes); } - budget_used += - next.header.dlength + next.header.mdlength; - return handler( - next.offset, - next.header, - next.mdbuffer - ).safe_then([&cursor] { - cursor.pending_records.pop_front(); + return consume_next_records(cursor, handler, budget_used + ).safe_then([] { return scan_valid_records_ertr::make_ready_future< seastar::stop_iteration>(seastar::stop_iteration::no); }); @@ -195,21 +191,12 @@ ExtentReader::scan_valid_records_ret ExtentReader::scan_valid_records( assert(!cursor.pending_records.empty()); auto &next = cursor.pending_records.front(); return read_validate_data(next.offset, next.header - ).safe_then([=, &budget_used, &next, &cursor, &handler](auto valid) { + ).safe_then([this, &budget_used, &cursor, &handler](auto valid) { if (!valid) { cursor.pending_records.clear(); return scan_valid_records_ertr::now(); } - budget_used += - next.header.dlength + next.header.mdlength; - return handler( - next.offset, - next.header, - next.mdbuffer - ).safe_then([&cursor] { - cursor.pending_records.pop_front(); - return scan_valid_records_ertr::now(); - }); + return consume_next_records(cursor, handler, budget_used); }); } }().safe_then([=, &budget_used, &cursor] { @@ -244,61 +231,63 @@ ExtentReader::read_validate_record_metadata( logger().debug("read_validate_record_metadata: reading header block {}...", start); return segment_manager.read(start, block_size - ).safe_then( - [=, &segment_manager](bufferptr bptr) mutable - -> read_validate_record_metadata_ret { - auto block_size = segment_manager.get_block_size(); - bufferlist bl; - bl.append(bptr); - auto bp = bl.cbegin(); - record_header_t header; - try { - decode(header, bp); - } catch (ceph::buffer::error &e) { - return read_validate_record_metadata_ret( - read_validate_record_metadata_ertr::ready_future_marker{}, - std::nullopt); - } - if (header.segment_nonce != nonce) { - return read_validate_record_metadata_ret( - read_validate_record_metadata_ertr::ready_future_marker{}, - std::nullopt); - } - auto& seg_addr = start.as_seg_paddr(); - if (header.mdlength > (extent_len_t)block_size) { - if (seg_addr.get_segment_off() + header.mdlength > - (int64_t)segment_manager.get_segment_size()) { - return crimson::ct_error::input_output_error::make(); - } - return segment_manager.read( - paddr_t::make_seg_paddr(seg_addr.get_segment_id(), - seg_addr.get_segment_off() + (segment_off_t)block_size), - header.mdlength - block_size).safe_then( - [header=std::move(header), bl=std::move(bl)]( - auto &&bptail) mutable { - bl.push_back(bptail); - return read_validate_record_metadata_ret( - read_validate_record_metadata_ertr::ready_future_marker{}, - std::make_pair(std::move(header), std::move(bl))); - }); - } else { - return read_validate_record_metadata_ret( - read_validate_record_metadata_ertr::ready_future_marker{}, - std::make_pair(std::move(header), std::move(bl)) - ); - } - }).safe_then([=](auto p) { - if (p && validate_metadata(p->second)) { - return read_validate_record_metadata_ret( - read_validate_record_metadata_ertr::ready_future_marker{}, - std::move(*p) - ); - } else { - return read_validate_record_metadata_ret( - read_validate_record_metadata_ertr::ready_future_marker{}, - std::nullopt); - } + ).safe_then([=, &segment_manager](bufferptr bptr) mutable + -> read_validate_record_metadata_ret { + auto block_size = static_cast( + segment_manager.get_block_size()); + bufferlist bl; + bl.append(bptr); + auto bp = bl.cbegin(); + record_header_t header; + try { + decode(header, bp); + } catch (ceph::buffer::error &e) { + return read_validate_record_metadata_ret( + read_validate_record_metadata_ertr::ready_future_marker{}, + std::nullopt); + } + if (header.segment_nonce != nonce) { + return read_validate_record_metadata_ret( + read_validate_record_metadata_ertr::ready_future_marker{}, + std::nullopt); + } + auto& seg_addr = start.as_seg_paddr(); + if (seg_addr.get_segment_off() + header.mdlength > + (int64_t)segment_manager.get_segment_size()) { + logger().error("read_validate_record_metadata: failed, invalid header"); + return crimson::ct_error::input_output_error::make(); + } + if (header.mdlength == block_size) { + return read_validate_record_metadata_ret( + read_validate_record_metadata_ertr::ready_future_marker{}, + std::make_pair(std::move(header), std::move(bl)) + ); + } + return segment_manager.read( + paddr_t::make_seg_paddr( + seg_addr.get_segment_id(), + seg_addr.get_segment_off() + (segment_off_t)block_size + ), + header.mdlength - block_size + ).safe_then([header=std::move(header), bl=std::move(bl) + ](auto&& bptail) mutable { + bl.push_back(bptail); + return read_validate_record_metadata_ret( + read_validate_record_metadata_ertr::ready_future_marker{}, + std::make_pair(std::move(header), std::move(bl))); }); + }).safe_then([this](auto p) { + if (p && validate_metadata(p->second)) { + return read_validate_record_metadata_ret( + read_validate_record_metadata_ertr::ready_future_marker{}, + std::move(*p) + ); + } else { + return read_validate_record_metadata_ret( + read_validate_record_metadata_ertr::ready_future_marker{}, + std::nullopt); + } + }); } std::optional> @@ -355,4 +344,22 @@ bool ExtentReader::validate_metadata(const bufferlist &bl) return test_crc == recorded_crc; } +ExtentReader::consume_record_group_ertr::future<> +ExtentReader::consume_next_records( + scan_valid_records_cursor& cursor, + found_record_handler_t& handler, + std::size_t& budget_used) +{ + auto& next = cursor.pending_records.front(); + auto total_length = next.header.dlength + next.header.mdlength; + budget_used += total_length; + return handler( + next.offset, + next.header, + next.mdbuffer + ).safe_then([&cursor] { + cursor.pending_records.pop_front(); + }); +} + } // namespace crimson::os::seastore diff --git a/src/crimson/os/seastore/extent_reader.h b/src/crimson/os/seastore/extent_reader.h index e0d66495af5..99c5895847d 100644 --- a/src/crimson/os/seastore/extent_reader.h +++ b/src/crimson/os/seastore/extent_reader.h @@ -112,6 +112,13 @@ private: /// future resolution ); + using consume_record_group_ertr = scan_valid_records_ertr; + consume_record_group_ertr::future<> consume_next_records( + scan_valid_records_cursor& cursor, + found_record_handler_t& handler, + std::size_t& budget_used); + + /// validate embedded metadata checksum static bool validate_metadata(const bufferlist &bl); diff --git a/src/crimson/os/seastore/journal.cc b/src/crimson/os/seastore/journal.cc index 90bf37dcd1e..c83cd4a494d 100644 --- a/src/crimson/os/seastore/journal.cc +++ b/src/crimson/os/seastore/journal.cc @@ -121,7 +121,7 @@ Journal::prep_replay_segments( } else { replay_from = paddr_t::make_seg_paddr( from->first, - (segment_off_t)journal_segment_manager.get_block_size()); + journal_segment_manager.get_block_size()); } auto ret = replay_segments_t(segments.end() - from); std::transform( @@ -131,7 +131,8 @@ Journal::prep_replay_segments( p.second.journal_segment_seq, paddr_t::make_seg_paddr( p.first, - (segment_off_t)journal_segment_manager.get_block_size())}; + journal_segment_manager.get_block_size()) + }; logger().debug( "Journal::prep_replay_segments: replaying from {}", ret); @@ -172,57 +173,61 @@ Journal::replay_segment( logger().debug("Journal::replay_segment: starting at {}", seq); return seastar::do_with( scan_valid_records_cursor(seq), - ExtentReader::found_record_handler_t( - [=, &handler](paddr_t base, - const record_header_t &header, - const bufferlist &mdbuf) { - auto deltas = try_decode_deltas( - header, - mdbuf); - if (!deltas) { - // This should be impossible, we did check the crc on the mdbuf - logger().error( - "Journal::replay_segment: unable to decode deltas for record {}", - base); - assert(deltas); - } + ExtentReader::found_record_handler_t([=, &handler]( + paddr_t base, + const record_header_t& header, + const bufferlist& mdbuf) + -> ExtentReader::scan_valid_records_ertr::future<> + { + auto maybe_record_deltas_list = try_decode_deltas( + header, mdbuf); + if (!maybe_record_deltas_list) { + // This should be impossible, we did check the crc on the mdbuf + logger().error( + "Journal::replay_segment: unable to decode deltas for record {}", + base); + return crimson::ct_error::input_output_error::make(); + } - return seastar::do_with( - std::move(*deltas), - [=](auto &deltas) { - return crimson::do_for_each( - deltas, - [=](auto &delta) { - /* The journal may validly contain deltas for extents in - * since released segments. We can detect those cases by - * checking whether the segment in question currently has a - * sequence number > the current journal segment seq. We can - * safetly skip these deltas because the extent must already - * have been rewritten. - * - * Note, this comparison exploits the fact that - * SEGMENT_SEQ_NULL is a large number. - */ - auto& seg_addr = delta.paddr.as_seg_paddr(); - if (delta.paddr != P_ADDR_NULL && - (segment_provider->get_seq(seg_addr.get_segment_id()) > - seq.segment_seq)) { - return replay_ertr::now(); - } else { - auto offsets = submit_result_t{ - base.add_offset(header.mdlength), - write_result_t{ - journal_seq_t{seq.segment_seq, base}, - static_cast(header.mdlength + header.dlength) - } - }; - return handler( - offsets, - delta); - } - }); - }); - }), + return seastar::do_with( + std::move(*maybe_record_deltas_list), + [=](auto &deltas) + { + logger().debug("Journal::replay_segment: decoded {} deltas at base {}", + deltas.size(), + base); + return crimson::do_for_each( + deltas, + [=](auto &delta) + { + /* The journal may validly contain deltas for extents in + * since released segments. We can detect those cases by + * checking whether the segment in question currently has a + * sequence number > the current journal segment seq. We can + * safetly skip these deltas because the extent must already + * have been rewritten. + * + * Note, this comparison exploits the fact that + * SEGMENT_SEQ_NULL is a large number. + */ + auto& seg_addr = delta.paddr.as_seg_paddr(); + if (delta.paddr != P_ADDR_NULL && + (segment_provider->get_seq(seg_addr.get_segment_id()) > + seq.segment_seq)) { + return replay_ertr::now(); + } else { + auto offsets = submit_result_t{ + base.add_offset(header.mdlength), + write_result_t{ + journal_seq_t{seq.segment_seq, base}, + static_cast(header.mdlength + header.dlength) + } + }; + return handler(offsets, delta); + } + }); + }); + }), [=](auto &cursor, auto &dhandler) { return scanner.scan_valid_records( cursor, @@ -234,8 +239,9 @@ Journal::replay_segment( crimson::ct_error::assert_all{ "shouldn't meet with any other error other replay_ertr" } - );; - }); + ); + } + ); } Journal::replay_ret Journal::replay( -- 2.39.5