]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/os/seastore: change Scanner to ExtentReader
authorXuehan Xu <xxhdx1985126@gmail.com>
Thu, 23 Sep 2021 06:53:39 +0000 (14:53 +0800)
committerXuehan Xu <xxhdx1985126@gmail.com>
Sun, 10 Oct 2021 06:21:07 +0000 (14:21 +0800)
This commit makes Scanner an extent reader that route read requests to the corresponding
backing devices according to the device ids encapsulated in the segment ids.

Signed-off-by: Xuehan Xu <xxhdx1985126@gmail.com>
18 files changed:
src/crimson/os/seastore/CMakeLists.txt
src/crimson/os/seastore/cache.cc
src/crimson/os/seastore/cache.h
src/crimson/os/seastore/extent_reader.cc [new file with mode: 0644]
src/crimson/os/seastore/extent_reader.h [new file with mode: 0644]
src/crimson/os/seastore/journal.cc
src/crimson/os/seastore/journal.h
src/crimson/os/seastore/scanner.cc [deleted file]
src/crimson/os/seastore/scanner.h [deleted file]
src/crimson/os/seastore/seastore.cc
src/crimson/os/seastore/segment_cleaner.cc
src/crimson/os/seastore/segment_cleaner.h
src/crimson/tools/store_nbd/tm_driver.cc
src/test/crimson/seastore/test_btree_lba_manager.cc
src/test/crimson/seastore/test_randomblock_manager.cc
src/test/crimson/seastore/test_seastore_cache.cc
src/test/crimson/seastore/test_seastore_journal.cc
src/test/crimson/seastore/transaction_manager_test_state.h

