]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/os/seastore/cbj: add deallocation map during replay to filter out out-dated... 48773/head
authormyoungwon oh <ohmyoungwon@gmail.com>
Wed, 2 Nov 2022 07:10:31 +0000 (16:10 +0900)
committermyoungwon oh <ohmyoungwon@gmail.com>
Mon, 21 Nov 2022 07:04:33 +0000 (16:04 +0900)
Signed-off-by: Myoungwon Oh <myoungwon.oh@samsung.com>
src/crimson/os/seastore/cache.cc
src/crimson/os/seastore/journal/circular_bounded_journal.cc
src/crimson/os/seastore/journal/circular_bounded_journal.h

index 80d09889b1b2e8ba98b0e351826162720b5095e7..4cedc835311eb00622509b108ea33cdf4d757fc8 100644 (file)
@@ -1760,17 +1760,7 @@ Cache::replay_delta(
       DEBUG("replay extent delta at {} {} ... -- {}, prv_extent={}",
             journal_seq, record_base, delta, *extent);
 
-      if (extent->last_committed_crc != delta.prev_crc) {
-        // FIXME: we can't rely on crc to detect whether is delta is
-        // out-of-date.
-        ERROR("identified delta crc {} doesn't match the extent at {} {}, "
-              "probably is out-dated -- {}",
-              delta, journal_seq, record_base, *extent);
-        ceph_assert(epm.get_journal_type() == journal_type_t::RANDOM_BLOCK);
-        remove_extent(extent);
-        return replay_delta_ertr::make_ready_future<bool>(false);
-      }
-
+      assert(extent->last_committed_crc == delta.prev_crc);
       assert(extent->version == delta.pversion);
       extent->apply_delta_and_adjust_crc(record_base, delta.bl);
       extent->set_modify_time(modify_time);
index 003d6546bce25d20a18152b99f75a54721928723..a4e234a8a90d53869d1c897692169f4d645d7dfc 100644 (file)
@@ -245,6 +245,110 @@ CircularBoundedJournal::read_header()
   });
 }
 
