]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/os/seastore/circular_bounded_journal: increment seq when roll
authorYingxin Cheng <yingxin.cheng@intel.com>
Tue, 5 Jul 2022 14:32:23 +0000 (22:32 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Wed, 6 Jul 2022 03:06:39 +0000 (11:06 +0800)
Similar to segmented_journal, only increment seq when roll
circular_bounded_journal.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/os/seastore/journal/circular_bounded_journal.cc
src/crimson/os/seastore/journal/circular_bounded_journal.h

index 03facde435f7c31ed21f1626085469242568e592..38134462fd03d38f308995d0e28c3ba14101a1a2 100644 (file)
@@ -104,10 +104,13 @@ CircularBoundedJournal::open_for_write_ret CircularBoundedJournal::open_for_writ
   paddr_t paddr = convert_abs_addr_to_paddr(
     get_written_to(),
     header.device_id);
+  if (circulation_seq == NULL_SEG_SEQ) {
+    circulation_seq = 0;
+  }
   return open_for_write_ret(
     open_for_write_ertr::ready_future_marker{},
     journal_seq_t{
-      cur_segment_seq,
+      circulation_seq,
       paddr
   });
 }
@@ -149,7 +152,7 @@ CircularBoundedJournal::open_device_read_header()
       return open_for_write_ret(
        open_for_write_ertr::ready_future_marker{},
        journal_seq_t{
-         cur_segment_seq,
+         circulation_seq,
          paddr
        });
     });
