From: Xuehan Xu Date: Thu, 23 Sep 2021 06:53:39 +0000 (+0800) Subject: crimson/os/seastore: change Scanner to ExtentReader X-Git-Tag: v17.1.0~712^2~10 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=0f3fc5af0968a21d4b234b1d88c9ac16e051de89;p=ceph.git crimson/os/seastore: change Scanner to ExtentReader This commit makes Scanner an extent reader that route read requests to the corresponding backing devices according to the device ids encapsulated in the segment ids. Signed-off-by: Xuehan Xu --- diff --git a/src/crimson/os/seastore/CMakeLists.txt b/src/crimson/os/seastore/CMakeLists.txt index 146880a65998..6a03e8faa873 100644 --- a/src/crimson/os/seastore/CMakeLists.txt +++ b/src/crimson/os/seastore/CMakeLists.txt @@ -7,7 +7,7 @@ add_library(crimson-seastore STATIC transaction.cc journal.cc cache.cc - scanner.cc + extent_reader.cc lba_manager.cc segment_cleaner.cc lba_manager/btree/btree_lba_manager.cc diff --git a/src/crimson/os/seastore/cache.cc b/src/crimson/os/seastore/cache.cc index 4f810003b90e..20a8c2c16822 100644 --- a/src/crimson/os/seastore/cache.cc +++ b/src/crimson/os/seastore/cache.cc @@ -20,8 +20,11 @@ using std::string_view; namespace crimson::os::seastore { -Cache::Cache(SegmentManager &segment_manager) : - segment_manager(segment_manager) +Cache::Cache( + ExtentReader &reader, + segment_off_t block_size) + : reader(reader), + block_size(block_size) { register_metrics(); } @@ -942,7 +945,7 @@ record_t Cache::prepare_record(Transaction &t) efforts.num_ool_records += ool_stats.num_records; efforts.ool_record_overhead_bytes += ool_stats.overhead_bytes; auto record_size = get_encoded_record_length( - record, segment_manager.get_block_size()); + record, block_size); auto inline_overhead = record_size.mdlength + record_size.dlength - record.get_raw_data_size(); efforts.inline_record_overhead_bytes += inline_overhead; diff --git a/src/crimson/os/seastore/cache.h b/src/crimson/os/seastore/cache.h index f1449adb2211..d7d59948628b 100644 --- a/src/crimson/os/seastore/cache.h +++ b/src/crimson/os/seastore/cache.h @@ -89,7 +89,7 @@ public: crimson::ct_error::input_output_error>; using base_iertr = trans_iertr; - Cache(SegmentManager &segment_manager); + Cache(ExtentReader &reader, segment_off_t block_size); ~Cache(); /// Creates empty transaction by source @@ -612,7 +612,9 @@ public: void dump_contents(); private: - SegmentManager &segment_manager; ///< ref to segment_manager + ExtentReader &reader; ///< ref to extent reader + segment_off_t block_size; ///< block size of the segment + ///< manager holding journal records RootBlockRef root; ///< ref to current root ExtentIndex extents; ///< set of live extents @@ -779,7 +781,7 @@ private: TCachedExtentRef&& extent ) { extent->set_io_wait(); - return segment_manager.read( + return reader.read( extent->get_paddr(), extent->get_length(), extent->get_bptr() diff --git a/src/crimson/os/seastore/extent_reader.cc b/src/crimson/os/seastore/extent_reader.cc new file mode 100644 index 000000000000..c3b21ae57761 --- /dev/null +++ b/src/crimson/os/seastore/extent_reader.cc @@ -0,0 +1,336 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab expandtab + +#include "crimson/os/seastore/segment_manager.h" +#include "crimson/os/seastore/extent_reader.h" +#include "crimson/common/log.h" + +namespace { + seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_seastore); + } +} + +namespace crimson::os::seastore { + +ExtentReader::read_segment_header_ret +ExtentReader::read_segment_header(segment_id_t segment) +{ + return segment_manager.read( + paddr_t{segment, 0}, + segment_manager.get_block_size() + ).handle_error( + read_segment_header_ertr::pass_further{}, + crimson::ct_error::assert_all{ + "Invalid error in ExtentReader::read_segment_header" + } + ).safe_then([=](bufferptr bptr) -> read_segment_header_ret { + logger().debug("segment {} bptr size {}", segment, bptr.length()); + + segment_header_t header; + bufferlist bl; + bl.push_back(bptr); + + logger().debug( + "ExtentReader::read_segment_header: segment {} block crc {}", + segment, + bl.begin().crc32c(segment_manager.get_block_size(), 0)); + + auto bp = bl.cbegin(); + try { + decode(header, bp); + } catch (ceph::buffer::error &e) { + logger().debug( + "ExtentReader::read_segment_header: segment {} unable to decode " + "header, skipping", + segment); + return crimson::ct_error::enodata::make(); + } + logger().debug( + "ExtentReader::read_segment_header: segment {} header {}", + segment, + header); + return read_segment_header_ret( + read_segment_header_ertr::ready_future_marker{}, + header); + }); +} +ExtentReader::scan_extents_ret ExtentReader::scan_extents( + scan_extents_cursor &cursor, + extent_len_t bytes_to_read) +{ + auto ret = std::make_unique(); + auto* extents = ret.get(); + return read_segment_header(cursor.get_offset().segment + ).handle_error( + scan_extents_ertr::pass_further{}, + crimson::ct_error::assert_all{ + "Invalid error in ExtentReader::scan_extents" + } + ).safe_then([bytes_to_read, extents, &cursor, this](auto segment_header) { + auto segment_nonce = segment_header.segment_nonce; + return seastar::do_with( + found_record_handler_t( + [extents, this]( + paddr_t base, + const record_header_t &header, + const bufferlist &mdbuf) mutable { + + auto infos = try_decode_extent_infos( + header, + mdbuf); + if (!infos) { + // This should be impossible, we did check the crc on the mdbuf + logger().error( + "ExtentReader::scan_extents unable to decode extents for record {}", + base); + assert(infos); + } + + paddr_t extent_offset = base.add_offset(header.mdlength); + for (const auto &i : *infos) { + extents->emplace_back(extent_offset, i); + extent_offset.offset += i.len; + } + return scan_extents_ertr::now(); + }), + [=, &cursor](auto &dhandler) { + return scan_valid_records( + cursor, + segment_nonce, + bytes_to_read, + dhandler).discard_result(); + }); + }).safe_then([ret=std::move(ret)] { + return std::move(*ret); + }); +} + +ExtentReader::scan_valid_records_ret ExtentReader::scan_valid_records( + scan_valid_records_cursor &cursor, + segment_nonce_t nonce, + size_t budget, + found_record_handler_t &handler) +{ + if (cursor.offset.offset == 0) { + cursor.offset.offset = segment_manager.get_block_size(); + } + auto retref = std::make_unique(0); + auto &budget_used = *retref; + return crimson::repeat( + [=, &cursor, &budget_used, &handler]() mutable + -> scan_valid_records_ertr::future { + return [=, &handler, &cursor, &budget_used] { + if (!cursor.last_valid_header_found) { + return read_validate_record_metadata(cursor.offset, nonce + ).safe_then([=, &cursor](auto md) { + logger().debug( + "ExtentReader::scan_valid_records: read complete {}", + cursor.offset); + if (!md) { + logger().debug( + "ExtentReader::scan_valid_records: found invalid header at {}, presumably at end", + cursor.offset); + cursor.last_valid_header_found = true; + return scan_valid_records_ertr::now(); + } else { + logger().debug( + "ExtentReader::scan_valid_records: valid record read at {}", + cursor.offset); + cursor.last_committed = paddr_t{ + cursor.offset.segment, + md->first.committed_to}; + cursor.pending_records.emplace_back( + cursor.offset, + md->first, + md->second); + cursor.offset.offset += + md->first.dlength + md->first.mdlength; + return scan_valid_records_ertr::now(); + } + }).safe_then([=, &cursor, &budget_used, &handler] { + return crimson::repeat( + [=, &budget_used, &cursor, &handler] { + logger().debug( + "ExtentReader::scan_valid_records: valid record read, processing queue"); + if (cursor.pending_records.empty()) { + /* This is only possible if the segment is empty. + * A record's last_commited must be prior to its own + * location since it itself cannot yet have been committed + * at its own time of submission. Thus, the most recently + * read record must always fall after cursor.last_committed */ + return scan_valid_records_ertr::make_ready_future< + seastar::stop_iteration>(seastar::stop_iteration::yes); + } + auto &next = cursor.pending_records.front(); + if (next.offset > cursor.last_committed) { + return scan_valid_records_ertr::make_ready_future< + seastar::stop_iteration>(seastar::stop_iteration::yes); + } + budget_used += + next.header.dlength + next.header.mdlength; + return handler( + next.offset, + next.header, + next.mdbuffer + ).safe_then([&cursor] { + cursor.pending_records.pop_front(); + return scan_valid_records_ertr::make_ready_future< + seastar::stop_iteration>(seastar::stop_iteration::no); + }); + }); + }); + } else { + assert(!cursor.pending_records.empty()); + auto &next = cursor.pending_records.front(); + return read_validate_data(next.offset, next.header + ).safe_then([=, &budget_used, &next, &cursor, &handler](auto valid) { + if (!valid) { + cursor.pending_records.clear(); + return scan_valid_records_ertr::now(); + } + budget_used += + next.header.dlength + next.header.mdlength; + return handler( + next.offset, + next.header, + next.mdbuffer + ).safe_then([&cursor] { + cursor.pending_records.pop_front(); + return scan_valid_records_ertr::now(); + }); + }); + } + }().safe_then([=, &budget_used, &cursor] { + if (cursor.is_complete() || budget_used >= budget) { + return seastar::stop_iteration::yes; + } else { + return seastar::stop_iteration::no; + } + }); + }).safe_then([retref=std::move(retref)]() mutable -> scan_valid_records_ret { + return scan_valid_records_ret( + scan_valid_records_ertr::ready_future_marker{}, + std::move(*retref)); + }); +} + +ExtentReader::read_validate_record_metadata_ret +ExtentReader::read_validate_record_metadata( + paddr_t start, + segment_nonce_t nonce) +{ + auto block_size = segment_manager.get_block_size(); + if (start.offset + block_size > (int64_t)segment_manager.get_segment_size()) { + return read_validate_record_metadata_ret( + read_validate_record_metadata_ertr::ready_future_marker{}, + std::nullopt); + } + return segment_manager.read(start, block_size + ).safe_then( + [=](bufferptr bptr) mutable + -> read_validate_record_metadata_ret { + logger().debug("read_validate_record_metadata: reading {}", start); + auto block_size = segment_manager.get_block_size(); + bufferlist bl; + bl.append(bptr); + auto bp = bl.cbegin(); + record_header_t header; + try { + decode(header, bp); + } catch (ceph::buffer::error &e) { + return read_validate_record_metadata_ret( + read_validate_record_metadata_ertr::ready_future_marker{}, + std::nullopt); + } + if (header.segment_nonce != nonce) { + return read_validate_record_metadata_ret( + read_validate_record_metadata_ertr::ready_future_marker{}, + std::nullopt); + } + if (header.mdlength > (extent_len_t)block_size) { + if (start.offset + header.mdlength > + (int64_t)segment_manager.get_segment_size()) { + return crimson::ct_error::input_output_error::make(); + } + return segment_manager.read( + {start.segment, start.offset + (segment_off_t)block_size}, + header.mdlength - block_size).safe_then( + [header=std::move(header), bl=std::move(bl)]( + auto &&bptail) mutable { + bl.push_back(bptail); + return read_validate_record_metadata_ret( + read_validate_record_metadata_ertr::ready_future_marker{}, + std::make_pair(std::move(header), std::move(bl))); + }); + } else { + return read_validate_record_metadata_ret( + read_validate_record_metadata_ertr::ready_future_marker{}, + std::make_pair(std::move(header), std::move(bl)) + ); + } + }).safe_then([=](auto p) { + if (p && validate_metadata(p->second)) { + return read_validate_record_metadata_ret( + read_validate_record_metadata_ertr::ready_future_marker{}, + std::move(*p) + ); + } else { + return read_validate_record_metadata_ret( + read_validate_record_metadata_ertr::ready_future_marker{}, + std::nullopt); + } + }); +} + +std::optional> +ExtentReader::try_decode_extent_infos( + record_header_t header, + const bufferlist &bl) +{ + auto bliter = bl.cbegin(); + bliter += ceph::encoded_sizeof_bounded(); + bliter += sizeof(checksum_t) /* crc */; + logger().debug("{}: decoding {} extents", __func__, header.extents); + std::vector extent_infos(header.extents); + for (auto &&i : extent_infos) { + try { + decode(i, bliter); + } catch (ceph::buffer::error &e) { + return std::nullopt; + } + } + return extent_infos; +} + +ExtentReader::read_validate_data_ret +ExtentReader::read_validate_data( + paddr_t record_base, + const record_header_t &header) +{ + return segment_manager.read( + record_base.add_offset(header.mdlength), + header.dlength + ).safe_then([=, &header](auto bptr) { + bufferlist bl; + bl.append(bptr); + return bl.crc32c(-1) == header.data_crc; + }); +} + +bool ExtentReader::validate_metadata(const bufferlist &bl) +{ + auto bliter = bl.cbegin(); + auto test_crc = bliter.crc32c( + ceph::encoded_sizeof_bounded(), + -1); + ceph_le32 recorded_crc_le; + decode(recorded_crc_le, bliter); + uint32_t recorded_crc = recorded_crc_le; + test_crc = bliter.crc32c( + bliter.get_remaining(), + test_crc); + return test_crc == recorded_crc; +} + +} // namespace crimson::os::seastore diff --git a/src/crimson/os/seastore/extent_reader.h b/src/crimson/os/seastore/extent_reader.h new file mode 100644 index 000000000000..7f0d1ea653b6 --- /dev/null +++ b/src/crimson/os/seastore/extent_reader.h @@ -0,0 +1,112 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab expandtab + +#pragma once + +#include "crimson/common/errorator.h" +#include "crimson/os/seastore/seastore_types.h" +#include "crimson/os/seastore/segment_manager.h" + +namespace crimson::os::seastore { + +class SegmentCleaner; + +class ExtentReader { +public: + using read_ertr = SegmentManager::read_ertr; + ExtentReader() { + segment_managers.resize(max_devices, nullptr); + } + using read_segment_header_ertr = crimson::errorator< + crimson::ct_error::enoent, + crimson::ct_error::enodata, + crimson::ct_error::input_output_error + >; + using read_segment_header_ret = read_segment_header_ertr::future< + segment_header_t>; + read_segment_header_ret read_segment_header(segment_id_t segment); + + /** + * scan_extents + * + * Scans records beginning at addr until the first record boundary after + * addr + bytes_to_read. + * + * Returns list + * cursor.is_complete() will be true when no further extents exist in segment. + */ + using scan_extents_cursor = scan_valid_records_cursor; + using scan_extents_ertr = read_ertr::extend; + using scan_extents_ret_bare = std::list>; + using scan_extents_ret = scan_extents_ertr::future; + scan_extents_ret scan_extents( + scan_extents_cursor &cursor, + extent_len_t bytes_to_read + ); + + using scan_valid_records_ertr = read_ertr::extend; + using scan_valid_records_ret = scan_valid_records_ertr::future< + size_t>; + using found_record_handler_t = std::function< + scan_valid_records_ertr::future<>( + paddr_t record_block_base, + // callee may assume header and bl will remain valid until + // returned future resolves + const record_header_t &header, + const bufferlist &bl)>; + scan_valid_records_ret scan_valid_records( + scan_valid_records_cursor &cursor, ///< [in, out] cursor, updated during call + segment_nonce_t nonce, ///< [in] nonce for segment + size_t budget, ///< [in] max budget to use + found_record_handler_t &handler ///< [in] handler for records + ); ///< @return used budget + + void add_segment_manager(SegmentManager* segment_manager) { + assert(!segment_managers[segment_manager->get_device_id()] || + segment_manager == segment_managers[segment_manager->get_device_id()]); + segment_managers[segment_manager->get_device_id()] = segment_manager; + } + + read_ertr::future<> read( + paddr_t addr, + size_t len, + ceph::bufferptr &out) { + assert(segment_managers[addr.segment.device_id()]); + return segment_managers[addr.segment.device_id()]->read(addr, len, out); + } + +private: + SegmentManager& segment_manager; + + /// read record metadata for record starting at start + using read_validate_record_metadata_ertr = read_ertr; + using read_validate_record_metadata_ret = + read_validate_record_metadata_ertr::future< + std::optional> + >; + read_validate_record_metadata_ret read_validate_record_metadata( + paddr_t start, + segment_nonce_t nonce); + + /// attempts to decode extent infos from bl, return nullopt if unsuccessful + std::optional> try_decode_extent_infos( + record_header_t header, + const bufferlist &bl); + + /// read and validate data + using read_validate_data_ertr = read_ertr; + using read_validate_data_ret = read_validate_data_ertr::future; + read_validate_data_ret read_validate_data( + paddr_t record_base, + const record_header_t &header ///< caller must ensure lifetime through + /// future resolution + ); + + /// validate embedded metadata checksum + static bool validate_metadata(const bufferlist &bl); + +}; + +using ExtentReaderRef = std::unique_ptr; + +} // namespace crimson::os::seastore diff --git a/src/crimson/os/seastore/journal.cc b/src/crimson/os/seastore/journal.cc index 8c9a81c027b6..db59b7d1373a 100644 --- a/src/crimson/os/seastore/journal.cc +++ b/src/crimson/os/seastore/journal.cc @@ -50,7 +50,7 @@ segment_nonce_t generate_nonce( sizeof(meta.seastore_id.uuid)); } -Journal::Journal(SegmentManager &segment_manager, Scanner& scanner) +Journal::Journal(SegmentManager &segment_manager, ExtentReader& scanner) : segment_manager(segment_manager), scanner(scanner) {} @@ -306,7 +306,7 @@ Journal::replay_segment( logger().debug("replay_segment: starting at {}", seq); return seastar::do_with( scan_valid_records_cursor(seq.offset), - Scanner::found_record_handler_t( + ExtentReader::found_record_handler_t( [=, &handler](paddr_t base, const record_header_t &header, const bufferlist &mdbuf) { diff --git a/src/crimson/os/seastore/journal.h b/src/crimson/os/seastore/journal.h index 625a102347e0..a1917f3770f6 100644 --- a/src/crimson/os/seastore/journal.h +++ b/src/crimson/os/seastore/journal.h @@ -14,7 +14,7 @@ #include "include/denc.h" #include "crimson/common/log.h" -#include "crimson/os/seastore/scanner.h" +#include "crimson/os/seastore/extent_reader.h" #include "crimson/os/seastore/segment_manager.h" #include "crimson/os/seastore/ordering_handle.h" #include "crimson/os/seastore/seastore_types.h" @@ -30,7 +30,7 @@ class SegmentedAllocator; */ class Journal { public: - Journal(SegmentManager &segment_manager, Scanner& scanner); + Journal(SegmentManager &segment_manager, ExtentReader& scanner); /** * Sets the SegmentProvider. @@ -156,7 +156,7 @@ private: segment_off_t written_to = 0; segment_off_t committed_to = 0; - Scanner& scanner; + ExtentReader& scanner; WritePipeline *write_pipeline = nullptr; void reset_soft_state() { diff --git a/src/crimson/os/seastore/scanner.cc b/src/crimson/os/seastore/scanner.cc deleted file mode 100644 index 9970ee3456c9..000000000000 --- a/src/crimson/os/seastore/scanner.cc +++ /dev/null @@ -1,336 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- -// vim: ts=8 sw=2 smarttab expandtab - -#include "crimson/os/seastore/segment_manager.h" -#include "crimson/os/seastore/scanner.h" -#include "crimson/common/log.h" - -namespace { - seastar::logger& logger() { - return crimson::get_logger(ceph_subsys_seastore); - } -} - -namespace crimson::os::seastore { - -Scanner::read_segment_header_ret -Scanner::read_segment_header(segment_id_t segment) -{ - return segment_manager.read( - paddr_t{segment, 0}, - segment_manager.get_block_size() - ).handle_error( - read_segment_header_ertr::pass_further{}, - crimson::ct_error::assert_all{ - "Invalid error in Scanner::read_segment_header" - } - ).safe_then([=](bufferptr bptr) -> read_segment_header_ret { - logger().debug("segment {} bptr size {}", segment, bptr.length()); - - segment_header_t header; - bufferlist bl; - bl.push_back(bptr); - - logger().debug( - "Scanner::read_segment_header: segment {} block crc {}", - segment, - bl.begin().crc32c(segment_manager.get_block_size(), 0)); - - auto bp = bl.cbegin(); - try { - decode(header, bp); - } catch (ceph::buffer::error &e) { - logger().debug( - "Scanner::read_segment_header: segment {} unable to decode " - "header, skipping", - segment); - return crimson::ct_error::enodata::make(); - } - logger().debug( - "Scanner::read_segment_header: segment {} header {}", - segment, - header); - return read_segment_header_ret( - read_segment_header_ertr::ready_future_marker{}, - header); - }); -} -Scanner::scan_extents_ret Scanner::scan_extents( - scan_extents_cursor &cursor, - extent_len_t bytes_to_read) -{ - auto ret = std::make_unique(); - auto* extents = ret.get(); - return read_segment_header(cursor.get_offset().segment - ).handle_error( - scan_extents_ertr::pass_further{}, - crimson::ct_error::assert_all{ - "Invalid error in Scanner::scan_extents" - } - ).safe_then([bytes_to_read, extents, &cursor, this](auto segment_header) { - auto segment_nonce = segment_header.segment_nonce; - return seastar::do_with( - found_record_handler_t( - [extents, this]( - paddr_t base, - const record_header_t &header, - const bufferlist &mdbuf) mutable { - - auto infos = try_decode_extent_infos( - header, - mdbuf); - if (!infos) { - // This should be impossible, we did check the crc on the mdbuf - logger().error( - "Scanner::scan_extents unable to decode extents for record {}", - base); - assert(infos); - } - - paddr_t extent_offset = base.add_offset(header.mdlength); - for (const auto &i : *infos) { - extents->emplace_back(extent_offset, i); - extent_offset.offset += i.len; - } - return scan_extents_ertr::now(); - }), - [=, &cursor](auto &dhandler) { - return scan_valid_records( - cursor, - segment_nonce, - bytes_to_read, - dhandler).discard_result(); - }); - }).safe_then([ret=std::move(ret)] { - return std::move(*ret); - }); -} - -Scanner::scan_valid_records_ret Scanner::scan_valid_records( - scan_valid_records_cursor &cursor, - segment_nonce_t nonce, - size_t budget, - found_record_handler_t &handler) -{ - if (cursor.offset.offset == 0) { - cursor.offset.offset = segment_manager.get_block_size(); - } - auto retref = std::make_unique(0); - auto &budget_used = *retref; - return crimson::repeat( - [=, &cursor, &budget_used, &handler]() mutable - -> scan_valid_records_ertr::future { - return [=, &handler, &cursor, &budget_used] { - if (!cursor.last_valid_header_found) { - return read_validate_record_metadata(cursor.offset, nonce - ).safe_then([=, &cursor](auto md) { - logger().debug( - "Scanner::scan_valid_records: read complete {}", - cursor.offset); - if (!md) { - logger().debug( - "Scanner::scan_valid_records: found invalid header at {}, presumably at end", - cursor.offset); - cursor.last_valid_header_found = true; - return scan_valid_records_ertr::now(); - } else { - logger().debug( - "Scanner::scan_valid_records: valid record read at {}", - cursor.offset); - cursor.last_committed = paddr_t{ - cursor.offset.segment, - md->first.committed_to}; - cursor.pending_records.emplace_back( - cursor.offset, - md->first, - md->second); - cursor.offset.offset += - md->first.dlength + md->first.mdlength; - return scan_valid_records_ertr::now(); - } - }).safe_then([=, &cursor, &budget_used, &handler] { - return crimson::repeat( - [=, &budget_used, &cursor, &handler] { - logger().debug( - "Scanner::scan_valid_records: valid record read, processing queue"); - if (cursor.pending_records.empty()) { - /* This is only possible if the segment is empty. - * A record's last_commited must be prior to its own - * location since it itself cannot yet have been committed - * at its own time of submission. Thus, the most recently - * read record must always fall after cursor.last_committed */ - return scan_valid_records_ertr::make_ready_future< - seastar::stop_iteration>(seastar::stop_iteration::yes); - } - auto &next = cursor.pending_records.front(); - if (next.offset > cursor.last_committed) { - return scan_valid_records_ertr::make_ready_future< - seastar::stop_iteration>(seastar::stop_iteration::yes); - } - budget_used += - next.header.dlength + next.header.mdlength; - return handler( - next.offset, - next.header, - next.mdbuffer - ).safe_then([&cursor] { - cursor.pending_records.pop_front(); - return scan_valid_records_ertr::make_ready_future< - seastar::stop_iteration>(seastar::stop_iteration::no); - }); - }); - }); - } else { - assert(!cursor.pending_records.empty()); - auto &next = cursor.pending_records.front(); - return read_validate_data(next.offset, next.header - ).safe_then([=, &budget_used, &next, &cursor, &handler](auto valid) { - if (!valid) { - cursor.pending_records.clear(); - return scan_valid_records_ertr::now(); - } - budget_used += - next.header.dlength + next.header.mdlength; - return handler( - next.offset, - next.header, - next.mdbuffer - ).safe_then([&cursor] { - cursor.pending_records.pop_front(); - return scan_valid_records_ertr::now(); - }); - }); - } - }().safe_then([=, &budget_used, &cursor] { - if (cursor.is_complete() || budget_used >= budget) { - return seastar::stop_iteration::yes; - } else { - return seastar::stop_iteration::no; - } - }); - }).safe_then([retref=std::move(retref)]() mutable -> scan_valid_records_ret { - return scan_valid_records_ret( - scan_valid_records_ertr::ready_future_marker{}, - std::move(*retref)); - }); -} - -Scanner::read_validate_record_metadata_ret -Scanner::read_validate_record_metadata( - paddr_t start, - segment_nonce_t nonce) -{ - auto block_size = segment_manager.get_block_size(); - if (start.offset + block_size > (int64_t)segment_manager.get_segment_size()) { - return read_validate_record_metadata_ret( - read_validate_record_metadata_ertr::ready_future_marker{}, - std::nullopt); - } - return segment_manager.read(start, block_size - ).safe_then( - [=](bufferptr bptr) mutable - -> read_validate_record_metadata_ret { - logger().debug("read_validate_record_metadata: reading {}", start); - auto block_size = segment_manager.get_block_size(); - bufferlist bl; - bl.append(bptr); - auto bp = bl.cbegin(); - record_header_t header; - try { - decode(header, bp); - } catch (ceph::buffer::error &e) { - return read_validate_record_metadata_ret( - read_validate_record_metadata_ertr::ready_future_marker{}, - std::nullopt); - } - if (header.segment_nonce != nonce) { - return read_validate_record_metadata_ret( - read_validate_record_metadata_ertr::ready_future_marker{}, - std::nullopt); - } - if (header.mdlength > (extent_len_t)block_size) { - if (start.offset + header.mdlength > - (int64_t)segment_manager.get_segment_size()) { - return crimson::ct_error::input_output_error::make(); - } - return segment_manager.read( - {start.segment, start.offset + (segment_off_t)block_size}, - header.mdlength - block_size).safe_then( - [header=std::move(header), bl=std::move(bl)]( - auto &&bptail) mutable { - bl.push_back(bptail); - return read_validate_record_metadata_ret( - read_validate_record_metadata_ertr::ready_future_marker{}, - std::make_pair(std::move(header), std::move(bl))); - }); - } else { - return read_validate_record_metadata_ret( - read_validate_record_metadata_ertr::ready_future_marker{}, - std::make_pair(std::move(header), std::move(bl)) - ); - } - }).safe_then([=](auto p) { - if (p && validate_metadata(p->second)) { - return read_validate_record_metadata_ret( - read_validate_record_metadata_ertr::ready_future_marker{}, - std::move(*p) - ); - } else { - return read_validate_record_metadata_ret( - read_validate_record_metadata_ertr::ready_future_marker{}, - std::nullopt); - } - }); -} - -std::optional> -Scanner::try_decode_extent_infos( - record_header_t header, - const bufferlist &bl) -{ - auto bliter = bl.cbegin(); - bliter += ceph::encoded_sizeof_bounded(); - bliter += sizeof(checksum_t) /* crc */; - logger().debug("{}: decoding {} extents", __func__, header.extents); - std::vector extent_infos(header.extents); - for (auto &&i : extent_infos) { - try { - decode(i, bliter); - } catch (ceph::buffer::error &e) { - return std::nullopt; - } - } - return extent_infos; -} - -Scanner::read_validate_data_ret -Scanner::read_validate_data( - paddr_t record_base, - const record_header_t &header) -{ - return segment_manager.read( - record_base.add_offset(header.mdlength), - header.dlength - ).safe_then([=, &header](auto bptr) { - bufferlist bl; - bl.append(bptr); - return bl.crc32c(-1) == header.data_crc; - }); -} - -bool Scanner::validate_metadata(const bufferlist &bl) -{ - auto bliter = bl.cbegin(); - auto test_crc = bliter.crc32c( - ceph::encoded_sizeof_bounded(), - -1); - ceph_le32 recorded_crc_le; - decode(recorded_crc_le, bliter); - uint32_t recorded_crc = recorded_crc_le; - test_crc = bliter.crc32c( - bliter.get_remaining(), - test_crc); - return test_crc == recorded_crc; -} - -} // namespace crimson::os::seastore diff --git a/src/crimson/os/seastore/scanner.h b/src/crimson/os/seastore/scanner.h deleted file mode 100644 index 36bec0858d40..000000000000 --- a/src/crimson/os/seastore/scanner.h +++ /dev/null @@ -1,102 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- -// vim: ts=8 sw=2 smarttab expandtab - -#pragma once - -#include "crimson/common/errorator.h" -#include "crimson/os/seastore/seastore_types.h" -#include "crimson/os/seastore/segment_manager.h" - -namespace crimson::os::seastore { - -class SegmentCleaner; - -class Scanner { -public: - using read_ertr = crimson::errorator< - crimson::ct_error::input_output_error, - crimson::ct_error::invarg, - crimson::ct_error::enoent, - crimson::ct_error::erange>; - - Scanner(SegmentManager& segment_manager) - : segment_manager(segment_manager) {} - using read_segment_header_ertr = crimson::errorator< - crimson::ct_error::enoent, - crimson::ct_error::enodata, - crimson::ct_error::input_output_error - >; - using read_segment_header_ret = read_segment_header_ertr::future< - segment_header_t>; - read_segment_header_ret read_segment_header(segment_id_t segment); - - /** - * scan_extents - * - * Scans records beginning at addr until the first record boundary after - * addr + bytes_to_read. - * - * Returns list - * cursor.is_complete() will be true when no further extents exist in segment. - */ - using scan_extents_cursor = scan_valid_records_cursor; - using scan_extents_ertr = read_ertr::extend; - using scan_extents_ret_bare = std::list>; - using scan_extents_ret = scan_extents_ertr::future; - scan_extents_ret scan_extents( - scan_extents_cursor &cursor, - extent_len_t bytes_to_read - ); - - using scan_valid_records_ertr = read_ertr::extend; - using scan_valid_records_ret = scan_valid_records_ertr::future< - size_t>; - using found_record_handler_t = std::function< - scan_valid_records_ertr::future<>( - paddr_t record_block_base, - // callee may assume header and bl will remain valid until - // returned future resolves - const record_header_t &header, - const bufferlist &bl)>; - scan_valid_records_ret scan_valid_records( - scan_valid_records_cursor &cursor, ///< [in, out] cursor, updated during call - segment_nonce_t nonce, ///< [in] nonce for segment - size_t budget, ///< [in] max budget to use - found_record_handler_t &handler ///< [in] handler for records - ); ///< @return used budget - -private: - SegmentManager& segment_manager; - - /// read record metadata for record starting at start - using read_validate_record_metadata_ertr = read_ertr; - using read_validate_record_metadata_ret = - read_validate_record_metadata_ertr::future< - std::optional> - >; - read_validate_record_metadata_ret read_validate_record_metadata( - paddr_t start, - segment_nonce_t nonce); - - /// attempts to decode extent infos from bl, return nullopt if unsuccessful - std::optional> try_decode_extent_infos( - record_header_t header, - const bufferlist &bl); - - /// read and validate data - using read_validate_data_ertr = read_ertr; - using read_validate_data_ret = read_validate_data_ertr::future; - read_validate_data_ret read_validate_data( - paddr_t record_base, - const record_header_t &header ///< caller must ensure lifetime through - /// future resolution - ); - - /// validate embedded metadata checksum - static bool validate_metadata(const bufferlist &bl); - -}; - -using ScannerRef = std::unique_ptr; - -} // namespace crimson::os::seastore diff --git a/src/crimson/os/seastore/seastore.cc b/src/crimson/os/seastore/seastore.cc index b15b5138e31e..f06aa650d8ee 100644 --- a/src/crimson/os/seastore/seastore.cc +++ b/src/crimson/os/seastore/seastore.cc @@ -1181,7 +1181,7 @@ std::unique_ptr make_seastore( segment_manager::block::BlockSegmentManager >(device + "/block"); - auto scanner = std::make_unique(*sm); + auto scanner = std::make_unique(); auto& scanner_ref = *scanner.get(); auto segment_cleaner = std::make_unique( SegmentCleaner::config_t::get_default(), @@ -1189,7 +1189,7 @@ std::unique_ptr make_seastore( false /* detailed */); auto journal = std::make_unique(*sm, scanner_ref); - auto cache = std::make_unique(*sm); + auto cache = std::make_unique(scanner_ref, sm->get_block_size()); auto lba_manager = lba_manager::create_lba_manager(*sm, *cache); auto epm = std::make_unique(*cache, *lba_manager); diff --git a/src/crimson/os/seastore/segment_cleaner.cc b/src/crimson/os/seastore/segment_cleaner.cc index a912bfbb7752..d23c5407c769 100644 --- a/src/crimson/os/seastore/segment_cleaner.cc +++ b/src/crimson/os/seastore/segment_cleaner.cc @@ -146,7 +146,7 @@ void SpaceTrackerDetailed::dump_usage(segment_id_t id) const SegmentCleaner::SegmentCleaner( config_t config, - ScannerRef&& scr, + ExtentReaderRef&& scr, bool detailed) : detailed(detailed), config(config), @@ -309,7 +309,7 @@ SegmentCleaner::gc_reclaim_space_ret SegmentCleaner::gc_reclaim_space() } next.offset = 0; scan_cursor = - std::make_unique( + std::make_unique( next); logger().debug( "SegmentCleaner::do_gc: starting gc on segment {}", @@ -388,13 +388,13 @@ SegmentCleaner::init_segments_ret SegmentCleaner::init_segments() { return scanner->read_segment_header(segment_id) .safe_then([&segments, segment_id, this](auto header) { if (header.out_of_line) { - logger().debug("Scanner::init_segments: out-of-line segment {}", segment_id); + logger().debug("ExtentReader::init_segments: out-of-line segment {}", segment_id); init_mark_segment_closed( segment_id, header.journal_segment_seq, true); } else { - logger().debug("Scanner::init_segments: journal segment {}", segment_id); + logger().debug("ExtentReader::init_segments: journal segment {}", segment_id); segments.emplace_back(std::make_pair(segment_id, std::move(header))); } return seastar::now(); diff --git a/src/crimson/os/seastore/segment_cleaner.h b/src/crimson/os/seastore/segment_cleaner.h index 3099fa2a84ab..470504ac44c6 100644 --- a/src/crimson/os/seastore/segment_cleaner.h +++ b/src/crimson/os/seastore/segment_cleaner.h @@ -383,7 +383,7 @@ private: size_t segment_size = 0; size_t block_size = 0; - ScannerRef scanner; + ExtentReaderRef scanner; SpaceTrackerIRef space_tracker; std::vector segments; @@ -422,7 +422,7 @@ private: public: SegmentCleaner( config_t config, - ScannerRef&& scanner, + ExtentReaderRef&& scanner, bool detailed = false); void mount(SegmentManager &sm) { @@ -652,7 +652,7 @@ private: // GC status helpers std::unique_ptr< - Scanner::scan_extents_cursor + ExtentReader::scan_extents_cursor > scan_cursor; /** @@ -730,7 +730,7 @@ private: } gc_process; using gc_ertr = work_ertr::extend_ertr< - Scanner::scan_extents_ertr + ExtentReader::scan_extents_ertr >; gc_cycle_ret do_gc_cycle(); diff --git a/src/crimson/tools/store_nbd/tm_driver.cc b/src/crimson/tools/store_nbd/tm_driver.cc index c933448675e6..5e97007a563a 100644 --- a/src/crimson/tools/store_nbd/tm_driver.cc +++ b/src/crimson/tools/store_nbd/tm_driver.cc @@ -127,7 +127,8 @@ seastar::future TMDriver::read( void TMDriver::init() { - auto scanner = std::make_unique(*segment_manager); + auto scanner = std::make_unique(); + scanner->add_segment_manager(segment_manager.get()); auto& scanner_ref = *scanner.get(); auto segment_cleaner = std::make_unique( SegmentCleaner::config_t::get_default(), @@ -135,7 +136,7 @@ void TMDriver::init() false /* detailed */); segment_cleaner->mount(*segment_manager); auto journal = std::make_unique(*segment_manager, scanner_ref); - auto cache = std::make_unique(*segment_manager); + auto cache = std::make_unique(scanner_ref, segment_manager->get_block_size()); auto lba_manager = lba_manager::create_lba_manager(*segment_manager, *cache); auto epm = std::make_unique(*cache, *lba_manager); diff --git a/src/test/crimson/seastore/test_btree_lba_manager.cc b/src/test/crimson/seastore/test_btree_lba_manager.cc index 158fbd0f8f6c..7399bd26eac8 100644 --- a/src/test/crimson/seastore/test_btree_lba_manager.cc +++ b/src/test/crimson/seastore/test_btree_lba_manager.cc @@ -27,7 +27,7 @@ using namespace crimson::os::seastore::lba_manager::btree; struct btree_lba_manager_test : public seastar_test_suite_t, SegmentProvider { segment_manager::EphemeralSegmentManagerRef segment_manager; - ScannerRef scanner; + ExtentReaderRef scanner; Journal journal; Cache cache; BtreeLBAManagerRef lba_manager; @@ -38,12 +38,13 @@ struct btree_lba_manager_test : btree_lba_manager_test() : segment_manager(segment_manager::create_test_ephemeral()), - scanner(new Scanner(*segment_manager)), + scanner(new ExtentReader()), journal(*segment_manager, *scanner), - cache(*segment_manager), + cache(*scanner, segment_manager->get_block_size()), lba_manager(new BtreeLBAManager(*segment_manager, cache)), block_size(segment_manager->get_block_size()) { + scanner->add_segment_manager(segment_manager.get()); journal.set_segment_provider(this); journal.set_write_pipeline(&pipeline); } diff --git a/src/test/crimson/seastore/test_randomblock_manager.cc b/src/test/crimson/seastore/test_randomblock_manager.cc index 8bfa75573172..f4c2526203fb 100644 --- a/src/test/crimson/seastore/test_randomblock_manager.cc +++ b/src/test/crimson/seastore/test_randomblock_manager.cc @@ -29,6 +29,7 @@ constexpr uint64_t DEFAULT_BLOCK_SIZE = 4096; struct rbm_test_t : public seastar_test_suite_t, TMTestState { segment_manager::EphemeralSegmentManagerRef segment_manager; // Need to be deleted, just for Cache + ExtentReaderRef reader; Cache cache; std::unique_ptr rbm_manager; nvme_device::NVMeBlockDevice *device; @@ -57,7 +58,8 @@ struct rbm_test_t : public seastar_test_suite_t, rbm_test_t() : segment_manager(segment_manager::create_test_ephemeral()), - cache(*segment_manager) + reader(new ExtentReader()), + cache(*reader, segment_manager->get_block_size()) { device = new nvme_device::TestMemory(DEFAULT_TEST_SIZE); rbm_manager.reset(new NVMeManager(device, std::string())); diff --git a/src/test/crimson/seastore/test_seastore_cache.cc b/src/test/crimson/seastore/test_seastore_cache.cc index 2b08af2e83f2..28688d9feed6 100644 --- a/src/test/crimson/seastore/test_seastore_cache.cc +++ b/src/test/crimson/seastore/test_seastore_cache.cc @@ -21,13 +21,17 @@ namespace { struct cache_test_t : public seastar_test_suite_t { segment_manager::EphemeralSegmentManagerRef segment_manager; + ExtentReaderRef reader; Cache cache; paddr_t current{0, 0}; journal_seq_t seq; cache_test_t() : segment_manager(segment_manager::create_test_ephemeral()), - cache(*segment_manager) {} + reader(new ExtentReader()), + cache(*reader, segment_manager->get_block_size()) { + reader->add_segment_manager(segment_manager.get()); + } seastar::future submit_transaction( TransactionRef t) { diff --git a/src/test/crimson/seastore/test_seastore_journal.cc b/src/test/crimson/seastore/test_seastore_journal.cc index 29227887928b..3d937d5b67a5 100644 --- a/src/test/crimson/seastore/test_seastore_journal.cc +++ b/src/test/crimson/seastore/test_seastore_journal.cc @@ -74,13 +74,15 @@ struct journal_test_t : seastar_test_suite_t, SegmentProvider { const segment_off_t block_size; - ScannerRef scanner; + ExtentReaderRef scanner; journal_test_t() : segment_manager(segment_manager::create_test_ephemeral()), block_size(segment_manager->get_block_size()), - scanner(std::make_unique(*segment_manager)) - {} + scanner(new ExtentReader()) + { + scanner->add_segment_manager(segment_manager.get()); + } segment_id_t next = 0; get_segment_ret get_segment() final { diff --git a/src/test/crimson/seastore/transaction_manager_test_state.h b/src/test/crimson/seastore/transaction_manager_test_state.h index bec8cb1dcf39..a5428556eb77 100644 --- a/src/test/crimson/seastore/transaction_manager_test_state.h +++ b/src/test/crimson/seastore/transaction_manager_test_state.h @@ -70,14 +70,15 @@ protected: auto get_transaction_manager( SegmentManager &segment_manager) { - auto scanner = std::make_unique(segment_manager); + auto scanner = std::make_unique(); + scanner->add_segment_manager(&segment_manager); auto& scanner_ref = *scanner.get(); auto segment_cleaner = std::make_unique( SegmentCleaner::config_t::get_default(), std::move(scanner), true); auto journal = std::make_unique(segment_manager, scanner_ref); - auto cache = std::make_unique(segment_manager); + auto cache = std::make_unique(scanner_ref, segment_manager.get_block_size()); auto lba_manager = lba_manager::create_lba_manager(segment_manager, *cache); auto epm = std::make_unique(*cache, *lba_manager);