]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/os/seastore: add Scanner to scan extents
authorXuehan Xu <xxhdx1985126@gmail.com>
Mon, 19 Jul 2021 07:38:12 +0000 (15:38 +0800)
committerXuehan Xu <xxhdx1985126@gmail.com>
Wed, 8 Sep 2021 03:03:00 +0000 (11:03 +0800)
As there will be two kinds of segments to be scanned, those created by the journal
and those created by the extent placement manager. We need a common module to scan
extents of both of these two kinds of segments

Signed-off-by: Xuehan Xu <xxhdx1985126@gmail.com>
14 files changed:
src/crimson/os/seastore/CMakeLists.txt
src/crimson/os/seastore/journal.cc
src/crimson/os/seastore/journal.h
src/crimson/os/seastore/scanner.cc [new file with mode: 0644]
src/crimson/os/seastore/scanner.h [new file with mode: 0644]
src/crimson/os/seastore/seastore.cc
src/crimson/os/seastore/seastore_types.h
src/crimson/os/seastore/segment_cleaner.cc
src/crimson/os/seastore/segment_cleaner.h
src/crimson/os/seastore/transaction_manager.cc
src/crimson/os/seastore/transaction_manager.h
src/test/crimson/seastore/test_btree_lba_manager.cc
src/test/crimson/seastore/test_seastore_journal.cc
src/test/crimson/seastore/transaction_manager_test_state.h

index cdd88ea28a92ad79cc6c1fd1f34bbccdf16286d4..146880a65998971e5cdd1688d246278d42f611f2 100644 (file)
@@ -7,6 +7,7 @@ add_library(crimson-seastore STATIC
   transaction.cc
   journal.cc
   cache.cc
+  scanner.cc
   lba_manager.cc
   segment_cleaner.cc
   lba_manager/btree/btree_lba_manager.cc
index f4f4e09f7ee3a4c38d4ab319223960ecafc62c16..e8cface690390fd8c5166d67d7ad731e6d45fd84 100644 (file)
@@ -50,8 +50,8 @@ segment_nonce_t generate_nonce(
     sizeof(meta.seastore_id.uuid));
 }
 
-Journal::Journal(SegmentManager &segment_manager)
-  : segment_manager(segment_manager) {}
+Journal::Journal(SegmentManager &segment_manager, Scanner& scanner)
+  : segment_manager(segment_manager), scanner(scanner) {}
 
 
 Journal::initialize_segment_ertr::future<segment_seq_t>
@@ -99,35 +99,6 @@ Journal::initialize_segment(Segment &segment)
     });
 }
 
-bool Journal::validate_metadata(const bufferlist &bl)
-{
-  auto bliter = bl.cbegin();
-  auto test_crc = bliter.crc32c(
-    ceph::encoded_sizeof_bounded<record_header_t>(),
-    -1);
-  ceph_le32 recorded_crc_le;
-  decode(recorded_crc_le, bliter);
-  uint32_t recorded_crc = recorded_crc_le;
-  test_crc = bliter.crc32c(
-    bliter.get_remaining(),
-    test_crc);
-  return test_crc == recorded_crc;
-}
-
-Journal::read_validate_data_ret Journal::read_validate_data(
-  paddr_t record_base,
-  const record_header_t &header)
-{
-  return segment_manager.read(
-    record_base.add_offset(header.mdlength),
-    header.dlength
-  ).safe_then([=, &header](auto bptr) {
-    bufferlist bl;
-    bl.append(bptr);
-    return bl.crc32c(-1) == header.data_crc;
-  });
-}
-
 Journal::write_record_ret Journal::write_record(
   record_size_t rsize,
   record_t &&record,
@@ -215,49 +186,6 @@ Journal::roll_journal_segment()
     );
 }
 
-Journal::read_segment_header_ret
-Journal::read_segment_header(segment_id_t segment)
-{
-  return segment_manager.read(
-    paddr_t{segment, 0},
-    segment_manager.get_block_size()
-  ).handle_error(
-    read_segment_header_ertr::pass_further{},
-    crimson::ct_error::assert_all{
-      "Invalid error in Journal::read_segment_header"
-    }
-  ).safe_then([=](bufferptr bptr) -> read_segment_header_ret {
-    logger().debug("segment {} bptr size {}", segment, bptr.length());
-
-    segment_header_t header;
-    bufferlist bl;
-    bl.push_back(bptr);
-
-    logger().debug(
-      "Journal::read_segment_header: segment {} block crc {}",
-      segment,
-      bl.begin().crc32c(segment_manager.get_block_size(), 0));
-
-    auto bp = bl.cbegin();
-    try {
-      decode(header, bp);
-    } catch (ceph::buffer::error &e) {
-      logger().debug(
-       "Journal::read_segment_header: segment {} unable to decode "
-       "header, skipping",
-       segment);
-      return crimson::ct_error::enodata::make();
-    }
-    logger().debug(
-      "Journal::read_segment_header: segment {} header {}",
-      segment,
-      header);
-    return read_segment_header_ret(
-      read_segment_header_ertr::ready_future_marker{},
-      header);
-  });
-}
-
 Journal::open_for_write_ret Journal::open_for_write()
 {
   return roll_journal_segment().safe_then([this](auto seq) {
@@ -272,187 +200,79 @@ Journal::open_for_write_ret Journal::open_for_write()
   });
 }
 