@@ -166,6 +169,7 @@ CircularBoundedJournal::submit_record_ret CircularBoundedJournal::submit_record(
 {
   LOG_PREFIX(CircularBoundedJournal::submit_record);
   assert(write_pipeline);
+  assert(circulation_seq != NULL_SEG_SEQ);
   auto r_size = record_group_size_t(record.size, get_block_size());
   auto encoded_size = r_size.get_encoded_length();
   if (encoded_size > get_available_size()) {
@@ -179,6 +183,7 @@ CircularBoundedJournal::submit_record_ret CircularBoundedJournal::submit_record(
   if (encoded_size + get_written_to() > get_journal_end()) {
     DEBUG("roll");
     set_written_to(get_start_addr());
+    ++circulation_seq;
     if (encoded_size > get_available_size()) {
       ERROR("rolled, record size {}, but available size {}",
             encoded_size, get_available_size());
@@ -187,7 +192,7 @@ CircularBoundedJournal::submit_record_ret CircularBoundedJournal::submit_record(
   }
 
   journal_seq_t j_seq {
-    cur_segment_seq++,
+    circulation_seq,
     convert_abs_addr_to_paddr(
       get_written_to(),
       header.device_id)};
@@ -200,6 +205,7 @@ CircularBoundedJournal::submit_record_ret CircularBoundedJournal::submit_record(
     assert(new_written_to == get_journal_end());
     DEBUG("roll");
     set_written_to(get_start_addr());
+    ++circulation_seq;
   } else {
     set_written_to(new_written_to);
   }
@@ -309,29 +315,31 @@ Journal::replay_ret CircularBoundedJournal::replay(
   LOG_PREFIX(CircularBoundedJournal::replay);
   return open_device_read_header(
   ).safe_then([this, FNAME, delta_handler=std::move(delta_handler)] (auto) {
+    circulation_seq = NULL_SEG_SEQ;
     set_written_to(get_journal_tail());
     return seastar::do_with(
       bool(false),
       rbm_abs_addr(get_journal_tail()),
       std::move(delta_handler),
-      segment_seq_t(),
-      [this, FNAME](auto &is_rolled, auto &cursor_addr, auto &d_handler, auto &last_seq) {
+      segment_seq_t(NULL_SEG_SEQ),
+      [this, FNAME](auto &is_rolled, auto &cursor_addr, auto &d_handler, auto &expected_seq) {
       return crimson::repeat(
-       [this, &is_rolled, &cursor_addr, &d_handler, &last_seq, FNAME]() mutable
+       [this, &is_rolled, &cursor_addr, &d_handler, &expected_seq, FNAME]() mutable
        -> replay_ertr::future<seastar::stop_iteration> {
        paddr_t record_paddr = convert_abs_addr_to_paddr(
          cursor_addr,
          header.device_id);
-       return read_record(record_paddr, last_seq
-       ).safe_then([this, &is_rolled, &cursor_addr, &d_handler, &last_seq, FNAME](auto ret)
+       return read_record(record_paddr, expected_seq
+       ).safe_then([this, &is_rolled, &cursor_addr, &d_handler, &expected_seq, FNAME](auto ret)
            -> replay_ertr::future<seastar::stop_iteration> {
          if (!ret.has_value()) {
-           if (is_rolled) {
+           if (expected_seq == NULL_SEG_SEQ || is_rolled) {
              DEBUG("no more records, stop replaying");
              return replay_ertr::make_ready_future<
                seastar::stop_iteration>(seastar::stop_iteration::yes);
            } else {
              cursor_addr = get_start_addr();
+             ++expected_seq;
              is_rolled = true;
              return replay_ertr::make_ready_future<
                seastar::stop_iteration>(seastar::stop_iteration::no);
@@ -355,15 +363,20 @@ Journal::replay_ret CircularBoundedJournal::replay(
            r_header.committed_to,
            (seastore_off_t)bl.length()
          };
-         cur_segment_seq = r_header.committed_to.segment_seq + 1;
+         if (expected_seq == NULL_SEG_SEQ) {
+           expected_seq = r_header.committed_to.segment_seq;
+         } else {
+           assert(expected_seq == r_header.committed_to.segment_seq);
+         }
          cursor_addr += bl.length();
          if (cursor_addr >= get_journal_end()) {
            assert(cursor_addr == get_journal_end());
            cursor_addr = get_start_addr();
+           ++expected_seq;
            is_rolled = true;
          }
          set_written_to(cursor_addr);
-         last_seq = r_header.committed_to.segment_seq;
+         circulation_seq = expected_seq;
          return seastar::do_with(
            std::move(*maybe_record_deltas_list),
            [write_result,
@@ -425,7 +438,7 @@ CircularBoundedJournal::return_record(record_group_header_t& header, bufferlist
 }
 
 CircularBoundedJournal::read_record_ret
-CircularBoundedJournal::read_record(paddr_t off, segment_seq_t last_seq)
+CircularBoundedJournal::read_record(paddr_t off, segment_seq_t expected_seq)
 {
   LOG_PREFIX(CircularBoundedJournal::read_record);
   rbm_abs_addr addr = convert_paddr_to_abs_addr(off);
@@ -435,7 +448,7 @@ CircularBoundedJournal::read_record(paddr_t off, segment_seq_t last_seq)
       addr, read_length);
   auto bptr = bufferptr(ceph::buffer::create_page_aligned(read_length));
   return device->read(addr, bptr
-  ).safe_then([this, addr, bptr, last_seq, FNAME]() mutable
+  ).safe_then([this, addr, bptr, expected_seq, FNAME]() mutable
     -> read_record_ret {
     record_group_header_t h;
     bufferlist bl;
@@ -453,7 +466,8 @@ CircularBoundedJournal::read_record(paddr_t off, segment_seq_t last_seq)
         h.dlength % get_block_size() != 0 ||
         addr + h.mdlength + h.dlength > get_journal_end() ||
         h.committed_to.segment_seq == NULL_SEG_SEQ ||
-        h.committed_to.segment_seq < last_seq) {
+        (expected_seq != NULL_SEG_SEQ &&
+         h.committed_to.segment_seq != expected_seq)) {
       return read_record_ret(
         read_record_ertr::ready_future_marker{},
         std::nullopt);
index 265fea984266cd6354af8c2ad587ef8631ed3849..4d869f017223abe377c5cae16ceb789b49abc683 100644 (file)
@@ -136,10 +136,10 @@ public:
    * read record from given address
    *
    * @param paddr_t to read
-   * @param last_seq
+   * @param expected_seq
    *
    */
-  read_record_ret read_record(paddr_t offset, segment_seq_t last_seq);
+  read_record_ret read_record(paddr_t offset, segment_seq_t expected_seq);
   /*
    * read_header
    *
@@ -291,7 +291,10 @@ private:
    * Indicates that device is open and in-memory header is valid.
    */
   bool initialized = false;
-  segment_seq_t cur_segment_seq = 0; // segment seq to track the sequence to written records
+
+  // circulation seq to track the sequence to written records
+  segment_seq_t circulation_seq = NULL_SEG_SEQ;
+
   // start address where the newest record will be written
   // should be in range [get_start_addr(), get_journal_end())
   rbm_abs_addr written_to = 0;