From: myoungwon oh Date: Thu, 20 Jul 2023 02:50:48 +0000 (+0000) Subject: crimson/os/seastore: introduce RecordScanner to generalize scan_valid_records() X-Git-Tag: v18.2.1~160^2~6 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=597312dfdd67efdb1e343c3d6ae7cddd0d3b7df5;p=ceph.git crimson/os/seastore: introduce RecordScanner to generalize scan_valid_records() Signed-off-by: Myoungwon Oh (cherry picked from commit 22c747826b398cc0a0d68aab9bba1fd096a2de9d) --- diff --git a/src/crimson/os/seastore/CMakeLists.txt b/src/crimson/os/seastore/CMakeLists.txt index 5b1c6187ca2a2..5994a17783135 100644 --- a/src/crimson/os/seastore/CMakeLists.txt +++ b/src/crimson/os/seastore/CMakeLists.txt @@ -51,6 +51,7 @@ set(crimson_seastore_srcs journal.cc device.cc segment_manager_group.cc + record_scanner.cc journal/circular_bounded_journal.cc ../../../test/crimson/seastore/test_block.cc ${PROJECT_SOURCE_DIR}/src/os/Transaction.cc diff --git a/src/crimson/os/seastore/record_scanner.cc b/src/crimson/os/seastore/record_scanner.cc new file mode 100644 index 0000000000000..f3ed54d01644e --- /dev/null +++ b/src/crimson/os/seastore/record_scanner.cc @@ -0,0 +1,142 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab expandtab + +#include "crimson/os/seastore/record_scanner.h" + +#include "crimson/os/seastore/logging.h" + +SET_SUBSYS(seastore_journal); + +namespace crimson::os::seastore { + +RecordScanner::scan_valid_records_ret +RecordScanner::scan_valid_records( + scan_valid_records_cursor &cursor, + segment_nonce_t nonce, + size_t budget, + found_record_handler_t &handler) +{ + LOG_PREFIX(RecordScanner::scan_valid_records); + initialize_cursor(cursor); + DEBUG("starting at {}, budget={}", cursor, budget); + auto retref = std::make_unique(0); + auto &budget_used = *retref; + return crimson::repeat( + [=, &cursor, &budget_used, &handler, this]() mutable + -> scan_valid_records_ertr::future { + return [=, &handler, &cursor, &budget_used, this] { + if (!cursor.last_valid_header_found) { + return read_validate_record_metadata(cursor.seq.offset, nonce + ).safe_then([=, &cursor](auto md) { + if (!md) { + cursor.last_valid_header_found = true; + if (cursor.is_complete()) { + INFO("complete at {}, invalid record group metadata", + cursor); + } else { + DEBUG("found invalid record group metadata at {}, " + "processing {} pending record groups", + cursor.seq, + cursor.pending_record_groups.size()); + } + return scan_valid_records_ertr::now(); + } else { + auto& [header, md_bl] = *md; + DEBUG("found valid {} at {}", header, cursor.seq); + cursor.emplace_record_group(header, std::move(md_bl)); + return scan_valid_records_ertr::now(); + } + }).safe_then([=, &cursor, &budget_used, &handler, this] { + DEBUG("processing committed record groups until {}, {} pending", + cursor.last_committed, + cursor.pending_record_groups.size()); + return crimson::repeat( + [=, &budget_used, &cursor, &handler, this] { + if (cursor.pending_record_groups.empty()) { + /* This is only possible if the segment is empty. + * A record's last_commited must be prior to its own + * location since it itself cannot yet have been committed + * at its own time of submission. Thus, the most recently + * read record must always fall after cursor.last_committed */ + return scan_valid_records_ertr::make_ready_future< + seastar::stop_iteration>(seastar::stop_iteration::yes); + } + auto &next = cursor.pending_record_groups.front(); + journal_seq_t next_seq = {cursor.seq.segment_seq, next.offset}; + if (cursor.last_committed == JOURNAL_SEQ_NULL || + next_seq > cursor.last_committed) { + return scan_valid_records_ertr::make_ready_future< + seastar::stop_iteration>(seastar::stop_iteration::yes); + } + return consume_next_records(cursor, handler, budget_used + ).safe_then([] { + return scan_valid_records_ertr::make_ready_future< + seastar::stop_iteration>(seastar::stop_iteration::no); + }); + }); + }); + } else { + assert(!cursor.pending_record_groups.empty()); + auto &next = cursor.pending_record_groups.front(); + return read_validate_data(next.offset, next.header + ).safe_then([this, FNAME, &budget_used, &cursor, &handler, &next](auto valid) { + if (!valid) { + INFO("complete at {}, invalid record group data at {}, {}", + cursor, next.offset, next.header); + cursor.pending_record_groups.clear(); + return scan_valid_records_ertr::now(); + } + return consume_next_records(cursor, handler, budget_used); + }); + } + }().safe_then([=, &budget_used, &cursor] { + if (cursor.is_complete() || budget_used >= budget) { + DEBUG("finish at {}, budget_used={}, budget={}", + cursor, budget_used, budget); + return seastar::stop_iteration::yes; + } else { + return seastar::stop_iteration::no; + } + }); + }).safe_then([retref=std::move(retref)]() mutable -> scan_valid_records_ret { + return scan_valid_records_ret( + scan_valid_records_ertr::ready_future_marker{}, + std::move(*retref)); + }); +} + +RecordScanner::consume_record_group_ertr::future<> +RecordScanner::consume_next_records( + scan_valid_records_cursor& cursor, + found_record_handler_t& handler, + std::size_t& budget_used) +{ + LOG_PREFIX(RecordScanner::consume_next_records); + auto& next = cursor.pending_record_groups.front(); + auto total_length = next.header.dlength + next.header.mdlength; + budget_used += total_length; + auto locator = record_locator_t{ + next.offset.add_offset(next.header.mdlength), + write_result_t{ + journal_seq_t{ + cursor.seq.segment_seq, + next.offset + }, + total_length + } + }; + DEBUG("processing {} at {}, budget_used={}", + next.header, locator, budget_used); + return handler( + locator, + next.header, + next.mdbuffer + ).safe_then([FNAME, &cursor] { + cursor.pop_record_group(); + if (cursor.is_complete()) { + INFO("complete at {}, no more record group", cursor); + } + }); +} + +} diff --git a/src/crimson/os/seastore/record_scanner.h b/src/crimson/os/seastore/record_scanner.h new file mode 100644 index 0000000000000..c8486f5901395 --- /dev/null +++ b/src/crimson/os/seastore/record_scanner.h @@ -0,0 +1,65 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab expandtab + +#pragma once + +#include "crimson/common/errorator.h" +#include "crimson/os/seastore/seastore_types.h" +#include "crimson/os/seastore/segment_manager.h" + + +namespace crimson::os::seastore { + +class RecordScanner { +public: + using read_ertr = SegmentManager::read_ertr; + using scan_valid_records_ertr = read_ertr; + using scan_valid_records_ret = scan_valid_records_ertr::future< + size_t>; + using found_record_handler_t = std::function< + scan_valid_records_ertr::future<>( + record_locator_t record_locator, + // callee may assume header and bl will remain valid until + // returned future resolves + const record_group_header_t &header, + const bufferlist &mdbuf)>; + scan_valid_records_ret scan_valid_records( + scan_valid_records_cursor &cursor, ///< [in, out] cursor, updated during call + segment_nonce_t nonce, ///< [in] nonce for segment + size_t budget, ///< [in] max budget to use + found_record_handler_t &handler ///< [in] handler for records + ); ///< @return used budget + +protected: + /// read record metadata for record starting at start + using read_validate_record_metadata_ertr = read_ertr; + using read_validate_record_metadata_ret = + read_validate_record_metadata_ertr::future< + std::optional> + >; + virtual read_validate_record_metadata_ret read_validate_record_metadata( + paddr_t start, + segment_nonce_t nonce) = 0; + + /// read and validate data + using read_validate_data_ertr = read_ertr; + using read_validate_data_ret = read_validate_data_ertr::future; + virtual read_validate_data_ret read_validate_data( + paddr_t record_base, + const record_group_header_t &header ///< caller must ensure lifetime through + /// future resolution + ) = 0; + + using consume_record_group_ertr = scan_valid_records_ertr; + consume_record_group_ertr::future<> consume_next_records( + scan_valid_records_cursor& cursor, + found_record_handler_t& handler, + std::size_t& budget_used); + + virtual void initialize_cursor(scan_valid_records_cursor &cursor) = 0; + + virtual ~RecordScanner() {} + +}; + +} diff --git a/src/crimson/os/seastore/segment_manager_group.cc b/src/crimson/os/seastore/segment_manager_group.cc index e78e299e71bc4..6fe56501c8a0a 100644 --- a/src/crimson/os/seastore/segment_manager_group.cc +++ b/src/crimson/os/seastore/segment_manager_group.cc @@ -91,14 +91,10 @@ SegmentManagerGroup::read_segment_header(segment_id_t segment) }); } -SegmentManagerGroup::scan_valid_records_ret -SegmentManagerGroup::scan_valid_records( - scan_valid_records_cursor &cursor, - segment_nonce_t nonce, - size_t budget, - found_record_handler_t &handler) +void SegmentManagerGroup::initialize_cursor( + scan_valid_records_cursor &cursor) { - LOG_PREFIX(SegmentManagerGroup::scan_valid_records); + LOG_PREFIX(SegmentManagerGroup::initialize_cursor); assert(has_device(cursor.get_segment_id().device_id())); auto& segment_manager = *segment_managers[cursor.get_segment_id().device_id()]; @@ -106,91 +102,6 @@ SegmentManagerGroup::scan_valid_records( INFO("start to scan segment {}", cursor.get_segment_id()); cursor.increment_seq(segment_manager.get_block_size()); } - DEBUG("starting at {}, budget={}", cursor, budget); - auto retref = std::make_unique(0); - auto &budget_used = *retref; - return crimson::repeat( - [=, &cursor, &budget_used, &handler, this]() mutable - -> scan_valid_records_ertr::future { - return [=, &handler, &cursor, &budget_used, this] { - if (!cursor.last_valid_header_found) { - return read_validate_record_metadata(cursor.seq.offset, nonce - ).safe_then([=, &cursor](auto md) { - if (!md) { - cursor.last_valid_header_found = true; - if (cursor.is_complete()) { - INFO("complete at {}, invalid record group metadata", - cursor); - } else { - DEBUG("found invalid record group metadata at {}, " - "processing {} pending record groups", - cursor.seq, - cursor.pending_record_groups.size()); - } - return scan_valid_records_ertr::now(); - } else { - auto& [header, md_bl] = *md; - DEBUG("found valid {} at {}", header, cursor.seq); - cursor.emplace_record_group(header, std::move(md_bl)); - return scan_valid_records_ertr::now(); - } - }).safe_then([=, &cursor, &budget_used, &handler, this] { - DEBUG("processing committed record groups until {}, {} pending", - cursor.last_committed, - cursor.pending_record_groups.size()); - return crimson::repeat( - [=, &budget_used, &cursor, &handler, this] { - if (cursor.pending_record_groups.empty()) { - /* This is only possible if the segment is empty. - * A record's last_commited must be prior to its own - * location since it itself cannot yet have been committed - * at its own time of submission. Thus, the most recently - * read record must always fall after cursor.last_committed */ - return scan_valid_records_ertr::make_ready_future< - seastar::stop_iteration>(seastar::stop_iteration::yes); - } - auto &next = cursor.pending_record_groups.front(); - journal_seq_t next_seq = {cursor.seq.segment_seq, next.offset}; - if (cursor.last_committed == JOURNAL_SEQ_NULL || - next_seq > cursor.last_committed) { - return scan_valid_records_ertr::make_ready_future< - seastar::stop_iteration>(seastar::stop_iteration::yes); - } - return consume_next_records(cursor, handler, budget_used - ).safe_then([] { - return scan_valid_records_ertr::make_ready_future< - seastar::stop_iteration>(seastar::stop_iteration::no); - }); - }); - }); - } else { - assert(!cursor.pending_record_groups.empty()); - auto &next = cursor.pending_record_groups.front(); - return read_validate_data(next.offset, next.header - ).safe_then([this, FNAME, &budget_used, &cursor, &handler, &next](auto valid) { - if (!valid) { - INFO("complete at {}, invalid record group data at {}, {}", - cursor, next.offset, next.header); - cursor.pending_record_groups.clear(); - return scan_valid_records_ertr::now(); - } - return consume_next_records(cursor, handler, budget_used); - }); - } - }().safe_then([=, &budget_used, &cursor] { - if (cursor.is_complete() || budget_used >= budget) { - DEBUG("finish at {}, budget_used={}, budget={}", - cursor, budget_used, budget); - return seastar::stop_iteration::yes; - } else { - return seastar::stop_iteration::no; - } - }); - }).safe_then([retref=std::move(retref)]() mutable -> scan_valid_records_ret { - return scan_valid_records_ret( - scan_valid_records_ertr::ready_future_marker{}, - std::move(*retref)); - }); } SegmentManagerGroup::read_validate_record_metadata_ret @@ -289,40 +200,6 @@ SegmentManagerGroup::read_validate_data( }); } -SegmentManagerGroup::consume_record_group_ertr::future<> -SegmentManagerGroup::consume_next_records( - scan_valid_records_cursor& cursor, - found_record_handler_t& handler, - std::size_t& budget_used) -{ - LOG_PREFIX(SegmentManagerGroup::consume_next_records); - auto& next = cursor.pending_record_groups.front(); - auto total_length = next.header.dlength + next.header.mdlength; - budget_used += total_length; - auto locator = record_locator_t{ - next.offset.add_offset(next.header.mdlength), - write_result_t{ - journal_seq_t{ - cursor.seq.segment_seq, - next.offset - }, - total_length - } - }; - DEBUG("processing {} at {}, budget_used={}", - next.header, locator, budget_used); - return handler( - locator, - next.header, - next.mdbuffer - ).safe_then([FNAME, &cursor] { - cursor.pop_record_group(); - if (cursor.is_complete()) { - INFO("complete at {}, no more record group", cursor); - } - }); -} - SegmentManagerGroup::find_journal_segment_headers_ret SegmentManagerGroup::find_journal_segment_headers() { diff --git a/src/crimson/os/seastore/segment_manager_group.h b/src/crimson/os/seastore/segment_manager_group.h index bd5af9601a567..826ab61a7e7a5 100644 --- a/src/crimson/os/seastore/segment_manager_group.h +++ b/src/crimson/os/seastore/segment_manager_group.h @@ -8,10 +8,11 @@ #include "crimson/common/errorator.h" #include "crimson/os/seastore/seastore_types.h" #include "crimson/os/seastore/segment_manager.h" +#include "crimson/os/seastore/record_scanner.h" namespace crimson::os::seastore { -class SegmentManagerGroup { +class SegmentManagerGroup : public RecordScanner { public: SegmentManagerGroup() { segment_managers.resize(DEVICE_ID_MAX, nullptr); @@ -96,24 +97,6 @@ public: segment_tail_t>; read_segment_tail_ret read_segment_tail(segment_id_t segment); - using read_ertr = SegmentManager::read_ertr; - using scan_valid_records_ertr = read_ertr; - using scan_valid_records_ret = scan_valid_records_ertr::future< - size_t>; - using found_record_handler_t = std::function< - scan_valid_records_ertr::future<>( - record_locator_t record_locator, - // callee may assume header and bl will remain valid until - // returned future resolves - const record_group_header_t &header, - const bufferlist &mdbuf)>; - scan_valid_records_ret scan_valid_records( - scan_valid_records_cursor &cursor, ///< [in, out] cursor, updated during call - segment_nonce_t nonce, ///< [in] nonce for segment - size_t budget, ///< [in] max budget to use - found_record_handler_t &handler ///< [in] handler for records - ); ///< @return used budget - /* * read journal segment headers */ @@ -143,30 +126,17 @@ private: return device_ids.count(id) >= 1; } - /// read record metadata for record starting at start - using read_validate_record_metadata_ertr = read_ertr; - using read_validate_record_metadata_ret = - read_validate_record_metadata_ertr::future< - std::optional> - >; + void initialize_cursor(scan_valid_records_cursor &cursor) final; + read_validate_record_metadata_ret read_validate_record_metadata( paddr_t start, - segment_nonce_t nonce); + segment_nonce_t nonce) final; - /// read and validate data - using read_validate_data_ertr = read_ertr; - using read_validate_data_ret = read_validate_data_ertr::future; read_validate_data_ret read_validate_data( paddr_t record_base, const record_group_header_t &header ///< caller must ensure lifetime through /// future resolution - ); - - using consume_record_group_ertr = scan_valid_records_ertr; - consume_record_group_ertr::future<> consume_next_records( - scan_valid_records_cursor& cursor, - found_record_handler_t& handler, - std::size_t& budget_used); + ) final; std::vector segment_managers; std::set device_ids;