index 146880a65998971e5cdd1688d246278d42f611f2..6a03e8faa873d95ad82b4443c32cff4f0e702d06 100644 (file)
@@ -7,7 +7,7 @@ add_library(crimson-seastore STATIC
   transaction.cc
   journal.cc
   cache.cc
-  scanner.cc
+  extent_reader.cc
   lba_manager.cc
   segment_cleaner.cc
   lba_manager/btree/btree_lba_manager.cc
index 4f810003b90e51682d78ab8fd5433bb670a8db9e..20a8c2c168228647e6d3f24ed30c09310df6b278 100644 (file)
@@ -20,8 +20,11 @@ using std::string_view;
 
 namespace crimson::os::seastore {
 
-Cache::Cache(SegmentManager &segment_manager) :
-  segment_manager(segment_manager)
+Cache::Cache(
+  ExtentReader &reader,
+  segment_off_t block_size)
+  : reader(reader),
+    block_size(block_size)
 {
   register_metrics();
 }
@@ -942,7 +945,7 @@ record_t Cache::prepare_record(Transaction &t)
   efforts.num_ool_records += ool_stats.num_records;
   efforts.ool_record_overhead_bytes += ool_stats.overhead_bytes;
   auto record_size = get_encoded_record_length(
-      record, segment_manager.get_block_size());
+      record, block_size);
   auto inline_overhead =
       record_size.mdlength + record_size.dlength - record.get_raw_data_size();
   efforts.inline_record_overhead_bytes += inline_overhead;
index f1449adb22119682ff1494a79d460676d9e8e3fc..d7d59948628bfca87952701dc1254713f33479f4 100644 (file)
@@ -89,7 +89,7 @@ public:
     crimson::ct_error::input_output_error>;
   using base_iertr = trans_iertr<base_ertr>;
 
-  Cache(SegmentManager &segment_manager);
+  Cache(ExtentReader &reader, segment_off_t block_size);
   ~Cache();
 
   /// Creates empty transaction by source
@@ -612,7 +612,9 @@ public:
   void dump_contents();
 
 private:
-  SegmentManager &segment_manager; ///< ref to segment_manager
+  ExtentReader &reader;                   ///< ref to extent reader
+  segment_off_t block_size;       ///< block size of the segment
+                                   ///< manager holding journal records
   RootBlockRef root;               ///< ref to current root
   ExtentIndex extents;             ///< set of live extents
 
@@ -779,7 +781,7 @@ private:
     TCachedExtentRef<T>&& extent
   ) {
     extent->set_io_wait();
-    return segment_manager.read(
+    return reader.read(
       extent->get_paddr(),
       extent->get_length(),
       extent->get_bptr()
diff --git a/src/crimson/os/seastore/extent_reader.cc b/src/crimson/os/seastore/extent_reader.cc
new file mode 100644 (file)
index 0000000..c3b21ae
--- /dev/null
@@ -0,0 +1,336 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
+
+#include "crimson/os/seastore/segment_manager.h"
+#include "crimson/os/seastore/extent_reader.h"
+#include "crimson/common/log.h"
+
+namespace {
+  seastar::logger& logger() {
+    return crimson::get_logger(ceph_subsys_seastore);
+  }
+}
+
+namespace crimson::os::seastore {
+
+ExtentReader::read_segment_header_ret
+ExtentReader::read_segment_header(segment_id_t segment)
+{
+  return segment_manager.read(
+    paddr_t{segment, 0},
+    segment_manager.get_block_size()
+  ).handle_error(
+    read_segment_header_ertr::pass_further{},
+    crimson::ct_error::assert_all{
+      "Invalid error in ExtentReader::read_segment_header"
+    }
+  ).safe_then([=](bufferptr bptr) -> read_segment_header_ret {
+    logger().debug("segment {} bptr size {}", segment, bptr.length());
+
+    segment_header_t header;
+    bufferlist bl;
+    bl.push_back(bptr);
+
+    logger().debug(
+      "ExtentReader::read_segment_header: segment {} block crc {}",
+      segment,
+      bl.begin().crc32c(segment_manager.get_block_size(), 0));
+
+    auto bp = bl.cbegin();
+    try {
+      decode(header, bp);
+    } catch (ceph::buffer::error &e) {
+      logger().debug(
+       "ExtentReader::read_segment_header: segment {} unable to decode "
+       "header, skipping",
+       segment);
+      return crimson::ct_error::enodata::make();
+    }
+    logger().debug(
+      "ExtentReader::read_segment_header: segment {} header {}",
+      segment,
+      header);
+    return read_segment_header_ret(
+      read_segment_header_ertr::ready_future_marker{},
+      header);
+  });
+}
+ExtentReader::scan_extents_ret ExtentReader::scan_extents(
+  scan_extents_cursor &cursor,
+  extent_len_t bytes_to_read)
+{
+  auto ret = std::make_unique<scan_extents_ret_bare>();
+  auto* extents = ret.get();
+  return read_segment_header(cursor.get_offset().segment
+  ).handle_error(
+    scan_extents_ertr::pass_further{},
+    crimson::ct_error::assert_all{
+      "Invalid error in ExtentReader::scan_extents"
+    }
+  ).safe_then([bytes_to_read, extents, &cursor, this](auto segment_header) {
+    auto segment_nonce = segment_header.segment_nonce;
+    return seastar::do_with(
+      found_record_handler_t(
+       [extents, this](
+         paddr_t base,
+         const record_header_t &header,
+         const bufferlist &mdbuf) mutable {
+
+         auto infos = try_decode_extent_infos(
+           header,
+           mdbuf);
+         if (!infos) {
+           // This should be impossible, we did check the crc on the mdbuf
+           logger().error(
+             "ExtentReader::scan_extents unable to decode extents for record {}",
+             base);
+           assert(infos);
+         }
+
+         paddr_t extent_offset = base.add_offset(header.mdlength);
+         for (const auto &i : *infos) {
+           extents->emplace_back(extent_offset, i);
+           extent_offset.offset += i.len;
+         }
+         return scan_extents_ertr::now();
+       }),
+      [=, &cursor](auto &dhandler) {
+       return scan_valid_records(
+         cursor,
+         segment_nonce,
+         bytes_to_read,
+         dhandler).discard_result();
+      });
+  }).safe_then([ret=std::move(ret)] {
+    return std::move(*ret);
+  });
+}
+
+ExtentReader::scan_valid_records_ret ExtentReader::scan_valid_records(
+  scan_valid_records_cursor &cursor,
+  segment_nonce_t nonce,
+  size_t budget,
+  found_record_handler_t &handler)
+{
+  if (cursor.offset.offset == 0) {
+    cursor.offset.offset = segment_manager.get_block_size();
+  }
+  auto retref = std::make_unique<size_t>(0);
+  auto &budget_used = *retref;
+  return crimson::repeat(
+    [=, &cursor, &budget_used, &handler]() mutable
+    -> scan_valid_records_ertr::future<seastar::stop_iteration> {
+      return [=, &handler, &cursor, &budget_used] {
+       if (!cursor.last_valid_header_found) {
+         return read_validate_record_metadata(cursor.offset, nonce
+         ).safe_then([=, &cursor](auto md) {
+           logger().debug(
+             "ExtentReader::scan_valid_records: read complete {}",
+             cursor.offset);
+           if (!md) {
+             logger().debug(
+               "ExtentReader::scan_valid_records: found invalid header at {}, presumably at end",
+               cursor.offset);
+             cursor.last_valid_header_found = true;
+             return scan_valid_records_ertr::now();
+           } else {
+             logger().debug(
+               "ExtentReader::scan_valid_records: valid record read at {}",
+               cursor.offset);
+             cursor.last_committed = paddr_t{
+               cursor.offset.segment,
+               md->first.committed_to};
+             cursor.pending_records.emplace_back(
+               cursor.offset,
+               md->first,
+               md->second);
+             cursor.offset.offset +=
+               md->first.dlength + md->first.mdlength;
+             return scan_valid_records_ertr::now();
+           }
+         }).safe_then([=, &cursor, &budget_used, &handler] {
+           return crimson::repeat(
+             [=, &budget_used, &cursor, &handler] {
+               logger().debug(
+                 "ExtentReader::scan_valid_records: valid record read, processing queue");
+               if (cursor.pending_records.empty()) {
+                 /* This is only possible if the segment is empty.
+                  * A record's last_commited must be prior to its own
+                  * location since it itself cannot yet have been committed
+                  * at its own time of submission.  Thus, the most recently
+                  * read record must always fall after cursor.last_committed */
+                 return scan_valid_records_ertr::make_ready_future<
+                   seastar::stop_iteration>(seastar::stop_iteration::yes);
+               }
+               auto &next = cursor.pending_records.front();
+               if (next.offset > cursor.last_committed) {
+                 return scan_valid_records_ertr::make_ready_future<
+                   seastar::stop_iteration>(seastar::stop_iteration::yes);
+               }
+               budget_used +=
+                 next.header.dlength + next.header.mdlength;
+               return handler(
+                 next.offset,
+                 next.header,
+                 next.mdbuffer
+               ).safe_then([&cursor] {
+                 cursor.pending_records.pop_front();
+                 return scan_valid_records_ertr::make_ready_future<
+                   seastar::stop_iteration>(seastar::stop_iteration::no);
+               });
+             });
+         });
+       } else {
+         assert(!cursor.pending_records.empty());
+         auto &next = cursor.pending_records.front();
+         return read_validate_data(next.offset, next.header
+         ).safe_then([=, &budget_used, &next, &cursor, &handler](auto valid) {
+           if (!valid) {
+             cursor.pending_records.clear();
+             return scan_valid_records_ertr::now();
+           }
+           budget_used +=
+             next.header.dlength + next.header.mdlength;
+           return handler(
+             next.offset,
+             next.header,
+             next.mdbuffer
+           ).safe_then([&cursor] {
+             cursor.pending_records.pop_front();
+             return scan_valid_records_ertr::now();
+           });
+         });
+       }
+      }().safe_then([=, &budget_used, &cursor] {
+       if (cursor.is_complete() || budget_used >= budget) {
+         return seastar::stop_iteration::yes;
+       } else {
+         return seastar::stop_iteration::no;
+       }
+      });
+    }).safe_then([retref=std::move(retref)]() mutable -> scan_valid_records_ret {
+      return scan_valid_records_ret(
+       scan_valid_records_ertr::ready_future_marker{},
+       std::move(*retref));
+    });
+}
+
+ExtentReader::read_validate_record_metadata_ret
+ExtentReader::read_validate_record_metadata(
+  paddr_t start,
+  segment_nonce_t nonce)
+{
+  auto block_size = segment_manager.get_block_size();
+  if (start.offset + block_size > (int64_t)segment_manager.get_segment_size()) {
+    return read_validate_record_metadata_ret(
+      read_validate_record_metadata_ertr::ready_future_marker{},
+      std::nullopt);
+  }
+  return segment_manager.read(start, block_size
+  ).safe_then(
+    [=](bufferptr bptr) mutable
+    -> read_validate_record_metadata_ret {
+      logger().debug("read_validate_record_metadata: reading {}", start);
+      auto block_size = segment_manager.get_block_size();
+      bufferlist bl;
+      bl.append(bptr);
+      auto bp = bl.cbegin();
+      record_header_t header;
+      try {
+       decode(header, bp);
+      } catch (ceph::buffer::error &e) {
+       return read_validate_record_metadata_ret(
+         read_validate_record_metadata_ertr::ready_future_marker{},
+         std::nullopt);
+      }
+      if (header.segment_nonce != nonce) {
+       return read_validate_record_metadata_ret(
+         read_validate_record_metadata_ertr::ready_future_marker{},
+         std::nullopt);
+      }
+      if (header.mdlength > (extent_len_t)block_size) {
+       if (start.offset + header.mdlength >
+           (int64_t)segment_manager.get_segment_size()) {
+         return crimson::ct_error::input_output_error::make();
+       }
+       return segment_manager.read(
+         {start.segment, start.offset + (segment_off_t)block_size},
+         header.mdlength - block_size).safe_then(
+           [header=std::move(header), bl=std::move(bl)](
+             auto &&bptail) mutable {
+             bl.push_back(bptail);
+             return read_validate_record_metadata_ret(
+               read_validate_record_metadata_ertr::ready_future_marker{},
+               std::make_pair(std::move(header), std::move(bl)));
+           });
+      } else {
+       return read_validate_record_metadata_ret(
+         read_validate_record_metadata_ertr::ready_future_marker{},
+         std::make_pair(std::move(header), std::move(bl))
+       );
+      }
+    }).safe_then([=](auto p) {
+      if (p && validate_metadata(p->second)) {
+       return read_validate_record_metadata_ret(
+         read_validate_record_metadata_ertr::ready_future_marker{},
+         std::move(*p)
+       );
+      } else {
+       return read_validate_record_metadata_ret(
+         read_validate_record_metadata_ertr::ready_future_marker{},
+         std::nullopt);
+      }
+    });
+}
+
+std::optional<std::vector<extent_info_t>>
+ExtentReader::try_decode_extent_infos(
+  record_header_t header,
+  const bufferlist &bl)
+{
+  auto bliter = bl.cbegin();
+  bliter += ceph::encoded_sizeof_bounded<record_header_t>();
+  bliter += sizeof(checksum_t) /* crc */;
+  logger().debug("{}: decoding {} extents", __func__, header.extents);
+  std::vector<extent_info_t> extent_infos(header.extents);
+  for (auto &&i : extent_infos) {
+    try {
+      decode(i, bliter);
+    } catch (ceph::buffer::error &e) {
+      return std::nullopt;
+    }
+  }
+  return extent_infos;
+}
+
+ExtentReader::read_validate_data_ret
+ExtentReader::read_validate_data(
+  paddr_t record_base,
+  const record_header_t &header)
+{
+  return segment_manager.read(
+    record_base.add_offset(header.mdlength),
+    header.dlength
+  ).safe_then([=, &header](auto bptr) {
+    bufferlist bl;
+    bl.append(bptr);
+    return bl.crc32c(-1) == header.data_crc;
+  });
+}
+
+bool ExtentReader::validate_metadata(const bufferlist &bl)
+{
+  auto bliter = bl.cbegin();
+  auto test_crc = bliter.crc32c(
+    ceph::encoded_sizeof_bounded<record_header_t>(),
+    -1);
+  ceph_le32 recorded_crc_le;
+  decode(recorded_crc_le, bliter);
+  uint32_t recorded_crc = recorded_crc_le;
+  test_crc = bliter.crc32c(
+    bliter.get_remaining(),
+    test_crc);
+  return test_crc == recorded_crc;
+}
+
+} // namespace crimson::os::seastore
diff --git a/src/crimson/os/seastore/extent_reader.h b/src/crimson/os/seastore/extent_reader.h
new file mode 100644 (file)
index 0000000..7f0d1ea
--- /dev/null
@@ -0,0 +1,112 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
+
+#pragma once
+
+#include "crimson/common/errorator.h"
+#include "crimson/os/seastore/seastore_types.h"
+#include "crimson/os/seastore/segment_manager.h"
+
+namespace crimson::os::seastore {
+
+class SegmentCleaner;
+
+class ExtentReader {
+public:
+  using read_ertr = SegmentManager::read_ertr;
+  ExtentReader() {
+    segment_managers.resize(max_devices, nullptr);
+  }
+  using read_segment_header_ertr = crimson::errorator<
+    crimson::ct_error::enoent,
+    crimson::ct_error::enodata,
+    crimson::ct_error::input_output_error
+    >;
+  using read_segment_header_ret = read_segment_header_ertr::future<
+    segment_header_t>;
+  read_segment_header_ret read_segment_header(segment_id_t segment);
+
+  /**
+   * scan_extents
+   *
+   * Scans records beginning at addr until the first record boundary after
+   * addr + bytes_to_read.
+   *
+   * Returns list<extent, extent_info>
+   * cursor.is_complete() will be true when no further extents exist in segment.
+   */
+  using scan_extents_cursor = scan_valid_records_cursor;
+  using scan_extents_ertr = read_ertr::extend<crimson::ct_error::enodata>;
+  using scan_extents_ret_bare = std::list<std::pair<paddr_t, extent_info_t>>;
+  using scan_extents_ret = scan_extents_ertr::future<scan_extents_ret_bare>;
+  scan_extents_ret scan_extents(
+    scan_extents_cursor &cursor,
+    extent_len_t bytes_to_read
+  );
+
+  using scan_valid_records_ertr = read_ertr::extend<crimson::ct_error::enodata>;
+  using scan_valid_records_ret = scan_valid_records_ertr::future<
+    size_t>;
+  using found_record_handler_t = std::function<
+    scan_valid_records_ertr::future<>(
+      paddr_t record_block_base,
+      // callee may assume header and bl will remain valid until
+      // returned future resolves
+      const record_header_t &header,
+      const bufferlist &bl)>;
+  scan_valid_records_ret scan_valid_records(
+    scan_valid_records_cursor &cursor, ///< [in, out] cursor, updated during call
+    segment_nonce_t nonce,             ///< [in] nonce for segment
+    size_t budget,                     ///< [in] max budget to use
+    found_record_handler_t &handler    ///< [in] handler for records
+  ); ///< @return used budget
+
+  void add_segment_manager(SegmentManager* segment_manager) {
+    assert(!segment_managers[segment_manager->get_device_id()] ||
+      segment_manager == segment_managers[segment_manager->get_device_id()]);
+    segment_managers[segment_manager->get_device_id()] = segment_manager;
+  }
+
+  read_ertr::future<> read(
+    paddr_t addr,
+    size_t len,
+    ceph::bufferptr &out) {
+    assert(segment_managers[addr.segment.device_id()]);
+    return segment_managers[addr.segment.device_id()]->read(addr, len, out);
+  }
+
+private:
+  SegmentManager& segment_manager;
+
+  /// read record metadata for record starting at start
+  using read_validate_record_metadata_ertr = read_ertr;
+  using read_validate_record_metadata_ret =
+    read_validate_record_metadata_ertr::future<
+      std::optional<std::pair<record_header_t, bufferlist>>
+    >;
+  read_validate_record_metadata_ret read_validate_record_metadata(
+    paddr_t start,
+    segment_nonce_t nonce);
+
+  /// attempts to decode extent infos from bl, return nullopt if unsuccessful
+  std::optional<std::vector<extent_info_t>> try_decode_extent_infos(
+    record_header_t header,
+    const bufferlist &bl);
+
+  /// read and validate data
+  using read_validate_data_ertr = read_ertr;
+  using read_validate_data_ret = read_validate_data_ertr::future<bool>;
+  read_validate_data_ret read_validate_data(
+    paddr_t record_base,
+    const record_header_t &header  ///< caller must ensure lifetime through
+                                   ///  future resolution
+  );
+
+  /// validate embedded metadata checksum
+  static bool validate_metadata(const bufferlist &bl);
+
+};
+
+using ExtentReaderRef = std::unique_ptr<ExtentReader>;
+
+} // namespace crimson::os::seastore
index 8c9a81c027b67ebdd7269a4f59fe124e4b02685f..db59b7d1373a14906b046b6593ea5c715922e951 100644 (file)
@@ -50,7 +50,7 @@ segment_nonce_t generate_nonce(
     sizeof(meta.seastore_id.uuid));
 }
 
-Journal::Journal(SegmentManager &segment_manager, Scanner& scanner)
+Journal::Journal(SegmentManager &segment_manager, ExtentReader& scanner)
   : segment_manager(segment_manager), scanner(scanner) {}
 
 
@@ -306,7 +306,7 @@ Journal::replay_segment(
   logger().debug("replay_segment: starting at {}", seq);
   return seastar::do_with(
     scan_valid_records_cursor(seq.offset),
-    Scanner::found_record_handler_t(
+    ExtentReader::found_record_handler_t(
       [=, &handler](paddr_t base,
                    const record_header_t &header,
                    const bufferlist &mdbuf) {
index 625a102347e0d1cbc668d9de84f1cf957543387b..a1917f3770f6255face0669682abd4609d4bbb6e 100644 (file)
@@ -14,7 +14,7 @@
 #include "include/denc.h"
 
 #include "crimson/common/log.h"
-#include "crimson/os/seastore/scanner.h"
+#include "crimson/os/seastore/extent_reader.h"
 #include "crimson/os/seastore/segment_manager.h"
 #include "crimson/os/seastore/ordering_handle.h"
 #include "crimson/os/seastore/seastore_types.h"
@@ -30,7 +30,7 @@ class SegmentedAllocator;
  */
 class Journal {
 public:
-  Journal(SegmentManager &segment_manager, Scanner& scanner);
+  Journal(SegmentManager &segment_manager, ExtentReader& scanner);
 
   /**
    * Sets the SegmentProvider.
@@ -156,7 +156,7 @@ private:
   segment_off_t written_to = 0;
   segment_off_t committed_to = 0;
 
-  Scanner& scanner;
+  ExtentReader& scanner;
   WritePipeline *write_pipeline = nullptr;
 
   void reset_soft_state() {
diff --git a/src/crimson/os/seastore/scanner.cc b/src/crimson/os/seastore/scanner.cc
deleted file mode 100644 (file)
index 9970ee3..0000000
+++ /dev/null
@@ -1,336 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
-// vim: ts=8 sw=2 smarttab expandtab
-
-#include "crimson/os/seastore/segment_manager.h"
-#include "crimson/os/seastore/scanner.h"
-#include "crimson/common/log.h"
-
-namespace {
-  seastar::logger& logger() {
-    return crimson::get_logger(ceph_subsys_seastore);
-  }
-}
-
-namespace crimson::os::seastore {
-
-Scanner::read_segment_header_ret
-Scanner::read_segment_header(segment_id_t segment)
-{
-  return segment_manager.read(
-    paddr_t{segment, 0},
-    segment_manager.get_block_size()
-  ).handle_error(
-    read_segment_header_ertr::pass_further{},
-    crimson::ct_error::assert_all{
-      "Invalid error in Scanner::read_segment_header"
-    }
-  ).safe_then([=](bufferptr bptr) -> read_segment_header_ret {
-    logger().debug("segment {} bptr size {}", segment, bptr.length());
-
-    segment_header_t header;
-    bufferlist bl;
-    bl.push_back(bptr);
-
-    logger().debug(
-      "Scanner::read_segment_header: segment {} block crc {}",
-      segment,
-      bl.begin().crc32c(segment_manager.get_block_size(), 0));
-
-    auto bp = bl.cbegin();
-    try {
-      decode(header, bp);
-    } catch (ceph::buffer::error &e) {
-      logger().debug(
-       "Scanner::read_segment_header: segment {} unable to decode "
-       "header, skipping",
-       segment);
-      return crimson::ct_error::enodata::make();
-    }
-    logger().debug(
-      "Scanner::read_segment_header: segment {} header {}",
-      segment,
-      header);
-    return read_segment_header_ret(
-      read_segment_header_ertr::ready_future_marker{},
-      header);
-  });
-}
-Scanner::scan_extents_ret Scanner::scan_extents(
-  scan_extents_cursor &cursor,
-  extent_len_t bytes_to_read)
-{
-  auto ret = std::make_unique<scan_extents_ret_bare>();
-  auto* extents = ret.get();
-  return read_segment_header(cursor.get_offset().segment
-  ).handle_error(
-    scan_extents_ertr::pass_further{},
-    crimson::ct_error::assert_all{
-      "Invalid error in Scanner::scan_extents"
-    }
-  ).safe_then([bytes_to_read, extents, &cursor, this](auto segment_header) {
-    auto segment_nonce = segment_header.segment_nonce;
-    return seastar::do_with(
-      found_record_handler_t(
-       [extents, this](
-         paddr_t base,
-         const record_header_t &header,
-         const bufferlist &mdbuf) mutable {
-
-         auto infos = try_decode_extent_infos(
-           header,
-           mdbuf);
-         if (!infos) {
-           // This should be impossible, we did check the crc on the mdbuf
-           logger().error(
-             "Scanner::scan_extents unable to decode extents for record {}",
-             base);
-           assert(infos);
-         }
-
-         paddr_t extent_offset = base.add_offset(header.mdlength);
-         for (const auto &i : *infos) {
-           extents->emplace_back(extent_offset, i);
-           extent_offset.offset += i.len;
-         }
-         return scan_extents_ertr::now();
-       }),
-      [=, &cursor](auto &dhandler) {
-       return scan_valid_records(
-         cursor,
-         segment_nonce,
-         bytes_to_read,
-         dhandler).discard_result();
-      });
-  }).safe_then([ret=std::move(ret)] {
-    return std::move(*ret);
-  });
-}
-
-Scanner::scan_valid_records_ret Scanner::scan_valid_records(
-  scan_valid_records_cursor &cursor,
-  segment_nonce_t nonce,
-  size_t budget,
-  found_record_handler_t &handler)
-{
-  if (cursor.offset.offset == 0) {
-    cursor.offset.offset = segment_manager.get_block_size();
-  }
-  auto retref = std::make_unique<size_t>(0);
-  auto &budget_used = *retref;
-  return crimson::repeat(
-    [=, &cursor, &budget_used, &handler]() mutable
-    -> scan_valid_records_ertr::future<seastar::stop_iteration> {
-      return [=, &handler, &cursor, &budget_used] {
-       if (!cursor.last_valid_header_found) {
-         return read_validate_record_metadata(cursor.offset, nonce
-         ).safe_then([=, &cursor](auto md) {
-           logger().debug(
-             "Scanner::scan_valid_records: read complete {}",
-             cursor.offset);
-           if (!md) {
-             logger().debug(
-               "Scanner::scan_valid_records: found invalid header at {}, presumably at end",
-               cursor.offset);
-             cursor.last_valid_header_found = true;
-             return scan_valid_records_ertr::now();
-           } else {
-             logger().debug(
-               "Scanner::scan_valid_records: valid record read at {}",
-               cursor.offset);
-             cursor.last_committed = paddr_t{
-               cursor.offset.segment,
-               md->first.committed_to};
-             cursor.pending_records.emplace_back(
-               cursor.offset,
-               md->first,
-               md->second);
-             cursor.offset.offset +=
-               md->first.dlength + md->first.mdlength;
-             return scan_valid_records_ertr::now();
-           }
-         }).safe_then([=, &cursor, &budget_used, &handler] {
-           return crimson::repeat(
-             [=, &budget_used, &cursor, &handler] {
-               logger().debug(
-                 "Scanner::scan_valid_records: valid record read, processing queue");
-               if (cursor.pending_records.empty()) {
-                 /* This is only possible if the segment is empty.
-                  * A record's last_commited must be prior to its own
-                  * location since it itself cannot yet have been committed
-                  * at its own time of submission.  Thus, the most recently
-                  * read record must always fall after cursor.last_committed */
-                 return scan_valid_records_ertr::make_ready_future<
-                   seastar::stop_iteration>(seastar::stop_iteration::yes);
-               }
-               auto &next = cursor.pending_records.front();
-               if (next.offset > cursor.last_committed) {
-                 return scan_valid_records_ertr::make_ready_future<
-                   seastar::stop_iteration>(seastar::stop_iteration::yes);
-               }
-               budget_used +=
-                 next.header.dlength + next.header.mdlength;
-               return handler(
-                 next.offset,
-                 next.header,
-                 next.mdbuffer
-               ).safe_then([&cursor] {
-                 cursor.pending_records.pop_front();
-                 return scan_valid_records_ertr::make_ready_future<
-                   seastar::stop_iteration>(seastar::stop_iteration::no);
-               });
-             });
-         });
-       } else {
-         assert(!cursor.pending_records.empty());
-         auto &next = cursor.pending_records.front();
-         return read_validate_data(next.offset, next.header
-         ).safe_then([=, &budget_used, &next, &cursor, &handler](auto valid) {
-           if (!valid) {
-             cursor.pending_records.clear();
-             return scan_valid_records_ertr::now();
-           }
-           budget_used +=
-             next.header.dlength + next.header.mdlength;
-           return handler(
-             next.offset,
-             next.header,
-             next.mdbuffer
-           ).safe_then([&cursor] {
-             cursor.pending_records.pop_front();
-             return scan_valid_records_ertr::now();
-           });
-         });
-       }
-      }().safe_then([=, &budget_used, &cursor] {
-       if (cursor.is_complete() || budget_used >= budget) {
-         return seastar::stop_iteration::yes;
-       } else {
-         return seastar::stop_iteration::no;
-       }
-      });
-    }).safe_then([retref=std::move(retref)]() mutable -> scan_valid_records_ret {
-      return scan_valid_records_ret(
-       scan_valid_records_ertr::ready_future_marker{},
-       std::move(*retref));
-    });
-}
-
-Scanner::read_validate_record_metadata_ret
-Scanner::read_validate_record_metadata(
-  paddr_t start,
-  segment_nonce_t nonce)
-{
-  auto block_size = segment_manager.get_block_size();
-  if (start.offset + block_size > (int64_t)segment_manager.get_segment_size()) {
-    return read_validate_record_metadata_ret(
-      read_validate_record_metadata_ertr::ready_future_marker{},
-      std::nullopt);
-  }
-  return segment_manager.read(start, block_size
-  ).safe_then(
-    [=](bufferptr bptr) mutable
-    -> read_validate_record_metadata_ret {
-      logger().debug("read_validate_record_metadata: reading {}", start);
-      auto block_size = segment_manager.get_block_size();
-      bufferlist bl;
-      bl.append(bptr);
-      auto bp = bl.cbegin();
-      record_header_t header;
-      try {
-       decode(header, bp);
-      } catch (ceph::buffer::error &e) {
-       return read_validate_record_metadata_ret(
-         read_validate_record_metadata_ertr::ready_future_marker{},
-         std::nullopt);
-      }
-      if (header.segment_nonce != nonce) {
-       return read_validate_record_metadata_ret(
-         read_validate_record_metadata_ertr::ready_future_marker{},
-         std::nullopt);
-      }
-      if (header.mdlength > (extent_len_t)block_size) {
-       if (start.offset + header.mdlength >
-           (int64_t)segment_manager.get_segment_size()) {
-         return crimson::ct_error::input_output_error::make();
-       }
-       return segment_manager.read(
-         {start.segment, start.offset + (segment_off_t)block_size},
-         header.mdlength - block_size).safe_then(
-           [header=std::move(header), bl=std::move(bl)](
-             auto &&bptail) mutable {
-             bl.push_back(bptail);
-             return read_validate_record_metadata_ret(
-               read_validate_record_metadata_ertr::ready_future_marker{},
-               std::make_pair(std::move(header), std::move(bl)));
-           });
-      } else {
-       return read_validate_record_metadata_ret(
-         read_validate_record_metadata_ertr::ready_future_marker{},
-         std::make_pair(std::move(header), std::move(bl))
-       );
-      }
-    }).safe_then([=](auto p) {
-      if (p && validate_metadata(p->second)) {
-       return read_validate_record_metadata_ret(
-         read_validate_record_metadata_ertr::ready_future_marker{},
-         std::move(*p)
-       );
-      } else {
-       return read_validate_record_metadata_ret(
-         read_validate_record_metadata_ertr::ready_future_marker{},
-         std::nullopt);
-      }
-    });
-}
-
-std::optional<std::vector<extent_info_t>>
-Scanner::try_decode_extent_infos(
-  record_header_t header,
-  const bufferlist &bl)
-{
-  auto bliter = bl.cbegin();
-  bliter += ceph::encoded_sizeof_bounded<record_header_t>();
-  bliter += sizeof(checksum_t) /* crc */;
-  logger().debug("{}: decoding {} extents", __func__, header.extents);
-  std::vector<extent_info_t> extent_infos(header.extents);
-  for (auto &&i : extent_infos) {
-    try {
-      decode(i, bliter);
-    } catch (ceph::buffer::error &e) {
-      return std::nullopt;
-    }
-  }
-  return extent_infos;
-}
-
-Scanner::read_validate_data_ret
-Scanner::read_validate_data(
-  paddr_t record_base,
-  const record_header_t &header)
-{
-  return segment_manager.read(
-    record_base.add_offset(header.mdlength),
-    header.dlength
-  ).safe_then([=, &header](auto bptr) {
-    bufferlist bl;
-    bl.append(bptr);
-    return bl.crc32c(-1) == header.data_crc;
-  });
-}
-
-bool Scanner::validate_metadata(const bufferlist &bl)
-{
-  auto bliter = bl.cbegin();
-  auto test_crc = bliter.crc32c(
-    ceph::encoded_sizeof_bounded<record_header_t>(),
-    -1);
-  ceph_le32 recorded_crc_le;
-  decode(recorded_crc_le, bliter);
-  uint32_t recorded_crc = recorded_crc_le;
-  test_crc = bliter.crc32c(
-    bliter.get_remaining(),
-    test_crc);
-  return test_crc == recorded_crc;
-}
-
-} // namespace crimson::os::seastore
diff --git a/src/crimson/os/seastore/scanner.h b/src/crimson/os/seastore/scanner.h
deleted file mode 100644 (file)
index 36bec08..0000000
+++ /dev/null
@@ -1,102 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
-// vim: ts=8 sw=2 smarttab expandtab
-
-#pragma once
-
-#include "crimson/common/errorator.h"
-#include "crimson/os/seastore/seastore_types.h"
-#include "crimson/os/seastore/segment_manager.h"
-
-namespace crimson::os::seastore {
-
-class SegmentCleaner;
-
-class Scanner {
-public:
-  using read_ertr = crimson::errorator<
-    crimson::ct_error::input_output_error,
-    crimson::ct_error::invarg,
-    crimson::ct_error::enoent,
-    crimson::ct_error::erange>;
-
-  Scanner(SegmentManager& segment_manager)
-    : segment_manager(segment_manager) {}
-  using read_segment_header_ertr = crimson::errorator<
-    crimson::ct_error::enoent,
-    crimson::ct_error::enodata,
-    crimson::ct_error::input_output_error
-    >;
-  using read_segment_header_ret = read_segment_header_ertr::future<
-    segment_header_t>;
-  read_segment_header_ret read_segment_header(segment_id_t segment);
-
-  /**
-   * scan_extents
-   *
-   * Scans records beginning at addr until the first record boundary after
-   * addr + bytes_to_read.
-   *
-   * Returns list<extent, extent_info>
-   * cursor.is_complete() will be true when no further extents exist in segment.
-   */
-  using scan_extents_cursor = scan_valid_records_cursor;
-  using scan_extents_ertr = read_ertr::extend<crimson::ct_error::enodata>;
-  using scan_extents_ret_bare = std::list<std::pair<paddr_t, extent_info_t>>;
-  using scan_extents_ret = scan_extents_ertr::future<scan_extents_ret_bare>;
-  scan_extents_ret scan_extents(
-    scan_extents_cursor &cursor,
-    extent_len_t bytes_to_read
-  );
-
-  using scan_valid_records_ertr = read_ertr::extend<crimson::ct_error::enodata>;
-  using scan_valid_records_ret = scan_valid_records_ertr::future<
-    size_t>;
-  using found_record_handler_t = std::function<
-    scan_valid_records_ertr::future<>(
-      paddr_t record_block_base,
-      // callee may assume header and bl will remain valid until
-      // returned future resolves
-      const record_header_t &header,
-      const bufferlist &bl)>;
-  scan_valid_records_ret scan_valid_records(
-    scan_valid_records_cursor &cursor, ///< [in, out] cursor, updated during call
-    segment_nonce_t nonce,             ///< [in] nonce for segment
-    size_t budget,                     ///< [in] max budget to use
-    found_record_handler_t &handler    ///< [in] handler for records
-  ); ///< @return used budget
-
-private:
-  SegmentManager& segment_manager;
-
-  /// read record metadata for record starting at start
-  using read_validate_record_metadata_ertr = read_ertr;
-  using read_validate_record_metadata_ret =
-    read_validate_record_metadata_ertr::future<
-      std::optional<std::pair<record_header_t, bufferlist>>
-    >;
-  read_validate_record_metadata_ret read_validate_record_metadata(
-    paddr_t start,
-    segment_nonce_t nonce);
-
-  /// attempts to decode extent infos from bl, return nullopt if unsuccessful
-  std::optional<std::vector<extent_info_t>> try_decode_extent_infos(
-    record_header_t header,
-    const bufferlist &bl);
-
-  /// read and validate data
-  using read_validate_data_ertr = read_ertr;
-  using read_validate_data_ret = read_validate_data_ertr::future<bool>;
-  read_validate_data_ret read_validate_data(
-    paddr_t record_base,
-    const record_header_t &header  ///< caller must ensure lifetime through
-                                   ///  future resolution
-  );
-
-  /// validate embedded metadata checksum
-  static bool validate_metadata(const bufferlist &bl);
-
-};
-
-using ScannerRef = std::unique_ptr<Scanner>;
-
-} // namespace crimson::os::seastore
index b15b5138e31ee46ed4681aa4853ce976e0ab8e7c..f06aa650d8ee401b1855de56f006106151400887 100644 (file)
@@ -1181,7 +1181,7 @@ std::unique_ptr<SeaStore> make_seastore(
     segment_manager::block::BlockSegmentManager
     >(device + "/block");
 
-  auto scanner = std::make_unique<Scanner>(*sm);
+  auto scanner = std::make_unique<ExtentReader>();
   auto& scanner_ref = *scanner.get();
   auto segment_cleaner = std::make_unique<SegmentCleaner>(
     SegmentCleaner::config_t::get_default(),
@@ -1189,7 +1189,7 @@ std::unique_ptr<SeaStore> make_seastore(
     false /* detailed */);
 
   auto journal = std::make_unique<Journal>(*sm, scanner_ref);
-  auto cache = std::make_unique<Cache>(*sm);
+  auto cache = std::make_unique<Cache>(scanner_ref, sm->get_block_size());
   auto lba_manager = lba_manager::create_lba_manager(*sm, *cache);
 
   auto epm = std::make_unique<ExtentPlacementManager>(*cache, *lba_manager);
index a912bfbb77526b9bfa7f6cbb6afba9422c37287c..d23c5407c76975d78486e276c9800bd918943ffd 100644 (file)
@@ -146,7 +146,7 @@ void SpaceTrackerDetailed::dump_usage(segment_id_t id) const
 
 SegmentCleaner::SegmentCleaner(
   config_t config,
-  ScannerRef&& scr,
+  ExtentReaderRef&& scr,
   bool detailed)
   : detailed(detailed),
     config(config),
@@ -309,7 +309,7 @@ SegmentCleaner::gc_reclaim_space_ret SegmentCleaner::gc_reclaim_space()
     }
     next.offset = 0;
     scan_cursor =
-      std::make_unique<Scanner::scan_extents_cursor>(
+      std::make_unique<ExtentReader::scan_extents_cursor>(
        next);
     logger().debug(
       "SegmentCleaner::do_gc: starting gc on segment {}",
@@ -388,13 +388,13 @@ SegmentCleaner::init_segments_ret SegmentCleaner::init_segments() {
       return scanner->read_segment_header(segment_id)
       .safe_then([&segments, segment_id, this](auto header) {
        if (header.out_of_line) {
-         logger().debug("Scanner::init_segments: out-of-line segment {}", segment_id);
+         logger().debug("ExtentReader::init_segments: out-of-line segment {}", segment_id);
          init_mark_segment_closed(
            segment_id,
            header.journal_segment_seq,
            true);
        } else {
-         logger().debug("Scanner::init_segments: journal segment {}", segment_id);
+         logger().debug("ExtentReader::init_segments: journal segment {}", segment_id);
          segments.emplace_back(std::make_pair(segment_id, std::move(header)));
        }
        return seastar::now();
index 3099fa2a84abb7533e956948bd8ddb512e03129e..470504ac44c66401c9c074d38f775826ffb30e2a 100644 (file)
@@ -383,7 +383,7 @@ private:
   size_t segment_size = 0;
   size_t block_size = 0;
 
-  ScannerRef scanner;
+  ExtentReaderRef scanner;
 
   SpaceTrackerIRef space_tracker;
   std::vector<segment_info_t> segments;
@@ -422,7 +422,7 @@ private:
 public:
   SegmentCleaner(
     config_t config,
-    ScannerRef&& scanner,
+    ExtentReaderRef&& scanner,
     bool detailed = false);
 
   void mount(SegmentManager &sm) {
@@ -652,7 +652,7 @@ private:
 
   // GC status helpers
   std::unique_ptr<
-    Scanner::scan_extents_cursor
+    ExtentReader::scan_extents_cursor
     > scan_cursor;
 
   /**
@@ -730,7 +730,7 @@ private:
   } gc_process;
 
   using gc_ertr = work_ertr::extend_ertr<
-    Scanner::scan_extents_ertr
+    ExtentReader::scan_extents_ertr
     >;
 
   gc_cycle_ret do_gc_cycle();
index c933448675e6ea7dbc82c151c0a7e0dbf1b061bf..5e97007a563abca42e314fb8dff93d4c88c408ec 100644 (file)
@@ -127,7 +127,8 @@ seastar::future<bufferlist> TMDriver::read(
 
 void TMDriver::init()
 {
-  auto scanner = std::make_unique<Scanner>(*segment_manager);
+  auto scanner = std::make_unique<ExtentReader>();
+  scanner->add_segment_manager(segment_manager.get());
   auto& scanner_ref = *scanner.get();
   auto segment_cleaner = std::make_unique<SegmentCleaner>(
     SegmentCleaner::config_t::get_default(),
@@ -135,7 +136,7 @@ void TMDriver::init()
     false /* detailed */);
   segment_cleaner->mount(*segment_manager);
   auto journal = std::make_unique<Journal>(*segment_manager, scanner_ref);
-  auto cache = std::make_unique<Cache>(*segment_manager);
+  auto cache = std::make_unique<Cache>(scanner_ref, segment_manager->get_block_size());
   auto lba_manager = lba_manager::create_lba_manager(*segment_manager, *cache);
 
   auto epm = std::make_unique<ExtentPlacementManager>(*cache, *lba_manager);
index 158fbd0f8f6c7cee13cc32a6d858e195dcf4aa40..7399bd26eac8723c4c69742714c4b0d9e934d255 100644 (file)
@@ -27,7 +27,7 @@ using namespace crimson::os::seastore::lba_manager::btree;
 struct btree_lba_manager_test :
   public seastar_test_suite_t, SegmentProvider {
   segment_manager::EphemeralSegmentManagerRef segment_manager;
-  ScannerRef scanner;
+  ExtentReaderRef scanner;
   Journal journal;
   Cache cache;
   BtreeLBAManagerRef lba_manager;
@@ -38,12 +38,13 @@ struct btree_lba_manager_test :
 
   btree_lba_manager_test()
     : segment_manager(segment_manager::create_test_ephemeral()),
-      scanner(new Scanner(*segment_manager)),
+      scanner(new ExtentReader()),
       journal(*segment_manager, *scanner),
-      cache(*segment_manager),
+      cache(*scanner, segment_manager->get_block_size()),
       lba_manager(new BtreeLBAManager(*segment_manager, cache)),
       block_size(segment_manager->get_block_size())
   {
+    scanner->add_segment_manager(segment_manager.get());
     journal.set_segment_provider(this);
     journal.set_write_pipeline(&pipeline);
   }
index 8bfa75573172ae11ff310f51beb9139add1f5273..f4c2526203fb7ca1a5a650ae6d9a6b79ad181e3f 100644 (file)
@@ -29,6 +29,7 @@ constexpr uint64_t DEFAULT_BLOCK_SIZE = 4096;
 struct rbm_test_t : public  seastar_test_suite_t,
   TMTestState {
   segment_manager::EphemeralSegmentManagerRef segment_manager; // Need to be deleted, just for Cache
+  ExtentReaderRef reader;
   Cache cache;
   std::unique_ptr<NVMeManager> rbm_manager;
   nvme_device::NVMeBlockDevice *device;
@@ -57,7 +58,8 @@ struct rbm_test_t : public  seastar_test_suite_t,
 
   rbm_test_t() :
       segment_manager(segment_manager::create_test_ephemeral()),
-      cache(*segment_manager)
+      reader(new ExtentReader()),
+      cache(*reader, segment_manager->get_block_size())
   {
     device = new nvme_device::TestMemory(DEFAULT_TEST_SIZE);
     rbm_manager.reset(new NVMeManager(device, std::string()));
index 2b08af2e83f239f22f79866857cf29c9f9cfed79..28688d9feed6e83e66641f6c82cdb6f160fddfae 100644 (file)
@@ -21,13 +21,17 @@ namespace {
 
 struct cache_test_t : public seastar_test_suite_t {
   segment_manager::EphemeralSegmentManagerRef segment_manager;
+  ExtentReaderRef reader;
   Cache cache;
   paddr_t current{0, 0};
   journal_seq_t seq;
 
   cache_test_t()
     : segment_manager(segment_manager::create_test_ephemeral()),
-      cache(*segment_manager) {}
+      reader(new ExtentReader()),
+      cache(*reader, segment_manager->get_block_size()) {
+    reader->add_segment_manager(segment_manager.get());
+  }
 
   seastar::future<paddr_t> submit_transaction(
     TransactionRef t) {
index 29227887928b59049229c27b1d017fd390317841..3d937d5b67a5e6a5ea9e50457adf3f1c18131f05 100644 (file)
@@ -74,13 +74,15 @@ struct journal_test_t : seastar_test_suite_t, SegmentProvider {
 
   const segment_off_t block_size;
 
-  ScannerRef scanner;
+  ExtentReaderRef scanner;
 
   journal_test_t()
     : segment_manager(segment_manager::create_test_ephemeral()),
       block_size(segment_manager->get_block_size()),
-      scanner(std::make_unique<Scanner>(*segment_manager))
-  {}
+      scanner(new ExtentReader())
+  {
+    scanner->add_segment_manager(segment_manager.get());
+  }
 
   segment_id_t next = 0;
   get_segment_ret get_segment() final {
index bec8cb1dcf3923b68b6e6950634c433334697cf0..a5428556eb77dc3850a21a04abed69f9e9cf6a2d 100644 (file)
@@ -70,14 +70,15 @@ protected:
 
 auto get_transaction_manager(
   SegmentManager &segment_manager) {
-  auto scanner = std::make_unique<Scanner>(segment_manager);
+  auto scanner = std::make_unique<ExtentReader>();
+  scanner->add_segment_manager(&segment_manager);
   auto& scanner_ref = *scanner.get();
   auto segment_cleaner = std::make_unique<SegmentCleaner>(
     SegmentCleaner::config_t::get_default(),
     std::move(scanner),
     true);
   auto journal = std::make_unique<Journal>(segment_manager, scanner_ref);
-  auto cache = std::make_unique<Cache>(segment_manager);
+  auto cache = std::make_unique<Cache>(scanner_ref, segment_manager.get_block_size());
   auto lba_manager = lba_manager::create_lba_manager(segment_manager, *cache);
 
   auto epm = std::make_unique<ExtentPlacementManager>(*cache, *lba_manager);