]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/os/seastore/circular_bounded_journal: do not split records
authorYingxin Cheng <yingxin.cheng@intel.com>
Tue, 5 Jul 2022 08:31:34 +0000 (16:31 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Wed, 6 Jul 2022 03:06:30 +0000 (11:06 +0800)
* no split record due to relative paddr resolution
* fix md_bl.substr_of(bl, 0, header.mdlength)
* maintain written_to in range [get_start_addr(), get_journal_end())

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/os/seastore/journal/circular_bounded_journal.cc
src/crimson/os/seastore/journal/circular_bounded_journal.h
src/test/crimson/seastore/test_cbjournal.cc

index d038b215d8da96c85a2c772974df72a9674b4d08..03facde435f7c31ed21f1626085469242568e592 100644 (file)
@@ -45,7 +45,7 @@ CircularBoundedJournal::mkfs(const mkfs_config_t& config)
     head.device_id = config.device_id;
     encode(head, bl);
     header = head;
-    written_to = head.journal_tail;
+    set_written_to(head.journal_tail);
     DEBUG(
       "initialize header block in CircularBoundedJournal, length {}",
       bl.length());
@@ -160,50 +160,6 @@ CircularBoundedJournal::open_device_read_header()
   });
 }
 
-CircularBoundedJournal::write_ertr::future<> CircularBoundedJournal::append_record(
-  ceph::bufferlist bl,
-  rbm_abs_addr addr)
-{
-  LOG_PREFIX(CircularBoundedJournal::append_record);
-  std::vector<std::pair<rbm_abs_addr, bufferlist>> writes;
-  if (addr + bl.length() <= get_journal_end()) {
-    writes.push_back(std::make_pair(addr, bl));
-  } else {
-    // write remaining data---in this case,
-    // data is splited into two parts before due to the end of CircularBoundedJournal.
-    // the following code is to write the second part
-    bufferlist first_write, next_write;
-    first_write.substr_of(bl, 0, get_journal_end() - addr);
-    writes.push_back(std::make_pair(addr, first_write));
-    next_write.substr_of(
-      bl, first_write.length(), bl.length() - first_write.length());
-    writes.push_back(std::make_pair(get_start_addr(), next_write));
-  }
-
-  return seastar::do_with(
-    std::move(bl),
-    [this, writes=move(writes), FNAME](auto& bl) mutable
-  {
-    DEBUG("original bl length {}", bl.length());
-    return write_ertr::parallel_for_each(
-      writes,
-      [this, FNAME](auto& p) mutable
-    {
-      DEBUG(
-       "append_record: offset {}, length {}",
-       p.first,
-       p.second.length());
-      return device_write_bl(p.first, p.second 
-      ).handle_error(
-       write_ertr::pass_further{},
-       crimson::ct_error::assert_all{ "Invalid error device->write" }
-      ).safe_then([]() {
-       return write_ertr::now();
-      });
-    });
-  });
-}
-
 CircularBoundedJournal::submit_record_ret CircularBoundedJournal::submit_record(
   record_t &&record,
   OrderingHandle &handle)
