]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/os/seastore: introduce generalized scan_valid_records for RBM
authormyoungwon oh <ohmyoungwon@gmail.com>
Sat, 22 Jul 2023 06:03:04 +0000 (06:03 +0000)
committermyoungwon oh <ohmyoungwon@gmail.com>
Wed, 16 Aug 2023 16:02:46 +0000 (16:02 +0000)
Signed-off-by: Myoungwon Oh <myoungwon.oh@samsung.com>
src/crimson/os/seastore/journal/circular_bounded_journal.cc
src/crimson/os/seastore/journal/circular_bounded_journal.h
src/crimson/os/seastore/record_scanner.cc
src/crimson/os/seastore/record_scanner.h
src/crimson/os/seastore/seastore_types.h
src/crimson/os/seastore/segment_manager_group.cc
src/crimson/os/seastore/segment_manager_group.h

index 8d0de4e18ecd4247e7887bb1ea457094856a6c4d..a92b9ecbd6b75da4a2f4a6312829ea8e8c94fd30 100644 (file)
@@ -111,115 +111,175 @@ CircularBoundedJournal::do_submit_record(
   });
 }
 
+RecordScanner::read_validate_record_metadata_ret CircularBoundedJournal::read_validate_record_metadata(
+  scan_valid_records_cursor &cursor,
+  segment_nonce_t nonce)
+{
+  LOG_PREFIX(CircularBoundedJournal::read_validate_record_metadata);
+  paddr_t start = cursor.seq.offset;
+  return read_record(start, 0
+  ).safe_then([FNAME, &cursor](auto ret) {
+    if (!ret.has_value()) {
+      return read_validate_record_metadata_ret(
+       read_validate_record_metadata_ertr::ready_future_marker{},
+       std::nullopt);
+    }
+    auto [r_header, bl] = *ret;
+    if ((cursor.last_committed != JOURNAL_SEQ_NULL &&
+       cursor.last_committed > r_header.committed_to) ||
+       r_header.committed_to.segment_seq != cursor.seq.segment_seq) {
+      DEBUG("invalid header: {}", r_header);
+      return read_validate_record_metadata_ret(
+       read_validate_record_metadata_ertr::ready_future_marker{},
+       std::nullopt);
+    }
+
+    bufferlist mdbuf;
+    mdbuf.substr_of(bl, 0, r_header.mdlength);
+    DEBUG("header: {}", r_header);
+    return read_validate_record_metadata_ret(
+      read_validate_record_metadata_ertr::ready_future_marker{},
+      std::make_pair(std::move(r_header), std::move(mdbuf)));
+  });
+}
+
+RecordScanner::read_validate_data_ret CircularBoundedJournal::read_validate_data(
+  paddr_t record_base,
+  const record_group_header_t &header)
+{
+  return read_record(record_base, header.segment_nonce
+  ).safe_then([](auto ret) {
+    // read_record would return non-empty value if the record is valid
+    if (!ret.has_value()) {
+      return read_validate_data_ret(
+        read_validate_data_ertr::ready_future_marker{},
+        false);
+    }
+    return read_validate_data_ertr::make_ready_future<bool>(true);
+  });
+}
+
+Journal::replay_ret CircularBoundedJournal::replay_segment(
+   cbj_delta_handler_t &handler, scan_valid_records_cursor& cursor)
+{
+  LOG_PREFIX(Journal::replay_segment);
+  return seastar::do_with(
+    RecordScanner::found_record_handler_t(
+      [this, &handler, FNAME](
+      record_locator_t locator,
+      const record_group_header_t& r_header,
+      const bufferlist& mdbuf)
+      -> RecordScanner::scan_valid_records_ertr::future<>
+    {
+      auto maybe_record_deltas_list = try_decode_deltas(
+        r_header, mdbuf, locator.record_block_base);
+      if (!maybe_record_deltas_list) {
+        // This should be impossible, we did check the crc on the mdbuf
+        ERROR("unable to decode deltas for record {} at {}",
+              r_header, locator.record_block_base);
+        return crimson::ct_error::input_output_error::make();
+      }
+      auto cursor_addr = convert_paddr_to_abs_addr(r_header.committed_to.offset);
+      DEBUG("{} at {}", r_header, cursor_addr);
+      auto write_result = write_result_t{
+        r_header.committed_to,
+        r_header.mdlength + r_header.dlength
+      };
+      auto expected_seq = r_header.committed_to.segment_seq;
+      cursor_addr += (r_header.mdlength + r_header.dlength);
+      if (cursor_addr >= get_journal_end()) {
+        cursor_addr = get_records_start();
+        ++expected_seq;
+        paddr_t addr = convert_abs_addr_to_paddr(
+          cursor_addr,
+          get_device_id());
+        write_result.start_seq.offset = addr;
+        write_result.start_seq.segment_seq = expected_seq;
+      }
+      paddr_t addr = convert_abs_addr_to_paddr(
+        cursor_addr,
+        get_device_id());
+      set_written_to(
+        journal_seq_t{expected_seq, addr});
+      return seastar::do_with(
+        std::move(*maybe_record_deltas_list),
+        [write_result,
+        &handler,
+        FNAME](auto& record_deltas_list) {
+        return crimson::do_for_each(
+          record_deltas_list,
+          [write_result,
+          &handler, FNAME](record_deltas_t& record_deltas) {
+          auto locator = record_locator_t{
+            record_deltas.record_block_base,
+            write_result
+          };
+          DEBUG("processing {} deltas at block_base {}",
+              record_deltas.deltas.size(),
+              locator);
+          return crimson::do_for_each(
+            record_deltas.deltas,
+            [locator,
+            &handler](auto& p) {
+            auto& modify_time = p.first;
+            auto& delta = p.second;
+            return handler(
+              locator,
+              delta,
+              modify_time).discard_result();
+          });
+        });
+      });
+    }),
+    [=, this, &cursor](auto &dhandler) {
+      return scan_valid_records(
+        cursor,
+       0,
+        std::numeric_limits<size_t>::max(),
+        dhandler).safe_then([](auto){}
+      ).handle_error(
+        replay_ertr::pass_further{},
+        crimson::ct_error::assert_all{
+          "shouldn't meet with any other error other replay_ertr"
+        }
+      );
+    }
+  );
+}
+
+
 Journal::replay_ret CircularBoundedJournal::scan_valid_record_delta(
-   cbj_delta_handler_t &&delta_handler, journal_seq_t tail)
+   cbj_delta_handler_t &&handler, journal_seq_t tail)
 {
-  LOG_PREFIX(CircularBoundedJournal::scan_valid_record_delta);
+  LOG_PREFIX(Journal::scan_valid_record_delta);
+  INFO("starting at {} ", tail);
   return seastar::do_with(
+    scan_valid_records_cursor(tail),
+    std::move(handler),
     bool(false),
-    rbm_abs_addr(get_rbm_addr(tail)),
-    std::move(delta_handler),
-    segment_seq_t(NULL_SEG_SEQ),
-    [this, FNAME](auto &is_rolled, auto &cursor_addr, auto &d_handler, auto &expected_seq) {
-    return crimson::repeat(
-      [this, &is_rolled, &cursor_addr, &d_handler, &expected_seq, FNAME]() mutable
-      -> replay_ertr::future<seastar::stop_iteration> {
-      paddr_t record_paddr = convert_abs_addr_to_paddr(
-       cursor_addr,
-       get_device_id());
-      return read_record(record_paddr, expected_seq
-      ).safe_then([this, &is_rolled, &cursor_addr, &d_handler, &expected_seq, FNAME](auto ret)
-         -> replay_ertr::future<seastar::stop_iteration> {
-       if (!ret.has_value()) {
-         if (expected_seq == NULL_SEG_SEQ || is_rolled) {
-           DEBUG("no more records, stop replaying");
-           return replay_ertr::make_ready_future<
-             seastar::stop_iteration>(seastar::stop_iteration::yes);
-         } else {
-           cursor_addr = get_records_start();
-           ++expected_seq;
-           is_rolled = true;
-           return replay_ertr::make_ready_future<
-             seastar::stop_iteration>(seastar::stop_iteration::no);
-         }
-       }
-       auto [r_header, bl] = *ret;
-       bufferlist mdbuf;
-       mdbuf.substr_of(bl, 0, r_header.mdlength);
-       paddr_t record_block_base = paddr_t::make_blk_paddr(
-         get_device_id(), cursor_addr + r_header.mdlength);
-       auto maybe_record_deltas_list = try_decode_deltas(
-         r_header, mdbuf, record_block_base);
-       if (!maybe_record_deltas_list) {
-         // This should be impossible, we did check the crc on the mdbuf
-         ERROR("unable to decode deltas for record {} at {}",
-               r_header, record_block_base);
-         return crimson::ct_error::input_output_error::make();
-       }
-       DEBUG("{} at {}", r_header, cursor_addr);
-       auto write_result = write_result_t{
-         r_header.committed_to,
-         bl.length()
-       };
-       if (expected_seq == NULL_SEG_SEQ) {
-         expected_seq = r_header.committed_to.segment_seq;
-       } else {
-         assert(expected_seq == r_header.committed_to.segment_seq);
-       }
-       cursor_addr += bl.length();
-       if (cursor_addr >= get_journal_end()) {
-         assert(cursor_addr == get_journal_end());
-         cursor_addr = get_records_start();
-         ++expected_seq;
-         paddr_t addr = convert_abs_addr_to_paddr(
-           cursor_addr,
-           get_device_id());
-         write_result.start_seq.offset = addr;
-         write_result.start_seq.segment_seq = expected_seq;
-         is_rolled = true;
-       }
-       paddr_t addr = convert_abs_addr_to_paddr(
-         cursor_addr,
-         get_device_id());
-       set_written_to(
-         journal_seq_t{expected_seq, addr});
-       return seastar::do_with(
-         std::move(*maybe_record_deltas_list),
-         [write_result,
-         &d_handler,
-         FNAME](auto& record_deltas_list) {
-         return crimson::do_for_each(
-           record_deltas_list,
-           [write_result,
-           &d_handler, FNAME](record_deltas_t& record_deltas) {
-           auto locator = record_locator_t{
-             record_deltas.record_block_base,
-             write_result
-           };
-           DEBUG("processing {} deltas at block_base {}",
-               record_deltas.deltas.size(),
-               locator);
-           return crimson::do_for_each(
-             record_deltas.deltas,
-             [locator,
-             &d_handler](auto& p) {
-             auto& modify_time = p.first;
-             auto& delta = p.second;
-             return d_handler(
-               locator,
-               delta,
-               modify_time).discard_result();
-           });
-         }).safe_then([]() {
-           return replay_ertr::make_ready_future<
-             seastar::stop_iteration>(seastar::stop_iteration::no);
-         });
-       });
+    [this] (auto &cursor, auto &handler, auto &rolled) {
+    return crimson::repeat([this, &handler, &cursor, &rolled]()
+    -> replay_ertr::future<seastar::stop_iteration>
+    {
+      return replay_segment(handler, cursor
+      ).safe_then([this, &cursor, &rolled] {
+        if (!rolled) {
+          cursor.last_valid_header_found = false;
+        }
+        if (!cursor.is_complete()) {
+          try_read_rolled_header(cursor);
+         rolled = true;
+          return replay_ertr::make_ready_future<
+            seastar::stop_iteration>(seastar::stop_iteration::no);
+        }
+        return replay_ertr::make_ready_future<
+          seastar::stop_iteration>(seastar::stop_iteration::yes);
       });
     });
   });
 }
 
+
 Journal::replay_ret CircularBoundedJournal::replay(
     delta_handler_t &&delta_handler)
 {
index bb3e2a860659c67ff76fe54bb2f148a64101db47..3c696f99704cc93f59736110469aec7025b41b00 100644 (file)
@@ -21,6 +21,7 @@
 #include <list>
 #include "crimson/os/seastore/journal/record_submitter.h"
 #include "crimson/os/seastore/journal/circular_journal_space.h"
+#include "crimson/os/seastore/record_scanner.h"
 
 namespace crimson::os::seastore::journal {
 
@@ -55,7 +56,7 @@ using RBMDevice = random_block_device::RBMDevice;
 
 constexpr uint64_t DEFAULT_BLOCK_SIZE = 4096;
 
-class CircularBoundedJournal : public Journal {
+class CircularBoundedJournal : public Journal, RecordScanner {
 public:
   CircularBoundedJournal(
       JournalTrimmer &trimmer, RBMDevice* device, const std::string &path);
@@ -179,6 +180,27 @@ public:
 
   submit_record_ret do_submit_record(record_t &&record, OrderingHandle &handle);
 
+  void try_read_rolled_header(scan_valid_records_cursor &cursor) {
+    paddr_t addr = convert_abs_addr_to_paddr(
+      get_records_start(),
+      get_device_id());
+    cursor.seq.offset = addr;
+    cursor.seq.segment_seq += 1;
+  }
+
+  void initialize_cursor(scan_valid_records_cursor& cursor) final {};
+
+  Journal::replay_ret replay_segment(
+    cbj_delta_handler_t &handler, scan_valid_records_cursor& cursor);
+
+  read_validate_record_metadata_ret read_validate_record_metadata(
+    scan_valid_records_cursor &cursor,
+    segment_nonce_t nonce) final;
+
+  read_validate_data_ret read_validate_data(
+    paddr_t record_base,
+    const record_group_header_t &header) final;
+
   // Test interfaces
   
   CircularJournalSpace& get_cjs() {
index f3ed54d01644e17725b8a782b1bcccd343e4f7a8..74bfdeb7cfae21c7539d70a09d345b7ae275b8a1 100644 (file)
@@ -26,7 +26,7 @@ RecordScanner::scan_valid_records(
     -> scan_valid_records_ertr::future<seastar::stop_iteration> {
       return [=, &handler, &cursor, &budget_used, this] {
        if (!cursor.last_valid_header_found) {
-         return read_validate_record_metadata(cursor.seq.offset, nonce
+         return read_validate_record_metadata(cursor, nonce
          ).safe_then([=, &cursor](auto md) {
            if (!md) {
              cursor.last_valid_header_found = true;
index c8486f5901395e39f6ff688077666119b32c174b..10569ef4e5dca9f2df5e63e39dccbf994d9bfeb7 100644 (file)
@@ -38,7 +38,7 @@ protected:
       std::optional<std::pair<record_group_header_t, bufferlist>>
     >;
   virtual read_validate_record_metadata_ret read_validate_record_metadata(
-    paddr_t start,
+    scan_valid_records_cursor &cursor,
     segment_nonce_t nonce) = 0;
 
   /// read and validate data
index 2dc038dfa7bc05bb3ddbd0d011cad5896f6c3684..9fd008f4bec6c6676843b4e246c4a603ebde1a69 100644 (file)
@@ -2083,9 +2083,7 @@ struct scan_valid_records_cursor {
   }
 
   void increment_seq(segment_off_t off) {
-    auto& seg_addr = seq.offset.as_seg_paddr();
-    seg_addr.set_segment_off(
-      seg_addr.get_segment_off() + off);
+    seq.offset = seq.offset.add_offset(off);
   }
 
   void emplace_record_group(const record_group_header_t&, ceph::bufferlist&&);
index 6fe56501c8a0aad8bb43df84a4495f240705ceae..efbbb0c888cd74a17b80855e30cb1fd3773062b0 100644 (file)
@@ -106,10 +106,11 @@ void SegmentManagerGroup::initialize_cursor(
 
 SegmentManagerGroup::read_validate_record_metadata_ret
 SegmentManagerGroup::read_validate_record_metadata(
-  paddr_t start,
+  scan_valid_records_cursor &cursor,
   segment_nonce_t nonce)
 {
   LOG_PREFIX(SegmentManagerGroup::read_validate_record_metadata);
+  paddr_t start = cursor.seq.offset;
   auto& seg_addr = start.as_seg_paddr();
   assert(has_device(seg_addr.get_segment_id().device_id()));
   auto& segment_manager = *segment_managers[seg_addr.get_segment_id().device_id()];
index 826ab61a7e7a51ef9363b8e496fa601abf31b027..10d8d4f292c39f6698129987f65d8c1767d2127e 100644 (file)
@@ -129,7 +129,7 @@ private:
   void initialize_cursor(scan_valid_records_cursor &cursor) final;
 
   read_validate_record_metadata_ret read_validate_record_metadata(
-    paddr_t start,
+    scan_valid_records_cursor &cursor,
     segment_nonce_t nonce) final;
 
   read_validate_data_ret read_validate_data(