This commit makes Scanner an extent reader that route read requests to the corresponding
backing devices according to the device ids encapsulated in the segment ids.
Signed-off-by: Xuehan Xu <xxhdx1985126@gmail.com>
transaction.cc
journal.cc
cache.cc
- scanner.cc
+ extent_reader.cc
lba_manager.cc
segment_cleaner.cc
lba_manager/btree/btree_lba_manager.cc
namespace crimson::os::seastore {
-Cache::Cache(SegmentManager &segment_manager) :
- segment_manager(segment_manager)
+Cache::Cache(
+ ExtentReader &reader,
+ segment_off_t block_size)
+ : reader(reader),
+ block_size(block_size)
{
register_metrics();
}
efforts.num_ool_records += ool_stats.num_records;
efforts.ool_record_overhead_bytes += ool_stats.overhead_bytes;
auto record_size = get_encoded_record_length(
- record, segment_manager.get_block_size());
+ record, block_size);
auto inline_overhead =
record_size.mdlength + record_size.dlength - record.get_raw_data_size();
efforts.inline_record_overhead_bytes += inline_overhead;
crimson::ct_error::input_output_error>;
using base_iertr = trans_iertr<base_ertr>;
- Cache(SegmentManager &segment_manager);
+ Cache(ExtentReader &reader, segment_off_t block_size);
~Cache();
/// Creates empty transaction by source
void dump_contents();
private:
- SegmentManager &segment_manager; ///< ref to segment_manager
+ ExtentReader &reader; ///< ref to extent reader
+ segment_off_t block_size; ///< block size of the segment
+ ///< manager holding journal records
RootBlockRef root; ///< ref to current root
ExtentIndex extents; ///< set of live extents
TCachedExtentRef<T>&& extent
) {
extent->set_io_wait();
- return segment_manager.read(
+ return reader.read(
extent->get_paddr(),
extent->get_length(),
extent->get_bptr()
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
+
+#include "crimson/os/seastore/segment_manager.h"
+#include "crimson/os/seastore/extent_reader.h"
+#include "crimson/common/log.h"
+
+namespace {
+ seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_seastore);
+ }
+}
+
+namespace crimson::os::seastore {
+
+ExtentReader::read_segment_header_ret
+ExtentReader::read_segment_header(segment_id_t segment)
+{
+ return segment_manager.read(
+ paddr_t{segment, 0},
+ segment_manager.get_block_size()
+ ).handle_error(
+ read_segment_header_ertr::pass_further{},
+ crimson::ct_error::assert_all{
+ "Invalid error in ExtentReader::read_segment_header"
+ }
+ ).safe_then([=](bufferptr bptr) -> read_segment_header_ret {
+ logger().debug("segment {} bptr size {}", segment, bptr.length());
+
+ segment_header_t header;
+ bufferlist bl;
+ bl.push_back(bptr);
+
+ logger().debug(
+ "ExtentReader::read_segment_header: segment {} block crc {}",
+ segment,
+ bl.begin().crc32c(segment_manager.get_block_size(), 0));
+
+ auto bp = bl.cbegin();
+ try {
+ decode(header, bp);
+ } catch (ceph::buffer::error &e) {
+ logger().debug(
+ "ExtentReader::read_segment_header: segment {} unable to decode "
+ "header, skipping",
+ segment);
+ return crimson::ct_error::enodata::make();
+ }
+ logger().debug(
+ "ExtentReader::read_segment_header: segment {} header {}",
+ segment,
+ header);
+ return read_segment_header_ret(
+ read_segment_header_ertr::ready_future_marker{},
+ header);
+ });
+}
+ExtentReader::scan_extents_ret ExtentReader::scan_extents(
+ scan_extents_cursor &cursor,
+ extent_len_t bytes_to_read)
+{
+ auto ret = std::make_unique<scan_extents_ret_bare>();
+ auto* extents = ret.get();
+ return read_segment_header(cursor.get_offset().segment
+ ).handle_error(
+ scan_extents_ertr::pass_further{},
+ crimson::ct_error::assert_all{
+ "Invalid error in ExtentReader::scan_extents"
+ }
+ ).safe_then([bytes_to_read, extents, &cursor, this](auto segment_header) {
+ auto segment_nonce = segment_header.segment_nonce;
+ return seastar::do_with(
+ found_record_handler_t(
+ [extents, this](
+ paddr_t base,
+ const record_header_t &header,
+ const bufferlist &mdbuf) mutable {
+
+ auto infos = try_decode_extent_infos(
+ header,
+ mdbuf);
+ if (!infos) {
+ // This should be impossible, we did check the crc on the mdbuf
+ logger().error(
+ "ExtentReader::scan_extents unable to decode extents for record {}",
+ base);
+ assert(infos);
+ }
+
+ paddr_t extent_offset = base.add_offset(header.mdlength);
+ for (const auto &i : *infos) {
+ extents->emplace_back(extent_offset, i);
+ extent_offset.offset += i.len;
+ }
+ return scan_extents_ertr::now();
+ }),
+ [=, &cursor](auto &dhandler) {
+ return scan_valid_records(
+ cursor,
+ segment_nonce,
+ bytes_to_read,
+ dhandler).discard_result();
+ });
+ }).safe_then([ret=std::move(ret)] {
+ return std::move(*ret);
+ });
+}
+
+ExtentReader::scan_valid_records_ret ExtentReader::scan_valid_records(
+ scan_valid_records_cursor &cursor,
+ segment_nonce_t nonce,
+ size_t budget,
+ found_record_handler_t &handler)
+{
+ if (cursor.offset.offset == 0) {
+ cursor.offset.offset = segment_manager.get_block_size();
+ }
+ auto retref = std::make_unique<size_t>(0);
+ auto &budget_used = *retref;
+ return crimson::repeat(
+ [=, &cursor, &budget_used, &handler]() mutable
+ -> scan_valid_records_ertr::future<seastar::stop_iteration> {
+ return [=, &handler, &cursor, &budget_used] {
+ if (!cursor.last_valid_header_found) {
+ return read_validate_record_metadata(cursor.offset, nonce
+ ).safe_then([=, &cursor](auto md) {
+ logger().debug(
+ "ExtentReader::scan_valid_records: read complete {}",
+ cursor.offset);
+ if (!md) {
+ logger().debug(
+ "ExtentReader::scan_valid_records: found invalid header at {}, presumably at end",
+ cursor.offset);
+ cursor.last_valid_header_found = true;
+ return scan_valid_records_ertr::now();
+ } else {
+ logger().debug(
+ "ExtentReader::scan_valid_records: valid record read at {}",
+ cursor.offset);
+ cursor.last_committed = paddr_t{
+ cursor.offset.segment,
+ md->first.committed_to};
+ cursor.pending_records.emplace_back(
+ cursor.offset,
+ md->first,
+ md->second);
+ cursor.offset.offset +=
+ md->first.dlength + md->first.mdlength;
+ return scan_valid_records_ertr::now();
+ }
+ }).safe_then([=, &cursor, &budget_used, &handler] {
+ return crimson::repeat(
+ [=, &budget_used, &cursor, &handler] {
+ logger().debug(
+ "ExtentReader::scan_valid_records: valid record read, processing queue");
+ if (cursor.pending_records.empty()) {
+ /* This is only possible if the segment is empty.
+ * A record's last_commited must be prior to its own
+ * location since it itself cannot yet have been committed
+ * at its own time of submission. Thus, the most recently
+ * read record must always fall after cursor.last_committed */
+ return scan_valid_records_ertr::make_ready_future<
+ seastar::stop_iteration>(seastar::stop_iteration::yes);
+ }
+ auto &next = cursor.pending_records.front();
+ if (next.offset > cursor.last_committed) {
+ return scan_valid_records_ertr::make_ready_future<
+ seastar::stop_iteration>(seastar::stop_iteration::yes);
+ }
+ budget_used +=
+ next.header.dlength + next.header.mdlength;
+ return handler(
+ next.offset,
+ next.header,
+ next.mdbuffer
+ ).safe_then([&cursor] {
+ cursor.pending_records.pop_front();
+ return scan_valid_records_ertr::make_ready_future<
+ seastar::stop_iteration>(seastar::stop_iteration::no);
+ });
+ });
+ });
+ } else {
+ assert(!cursor.pending_records.empty());
+ auto &next = cursor.pending_records.front();
+ return read_validate_data(next.offset, next.header
+ ).safe_then([=, &budget_used, &next, &cursor, &handler](auto valid) {
+ if (!valid) {
+ cursor.pending_records.clear();
+ return scan_valid_records_ertr::now();
+ }
+ budget_used +=
+ next.header.dlength + next.header.mdlength;
+ return handler(
+ next.offset,
+ next.header,
+ next.mdbuffer
+ ).safe_then([&cursor] {
+ cursor.pending_records.pop_front();
+ return scan_valid_records_ertr::now();
+ });
+ });
+ }
+ }().safe_then([=, &budget_used, &cursor] {
+ if (cursor.is_complete() || budget_used >= budget) {
+ return seastar::stop_iteration::yes;
+ } else {
+ return seastar::stop_iteration::no;
+ }
+ });
+ }).safe_then([retref=std::move(retref)]() mutable -> scan_valid_records_ret {
+ return scan_valid_records_ret(
+ scan_valid_records_ertr::ready_future_marker{},
+ std::move(*retref));
+ });
+}
+
+ExtentReader::read_validate_record_metadata_ret
+ExtentReader::read_validate_record_metadata(
+ paddr_t start,
+ segment_nonce_t nonce)
+{
+ auto block_size = segment_manager.get_block_size();
+ if (start.offset + block_size > (int64_t)segment_manager.get_segment_size()) {
+ return read_validate_record_metadata_ret(
+ read_validate_record_metadata_ertr::ready_future_marker{},
+ std::nullopt);
+ }
+ return segment_manager.read(start, block_size
+ ).safe_then(
+ [=](bufferptr bptr) mutable
+ -> read_validate_record_metadata_ret {
+ logger().debug("read_validate_record_metadata: reading {}", start);
+ auto block_size = segment_manager.get_block_size();
+ bufferlist bl;
+ bl.append(bptr);
+ auto bp = bl.cbegin();
+ record_header_t header;
+ try {
+ decode(header, bp);
+ } catch (ceph::buffer::error &e) {
+ return read_validate_record_metadata_ret(
+ read_validate_record_metadata_ertr::ready_future_marker{},
+ std::nullopt);
+ }
+ if (header.segment_nonce != nonce) {
+ return read_validate_record_metadata_ret(
+ read_validate_record_metadata_ertr::ready_future_marker{},
+ std::nullopt);
+ }
+ if (header.mdlength > (extent_len_t)block_size) {
+ if (start.offset + header.mdlength >
+ (int64_t)segment_manager.get_segment_size()) {
+ return crimson::ct_error::input_output_error::make();
+ }
+ return segment_manager.read(
+ {start.segment, start.offset + (segment_off_t)block_size},
+ header.mdlength - block_size).safe_then(
+ [header=std::move(header), bl=std::move(bl)](
+ auto &&bptail) mutable {
+ bl.push_back(bptail);
+ return read_validate_record_metadata_ret(
+ read_validate_record_metadata_ertr::ready_future_marker{},
+ std::make_pair(std::move(header), std::move(bl)));
+ });
+ } else {
+ return read_validate_record_metadata_ret(
+ read_validate_record_metadata_ertr::ready_future_marker{},
+ std::make_pair(std::move(header), std::move(bl))
+ );
+ }
+ }).safe_then([=](auto p) {
+ if (p && validate_metadata(p->second)) {
+ return read_validate_record_metadata_ret(
+ read_validate_record_metadata_ertr::ready_future_marker{},
+ std::move(*p)
+ );
+ } else {
+ return read_validate_record_metadata_ret(
+ read_validate_record_metadata_ertr::ready_future_marker{},
+ std::nullopt);
+ }
+ });
+}
+
+std::optional<std::vector<extent_info_t>>
+ExtentReader::try_decode_extent_infos(
+ record_header_t header,
+ const bufferlist &bl)
+{
+ auto bliter = bl.cbegin();
+ bliter += ceph::encoded_sizeof_bounded<record_header_t>();
+ bliter += sizeof(checksum_t) /* crc */;
+ logger().debug("{}: decoding {} extents", __func__, header.extents);
+ std::vector<extent_info_t> extent_infos(header.extents);
+ for (auto &&i : extent_infos) {
+ try {
+ decode(i, bliter);
+ } catch (ceph::buffer::error &e) {
+ return std::nullopt;
+ }
+ }
+ return extent_infos;
+}
+
+ExtentReader::read_validate_data_ret
+ExtentReader::read_validate_data(
+ paddr_t record_base,
+ const record_header_t &header)
+{
+ return segment_manager.read(
+ record_base.add_offset(header.mdlength),
+ header.dlength
+ ).safe_then([=, &header](auto bptr) {
+ bufferlist bl;
+ bl.append(bptr);
+ return bl.crc32c(-1) == header.data_crc;
+ });
+}
+
+bool ExtentReader::validate_metadata(const bufferlist &bl)
+{
+ auto bliter = bl.cbegin();
+ auto test_crc = bliter.crc32c(
+ ceph::encoded_sizeof_bounded<record_header_t>(),
+ -1);
+ ceph_le32 recorded_crc_le;
+ decode(recorded_crc_le, bliter);
+ uint32_t recorded_crc = recorded_crc_le;
+ test_crc = bliter.crc32c(
+ bliter.get_remaining(),
+ test_crc);
+ return test_crc == recorded_crc;
+}
+
+} // namespace crimson::os::seastore
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
+
+#pragma once
+
+#include "crimson/common/errorator.h"
+#include "crimson/os/seastore/seastore_types.h"
+#include "crimson/os/seastore/segment_manager.h"
+
+namespace crimson::os::seastore {
+
+class SegmentCleaner;
+
+class ExtentReader {
+public:
+ using read_ertr = SegmentManager::read_ertr;
+ ExtentReader() {
+ segment_managers.resize(max_devices, nullptr);
+ }
+ using read_segment_header_ertr = crimson::errorator<
+ crimson::ct_error::enoent,
+ crimson::ct_error::enodata,
+ crimson::ct_error::input_output_error
+ >;
+ using read_segment_header_ret = read_segment_header_ertr::future<
+ segment_header_t>;
+ read_segment_header_ret read_segment_header(segment_id_t segment);
+
+ /**
+ * scan_extents
+ *
+ * Scans records beginning at addr until the first record boundary after
+ * addr + bytes_to_read.
+ *
+ * Returns list<extent, extent_info>
+ * cursor.is_complete() will be true when no further extents exist in segment.
+ */
+ using scan_extents_cursor = scan_valid_records_cursor;
+ using scan_extents_ertr = read_ertr::extend<crimson::ct_error::enodata>;
+ using scan_extents_ret_bare = std::list<std::pair<paddr_t, extent_info_t>>;
+ using scan_extents_ret = scan_extents_ertr::future<scan_extents_ret_bare>;
+ scan_extents_ret scan_extents(
+ scan_extents_cursor &cursor,
+ extent_len_t bytes_to_read
+ );
+
+ using scan_valid_records_ertr = read_ertr::extend<crimson::ct_error::enodata>;
+ using scan_valid_records_ret = scan_valid_records_ertr::future<
+ size_t>;
+ using found_record_handler_t = std::function<
+ scan_valid_records_ertr::future<>(
+ paddr_t record_block_base,
+ // callee may assume header and bl will remain valid until
+ // returned future resolves
+ const record_header_t &header,
+ const bufferlist &bl)>;
+ scan_valid_records_ret scan_valid_records(
+ scan_valid_records_cursor &cursor, ///< [in, out] cursor, updated during call
+ segment_nonce_t nonce, ///< [in] nonce for segment
+ size_t budget, ///< [in] max budget to use
+ found_record_handler_t &handler ///< [in] handler for records
+ ); ///< @return used budget
+
+ void add_segment_manager(SegmentManager* segment_manager) {
+ assert(!segment_managers[segment_manager->get_device_id()] ||
+ segment_manager == segment_managers[segment_manager->get_device_id()]);
+ segment_managers[segment_manager->get_device_id()] = segment_manager;
+ }
+
+ read_ertr::future<> read(
+ paddr_t addr,
+ size_t len,
+ ceph::bufferptr &out) {
+ assert(segment_managers[addr.segment.device_id()]);
+ return segment_managers[addr.segment.device_id()]->read(addr, len, out);
+ }
+
+private:
+ SegmentManager& segment_manager;
+
+ /// read record metadata for record starting at start
+ using read_validate_record_metadata_ertr = read_ertr;
+ using read_validate_record_metadata_ret =
+ read_validate_record_metadata_ertr::future<
+ std::optional<std::pair<record_header_t, bufferlist>>
+ >;
+ read_validate_record_metadata_ret read_validate_record_metadata(
+ paddr_t start,
+ segment_nonce_t nonce);
+
+ /// attempts to decode extent infos from bl, return nullopt if unsuccessful
+ std::optional<std::vector<extent_info_t>> try_decode_extent_infos(
+ record_header_t header,
+ const bufferlist &bl);
+
+ /// read and validate data
+ using read_validate_data_ertr = read_ertr;
+ using read_validate_data_ret = read_validate_data_ertr::future<bool>;
+ read_validate_data_ret read_validate_data(
+ paddr_t record_base,
+ const record_header_t &header ///< caller must ensure lifetime through
+ /// future resolution
+ );
+
+ /// validate embedded metadata checksum
+ static bool validate_metadata(const bufferlist &bl);
+
+};
+
+using ExtentReaderRef = std::unique_ptr<ExtentReader>;
+
+} // namespace crimson::os::seastore
sizeof(meta.seastore_id.uuid));
}
-Journal::Journal(SegmentManager &segment_manager, Scanner& scanner)
+Journal::Journal(SegmentManager &segment_manager, ExtentReader& scanner)
: segment_manager(segment_manager), scanner(scanner) {}
logger().debug("replay_segment: starting at {}", seq);
return seastar::do_with(
scan_valid_records_cursor(seq.offset),
- Scanner::found_record_handler_t(
+ ExtentReader::found_record_handler_t(
[=, &handler](paddr_t base,
const record_header_t &header,
const bufferlist &mdbuf) {
#include "include/denc.h"
#include "crimson/common/log.h"
-#include "crimson/os/seastore/scanner.h"
+#include "crimson/os/seastore/extent_reader.h"
#include "crimson/os/seastore/segment_manager.h"
#include "crimson/os/seastore/ordering_handle.h"
#include "crimson/os/seastore/seastore_types.h"
*/
class Journal {
public:
- Journal(SegmentManager &segment_manager, Scanner& scanner);
+ Journal(SegmentManager &segment_manager, ExtentReader& scanner);
/**
* Sets the SegmentProvider.
segment_off_t written_to = 0;
segment_off_t committed_to = 0;
- Scanner& scanner;
+ ExtentReader& scanner;
WritePipeline *write_pipeline = nullptr;
void reset_soft_state() {
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
-// vim: ts=8 sw=2 smarttab expandtab
-
-#include "crimson/os/seastore/segment_manager.h"
-#include "crimson/os/seastore/scanner.h"
-#include "crimson/common/log.h"
-
-namespace {
- seastar::logger& logger() {
- return crimson::get_logger(ceph_subsys_seastore);
- }
-}
-
-namespace crimson::os::seastore {
-
-Scanner::read_segment_header_ret
-Scanner::read_segment_header(segment_id_t segment)
-{
- return segment_manager.read(
- paddr_t{segment, 0},
- segment_manager.get_block_size()
- ).handle_error(
- read_segment_header_ertr::pass_further{},
- crimson::ct_error::assert_all{
- "Invalid error in Scanner::read_segment_header"
- }
- ).safe_then([=](bufferptr bptr) -> read_segment_header_ret {
- logger().debug("segment {} bptr size {}", segment, bptr.length());
-
- segment_header_t header;
- bufferlist bl;
- bl.push_back(bptr);
-
- logger().debug(
- "Scanner::read_segment_header: segment {} block crc {}",
- segment,
- bl.begin().crc32c(segment_manager.get_block_size(), 0));
-
- auto bp = bl.cbegin();
- try {
- decode(header, bp);
- } catch (ceph::buffer::error &e) {
- logger().debug(
- "Scanner::read_segment_header: segment {} unable to decode "
- "header, skipping",
- segment);
- return crimson::ct_error::enodata::make();
- }
- logger().debug(
- "Scanner::read_segment_header: segment {} header {}",
- segment,
- header);
- return read_segment_header_ret(
- read_segment_header_ertr::ready_future_marker{},
- header);
- });
-}
-Scanner::scan_extents_ret Scanner::scan_extents(
- scan_extents_cursor &cursor,
- extent_len_t bytes_to_read)
-{
- auto ret = std::make_unique<scan_extents_ret_bare>();
- auto* extents = ret.get();
- return read_segment_header(cursor.get_offset().segment
- ).handle_error(
- scan_extents_ertr::pass_further{},
- crimson::ct_error::assert_all{
- "Invalid error in Scanner::scan_extents"
- }
- ).safe_then([bytes_to_read, extents, &cursor, this](auto segment_header) {
- auto segment_nonce = segment_header.segment_nonce;
- return seastar::do_with(
- found_record_handler_t(
- [extents, this](
- paddr_t base,
- const record_header_t &header,
- const bufferlist &mdbuf) mutable {
-
- auto infos = try_decode_extent_infos(
- header,
- mdbuf);
- if (!infos) {
- // This should be impossible, we did check the crc on the mdbuf
- logger().error(
- "Scanner::scan_extents unable to decode extents for record {}",
- base);
- assert(infos);
- }
-
- paddr_t extent_offset = base.add_offset(header.mdlength);
- for (const auto &i : *infos) {
- extents->emplace_back(extent_offset, i);
- extent_offset.offset += i.len;
- }
- return scan_extents_ertr::now();
- }),
- [=, &cursor](auto &dhandler) {
- return scan_valid_records(
- cursor,
- segment_nonce,
- bytes_to_read,
- dhandler).discard_result();
- });
- }).safe_then([ret=std::move(ret)] {
- return std::move(*ret);
- });
-}
-
-Scanner::scan_valid_records_ret Scanner::scan_valid_records(
- scan_valid_records_cursor &cursor,
- segment_nonce_t nonce,
- size_t budget,
- found_record_handler_t &handler)
-{
- if (cursor.offset.offset == 0) {
- cursor.offset.offset = segment_manager.get_block_size();
- }
- auto retref = std::make_unique<size_t>(0);
- auto &budget_used = *retref;
- return crimson::repeat(
- [=, &cursor, &budget_used, &handler]() mutable
- -> scan_valid_records_ertr::future<seastar::stop_iteration> {
- return [=, &handler, &cursor, &budget_used] {
- if (!cursor.last_valid_header_found) {
- return read_validate_record_metadata(cursor.offset, nonce
- ).safe_then([=, &cursor](auto md) {
- logger().debug(
- "Scanner::scan_valid_records: read complete {}",
- cursor.offset);
- if (!md) {
- logger().debug(
- "Scanner::scan_valid_records: found invalid header at {}, presumably at end",
- cursor.offset);
- cursor.last_valid_header_found = true;
- return scan_valid_records_ertr::now();
- } else {
- logger().debug(
- "Scanner::scan_valid_records: valid record read at {}",
- cursor.offset);
- cursor.last_committed = paddr_t{
- cursor.offset.segment,
- md->first.committed_to};
- cursor.pending_records.emplace_back(
- cursor.offset,
- md->first,
- md->second);
- cursor.offset.offset +=
- md->first.dlength + md->first.mdlength;
- return scan_valid_records_ertr::now();
- }
- }).safe_then([=, &cursor, &budget_used, &handler] {
- return crimson::repeat(
- [=, &budget_used, &cursor, &handler] {
- logger().debug(
- "Scanner::scan_valid_records: valid record read, processing queue");
- if (cursor.pending_records.empty()) {
- /* This is only possible if the segment is empty.
- * A record's last_commited must be prior to its own
- * location since it itself cannot yet have been committed
- * at its own time of submission. Thus, the most recently
- * read record must always fall after cursor.last_committed */
- return scan_valid_records_ertr::make_ready_future<
- seastar::stop_iteration>(seastar::stop_iteration::yes);
- }
- auto &next = cursor.pending_records.front();
- if (next.offset > cursor.last_committed) {
- return scan_valid_records_ertr::make_ready_future<
- seastar::stop_iteration>(seastar::stop_iteration::yes);
- }
- budget_used +=
- next.header.dlength + next.header.mdlength;
- return handler(
- next.offset,
- next.header,
- next.mdbuffer
- ).safe_then([&cursor] {
- cursor.pending_records.pop_front();
- return scan_valid_records_ertr::make_ready_future<
- seastar::stop_iteration>(seastar::stop_iteration::no);
- });
- });
- });
- } else {
- assert(!cursor.pending_records.empty());
- auto &next = cursor.pending_records.front();
- return read_validate_data(next.offset, next.header
- ).safe_then([=, &budget_used, &next, &cursor, &handler](auto valid) {
- if (!valid) {
- cursor.pending_records.clear();
- return scan_valid_records_ertr::now();
- }
- budget_used +=
- next.header.dlength + next.header.mdlength;
- return handler(
- next.offset,
- next.header,
- next.mdbuffer
- ).safe_then([&cursor] {
- cursor.pending_records.pop_front();
- return scan_valid_records_ertr::now();
- });
- });
- }
- }().safe_then([=, &budget_used, &cursor] {
- if (cursor.is_complete() || budget_used >= budget) {
- return seastar::stop_iteration::yes;
- } else {
- return seastar::stop_iteration::no;
- }
- });
- }).safe_then([retref=std::move(retref)]() mutable -> scan_valid_records_ret {
- return scan_valid_records_ret(
- scan_valid_records_ertr::ready_future_marker{},
- std::move(*retref));
- });
-}
-
-Scanner::read_validate_record_metadata_ret
-Scanner::read_validate_record_metadata(
- paddr_t start,
- segment_nonce_t nonce)
-{
- auto block_size = segment_manager.get_block_size();
- if (start.offset + block_size > (int64_t)segment_manager.get_segment_size()) {
- return read_validate_record_metadata_ret(
- read_validate_record_metadata_ertr::ready_future_marker{},
- std::nullopt);
- }
- return segment_manager.read(start, block_size
- ).safe_then(
- [=](bufferptr bptr) mutable
- -> read_validate_record_metadata_ret {
- logger().debug("read_validate_record_metadata: reading {}", start);
- auto block_size = segment_manager.get_block_size();
- bufferlist bl;
- bl.append(bptr);
- auto bp = bl.cbegin();
- record_header_t header;
- try {
- decode(header, bp);
- } catch (ceph::buffer::error &e) {
- return read_validate_record_metadata_ret(
- read_validate_record_metadata_ertr::ready_future_marker{},
- std::nullopt);
- }
- if (header.segment_nonce != nonce) {
- return read_validate_record_metadata_ret(
- read_validate_record_metadata_ertr::ready_future_marker{},
- std::nullopt);
- }
- if (header.mdlength > (extent_len_t)block_size) {
- if (start.offset + header.mdlength >
- (int64_t)segment_manager.get_segment_size()) {
- return crimson::ct_error::input_output_error::make();
- }
- return segment_manager.read(
- {start.segment, start.offset + (segment_off_t)block_size},
- header.mdlength - block_size).safe_then(
- [header=std::move(header), bl=std::move(bl)](
- auto &&bptail) mutable {
- bl.push_back(bptail);
- return read_validate_record_metadata_ret(
- read_validate_record_metadata_ertr::ready_future_marker{},
- std::make_pair(std::move(header), std::move(bl)));
- });
- } else {
- return read_validate_record_metadata_ret(
- read_validate_record_metadata_ertr::ready_future_marker{},
- std::make_pair(std::move(header), std::move(bl))
- );
- }
- }).safe_then([=](auto p) {
- if (p && validate_metadata(p->second)) {
- return read_validate_record_metadata_ret(
- read_validate_record_metadata_ertr::ready_future_marker{},
- std::move(*p)
- );
- } else {
- return read_validate_record_metadata_ret(
- read_validate_record_metadata_ertr::ready_future_marker{},
- std::nullopt);
- }
- });
-}
-
-std::optional<std::vector<extent_info_t>>
-Scanner::try_decode_extent_infos(
- record_header_t header,
- const bufferlist &bl)
-{
- auto bliter = bl.cbegin();
- bliter += ceph::encoded_sizeof_bounded<record_header_t>();
- bliter += sizeof(checksum_t) /* crc */;
- logger().debug("{}: decoding {} extents", __func__, header.extents);
- std::vector<extent_info_t> extent_infos(header.extents);
- for (auto &&i : extent_infos) {
- try {
- decode(i, bliter);
- } catch (ceph::buffer::error &e) {
- return std::nullopt;
- }
- }
- return extent_infos;
-}
-
-Scanner::read_validate_data_ret
-Scanner::read_validate_data(
- paddr_t record_base,
- const record_header_t &header)
-{
- return segment_manager.read(
- record_base.add_offset(header.mdlength),
- header.dlength
- ).safe_then([=, &header](auto bptr) {
- bufferlist bl;
- bl.append(bptr);
- return bl.crc32c(-1) == header.data_crc;
- });
-}
-
-bool Scanner::validate_metadata(const bufferlist &bl)
-{
- auto bliter = bl.cbegin();
- auto test_crc = bliter.crc32c(
- ceph::encoded_sizeof_bounded<record_header_t>(),
- -1);
- ceph_le32 recorded_crc_le;
- decode(recorded_crc_le, bliter);
- uint32_t recorded_crc = recorded_crc_le;
- test_crc = bliter.crc32c(
- bliter.get_remaining(),
- test_crc);
- return test_crc == recorded_crc;
-}
-
-} // namespace crimson::os::seastore
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
-// vim: ts=8 sw=2 smarttab expandtab
-
-#pragma once
-
-#include "crimson/common/errorator.h"
-#include "crimson/os/seastore/seastore_types.h"
-#include "crimson/os/seastore/segment_manager.h"
-
-namespace crimson::os::seastore {
-
-class SegmentCleaner;
-
-class Scanner {
-public:
- using read_ertr = crimson::errorator<
- crimson::ct_error::input_output_error,
- crimson::ct_error::invarg,
- crimson::ct_error::enoent,
- crimson::ct_error::erange>;
-
- Scanner(SegmentManager& segment_manager)
- : segment_manager(segment_manager) {}
- using read_segment_header_ertr = crimson::errorator<
- crimson::ct_error::enoent,
- crimson::ct_error::enodata,
- crimson::ct_error::input_output_error
- >;
- using read_segment_header_ret = read_segment_header_ertr::future<
- segment_header_t>;
- read_segment_header_ret read_segment_header(segment_id_t segment);
-
- /**
- * scan_extents
- *
- * Scans records beginning at addr until the first record boundary after
- * addr + bytes_to_read.
- *
- * Returns list<extent, extent_info>
- * cursor.is_complete() will be true when no further extents exist in segment.
- */
- using scan_extents_cursor = scan_valid_records_cursor;
- using scan_extents_ertr = read_ertr::extend<crimson::ct_error::enodata>;
- using scan_extents_ret_bare = std::list<std::pair<paddr_t, extent_info_t>>;
- using scan_extents_ret = scan_extents_ertr::future<scan_extents_ret_bare>;
- scan_extents_ret scan_extents(
- scan_extents_cursor &cursor,
- extent_len_t bytes_to_read
- );
-
- using scan_valid_records_ertr = read_ertr::extend<crimson::ct_error::enodata>;
- using scan_valid_records_ret = scan_valid_records_ertr::future<
- size_t>;
- using found_record_handler_t = std::function<
- scan_valid_records_ertr::future<>(
- paddr_t record_block_base,
- // callee may assume header and bl will remain valid until
- // returned future resolves
- const record_header_t &header,
- const bufferlist &bl)>;
- scan_valid_records_ret scan_valid_records(
- scan_valid_records_cursor &cursor, ///< [in, out] cursor, updated during call
- segment_nonce_t nonce, ///< [in] nonce for segment
- size_t budget, ///< [in] max budget to use
- found_record_handler_t &handler ///< [in] handler for records
- ); ///< @return used budget
-
-private:
- SegmentManager& segment_manager;
-
- /// read record metadata for record starting at start
- using read_validate_record_metadata_ertr = read_ertr;
- using read_validate_record_metadata_ret =
- read_validate_record_metadata_ertr::future<
- std::optional<std::pair<record_header_t, bufferlist>>
- >;
- read_validate_record_metadata_ret read_validate_record_metadata(
- paddr_t start,
- segment_nonce_t nonce);
-
- /// attempts to decode extent infos from bl, return nullopt if unsuccessful
- std::optional<std::vector<extent_info_t>> try_decode_extent_infos(
- record_header_t header,
- const bufferlist &bl);
-
- /// read and validate data
- using read_validate_data_ertr = read_ertr;
- using read_validate_data_ret = read_validate_data_ertr::future<bool>;
- read_validate_data_ret read_validate_data(
- paddr_t record_base,
- const record_header_t &header ///< caller must ensure lifetime through
- /// future resolution
- );
-
- /// validate embedded metadata checksum
- static bool validate_metadata(const bufferlist &bl);
-
-};
-
-using ScannerRef = std::unique_ptr<Scanner>;
-
-} // namespace crimson::os::seastore
segment_manager::block::BlockSegmentManager
>(device + "/block");
- auto scanner = std::make_unique<Scanner>(*sm);
+ auto scanner = std::make_unique<ExtentReader>();
auto& scanner_ref = *scanner.get();
auto segment_cleaner = std::make_unique<SegmentCleaner>(
SegmentCleaner::config_t::get_default(),
false /* detailed */);
auto journal = std::make_unique<Journal>(*sm, scanner_ref);
- auto cache = std::make_unique<Cache>(*sm);
+ auto cache = std::make_unique<Cache>(scanner_ref, sm->get_block_size());
auto lba_manager = lba_manager::create_lba_manager(*sm, *cache);
auto epm = std::make_unique<ExtentPlacementManager>(*cache, *lba_manager);
SegmentCleaner::SegmentCleaner(
config_t config,
- ScannerRef&& scr,
+ ExtentReaderRef&& scr,
bool detailed)
: detailed(detailed),
config(config),
}
next.offset = 0;
scan_cursor =
- std::make_unique<Scanner::scan_extents_cursor>(
+ std::make_unique<ExtentReader::scan_extents_cursor>(
next);
logger().debug(
"SegmentCleaner::do_gc: starting gc on segment {}",
return scanner->read_segment_header(segment_id)
.safe_then([&segments, segment_id, this](auto header) {
if (header.out_of_line) {
- logger().debug("Scanner::init_segments: out-of-line segment {}", segment_id);
+ logger().debug("ExtentReader::init_segments: out-of-line segment {}", segment_id);
init_mark_segment_closed(
segment_id,
header.journal_segment_seq,
true);
} else {
- logger().debug("Scanner::init_segments: journal segment {}", segment_id);
+ logger().debug("ExtentReader::init_segments: journal segment {}", segment_id);
segments.emplace_back(std::make_pair(segment_id, std::move(header)));
}
return seastar::now();
size_t segment_size = 0;
size_t block_size = 0;
- ScannerRef scanner;
+ ExtentReaderRef scanner;
SpaceTrackerIRef space_tracker;
std::vector<segment_info_t> segments;
public:
SegmentCleaner(
config_t config,
- ScannerRef&& scanner,
+ ExtentReaderRef&& scanner,
bool detailed = false);
void mount(SegmentManager &sm) {
// GC status helpers
std::unique_ptr<
- Scanner::scan_extents_cursor
+ ExtentReader::scan_extents_cursor
> scan_cursor;
/**
} gc_process;
using gc_ertr = work_ertr::extend_ertr<
- Scanner::scan_extents_ertr
+ ExtentReader::scan_extents_ertr
>;
gc_cycle_ret do_gc_cycle();
void TMDriver::init()
{
- auto scanner = std::make_unique<Scanner>(*segment_manager);
+ auto scanner = std::make_unique<ExtentReader>();
+ scanner->add_segment_manager(segment_manager.get());
auto& scanner_ref = *scanner.get();
auto segment_cleaner = std::make_unique<SegmentCleaner>(
SegmentCleaner::config_t::get_default(),
false /* detailed */);
segment_cleaner->mount(*segment_manager);
auto journal = std::make_unique<Journal>(*segment_manager, scanner_ref);
- auto cache = std::make_unique<Cache>(*segment_manager);
+ auto cache = std::make_unique<Cache>(scanner_ref, segment_manager->get_block_size());
auto lba_manager = lba_manager::create_lba_manager(*segment_manager, *cache);
auto epm = std::make_unique<ExtentPlacementManager>(*cache, *lba_manager);
struct btree_lba_manager_test :
public seastar_test_suite_t, SegmentProvider {
segment_manager::EphemeralSegmentManagerRef segment_manager;
- ScannerRef scanner;
+ ExtentReaderRef scanner;
Journal journal;
Cache cache;
BtreeLBAManagerRef lba_manager;
btree_lba_manager_test()
: segment_manager(segment_manager::create_test_ephemeral()),
- scanner(new Scanner(*segment_manager)),
+ scanner(new ExtentReader()),
journal(*segment_manager, *scanner),
- cache(*segment_manager),
+ cache(*scanner, segment_manager->get_block_size()),
lba_manager(new BtreeLBAManager(*segment_manager, cache)),
block_size(segment_manager->get_block_size())
{
+ scanner->add_segment_manager(segment_manager.get());
journal.set_segment_provider(this);
journal.set_write_pipeline(&pipeline);
}
struct rbm_test_t : public seastar_test_suite_t,
TMTestState {
segment_manager::EphemeralSegmentManagerRef segment_manager; // Need to be deleted, just for Cache
+ ExtentReaderRef reader;
Cache cache;
std::unique_ptr<NVMeManager> rbm_manager;
nvme_device::NVMeBlockDevice *device;
rbm_test_t() :
segment_manager(segment_manager::create_test_ephemeral()),
- cache(*segment_manager)
+ reader(new ExtentReader()),
+ cache(*reader, segment_manager->get_block_size())
{
device = new nvme_device::TestMemory(DEFAULT_TEST_SIZE);
rbm_manager.reset(new NVMeManager(device, std::string()));
struct cache_test_t : public seastar_test_suite_t {
segment_manager::EphemeralSegmentManagerRef segment_manager;
+ ExtentReaderRef reader;
Cache cache;
paddr_t current{0, 0};
journal_seq_t seq;
cache_test_t()
: segment_manager(segment_manager::create_test_ephemeral()),
- cache(*segment_manager) {}
+ reader(new ExtentReader()),
+ cache(*reader, segment_manager->get_block_size()) {
+ reader->add_segment_manager(segment_manager.get());
+ }
seastar::future<paddr_t> submit_transaction(
TransactionRef t) {
const segment_off_t block_size;
- ScannerRef scanner;
+ ExtentReaderRef scanner;
journal_test_t()
: segment_manager(segment_manager::create_test_ephemeral()),
block_size(segment_manager->get_block_size()),
- scanner(std::make_unique<Scanner>(*segment_manager))
- {}
+ scanner(new ExtentReader())
+ {
+ scanner->add_segment_manager(segment_manager.get());
+ }
segment_id_t next = 0;
get_segment_ret get_segment() final {
auto get_transaction_manager(
SegmentManager &segment_manager) {
- auto scanner = std::make_unique<Scanner>(segment_manager);
+ auto scanner = std::make_unique<ExtentReader>();
+ scanner->add_segment_manager(&segment_manager);
auto& scanner_ref = *scanner.get();
auto segment_cleaner = std::make_unique<SegmentCleaner>(
SegmentCleaner::config_t::get_default(),
std::move(scanner),
true);
auto journal = std::make_unique<Journal>(segment_manager, scanner_ref);
- auto cache = std::make_unique<Cache>(segment_manager);
+ auto cache = std::make_unique<Cache>(scanner_ref, segment_manager.get_block_size());
auto lba_manager = lba_manager::create_lba_manager(segment_manager, *cache);
auto epm = std::make_unique<ExtentPlacementManager>(*cache, *lba_manager);