@@ -212,16 +168,6 @@ CircularBoundedJournal::submit_record_ret CircularBoundedJournal::submit_record(
   assert(write_pipeline);
   auto r_size = record_group_size_t(record.size, get_block_size());
   auto encoded_size = r_size.get_encoded_length();
-  if (get_written_to() +
-      ceph::encoded_sizeof_bounded<record_group_header_t>() > get_journal_end()) {
-    // not enough space between written_to and the end of journal,
-    // so that set written_to to the beginning of cbjournal to append 
-    // the record at the start address of cbjournal
-    // |        cbjournal      |
-    //                   v            v
-    //      written_to <-> the end of journal
-    set_written_to(get_start_addr());
-  }
   if (encoded_size > get_available_size()) {
     ERROR(
       "CircularBoundedJournal::submit_record: record size {}, but available size {}",
@@ -230,6 +176,15 @@ CircularBoundedJournal::submit_record_ret CircularBoundedJournal::submit_record(
       );
     return crimson::ct_error::erange::make();
   }
+  if (encoded_size + get_written_to() > get_journal_end()) {
+    DEBUG("roll");
+    set_written_to(get_start_addr());
+    if (encoded_size > get_available_size()) {
+      ERROR("rolled, record size {}, but available size {}",
+            encoded_size, get_available_size());
+      return crimson::ct_error::erange::make();
+    }
+  }
 
   journal_seq_t j_seq {
     cur_segment_seq++,
@@ -240,11 +195,13 @@ CircularBoundedJournal::submit_record_ret CircularBoundedJournal::submit_record(
     std::move(record), device->get_block_size(),
     j_seq, 0);
   auto target = get_written_to();
-  if (get_written_to() + to_write.length() >= get_journal_end()) {
-    set_written_to(get_start_addr() +
-      (to_write.length() - (get_journal_end() - get_written_to())));
+  auto new_written_to = target + to_write.length();
+  if (new_written_to >= get_journal_end()) {
+    assert(new_written_to == get_journal_end());
+    DEBUG("roll");
+    set_written_to(get_start_addr());
   } else {
-    set_written_to(get_written_to() + to_write.length());
+    set_written_to(new_written_to);
   }
   DEBUG(
     "submit_record: mdlength {}, dlength {}, target {}",
@@ -256,16 +213,10 @@ CircularBoundedJournal::submit_record_ret CircularBoundedJournal::submit_record(
     j_seq,
     (seastore_off_t)to_write.length()
   };
-  auto write_fut = append_record(to_write, target);
+  auto write_fut = device_write_bl(target, to_write);
   return handle.enter(write_pipeline->device_submission
   ).then([write_fut = std::move(write_fut)]() mutable {
-    return std::move(write_fut
-    ).handle_error(
-      write_ertr::pass_further{},
-      crimson::ct_error::assert_all{
-        "Invalid error in CircularBoundedJournal::append_record"
-      }
-    );
+    return std::move(write_fut);
   }).safe_then([this, &handle] {
     return handle.enter(write_pipeline->finalize);
   }).safe_then([this, target,
@@ -356,32 +307,37 @@ Journal::replay_ret CircularBoundedJournal::replay(
    * read records from last applied record prior to written_to, and replay
    */
   LOG_PREFIX(CircularBoundedJournal::replay);
-  auto fut = open_device_read_header();
-  return fut.safe_then([this, FNAME, delta_handler=std::move(delta_handler)] (auto addr) {
+  return open_device_read_header(
+  ).safe_then([this, FNAME, delta_handler=std::move(delta_handler)] (auto) {
+    set_written_to(get_journal_tail());
     return seastar::do_with(
+      bool(false),
       rbm_abs_addr(get_journal_tail()),
       std::move(delta_handler),
       segment_seq_t(),
-      [this, FNAME](auto &cursor_addr, auto &d_handler, auto &last_seq) {
+      [this, FNAME](auto &is_rolled, auto &cursor_addr, auto &d_handler, auto &last_seq) {
       return crimson::repeat(
-       [this, &cursor_addr, &d_handler, &last_seq, FNAME]() mutable
+       [this, &is_rolled, &cursor_addr, &d_handler, &last_seq, FNAME]() mutable
        -> replay_ertr::future<seastar::stop_iteration> {
-       paddr_t cursor_paddr = convert_abs_addr_to_paddr(
+       paddr_t record_paddr = convert_abs_addr_to_paddr(
          cursor_addr,
          header.device_id);
-       return read_record(cursor_paddr
-       ).safe_then([this, &cursor_addr, &d_handler, &last_seq, FNAME](auto ret) {
+       return read_record(record_paddr, last_seq
+       ).safe_then([this, &is_rolled, &cursor_addr, &d_handler, &last_seq, FNAME](auto ret)
+           -> replay_ertr::future<seastar::stop_iteration> {
          if (!ret.has_value()) {
-           DEBUG("no more records");
-           return replay_ertr::make_ready_future<
-             seastar::stop_iteration>(seastar::stop_iteration::yes);
+           if (is_rolled) {
+             DEBUG("no more records, stop replaying");
+             return replay_ertr::make_ready_future<
+               seastar::stop_iteration>(seastar::stop_iteration::yes);
+           } else {
+             cursor_addr = get_start_addr();
+             is_rolled = true;
+             return replay_ertr::make_ready_future<
+               seastar::stop_iteration>(seastar::stop_iteration::no);
+           }
          }
          auto [r_header, bl] = *ret;
-         if (last_seq > r_header.committed_to.segment_seq) {
-           DEBUG("found invalide record. stop replaying");
-           return replay_ertr::make_ready_future<
-             seastar::stop_iteration>(seastar::stop_iteration::yes);
-         }
          bufferlist mdbuf;
          mdbuf.substr_of(bl, 0, r_header.mdlength);
          paddr_t record_block_base = paddr_t::make_blk_paddr(
@@ -389,27 +345,29 @@ Journal::replay_ret CircularBoundedJournal::replay(
          auto maybe_record_deltas_list = try_decode_deltas(
            r_header, mdbuf, record_block_base);
          if (!maybe_record_deltas_list) {
-           DEBUG("unable to decode deltas for record {} at {}",
-             r_header, record_block_base);
-           return replay_ertr::make_ready_future<
-             seastar::stop_iteration>(seastar::stop_iteration::yes);
+           // This should be impossible, we did check the crc on the mdbuf
+           ERROR("unable to decode deltas for record {} at {}",
+                 r_header, record_block_base);
+           return crimson::ct_error::input_output_error::make();
          }
-         DEBUG(" record_group_header_t: {}, cursor_addr: {} ",
-           r_header, cursor_addr);
+         DEBUG("{} at {}", r_header, cursor_addr);
          auto write_result = write_result_t{
            r_header.committed_to,
            (seastore_off_t)bl.length()
          };
          cur_segment_seq = r_header.committed_to.segment_seq + 1;
          cursor_addr += bl.length();
+         if (cursor_addr >= get_journal_end()) {
+           assert(cursor_addr == get_journal_end());
+           cursor_addr = get_start_addr();
+           is_rolled = true;
+         }
          set_written_to(cursor_addr);
          last_seq = r_header.committed_to.segment_seq;
          return seastar::do_with(
            std::move(*maybe_record_deltas_list),
            [write_result,
-           this,
            &d_handler,
-           &cursor_addr,
            FNAME](auto& record_deltas_list) {
            return crimson::do_for_each(
              record_deltas_list,
@@ -433,14 +391,7 @@ Journal::replay_ret CircularBoundedJournal::replay(
                  locator.write_result.start_seq,
                  modify_time);
              });
-           }).safe_then([this, &cursor_addr]() {
-             if (cursor_addr >= get_journal_end()) {
-               cursor_addr = (cursor_addr - get_journal_end()) + get_start_addr();
-             }
-             if (get_written_to() +
-                 ceph::encoded_sizeof_bounded<record_group_header_t>() > get_journal_end()) {
-               cursor_addr = get_start_addr();
-             }
+           }).safe_then([]() {
              return replay_ertr::make_ready_future<
                seastar::stop_iteration>(seastar::stop_iteration::no);
            });
@@ -455,8 +406,10 @@ CircularBoundedJournal::read_record_ret
 CircularBoundedJournal::return_record(record_group_header_t& header, bufferlist bl)
 {
   LOG_PREFIX(CircularBoundedJournal::return_record);
+  DEBUG("record size {}", bl.length());
+  assert(bl.length() == header.mdlength + header.dlength);
   bufferlist md_bl, data_bl;
-  md_bl.substr_of(bl, 0, get_block_size());
+  md_bl.substr_of(bl, 0, header.mdlength);
   data_bl.substr_of(bl, header.mdlength, header.dlength);
   if (validate_records_metadata(md_bl) &&
       validate_records_data(header, data_bl)) {
@@ -471,23 +424,18 @@ CircularBoundedJournal::return_record(record_group_header_t& header, bufferlist
   }
 }
 
-CircularBoundedJournal::read_record_ret CircularBoundedJournal::read_record(paddr_t off)
+CircularBoundedJournal::read_record_ret
+CircularBoundedJournal::read_record(paddr_t off, segment_seq_t last_seq)
 {
   LOG_PREFIX(CircularBoundedJournal::read_record);
-  rbm_abs_addr offset = convert_paddr_to_abs_addr(
-    off);
-  rbm_abs_addr addr = offset;
+  rbm_abs_addr addr = convert_paddr_to_abs_addr(off);
   auto read_length = get_block_size();
-  if (addr + get_block_size() > get_journal_end()) {
-    addr = get_start_addr();
-    read_length = get_journal_end() - offset;
-  }
+  assert(addr + read_length <= get_journal_end());
   DEBUG("read_record: reading record from abs addr {} read length {}",
       addr, read_length);
   auto bptr = bufferptr(ceph::buffer::create_page_aligned(read_length));
-  bptr.zero();
   return device->read(addr, bptr
-  ).safe_then([this, addr, read_length, bptr, FNAME]() mutable
+  ).safe_then([this, addr, bptr, last_seq, FNAME]() mutable
     -> read_record_ret {
     record_group_header_t h;
     bufferlist bl;
@@ -500,65 +448,30 @@ CircularBoundedJournal::read_record_ret CircularBoundedJournal::read_record(padd
        read_record_ertr::ready_future_marker{},
        std::nullopt);
     }
-    /*
-     * |          journal          |
-     *        | record 1 header |  | record 1 data
-     *  record 1 data  (remaining) |
-     *
-     *        <---- 1 block ----><--
-     * -- 2 block --->
-     *
-     *  If record has logner than read_length and its data is located across
-     *  the end of journal and the begining of journal, we need three reads
-     *  ---reads of header, other remaining data before the end, and
-     *  the other remaining data from the begining.
-     *
-     */
-    if (h.mdlength + h.dlength > read_length) {
-      rbm_abs_addr next_read_addr = addr + read_length;
-      auto next_read = h.mdlength + h.dlength - read_length;
-      DEBUG(" next_read_addr {}, next_read_length {} ",
-         next_read_addr, next_read);
-      if (get_journal_end() < next_read_addr + next_read) {
-       // In this case, need two more reads.
-       // The first is to read remain bytes to the end of cbjournal
-       // The second is to read the data at the begining of cbjournal
-       next_read = get_journal_end() - (addr + read_length);
-      }
-      DEBUG("read_entry: additional reading addr {} length {}",
-           next_read_addr,
-           next_read);
-      auto next_bptr = bufferptr(ceph::buffer::create_page_aligned(next_read));
-      next_bptr.zero();
-      return device->read(
-       next_read_addr,
-       next_bptr
-      ).safe_then([this, h=h, next_bptr=std::move(next_bptr), bl=std::move(bl),
-        FNAME]() mutable {
-       bl.append(next_bptr);
-       if (h.mdlength + h.dlength == bl.length()) {
-         DEBUG("read_record: record length {} done", bl.length());
-         return return_record(h, bl);
-       }
-       // need one more read
-       auto next_read_addr = get_start_addr();
-       auto last_bptr = bufferptr(ceph::buffer::create_page_aligned(
-             h.mdlength + h.dlength - bl.length()));
-       DEBUG("read_record: last additional reading addr {} length {}",
-             next_read_addr,
-             h.mdlength + h.dlength - bl.length());
-       return device->read(
-         next_read_addr,
-         last_bptr
-       ).safe_then([this, h=h, last_bptr=std::move(last_bptr),
-         bl=std::move(bl), FNAME]() mutable {
-         bl.append(last_bptr);
-         DEBUG("read_record: complte size {}", bl.length());
-         return return_record(h, bl);
-       });
+    if (h.mdlength < get_block_size() ||
+        h.mdlength % get_block_size() != 0 ||
+        h.dlength % get_block_size() != 0 ||
+        addr + h.mdlength + h.dlength > get_journal_end() ||
+        h.committed_to.segment_seq == NULL_SEG_SEQ ||
+        h.committed_to.segment_seq < last_seq) {
+      return read_record_ret(
+        read_record_ertr::ready_future_marker{},
+        std::nullopt);
+    }
+    auto record_size = h.mdlength + h.dlength;
+    if (record_size > get_block_size()) {
+      auto next_addr = addr + get_block_size();
+      auto next_length = record_size - get_block_size();
+      auto next_bptr = bufferptr(ceph::buffer::create_page_aligned(next_length));
+      DEBUG("reading record part 2 from abs addr {} read length {}",
+            next_addr, next_length);
+      return device->read(next_addr, next_bptr
+      ).safe_then([this, h, next_bptr=std::move(next_bptr), bl=std::move(bl)]() mutable {
+        bl.append(next_bptr);
+        return return_record(h, bl);
       });
     } else {
-      DEBUG("read_record: complte size {}", bl.length());
+      assert(record_size == get_block_size());
       return return_record(h, bl);
     }
   });
index 230d3fdd12295d3a8eec9471e5651a1daa9a1f3a..265fea984266cd6354af8c2ad587ef8631ed3849 100644 (file)
@@ -108,16 +108,6 @@ public:
   using write_ertr = crimson::errorator<
     crimson::ct_error::input_output_error,
     crimson::ct_error::erange>;
-  /*
-   * append_record
-   *
-   * append data to current write position of CircularBoundedJournal
-   *
-   * @param bufferlist to write
-   * @param rbm_abs_addr where data is written
-   *
-   */
-  write_ertr::future<> append_record(ceph::bufferlist bl, rbm_abs_addr addr);
   /*
    * device_write_bl
    *
@@ -146,9 +136,10 @@ public:
    * read record from given address
    *
    * @param paddr_t to read
+   * @param last_seq
    *
    */
-  read_record_ret read_record(paddr_t offset);
+  read_record_ret read_record(paddr_t offset, segment_seq_t last_seq);
   /*
    * read_header
    *
@@ -240,7 +231,7 @@ public:
   size_t get_used_size() const {
     return get_written_to() >= get_journal_tail() ?
       get_written_to() - get_journal_tail() :
-      get_written_to() + get_total_size() - get_journal_tail();
+      get_written_to() + header.size + get_block_size() - get_journal_tail();
   }
   size_t get_total_size() const {
     return header.size;
@@ -272,6 +263,8 @@ public:
     return written_to;
   }
   void set_written_to(rbm_abs_addr addr) {
+    assert(addr >= get_start_addr());
+    assert(addr < get_journal_end());
     written_to = addr;
   }
   device_id_t get_device_id() const {
@@ -281,7 +274,7 @@ public:
     return header.block_size;
   }
   rbm_abs_addr get_journal_end() const {
-    return get_start_addr() + header.size; // journal size + header length
+    return get_start_addr() + header.size + get_block_size(); // journal size + header length
   }
   void add_device(NVMeBlockDevice* dev) {
     device = dev;
@@ -300,6 +293,7 @@ private:
   bool initialized = false;
   segment_seq_t cur_segment_seq = 0; // segment seq to track the sequence to written records
   // start address where the newest record will be written
+  // should be in range [get_start_addr(), get_journal_end())
   rbm_abs_addr written_to = 0;
 };
 
index a389301b281feb398f8ee285a823370931cfcb1e..1161f7dfc32fe401ae402b48cbc961c4a3eb9d23 100644 (file)
@@ -101,7 +101,7 @@ struct entry_validator_t {
       paddr_t paddr = convert_abs_addr_to_paddr(
        addr + offset,
        cbj.get_device_id());
-      auto [header, buf] = *(cbj.read_record(paddr).unsafe_get0());
+      auto [header, buf] = *(cbj.read_record(paddr, NULL_SEG_SEQ).unsafe_get0());
       auto record = decode_record(buf);
       validate(*record);
       offset += header.mdlength + header.dlength;
@@ -138,7 +138,7 @@ struct cbjournal_test_t : public seastar_test_suite_t
       epm(new ExtentPlacementManager(true)),
       cache(*epm)
   {
-    device = new nvme_device::TestMemory(CBTEST_DEFAULT_TEST_SIZE);
+    device = new nvme_device::TestMemory(CBTEST_DEFAULT_TEST_SIZE + CBTEST_DEFAULT_BLOCK_SIZE);
     cbj.reset(new CircularBoundedJournal(device, std::string()));
     device_id_t d_id = 1 << (std::numeric_limits<device_id_t>::digits - 1);
     config.block_size = CBTEST_DEFAULT_BLOCK_SIZE;
@@ -232,8 +232,9 @@ struct cbjournal_test_t : public seastar_test_suite_t
   auto mkfs() {
     return cbj->mkfs(config).unsafe_get0();
   }
-  auto open() {
-    return cbj->open_device_read_header().unsafe_get0();
+  void open() {
+    cbj->open_device_read_header().unsafe_get0();
+    cbj->open_for_write().unsafe_get0();
   }
   auto get_available_size() {
     return cbj->get_available_size();