+Journal::replay_ret CircularBoundedJournal::scan_valid_record_delta(
+   cbj_delta_handler_t &&delta_handler, journal_seq_t tail)
+{
+  LOG_PREFIX(CircularBoundedJournal::scan_valid_record_delta);
+  return seastar::do_with(
+    bool(false),
+    rbm_abs_addr(get_rbm_addr(tail)),
+    std::move(delta_handler),
+    segment_seq_t(NULL_SEG_SEQ),
+    [this, FNAME](auto &is_rolled, auto &cursor_addr, auto &d_handler, auto &expected_seq) {
+    return crimson::repeat(
+      [this, &is_rolled, &cursor_addr, &d_handler, &expected_seq, FNAME]() mutable
+      -> replay_ertr::future<seastar::stop_iteration> {
+      paddr_t record_paddr = convert_abs_addr_to_paddr(
+       cursor_addr,
+       get_device_id());
+      return read_record(record_paddr, expected_seq
+      ).safe_then([this, &is_rolled, &cursor_addr, &d_handler, &expected_seq, FNAME](auto ret)
+         -> replay_ertr::future<seastar::stop_iteration> {
+       if (!ret.has_value()) {
+         if (expected_seq == NULL_SEG_SEQ || is_rolled) {
+           DEBUG("no more records, stop replaying");
+           return replay_ertr::make_ready_future<
+             seastar::stop_iteration>(seastar::stop_iteration::yes);
+         } else {
+           cursor_addr = get_records_start();
+           ++expected_seq;
+           is_rolled = true;
+           return replay_ertr::make_ready_future<
+             seastar::stop_iteration>(seastar::stop_iteration::no);
+         }
+       }
+       auto [r_header, bl] = *ret;
+       bufferlist mdbuf;
+       mdbuf.substr_of(bl, 0, r_header.mdlength);
+       paddr_t record_block_base = paddr_t::make_blk_paddr(
+         get_device_id(), cursor_addr + r_header.mdlength);
+       auto maybe_record_deltas_list = try_decode_deltas(
+         r_header, mdbuf, record_block_base);
+       if (!maybe_record_deltas_list) {
+         // This should be impossible, we did check the crc on the mdbuf
+         ERROR("unable to decode deltas for record {} at {}",
+               r_header, record_block_base);
+         return crimson::ct_error::input_output_error::make();
+       }
+       DEBUG("{} at {}", r_header, cursor_addr);
+       auto write_result = write_result_t{
+         r_header.committed_to,
+         bl.length()
+       };
+       if (expected_seq == NULL_SEG_SEQ) {
+         expected_seq = r_header.committed_to.segment_seq;
+       } else {
+         assert(expected_seq == r_header.committed_to.segment_seq);
+       }
+       cursor_addr += bl.length();
+       if (cursor_addr >= get_journal_end()) {
+         assert(cursor_addr == get_journal_end());
+         cursor_addr = get_records_start();
+         ++expected_seq;
+         is_rolled = true;
+       }
+       paddr_t addr = convert_abs_addr_to_paddr(
+         cursor_addr,
+         get_device_id());
+       set_written_to(
+         journal_seq_t{expected_seq, addr});
+       return seastar::do_with(
+         std::move(*maybe_record_deltas_list),
+         [write_result,
+         &d_handler,
+         FNAME](auto& record_deltas_list) {
+         return crimson::do_for_each(
+           record_deltas_list,
+           [write_result,
+           &d_handler, FNAME](record_deltas_t& record_deltas) {
+           auto locator = record_locator_t{
+             record_deltas.record_block_base,
+             write_result
+           };
+           DEBUG("processing {} deltas at block_base {}",
+               record_deltas.deltas.size(),
+               locator);
+           return crimson::do_for_each(
+             record_deltas.deltas,
+             [locator,
+             &d_handler](auto& p) {
+             auto& modify_time = p.first;
+             auto& delta = p.second;
+             return d_handler(
+               locator,
+               delta,
+               modify_time).discard_result();
+           });
+         }).safe_then([]() {
+           return replay_ertr::make_ready_future<
+             seastar::stop_iteration>(seastar::stop_iteration::no);
+         });
+       });
+      });
+    });
+  });
+}
+
 Journal::replay_ret CircularBoundedJournal::replay(
     delta_handler_t &&delta_handler)
 {
@@ -257,116 +361,59 @@ Journal::replay_ret CircularBoundedJournal::replay(
     open_for_mount_ertr::pass_further{},
     crimson::ct_error::assert_all{
       "Invalid error read_header"
-  }).safe_then([this, FNAME, delta_handler=std::move(delta_handler)](auto p) mutable {
+  }).safe_then([this, FNAME, delta_handler=std::move(delta_handler)](auto p)
+    mutable {
     auto &[head, bl] = *p;
     header = head;
     DEBUG("header : {}", header);
     initialized = true;
-    written_to.segment_seq = NULL_SEG_SEQ;
-    auto tail = get_dirty_tail() <= get_alloc_tail() ?
-      get_dirty_tail() : get_alloc_tail();
-    set_written_to(tail);
     return seastar::do_with(
-      bool(false),
-      rbm_abs_addr(get_rbm_addr(tail)),
       std::move(delta_handler),
-      segment_seq_t(NULL_SEG_SEQ),
-      [this, FNAME](auto &is_rolled, auto &cursor_addr, auto &d_handler, auto &expected_seq) {
-      return crimson::repeat(
-       [this, &is_rolled, &cursor_addr, &d_handler, &expected_seq, FNAME]() mutable
-       -> replay_ertr::future<seastar::stop_iteration> {
-       paddr_t record_paddr = convert_abs_addr_to_paddr(
-         cursor_addr,
-         get_device_id());
-       return read_record(record_paddr, expected_seq
-       ).safe_then([this, &is_rolled, &cursor_addr, &d_handler, &expected_seq, FNAME](auto ret)
-           -> replay_ertr::future<seastar::stop_iteration> {
-         if (!ret.has_value()) {
-           if (expected_seq == NULL_SEG_SEQ || is_rolled) {
-             DEBUG("no more records, stop replaying");
-             return replay_ertr::make_ready_future<
-               seastar::stop_iteration>(seastar::stop_iteration::yes);
-           } else {
-             cursor_addr = get_records_start();
-             ++expected_seq;
-             is_rolled = true;
-             return replay_ertr::make_ready_future<
-               seastar::stop_iteration>(seastar::stop_iteration::no);
+      std::map<paddr_t, journal_seq_t>(),
+      [this](auto &d_handler, auto &map) {
+      auto build_paddr_seq_map = [&map](
+        const auto &offsets,
+        const auto &e,
+       sea_time_point modify_time)
+      {
+       if (e.type == extent_types_t::ALLOC_INFO) {
+         alloc_delta_t alloc_delta;
+         decode(alloc_delta, e.bl);
+         if (alloc_delta.op == alloc_delta_t::op_types_t::CLEAR) {
+           for (auto &alloc_blk : alloc_delta.alloc_blk_ranges) {
+             map[alloc_blk.paddr] = offsets.write_result.start_seq;
            }
          }
-         auto [r_header, bl] = *ret;
-         bufferlist mdbuf;
-         mdbuf.substr_of(bl, 0, r_header.mdlength);
-         paddr_t record_block_base = paddr_t::make_blk_paddr(
-           get_device_id(), cursor_addr + r_header.mdlength);
-         auto maybe_record_deltas_list = try_decode_deltas(
-           r_header, mdbuf, record_block_base);
-         if (!maybe_record_deltas_list) {
-           // This should be impossible, we did check the crc on the mdbuf
-           ERROR("unable to decode deltas for record {} at {}",
-                 r_header, record_block_base);
-           return crimson::ct_error::input_output_error::make();
+       }
+       return replay_ertr::make_ready_future<bool>(true);
+      };
+      written_to.segment_seq = NULL_SEG_SEQ;
+      auto tail = get_dirty_tail() <= get_alloc_tail() ?
+       get_dirty_tail() : get_alloc_tail();
+      set_written_to(tail);
+      // The first pass to build the paddr->journal_seq_t map 
+      // from extent allocations
+      return scan_valid_record_delta(std::move(build_paddr_seq_map), tail
+      ).safe_then([this, &map, &d_handler, tail]() {
+       auto call_d_handler_if_valid = [this, &map, &d_handler](
+         const auto &offsets,
+         const auto &e,
+         sea_time_point modify_time)
+       {
+         if (map.find(e.paddr) == map.end() ||
+             map[e.paddr] <= offsets.write_result.start_seq) {
+           return d_handler(
+             offsets,
+             e,
+             header.dirty_tail,
+             header.alloc_tail,
+             modify_time
+           );
          }
-         DEBUG("{} at {}", r_header, cursor_addr);
-         auto write_result = write_result_t{
-           r_header.committed_to,
-           bl.length()
-         };
-         if (expected_seq == NULL_SEG_SEQ) {
-           expected_seq = r_header.committed_to.segment_seq;
-         } else {
-           assert(expected_seq == r_header.committed_to.segment_seq);
-         }
-         cursor_addr += bl.length();
-         if (cursor_addr >= get_journal_end()) {
-           assert(cursor_addr == get_journal_end());
-           cursor_addr = get_records_start();
-           ++expected_seq;
-           is_rolled = true;
-         }
-         paddr_t addr = convert_abs_addr_to_paddr(
-           cursor_addr,
-           get_device_id());
-         set_written_to(
-           journal_seq_t{expected_seq, addr});
-         return seastar::do_with(
-           std::move(*maybe_record_deltas_list),
-           [this,
-           write_result,
-           &d_handler,
-           FNAME](auto& record_deltas_list) {
-           return crimson::do_for_each(
-             record_deltas_list,
-             [this,
-             write_result,
-             &d_handler, FNAME](record_deltas_t& record_deltas) {
-             auto locator = record_locator_t{
-               record_deltas.record_block_base,
-               write_result
-             };
-             DEBUG("processing {} deltas at block_base {}",
-                 record_deltas.deltas.size(),
-                 locator);
-             return crimson::do_for_each(
-               record_deltas.deltas,
-               [this,
-               locator,
-               &d_handler](auto& p) {
-               auto& modify_time = p.first;
-               auto& delta = p.second;
-               return d_handler(
-                 locator,
-                 delta,
-                 header.dirty_tail,
-                 header.alloc_tail,
-                 modify_time).discard_result();
-             });
-           }).safe_then([]() {
-             return replay_ertr::make_ready_future<
-               seastar::stop_iteration>(seastar::stop_iteration::no);
-           });
-         });
-       });
+         return replay_ertr::make_ready_future<bool>(true);
+       };
+       // The second pass to replay deltas
+       return scan_valid_record_delta(std::move(call_d_handler_if_valid), tail);
       });
     }).safe_then([this]() {
       trimmer.update_journal_tails(
index 09be167d97062d8f9a26773912c5b37b3f247dea..2977872a5e749777d872163a3b1385887778d9b0 100644 (file)
@@ -277,6 +277,16 @@ public:
   }
   seastar::future<> finish_commit(transaction_type_t type) final;
 
+  using cbj_delta_handler_t = std::function<
+  replay_ertr::future<bool>(
+    const record_locator_t&,
+    const delta_info_t&,
+    sea_time_point modify_time)>;
+
+  Journal::replay_ret scan_valid_record_delta(
+    cbj_delta_handler_t &&delta_handler,
+    journal_seq_t tail);
+
 private:
   cbj_header_t header;
   JournalTrimmer &trimmer;