-Journal::find_replay_segments_fut Journal::find_replay_segments()
+Journal::prep_replay_segments_fut
+Journal::prep_replay_segments(
+  std::vector<std::pair<segment_id_t, segment_header_t>> segments)
 {
-  return seastar::do_with(
-    std::vector<std::pair<segment_id_t, segment_header_t>>(),
-    [this](auto &&segments) mutable {
-      return crimson::do_for_each(
-       boost::make_counting_iterator(segment_id_t{0}),
-       boost::make_counting_iterator(segment_manager.get_num_segments()),
-       [this, &segments](auto i) {
-         return read_segment_header(i
-         ).safe_then([this, &segments, i](auto header) mutable {
-           if (generate_nonce(
-                 header.journal_segment_seq,
-                 segment_manager.get_meta()) != header.segment_nonce) {
-             logger().debug(
-               "find_replay_segments: nonce mismatch segment {} header {}",
-               i,
-               header);
-             assert(0 == "impossible");
-             return find_replay_segments_ertr::now();
-           }
-
-           segments.emplace_back(i, std::move(header));
-           return find_replay_segments_ertr::now();
-         }).handle_error(
-           crimson::ct_error::enoent::handle([i](auto) {
-             logger().debug(
-               "find_replay_segments: segment {} not available for read",
-               i);
-             return find_replay_segments_ertr::now();
-           }),
-           crimson::ct_error::enodata::handle([i](auto) {
-             logger().debug(
-               "find_replay_segments: segment {} header undecodable",
-               i);
-             return find_replay_segments_ertr::now();
-           }),
-           find_replay_segments_ertr::pass_further{},
-           crimson::ct_error::assert_all{
-             "Invalid error in Journal::find_replay_segments"
-            }
-         );
-       }).safe_then([this, &segments]() mutable -> find_replay_segments_fut {
-         logger().debug(
-           "find_replay_segments: have {} segments",
-           segments.size());
-         if (segments.empty()) {
-           return crimson::ct_error::input_output_error::make();
-         }
-         std::sort(
-           segments.begin(),
-           segments.end(),
-           [](const auto &lt, const auto &rt) {
-             return lt.second.journal_segment_seq <
-               rt.second.journal_segment_seq;
-           });
-
-         next_journal_segment_seq =
-           segments.rbegin()->second.journal_segment_seq + 1;
-         std::for_each(
-           segments.begin(),
-           segments.end(),
-           [this](auto &seg) {
-             segment_provider->init_mark_segment_closed(
-               seg.first,
-               seg.second.journal_segment_seq);
-           });
+  logger().debug(
+    "prep_replay_segments: have {} segments",
+    segments.size());
+  if (segments.empty()) {
+    return crimson::ct_error::input_output_error::make();
+  }
+  std::sort(
+    segments.begin(),
+    segments.end(),
+    [](const auto &lt, const auto &rt) {
+      return lt.second.journal_segment_seq <
+       rt.second.journal_segment_seq;
+    });
 
-         auto journal_tail = segments.rbegin()->second.journal_tail;
-         segment_provider->update_journal_tail_committed(journal_tail);
-         auto replay_from = journal_tail.offset;
-         logger().debug(
-           "Journal::find_replay_segments: journal_tail={}",
-           journal_tail);
-         auto from = segments.begin();
-         if (replay_from != P_ADDR_NULL) {
-           from = std::find_if(
-             segments.begin(),
-             segments.end(),
-             [&replay_from](const auto &seg) -> bool {
-               return seg.first == replay_from.segment;
-             });
-           if (from->second.journal_segment_seq != journal_tail.segment_seq) {
-             logger().error(
-               "find_replay_segments: journal_tail {} does not match {}",
-               journal_tail,
-               from->second);
-             assert(0 == "invalid");
-           }
-         } else {
-           replay_from = paddr_t{
-             from->first,
-             (segment_off_t)segment_manager.get_block_size()};
-         }
-         auto ret = replay_segments_t(segments.end() - from);
-         std::transform(
-           from, segments.end(), ret.begin(),
-           [this](const auto &p) {
-             auto ret = journal_seq_t{
-               p.second.journal_segment_seq,
-               paddr_t{
-                 p.first,
-                 (segment_off_t)segment_manager.get_block_size()}};
-             logger().debug(
-               "Journal::find_replay_segments: replaying from  {}",
-               ret);
-             return std::make_pair(ret, p.second);
-           });
-         ret[0].first.offset = replay_from;
-         return find_replay_segments_fut(
-           find_replay_segments_ertr::ready_future_marker{},
-           std::move(ret));
-       });
+  next_journal_segment_seq =
+    segments.rbegin()->second.journal_segment_seq + 1;
+  std::for_each(
+    segments.begin(),
+    segments.end(),
+    [this](auto &seg) {
+      segment_provider->init_mark_segment_closed(
+       seg.first,
+       seg.second.journal_segment_seq);
     });
-}
 
-Journal::read_validate_record_metadata_ret Journal::read_validate_record_metadata(
-  paddr_t start,
-  segment_nonce_t nonce)
-{
-  auto block_size = segment_manager.get_block_size();
-  if (start.offset + block_size > (int64_t)segment_manager.get_segment_size()) {
-    return read_validate_record_metadata_ret(
-      read_validate_record_metadata_ertr::ready_future_marker{},
-      std::nullopt);
+  auto journal_tail = segments.rbegin()->second.journal_tail;
+  segment_provider->update_journal_tail_committed(journal_tail);
+  auto replay_from = journal_tail.offset;
+  logger().debug(
+    "Journal::prep_replay_segments: journal_tail={}",
+    journal_tail);
+  auto from = segments.begin();
+  if (replay_from != P_ADDR_NULL) {
+    from = std::find_if(
+      segments.begin(),
+      segments.end(),
+      [&replay_from](const auto &seg) -> bool {
+       return seg.first == replay_from.segment;
+      });
+    if (from->second.journal_segment_seq != journal_tail.segment_seq) {
+      logger().error(
+       "prep_replay_segments: journal_tail {} does not match {}",
+       journal_tail,
+       from->second);
+      assert(0 == "invalid");
+    }
+  } else {
+    replay_from = paddr_t{
+      from->first,
+      (segment_off_t)segment_manager.get_block_size()};
   }
-  return segment_manager.read(start, block_size
-  ).safe_then(
-    [=](bufferptr bptr) mutable
-    -> read_validate_record_metadata_ret {
-      logger().debug("read_validate_record_metadata: reading {}", start);
-      auto block_size = segment_manager.get_block_size();
-      bufferlist bl;
-      bl.append(bptr);
-      auto bp = bl.cbegin();
-      record_header_t header;
-      try {
-       decode(header, bp);
-      } catch (ceph::buffer::error &e) {
-       return read_validate_record_metadata_ret(
-         read_validate_record_metadata_ertr::ready_future_marker{},
-         std::nullopt);
-      }
-      if (header.segment_nonce != nonce) {
-       return read_validate_record_metadata_ret(
-         read_validate_record_metadata_ertr::ready_future_marker{},
-         std::nullopt);
-      }
-      if (header.mdlength > (extent_len_t)block_size) {
-       if (start.offset + header.mdlength >
-           (int64_t)segment_manager.get_segment_size()) {
-         return crimson::ct_error::input_output_error::make();
-       }
-       return segment_manager.read(
-         {start.segment, start.offset + (segment_off_t)block_size},
-         header.mdlength - block_size).safe_then(
-           [header=std::move(header), bl=std::move(bl)](
-             auto &&bptail) mutable {
-             bl.push_back(bptail);
-             return read_validate_record_metadata_ret(
-               read_validate_record_metadata_ertr::ready_future_marker{},
-               std::make_pair(std::move(header), std::move(bl)));
-           });
-      } else {
-       return read_validate_record_metadata_ret(
-         read_validate_record_metadata_ertr::ready_future_marker{},
-         std::make_pair(std::move(header), std::move(bl))
-       );
-      }
-    }).safe_then([=](auto p) {
-      if (p && validate_metadata(p->second)) {
-       return read_validate_record_metadata_ret(
-         read_validate_record_metadata_ertr::ready_future_marker{},
-         std::move(*p)
-       );
-      } else {
-       return read_validate_record_metadata_ret(
-         read_validate_record_metadata_ertr::ready_future_marker{},
-         std::nullopt);
-      }
+  auto ret = replay_segments_t(segments.end() - from);
+  std::transform(
+    from, segments.end(), ret.begin(),
+    [this](const auto &p) {
+      auto ret = journal_seq_t{
+       p.second.journal_segment_seq,
+       paddr_t{
+         p.first,
+         (segment_off_t)segment_manager.get_block_size()}};
+      logger().debug(
+       "Journal::prep_replay_segments: replaying from  {}",
+       ret);
+      return std::make_pair(ret, p.second);
     });
+  ret[0].first.offset = replay_from;
+  return prep_replay_segments_fut(
+    prep_replay_segments_ertr::ready_future_marker{},
+    std::move(ret));
 }
 
 std::optional<std::vector<delta_info_t>> Journal::try_decode_deltas(
@@ -475,25 +295,6 @@ std::optional<std::vector<delta_info_t>> Journal::try_decode_deltas(
   return deltas;
 }
 
-std::optional<std::vector<extent_info_t>> Journal::try_decode_extent_infos(
-  record_header_t header,
-  const bufferlist &bl)
-{
-  auto bliter = bl.cbegin();
-  bliter += ceph::encoded_sizeof_bounded<record_header_t>();
-  bliter += sizeof(checksum_t) /* crc */;
-  logger().debug("{}: decoding {} extents", __func__, header.extents);
-  std::vector<extent_info_t> extent_infos(header.extents);
-  for (auto &&i : extent_infos) {
-    try {
-      decode(i, bliter);
-    } catch (ceph::buffer::error &e) {
-      return std::nullopt;
-    }
-  }
-  return extent_infos;
-}
-
 Journal::replay_ertr::future<>
 Journal::replay_segment(
   journal_seq_t seq,
@@ -503,7 +304,7 @@ Journal::replay_segment(
   logger().debug("replay_segment: starting at {}", seq);
   return seastar::do_with(
     scan_valid_records_cursor(seq.offset),
-    found_record_handler_t(
+    Scanner::found_record_handler_t(
       [=, &handler](paddr_t base,
                    const record_header_t &header,
                    const bufferlist &mdbuf) {
@@ -548,20 +349,29 @@ Journal::replay_segment(
          });
       }),
     [=](auto &cursor, auto &dhandler) {
-      return scan_valid_records(
+      return scanner.scan_valid_records(
        cursor,
        header.segment_nonce,
        std::numeric_limits<size_t>::max(),
-       dhandler).safe_then([](auto){});
+       dhandler).safe_then([](auto){}
+      ).handle_error(
+       replay_ertr::pass_further{},
+       crimson::ct_error::assert_all{
+         "shouldn't meet with any other error other replay_ertr"
+       }
+      );;
     });
 }
 
-Journal::replay_ret Journal::replay(delta_handler_t &&delta_handler)
+Journal::replay_ret Journal::replay(
+  std::vector<std::pair<segment_id_t, segment_header_t>>&& segment_headers,
+  delta_handler_t &&delta_handler)
 {
   return seastar::do_with(
     std::move(delta_handler), replay_segments_t(),
-    [this](auto &handler, auto &segments) mutable -> replay_ret {
-      return find_replay_segments().safe_then(
+    [this, segment_headers=std::move(segment_headers)]
+    (auto &handler, auto &segments) mutable -> replay_ret {
+      return prep_replay_segments(std::move(segment_headers)).safe_then(
         [this, &handler, &segments](auto replay_segs) mutable {
           logger().debug("replay: found {} segments", replay_segs.size());
           segments = std::move(replay_segs);
@@ -572,165 +382,4 @@ Journal::replay_ret Journal::replay(delta_handler_t &&delta_handler)
     });
 }
 
-Journal::scan_extents_ret Journal::scan_extents(
-  scan_extents_cursor &cursor,
-  extent_len_t bytes_to_read)
-{
-  auto ret = std::make_unique<scan_extents_ret_bare>();
-  auto* extents = ret.get();
-  return read_segment_header(cursor.get_offset().segment
-  ).handle_error(
-    scan_extents_ertr::pass_further{},
-    crimson::ct_error::assert_all{
-      "Invalid error in Journal::scan_extents"
-    }
-  ).safe_then([bytes_to_read, extents, &cursor, this](auto segment_header) {
-    auto segment_nonce = segment_header.segment_nonce;
-    return seastar::do_with(
-      found_record_handler_t(
-       [extents, this](
-         paddr_t base,
-         const record_header_t &header,
-         const bufferlist &mdbuf) mutable {
-
-         auto infos = try_decode_extent_infos(
-           header,
-           mdbuf);
-         if (!infos) {
-           // This should be impossible, we did check the crc on the mdbuf
-           logger().error(
-             "Journal::scan_extents unable to decode extents for record {}",
-             base);
-           assert(infos);
-         }
-
-         paddr_t extent_offset = base.add_offset(header.mdlength);
-         for (const auto &i : *infos) {
-           extents->emplace_back(extent_offset, i);
-           extent_offset.offset += i.len;
-         }
-         return scan_extents_ertr::now();
-       }),
-      [=, &cursor](auto &dhandler) {
-       return scan_valid_records(
-         cursor,
-         segment_nonce,
-         bytes_to_read,
-         dhandler).discard_result();
-      });
-  }).safe_then([ret=std::move(ret)] {
-    return std::move(*ret);
-  });
-}
-
-Journal::scan_valid_records_ret Journal::scan_valid_records(
-    scan_valid_records_cursor &cursor,
-    segment_nonce_t nonce,
-    size_t budget,
-    found_record_handler_t &handler)
-{
-  if (cursor.offset.offset == 0) {
-    cursor.offset.offset = segment_manager.get_block_size();
-  }
-  auto retref = std::make_unique<size_t>(0);
-  auto budget_used = *retref;
-  return crimson::repeat(
-    [=, &cursor, &budget_used, &handler]() mutable
-    -> scan_valid_records_ertr::future<seastar::stop_iteration> {
-      return [=, &handler, &cursor, &budget_used] {
-       if (!cursor.last_valid_header_found) {
-         return read_validate_record_metadata(cursor.offset, nonce
-         ).safe_then([=, &cursor](auto md) {
-           logger().debug(
-             "Journal::scan_valid_records: read complete {}",
-             cursor.offset);
-           if (!md) {
-             logger().debug(
-               "Journal::scan_valid_records: found invalid header at {}, presumably at end",
-               cursor.offset);
-             cursor.last_valid_header_found = true;
-             return scan_valid_records_ertr::now();
-           } else {
-             logger().debug(
-               "Journal::scan_valid_records: valid record read at {}",
-               cursor.offset);
-             cursor.last_committed = paddr_t{
-               cursor.offset.segment,
-               md->first.committed_to};
-             cursor.pending_records.emplace_back(
-               cursor.offset,
-               md->first,
-               md->second);
-             cursor.offset.offset +=
-               md->first.dlength + md->first.mdlength;
-             return scan_valid_records_ertr::now();
-           }
-         }).safe_then([=, &cursor, &budget_used, &handler] {
-           return crimson::repeat(
-             [=, &budget_used, &cursor, &handler] {
-               logger().debug(
-                 "Journal::scan_valid_records: valid record read, processing queue");
-               if (cursor.pending_records.empty()) {
-                 /* This is only possible if the segment is empty.
-                  * A record's last_commited must be prior to its own
-                  * location since it itself cannot yet have been committed
-                  * at its own time of submission.  Thus, the most recently
-                  * read record must always fall after cursor.last_committed */
-                 return scan_valid_records_ertr::make_ready_future<
-                   seastar::stop_iteration>(seastar::stop_iteration::yes);
-               }
-               auto &next = cursor.pending_records.front();
-               if (next.offset > cursor.last_committed) {
-                 return scan_valid_records_ertr::make_ready_future<
-                   seastar::stop_iteration>(seastar::stop_iteration::yes);
-               }
-               budget_used +=
-                 next.header.dlength + next.header.mdlength;
-               return handler(
-                 next.offset,
-                 next.header,
-                 next.mdbuffer
-               ).safe_then([&cursor] {
-                 cursor.pending_records.pop_front();
-                 return scan_valid_records_ertr::make_ready_future<
-                   seastar::stop_iteration>(seastar::stop_iteration::no);
-               });
-             });
-         });
-       } else {
-         assert(!cursor.pending_records.empty());
-         auto &next = cursor.pending_records.front();
-         return read_validate_data(next.offset, next.header
-         ).safe_then([=, &budget_used, &next, &cursor, &handler](auto valid) {
-           if (!valid) {
-             cursor.pending_records.clear();
-             return scan_valid_records_ertr::now();
-           }
-           budget_used +=
-             next.header.dlength + next.header.mdlength;
-           return handler(
-             next.offset,
-             next.header,
-             next.mdbuffer
-           ).safe_then([&cursor] {
-             cursor.pending_records.pop_front();
-             return scan_valid_records_ertr::now();
-           });
-         });
-       }
-      }().safe_then([=, &budget_used, &cursor] {
-       if (cursor.is_complete() || budget_used >= budget) {
-         return seastar::stop_iteration::yes;
-       } else {
-         return seastar::stop_iteration::no;
-       }
-      });
-    }).safe_then([retref=std::move(retref)]() mutable -> scan_valid_records_ret {
-      return scan_valid_records_ret(
-       scan_valid_records_ertr::ready_future_marker{},
-       std::move(*retref));
-    });
-}
-
-
 }
index e7552bd13161fefb11c69a489fcc8a8c2033f214..625a102347e0d1cbc668d9de84f1cf957543387b 100644 (file)
@@ -14,6 +14,7 @@
 #include "include/denc.h"
 
 #include "crimson/common/log.h"
+#include "crimson/os/seastore/scanner.h"
 #include "crimson/os/seastore/segment_manager.h"
 #include "crimson/os/seastore/ordering_handle.h"
 #include "crimson/os/seastore/seastore_types.h"
@@ -29,7 +30,7 @@ class SegmentedAllocator;
  */
 class Journal {
 public:
-  Journal(SegmentManager &segment_manager);
+  Journal(SegmentManager &segment_manager, Scanner& scanner);
 
   /**
    * Sets the SegmentProvider.
@@ -136,26 +137,9 @@ public:
     replay_ret(journal_seq_t seq,
               paddr_t record_block_base,
               const delta_info_t&)>;
-  replay_ret replay(delta_handler_t &&delta_handler);
-
-  /**
-   * scan_extents
-   *
-   * Scans records beginning at addr until the first record boundary after
-   * addr + bytes_to_read.
-   *
-   * Returns list<extent, extent_info>
-   * cursor.is_complete() will be true when no further extents exist in segment.
-   */
-  class scan_valid_records_cursor;
-  using scan_extents_cursor = scan_valid_records_cursor;
-  using scan_extents_ertr = SegmentManager::read_ertr;
-  using scan_extents_ret_bare = std::list<std::pair<paddr_t, extent_info_t>>;
-  using scan_extents_ret = scan_extents_ertr::future<scan_extents_ret_bare>;
-  scan_extents_ret scan_extents(
-    scan_extents_cursor &cursor,
-    extent_len_t bytes_to_read
-  );
+  replay_ret replay(
+    std::vector<std::pair<segment_id_t, segment_header_t>>&& segment_headers,
+    delta_handler_t &&delta_handler);
 
   void set_write_pipeline(WritePipeline *_write_pipeline) {
     write_pipeline = _write_pipeline;
@@ -172,6 +156,7 @@ private:
   segment_off_t written_to = 0;
   segment_off_t committed_to = 0;
 
+  Scanner& scanner;
   WritePipeline *write_pipeline = nullptr;
 
   void reset_soft_state() {
@@ -187,18 +172,6 @@ private:
   initialize_segment_ertr::future<segment_seq_t> initialize_segment(
     Segment &segment);
 
-  /// validate embedded metadata checksum
-  static bool validate_metadata(const bufferlist &bl);
-
-  /// read and validate data
-  using read_validate_data_ertr = SegmentManager::read_ertr;
-  using read_validate_data_ret = read_validate_data_ertr::future<bool>;
-  read_validate_data_ret read_validate_data(
-    paddr_t record_base,
-    const record_header_t &header  ///< caller must ensure lifetime through
-                                   ///  future resolution
-  );
-
 
   /// do record write
   using write_record_ertr = crimson::errorator<
@@ -217,96 +190,24 @@ private:
   /// returns true iff current segment has insufficient space
   bool needs_roll(segment_off_t length) const;
 
-  using read_segment_header_ertr = crimson::errorator<
-    crimson::ct_error::enoent,
-    crimson::ct_error::enodata,
-    crimson::ct_error::input_output_error
-    >;
-  using read_segment_header_ret = read_segment_header_ertr::future<
-    segment_header_t>;
-  read_segment_header_ret read_segment_header(segment_id_t segment);
-
   /// return ordered vector of segments to replay
   using replay_segments_t = std::vector<
     std::pair<journal_seq_t, segment_header_t>>;
-  using find_replay_segments_ertr = crimson::errorator<
+  using prep_replay_segments_ertr = crimson::errorator<
     crimson::ct_error::input_output_error
     >;
-  using find_replay_segments_fut = find_replay_segments_ertr::future<
+  using prep_replay_segments_fut = prep_replay_segments_ertr::future<
     replay_segments_t>;
-  find_replay_segments_fut find_replay_segments();
+  prep_replay_segments_fut prep_replay_segments(
+    std::vector<std::pair<segment_id_t, segment_header_t>> segments);
 
   /// attempts to decode deltas from bl, return nullopt if unsuccessful
   std::optional<std::vector<delta_info_t>> try_decode_deltas(
     record_header_t header,
     const bufferlist &bl);
 
-  /// attempts to decode extent infos from bl, return nullopt if unsuccessful
-  std::optional<std::vector<extent_info_t>> try_decode_extent_infos(
-    record_header_t header,
-    const bufferlist &bl);
-
-  /// read record metadata for record starting at start
-  using read_validate_record_metadata_ertr = replay_ertr;
-  using read_validate_record_metadata_ret =
-    read_validate_record_metadata_ertr::future<
-      std::optional<std::pair<record_header_t, bufferlist>>
-    >;
-  read_validate_record_metadata_ret read_validate_record_metadata(
-    paddr_t start,
-    segment_nonce_t nonce);
-
-public:
-  /// scan segment for end incrementally
-  struct scan_valid_records_cursor {
-    bool last_valid_header_found = false;
-    paddr_t offset;
-    paddr_t last_committed;
-
-    struct found_record_t {
-      paddr_t offset;
-      record_header_t header;
-      bufferlist mdbuffer;
-
-      found_record_t(
-       paddr_t offset,
-       const record_header_t &header,
-       const bufferlist &mdbuffer)
-       : offset(offset), header(header), mdbuffer(mdbuffer) {}
-    };
-    std::deque<found_record_t> pending_records;
-
-    bool is_complete() const {
-      return last_valid_header_found && pending_records.empty();
-    }
-
-    paddr_t get_offset() const {
-      return offset;
-    }
-
-    scan_valid_records_cursor(
-      paddr_t offset)
-      : offset(offset) {}
-  };
 private:
 
-  using scan_valid_records_ertr = SegmentManager::read_ertr;
-  using scan_valid_records_ret = scan_valid_records_ertr::future<
-    size_t>;
-  using found_record_handler_t = std::function<
-    scan_valid_records_ertr::future<>(
-      paddr_t record_block_base,
-      // callee may assume header and bl will remain valid until
-      // returned future resolves
-      const record_header_t &header,
-      const bufferlist &bl)>;
-  scan_valid_records_ret scan_valid_records(
-    scan_valid_records_cursor &cursor, ///< [in, out] cursor, updated during call
-    segment_nonce_t nonce,             ///< [in] nonce for segment
-    size_t budget,                     ///< [in] max budget to use
-    found_record_handler_t &handler    ///< [in] handler for records
-  ); ///< @return used budget
-
   /// replays records starting at start through end of segment
   replay_ertr::future<>
   replay_segment(
diff --git a/src/crimson/os/seastore/scanner.cc b/src/crimson/os/seastore/scanner.cc
new file mode 100644 (file)
index 0000000..df3394c
--- /dev/null
@@ -0,0 +1,336 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
+
+#include "crimson/os/seastore/segment_manager.h"
+#include "crimson/os/seastore/scanner.h"
+#include "crimson/common/log.h"
+
+namespace {
+  seastar::logger& logger() {
+    return crimson::get_logger(ceph_subsys_seastore);
+  }
+}
+
+namespace crimson::os::seastore {
+
+Scanner::read_segment_header_ret
+Scanner::read_segment_header(segment_id_t segment)
+{
+  return segment_manager.read(
+    paddr_t{segment, 0},
+    segment_manager.get_block_size()
+  ).handle_error(
+    read_segment_header_ertr::pass_further{},
+    crimson::ct_error::assert_all{
+      "Invalid error in Scanner::read_segment_header"
+    }
+  ).safe_then([=](bufferptr bptr) -> read_segment_header_ret {
+    logger().debug("segment {} bptr size {}", segment, bptr.length());
+
+    segment_header_t header;
+    bufferlist bl;
+    bl.push_back(bptr);
+
+    logger().debug(
+      "Scanner::read_segment_header: segment {} block crc {}",
+      segment,
+      bl.begin().crc32c(segment_manager.get_block_size(), 0));
+
+    auto bp = bl.cbegin();
+    try {
+      decode(header, bp);
+    } catch (ceph::buffer::error &e) {
+      logger().debug(
+       "Scanner::read_segment_header: segment {} unable to decode "
+       "header, skipping",
+       segment);
+      return crimson::ct_error::enodata::make();
+    }
+    logger().debug(
+      "Scanner::read_segment_header: segment {} header {}",
+      segment,
+      header);
+    return read_segment_header_ret(
+      read_segment_header_ertr::ready_future_marker{},
+      header);
+  });
+}
+Scanner::scan_extents_ret Scanner::scan_extents(
+  scan_extents_cursor &cursor,
+  extent_len_t bytes_to_read)
+{
+  auto ret = std::make_unique<scan_extents_ret_bare>();
+  auto* extents = ret.get();
+  return read_segment_header(cursor.get_offset().segment
+  ).handle_error(
+    scan_extents_ertr::pass_further{},
+    crimson::ct_error::assert_all{
+      "Invalid error in Scanner::scan_extents"
+    }
+  ).safe_then([bytes_to_read, extents, &cursor, this](auto segment_header) {
+    auto segment_nonce = segment_header.segment_nonce;
+    return seastar::do_with(
+      found_record_handler_t(
+       [extents, this](
+         paddr_t base,
+         const record_header_t &header,
+         const bufferlist &mdbuf) mutable {
+
+         auto infos = try_decode_extent_infos(
+           header,
+           mdbuf);
+         if (!infos) {
+           // This should be impossible, we did check the crc on the mdbuf
+           logger().error(
+             "Scanner::scan_extents unable to decode extents for record {}",
+             base);
+           assert(infos);
+         }
+
+         paddr_t extent_offset = base.add_offset(header.mdlength);
+         for (const auto &i : *infos) {
+           extents->emplace_back(extent_offset, i);
+           extent_offset.offset += i.len;
+         }
+         return scan_extents_ertr::now();
+       }),
+      [=, &cursor](auto &dhandler) {
+       return scan_valid_records(
+         cursor,
+         segment_nonce,
+         bytes_to_read,
+         dhandler).discard_result();
+      });
+  }).safe_then([ret=std::move(ret)] {
+    return std::move(*ret);
+  });
+}
+
+Scanner::scan_valid_records_ret Scanner::scan_valid_records(
+  scan_valid_records_cursor &cursor,
+  segment_nonce_t nonce,
+  size_t budget,
+  found_record_handler_t &handler)
+{
+  if (cursor.offset.offset == 0) {
+    cursor.offset.offset = segment_manager.get_block_size();
+  }
+  auto retref = std::make_unique<size_t>(0);
+  auto budget_used = *retref;
+  return crimson::repeat(
+    [=, &cursor, &budget_used, &handler]() mutable
+    -> scan_valid_records_ertr::future<seastar::stop_iteration> {
+      return [=, &handler, &cursor, &budget_used] {
+       if (!cursor.last_valid_header_found) {
+         return read_validate_record_metadata(cursor.offset, nonce
+         ).safe_then([=, &cursor](auto md) {
+           logger().debug(
+             "Scanner::scan_valid_records: read complete {}",
+             cursor.offset);
+           if (!md) {
+             logger().debug(
+               "Scanner::scan_valid_records: found invalid header at {}, presumably at end",
+               cursor.offset);
+             cursor.last_valid_header_found = true;
+             return scan_valid_records_ertr::now();
+           } else {
+             logger().debug(
+               "Scanner::scan_valid_records: valid record read at {}",
+               cursor.offset);
+             cursor.last_committed = paddr_t{
+               cursor.offset.segment,
+               md->first.committed_to};
+             cursor.pending_records.emplace_back(
+               cursor.offset,
+               md->first,
+               md->second);
+             cursor.offset.offset +=
+               md->first.dlength + md->first.mdlength;
+             return scan_valid_records_ertr::now();
+           }
+         }).safe_then([=, &cursor, &budget_used, &handler] {
+           return crimson::repeat(
+             [=, &budget_used, &cursor, &handler] {
+               logger().debug(
+                 "Scanner::scan_valid_records: valid record read, processing queue");
+               if (cursor.pending_records.empty()) {
+                 /* This is only possible if the segment is empty.
+                  * A record's last_commited must be prior to its own
+                  * location since it itself cannot yet have been committed
+                  * at its own time of submission.  Thus, the most recently
+                  * read record must always fall after cursor.last_committed */
+                 return scan_valid_records_ertr::make_ready_future<
+                   seastar::stop_iteration>(seastar::stop_iteration::yes);
+               }
+               auto &next = cursor.pending_records.front();
+               if (next.offset > cursor.last_committed) {
+                 return scan_valid_records_ertr::make_ready_future<
+                   seastar::stop_iteration>(seastar::stop_iteration::yes);
+               }
+               budget_used +=
+                 next.header.dlength + next.header.mdlength;
+               return handler(
+                 next.offset,
+                 next.header,
+                 next.mdbuffer
+               ).safe_then([&cursor] {
+                 cursor.pending_records.pop_front();
+                 return scan_valid_records_ertr::make_ready_future<
+                   seastar::stop_iteration>(seastar::stop_iteration::no);
+               });
+             });
+         });
+       } else {
+         assert(!cursor.pending_records.empty());
+         auto &next = cursor.pending_records.front();
+         return read_validate_data(next.offset, next.header
+         ).safe_then([=, &budget_used, &next, &cursor, &handler](auto valid) {
+           if (!valid) {
+             cursor.pending_records.clear();
+             return scan_valid_records_ertr::now();
+           }
+           budget_used +=
+             next.header.dlength + next.header.mdlength;
+           return handler(
+             next.offset,
+             next.header,
+             next.mdbuffer
+           ).safe_then([&cursor] {
+             cursor.pending_records.pop_front();
+             return scan_valid_records_ertr::now();
+           });
+         });
+       }
+      }().safe_then([=, &budget_used, &cursor] {
+       if (cursor.is_complete() || budget_used >= budget) {
+         return seastar::stop_iteration::yes;
+       } else {
+         return seastar::stop_iteration::no;
+       }
+      });
+    }).safe_then([retref=std::move(retref)]() mutable -> scan_valid_records_ret {
+      return scan_valid_records_ret(
+       scan_valid_records_ertr::ready_future_marker{},
+       std::move(*retref));
+    });
+}
+
+Scanner::read_validate_record_metadata_ret
+Scanner::read_validate_record_metadata(
+  paddr_t start,
+  segment_nonce_t nonce)
+{
+  auto block_size = segment_manager.get_block_size();
+  if (start.offset + block_size > (int64_t)segment_manager.get_segment_size()) {
+    return read_validate_record_metadata_ret(
+      read_validate_record_metadata_ertr::ready_future_marker{},
+      std::nullopt);
+  }
+  return segment_manager.read(start, block_size
+  ).safe_then(
+    [=](bufferptr bptr) mutable
+    -> read_validate_record_metadata_ret {
+      logger().debug("read_validate_record_metadata: reading {}", start);
+      auto block_size = segment_manager.get_block_size();
+      bufferlist bl;
+      bl.append(bptr);
+      auto bp = bl.cbegin();
+      record_header_t header;
+      try {
+       decode(header, bp);
+      } catch (ceph::buffer::error &e) {
+       return read_validate_record_metadata_ret(
+         read_validate_record_metadata_ertr::ready_future_marker{},
+         std::nullopt);
+      }
+      if (header.segment_nonce != nonce) {
+       return read_validate_record_metadata_ret(
+         read_validate_record_metadata_ertr::ready_future_marker{},
+         std::nullopt);
+      }
+      if (header.mdlength > (extent_len_t)block_size) {
+       if (start.offset + header.mdlength >
+           (int64_t)segment_manager.get_segment_size()) {
+         return crimson::ct_error::input_output_error::make();
+       }
+       return segment_manager.read(
+         {start.segment, start.offset + (segment_off_t)block_size},
+         header.mdlength - block_size).safe_then(
+           [header=std::move(header), bl=std::move(bl)](
+             auto &&bptail) mutable {
+             bl.push_back(bptail);
+             return read_validate_record_metadata_ret(
+               read_validate_record_metadata_ertr::ready_future_marker{},
+               std::make_pair(std::move(header), std::move(bl)));
+           });
+      } else {
+       return read_validate_record_metadata_ret(
+         read_validate_record_metadata_ertr::ready_future_marker{},
+         std::make_pair(std::move(header), std::move(bl))
+       );
+      }
+    }).safe_then([=](auto p) {
+      if (p && validate_metadata(p->second)) {
+       return read_validate_record_metadata_ret(
+         read_validate_record_metadata_ertr::ready_future_marker{},
+         std::move(*p)
+       );
+      } else {
+       return read_validate_record_metadata_ret(
+         read_validate_record_metadata_ertr::ready_future_marker{},
+         std::nullopt);
+      }
+    });
+}
+
+std::optional<std::vector<extent_info_t>>
+Scanner::try_decode_extent_infos(
+  record_header_t header,
+  const bufferlist &bl)
+{
+  auto bliter = bl.cbegin();
+  bliter += ceph::encoded_sizeof_bounded<record_header_t>();
+  bliter += sizeof(checksum_t) /* crc */;
+  logger().debug("{}: decoding {} extents", __func__, header.extents);
+  std::vector<extent_info_t> extent_infos(header.extents);
+  for (auto &&i : extent_infos) {
+    try {
+      decode(i, bliter);
+    } catch (ceph::buffer::error &e) {
+      return std::nullopt;
+    }
+  }
+  return extent_infos;
+}
+
+Scanner::read_validate_data_ret
+Scanner::read_validate_data(
+  paddr_t record_base,
+  const record_header_t &header)
+{
+  return segment_manager.read(
+    record_base.add_offset(header.mdlength),
+    header.dlength
+  ).safe_then([=, &header](auto bptr) {
+    bufferlist bl;
+    bl.append(bptr);
+    return bl.crc32c(-1) == header.data_crc;
+  });
+}
+
+bool Scanner::validate_metadata(const bufferlist &bl)
+{
+  auto bliter = bl.cbegin();
+  auto test_crc = bliter.crc32c(
+    ceph::encoded_sizeof_bounded<record_header_t>(),
+    -1);
+  ceph_le32 recorded_crc_le;
+  decode(recorded_crc_le, bliter);
+  uint32_t recorded_crc = recorded_crc_le;
+  test_crc = bliter.crc32c(
+    bliter.get_remaining(),
+    test_crc);
+  return test_crc == recorded_crc;
+}
+
+} // namespace crimson::os::seastore
diff --git a/src/crimson/os/seastore/scanner.h b/src/crimson/os/seastore/scanner.h
new file mode 100644 (file)
index 0000000..36bec08
--- /dev/null
@@ -0,0 +1,102 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
+
+#pragma once
+
+#include "crimson/common/errorator.h"
+#include "crimson/os/seastore/seastore_types.h"
+#include "crimson/os/seastore/segment_manager.h"
+
+namespace crimson::os::seastore {
+
+class SegmentCleaner;
+
+class Scanner {
+public:
+  using read_ertr = crimson::errorator<
+    crimson::ct_error::input_output_error,
+    crimson::ct_error::invarg,
+    crimson::ct_error::enoent,
+    crimson::ct_error::erange>;
+
+  Scanner(SegmentManager& segment_manager)
+    : segment_manager(segment_manager) {}
+  using read_segment_header_ertr = crimson::errorator<
+    crimson::ct_error::enoent,
+    crimson::ct_error::enodata,
+    crimson::ct_error::input_output_error
+    >;
+  using read_segment_header_ret = read_segment_header_ertr::future<
+    segment_header_t>;
+  read_segment_header_ret read_segment_header(segment_id_t segment);
+
+  /**
+   * scan_extents
+   *
+   * Scans records beginning at addr until the first record boundary after
+   * addr + bytes_to_read.
+   *
+   * Returns list<extent, extent_info>
+   * cursor.is_complete() will be true when no further extents exist in segment.
+   */
+  using scan_extents_cursor = scan_valid_records_cursor;
+  using scan_extents_ertr = read_ertr::extend<crimson::ct_error::enodata>;
+  using scan_extents_ret_bare = std::list<std::pair<paddr_t, extent_info_t>>;
+  using scan_extents_ret = scan_extents_ertr::future<scan_extents_ret_bare>;
+  scan_extents_ret scan_extents(
+    scan_extents_cursor &cursor,
+    extent_len_t bytes_to_read
+  );
+
+  using scan_valid_records_ertr = read_ertr::extend<crimson::ct_error::enodata>;
+  using scan_valid_records_ret = scan_valid_records_ertr::future<
+    size_t>;
+  using found_record_handler_t = std::function<
+    scan_valid_records_ertr::future<>(
+      paddr_t record_block_base,
+      // callee may assume header and bl will remain valid until
+      // returned future resolves
+      const record_header_t &header,
+      const bufferlist &bl)>;
+  scan_valid_records_ret scan_valid_records(
+    scan_valid_records_cursor &cursor, ///< [in, out] cursor, updated during call
+    segment_nonce_t nonce,             ///< [in] nonce for segment
+    size_t budget,                     ///< [in] max budget to use
+    found_record_handler_t &handler    ///< [in] handler for records
+  ); ///< @return used budget
+
+private:
+  SegmentManager& segment_manager;
+
+  /// read record metadata for record starting at start
+  using read_validate_record_metadata_ertr = read_ertr;
+  using read_validate_record_metadata_ret =
+    read_validate_record_metadata_ertr::future<
+      std::optional<std::pair<record_header_t, bufferlist>>
+    >;
+  read_validate_record_metadata_ret read_validate_record_metadata(
+    paddr_t start,
+    segment_nonce_t nonce);
+
+  /// attempts to decode extent infos from bl, return nullopt if unsuccessful
+  std::optional<std::vector<extent_info_t>> try_decode_extent_infos(
+    record_header_t header,
+    const bufferlist &bl);
+
+  /// read and validate data
+  using read_validate_data_ertr = read_ertr;
+  using read_validate_data_ret = read_validate_data_ertr::future<bool>;
+  read_validate_data_ret read_validate_data(
+    paddr_t record_base,
+    const record_header_t &header  ///< caller must ensure lifetime through
+                                   ///  future resolution
+  );
+
+  /// validate embedded metadata checksum
+  static bool validate_metadata(const bufferlist &bl);
+
+};
+
+using ScannerRef = std::unique_ptr<Scanner>;
+
+} // namespace crimson::os::seastore
index 3973da181b2b53657e6d2931c7928ca8d401acd2..2aa4e6c3156c495ec8b094bdb137ce813a01d2ee 100644 (file)
@@ -1173,11 +1173,14 @@ std::unique_ptr<SeaStore> make_seastore(
     segment_manager::block::BlockSegmentManager
     >(device + "/block");
 
+  auto scanner = std::make_unique<Scanner>(*sm);
+  auto& scanner_ref = *scanner.get();
   auto segment_cleaner = std::make_unique<SegmentCleaner>(
     SegmentCleaner::config_t::get_default(),
+    std::move(scanner),
     false /* detailed */);
 
-  auto journal = std::make_unique<Journal>(*sm);
+  auto journal = std::make_unique<Journal>(*sm, scanner_ref);
   auto cache = std::make_unique<Cache>(*sm);
   auto lba_manager = lba_manager::create_lba_manager(*sm, *cache);
 
index 704f2bcea2fd74fa5fbd06c8fe266b5c6c72722f..465cd2dfb4794b32f9a0fda0d9d13c9765fdbfff 100644 (file)
@@ -840,6 +840,38 @@ ceph::bufferlist encode_record(
   segment_off_t committed_to,
   segment_nonce_t current_segment_nonce = 0);
 
+/// scan segment for end incrementally
+struct scan_valid_records_cursor {
+  bool last_valid_header_found = false;
+  paddr_t offset;
+  paddr_t last_committed;
+
+  struct found_record_t {
+    paddr_t offset;
+    record_header_t header;
+    bufferlist mdbuffer;
+
+    found_record_t(
+      paddr_t offset,
+      const record_header_t &header,
+      const bufferlist &mdbuffer)
+      : offset(offset), header(header), mdbuffer(mdbuffer) {}
+  };
+  std::deque<found_record_t> pending_records;
+
+  bool is_complete() const {
+    return last_valid_header_found && pending_records.empty();
+  }
+
+  paddr_t get_offset() const {
+    return offset;
+  }
+
+  scan_valid_records_cursor(
+    paddr_t offset)
+    : offset(offset) {}
+};
+
 }
 
 WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::seastore_meta_t)
index 283b7eece8953e6881087cc1bb9347e067e827bc..82bf2d22fc4b0a22e2ed04d0c689bd18b155d159 100644 (file)
@@ -144,9 +144,13 @@ void SpaceTrackerDetailed::dump_usage(segment_id_t id) const
   segment_usage[id].dump_usage(block_size);
 }
 
-SegmentCleaner::SegmentCleaner(config_t config, bool detailed)
+SegmentCleaner::SegmentCleaner(
+  config_t config,
+  ScannerRef&& scr,
+  bool detailed)
   : detailed(detailed),
     config(config),
+    scanner(std::move(scr)),
     gc_process(*this)
 {
   register_metrics();
@@ -304,7 +308,7 @@ SegmentCleaner::gc_reclaim_space_ret SegmentCleaner::gc_reclaim_space()
     }
     next.offset = 0;
     scan_cursor =
-      std::make_unique<ExtentCallbackInterface::scan_extents_cursor>(
+      std::make_unique<Scanner::scan_extents_cursor>(
        next);
     logger().debug(
       "SegmentCleaner::do_gc: starting gc on segment {}",
@@ -313,7 +317,7 @@ SegmentCleaner::gc_reclaim_space_ret SegmentCleaner::gc_reclaim_space()
     ceph_assert(!scan_cursor->is_complete());
   }
 
-  return ecb->scan_extents(
+  return scanner->scan_extents(
     *scan_cursor,
     config.reclaim_bytes_stride
   ).safe_then([this](auto &&_extents) {
@@ -372,4 +376,41 @@ SegmentCleaner::gc_reclaim_space_ret SegmentCleaner::gc_reclaim_space()
   });
 }
 
+SegmentCleaner::init_segments_ret SegmentCleaner::init_segments() {
+  return seastar::do_with(
+    std::vector<std::pair<segment_id_t, segment_header_t>>(),
+    [this](auto& segments) {
+    return crimson::do_for_each(
+      boost::make_counting_iterator(segment_id_t{0}),
+      boost::make_counting_iterator(segment_id_t{num_segments}),
+      [this, &segments](auto segment_id) {
+      return scanner->read_segment_header(segment_id)
+      .safe_then([&segments, segment_id, this](auto header) {
+       if (header.out_of_line) {
+         logger().debug("Scanner::init_segments: out-of-line segment {}", segment_id);
+         init_mark_segment_closed(
+           segment_id,
+           header.journal_segment_seq);
+       } else {
+         logger().debug("Scanner::init_segments: journal segment {}", segment_id);
+         segments.emplace_back(std::make_pair(segment_id, std::move(header)));
+       }
+       return seastar::now();
+      }).handle_error(
+       crimson::ct_error::enoent::handle([](auto) {
+         return init_segments_ertr::now();
+       }),
+       crimson::ct_error::enodata::handle([](auto) {
+         return init_segments_ertr::now();
+       }),
+       crimson::ct_error::input_output_error::pass_further{}
+      );
+    }).safe_then([&segments] {
+      return seastar::make_ready_future<
+       std::vector<std::pair<segment_id_t, segment_header_t>>>(
+         std::move(segments));
+    });
+  });
+}
+
 }
index a94a133301f4b60b4461a46939c9ef13e21c9c6a..f79dee67751aba7771d9a23e40d9ec26fb340efc 100644 (file)
@@ -108,7 +108,7 @@ class SpaceTrackerSimple : public SpaceTrackerI {
     return live_bytes_by_segment[segment];
   }
 public:
-  SpaceTrackerSimple(size_t num_segments)
+  SpaceTrackerSimple(segment_id_t num_segments)
     : live_bytes_by_segment(num_segments, 0) {}
 
   int64_t allocate(
@@ -190,7 +190,7 @@ class SpaceTrackerDetailed : public SpaceTrackerI {
   std::vector<SegmentMap> segment_usage;
 
 public:
-  SpaceTrackerDetailed(size_t num_segments, size_t segment_size, size_t block_size)
+  SpaceTrackerDetailed(segment_id_t num_segments, size_t segment_size, size_t block_size)
     : block_size(block_size),
       segment_size(segment_size),
       segment_usage(num_segments, segment_size / block_size) {}
@@ -345,18 +345,6 @@ public:
       laddr_t laddr,
       segment_off_t len) = 0;
 
-    /**
-     * scan_extents
-     *
-     * Interface shim for Journal::scan_extents
-     */
-    using scan_extents_cursor = Journal::scan_valid_records_cursor;
-    using scan_extents_ertr = Journal::scan_extents_ertr;
-    using scan_extents_ret = Journal::scan_extents_ret;
-    virtual scan_extents_ret scan_extents(
-      scan_extents_cursor &cursor,
-      extent_len_t bytes_to_read) = 0;
-
     /**
      * release_segment
      *
@@ -386,10 +374,12 @@ private:
   const bool detailed;
   const config_t config;
 
-  size_t num_segments = 0;
+  segment_id_t num_segments = 0;
   size_t segment_size = 0;
   size_t block_size = 0;
 
+  ScannerRef scanner;
+
   SpaceTrackerIRef space_tracker;
   std::vector<segment_info_t> segments;
   size_t empty_segments;
@@ -417,7 +407,10 @@ private:
   std::optional<seastar::promise<>> blocked_io_wake;
 
 public:
-  SegmentCleaner(config_t config, bool detailed = false);
+  SegmentCleaner(
+    config_t config,
+    ScannerRef&& scanner,
+    bool detailed = false);
 
   void mount(SegmentManager &sm) {
     init_complete = false;
@@ -444,6 +437,13 @@ public:
     empty_segments = num_segments;
   }
 
+  using init_segments_ertr = crimson::errorator<
+    crimson::ct_error::input_output_error>;
+  using init_segments_ret_bare =
+    std::vector<std::pair<segment_id_t, segment_header_t>>;
+  using init_segments_ret = init_segments_ertr::future<init_segments_ret_bare>;
+  init_segments_ret init_segments();
+
   get_segment_ret get_segment() final;
 
   void close_segment(segment_id_t segment) final;
@@ -633,7 +633,7 @@ private:
 
   // GC status helpers
   std::unique_ptr<
-    ExtentCallbackInterface::scan_extents_cursor
+    Scanner::scan_extents_cursor
     > scan_cursor;
 
   /**
@@ -711,7 +711,7 @@ private:
   } gc_process;
 
   using gc_ertr = work_ertr::extend_ertr<
-    ExtentCallbackInterface::scan_extents_ertr
+    Scanner::scan_extents_ertr
     >;
 
   gc_cycle_ret do_gc_cycle();
index 43e923ca0b4d8ecc44ce49a835342aee65a2970b..3ef0f5af59b589bbe0887b6c6db819004bd2a4bb 100644 (file)
@@ -63,8 +63,13 @@ TransactionManager::mount_ertr::future<> TransactionManager::mount()
   LOG_PREFIX(TransactionManager::mount);
   cache->init();
   segment_cleaner->mount(segment_manager);
-  return journal->replay([this](auto seq, auto paddr, const auto &e) {
-    return cache->replay_delta(seq, paddr, e);
+  return segment_cleaner->init_segments().safe_then(
+    [this](auto&& segments) {
+    return journal->replay(
+      std::move(segments),
+      [this](auto seq, auto paddr, const auto &e) {
+      return cache->replay_delta(seq, paddr, e);
+    });
   }).safe_then([this] {
     return journal->open_for_write();
   }).safe_then([this, FNAME](auto addr) {
index 03a0d32a8753dedd3dbce69da2fcd98fd826fc87..f12aac95ff0a52988eac17b3e5bb931c63e18a4b 100644 (file)
@@ -373,18 +373,6 @@ public:
     laddr_t laddr,
     segment_off_t len) final;
 
-  using scan_extents_cursor =
-    SegmentCleaner::ExtentCallbackInterface::scan_extents_cursor;
-  using scan_extents_ertr =
-    SegmentCleaner::ExtentCallbackInterface::scan_extents_ertr;
-  using scan_extents_ret =
-    SegmentCleaner::ExtentCallbackInterface::scan_extents_ret;
-  scan_extents_ret scan_extents(
-    scan_extents_cursor &cursor,
-    extent_len_t bytes_to_read) final {
-    return journal->scan_extents(cursor, bytes_to_read);
-  }
-
   using release_segment_ret =
     SegmentCleaner::ExtentCallbackInterface::release_segment_ret;
   release_segment_ret release_segment(
index 3c94f5226e522c5dca2a0a6a00e4c1e143dece9b..84809db5aa42286cc9dedbc6f558e83b73c0f613 100644 (file)
@@ -27,6 +27,7 @@ using namespace crimson::os::seastore::lba_manager::btree;
 struct btree_lba_manager_test :
   public seastar_test_suite_t, SegmentProvider {
   segment_manager::EphemeralSegmentManagerRef segment_manager;
+  ScannerRef scanner;
   Journal journal;
   Cache cache;
   BtreeLBAManagerRef lba_manager;
@@ -37,7 +38,8 @@ struct btree_lba_manager_test :
 
   btree_lba_manager_test()
     : segment_manager(segment_manager::create_test_ephemeral()),
-      journal(*segment_manager),
+      scanner(new Scanner(*segment_manager)),
+      journal(*segment_manager, *scanner),
       cache(*segment_manager),
       lba_manager(new BtreeLBAManager(*segment_manager, cache)),
       block_size(segment_manager->get_block_size())
index 2cc2439c7deaad3316b00606be7098a1436e2b44..2be3cf9496687aa3a2644e904ceeddfaa184d64b 100644 (file)
@@ -74,11 +74,13 @@ struct journal_test_t : seastar_test_suite_t, SegmentProvider {
 
   const segment_off_t block_size;
 
+  ScannerRef scanner;
+
   journal_test_t()
     : segment_manager(segment_manager::create_test_ephemeral()),
-      block_size(segment_manager->get_block_size())
-  {
-  }
+      block_size(segment_manager->get_block_size()),
+      scanner(std::make_unique<Scanner>(*segment_manager))
+  {}
 
   segment_id_t next = 0;
   get_segment_ret get_segment() final {
@@ -91,7 +93,7 @@ struct journal_test_t : seastar_test_suite_t, SegmentProvider {
   void update_journal_tail_committed(journal_seq_t paddr) final {}
 
   seastar::future<> set_up_fut() final {
-    journal.reset(new Journal(*segment_manager));
+    journal.reset(new Journal(*segment_manager, *scanner));
     journal->set_segment_provider(this);
     journal->set_write_pipeline(&pipeline);
     return segment_manager->init(
@@ -108,12 +110,43 @@ struct journal_test_t : seastar_test_suite_t, SegmentProvider {
   auto replay(T &&f) {
     return journal->close(
     ).safe_then([this, f=std::move(f)]() mutable {
-      journal.reset(new Journal(*segment_manager));
+      journal.reset(new Journal(*segment_manager, *scanner));
       journal->set_segment_provider(this);
       journal->set_write_pipeline(&pipeline);
-      return journal->replay(std::forward<T>(std::move(f)));
-    }).safe_then([this] {
-      return journal->open_for_write();
+      return seastar::do_with(
+       std::vector<std::pair<segment_id_t, segment_header_t>>(),
+       [this](auto& segments) {
+       return crimson::do_for_each(
+         boost::make_counting_iterator(segment_id_t{0}),
+         boost::make_counting_iterator(segment_manager->get_num_segments()),
+         [this, &segments](auto segment_id) {
+         return scanner->read_segment_header(segment_id)
+         .safe_then([&segments, segment_id](auto header) {
+           if (!header.out_of_line) {
+             segments.emplace_back(std::make_pair(segment_id, std::move(header)));
+           }
+           return seastar::now();
+         }).handle_error(
+           crimson::ct_error::enoent::handle([](auto) {
+             return SegmentCleaner::init_segments_ertr::now();
+           }),
+           crimson::ct_error::enodata::handle([](auto) {
+             return SegmentCleaner::init_segments_ertr::now();
+           }),
+           crimson::ct_error::input_output_error::pass_further{}
+         );
+       }).safe_then([&segments] {
+         return seastar::make_ready_future<
+           std::vector<std::pair<segment_id_t, segment_header_t>>>(
+             std::move(segments));
+       });
+      }).safe_then([this, f=std::move(f)](auto&& segments) mutable {
+       return journal->replay(
+         std::move(segments),
+         std::forward<T>(std::move(f)));
+      }).safe_then([this] {
+       return journal->open_for_write();
+      });
     });
   }
 
index a4abee896014a82df3b869129716b3c50496a79e..4d9eab791820c308c98aef66ce3f422fd6bb7de4 100644 (file)
@@ -70,10 +70,13 @@ protected:
 
 auto get_transaction_manager(
   SegmentManager &segment_manager) {
+  auto scanner = std::make_unique<Scanner>(segment_manager);
+  auto& scanner_ref = *scanner.get();
   auto segment_cleaner = std::make_unique<SegmentCleaner>(
     SegmentCleaner::config_t::get_default(),
+    std::move(scanner),
     true);
-  auto journal = std::make_unique<Journal>(segment_manager);
+  auto journal = std::make_unique<Journal>(segment_manager, scanner_ref);
   auto cache = std::make_unique<Cache>(segment_manager);
   auto lba_manager = lba_manager::create_lba_manager(segment_manager, *cache);