]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/os/seastore/journal: add CircularJournalSpace
authormyoungwon oh <ohmyoungwon@gmail.com>
Thu, 6 Apr 2023 01:56:10 +0000 (01:56 +0000)
committerMatan Breizman <mbreizma@redhat.com>
Sun, 21 May 2023 09:31:47 +0000 (09:31 +0000)
Signed-off-by: Myoungwon Oh <myoungwon.oh@samsung.com>
(cherry picked from commit 59cb8c1050b08bd3fa002c0c401605b977631be9)

src/crimson/os/seastore/CMakeLists.txt
src/crimson/os/seastore/journal/circular_bounded_journal.cc
src/crimson/os/seastore/journal/circular_bounded_journal.h
src/crimson/os/seastore/journal/circular_journal_space.cc [new file with mode: 0644]
src/crimson/os/seastore/journal/circular_journal_space.h [new file with mode: 0644]

index d26ae813afe68b117b2221e18dcf5aefd1df3381..6dd19a1563a525027f24af817ea4aa889abcc161 100644 (file)
@@ -44,6 +44,7 @@ set(crimson_seastore_srcs
   journal/segmented_journal.cc
   journal/segment_allocator.cc
   journal/record_submitter.cc
+  journal/circular_journal_space.cc
   journal.cc
   device.cc
   segment_manager_group.cc
index 1f9762bdbb214d4e5e4edb6f9a2cebab49f59d1c..4a493b22cebe7975cc2789761670a06eff247487 100644 (file)
 #include "crimson/os/seastore/async_cleaner.h"
 #include "crimson/os/seastore/journal/circular_bounded_journal.h"
 #include "crimson/os/seastore/logging.h"
+#include "crimson/os/seastore/journal/circular_journal_space.h"
 
 SET_SUBSYS(seastore_journal);
 
 namespace crimson::os::seastore::journal {
 
-std::ostream &operator<<(std::ostream &out,
-    const CircularBoundedJournal::cbj_header_t &header)
-{
-  return out << "cbj_header_t(" 
-            << ", dirty_tail=" << header.dirty_tail
-            << ", alloc_tail=" << header.alloc_tail
-             << ")";
-}
-
 CircularBoundedJournal::CircularBoundedJournal(
     JournalTrimmer &trimmer,
     RBMDevice* device,
     const std::string &path)
-  : trimmer(trimmer), device(device), path(path) {}
-
-ceph::bufferlist CircularBoundedJournal::encode_header()
-{
-  bufferlist bl;
-  encode(header, bl);
-  auto header_crc_filler = bl.append_hole(sizeof(checksum_t));
-  auto bliter = bl.cbegin();
-  auto header_crc = bliter.crc32c(
-    ceph::encoded_sizeof_bounded<cbj_header_t>(),
-    -1);
-  ceph_le32 header_crc_le;
-  header_crc_le = header_crc;
-  header_crc_filler.copy_in(
-    sizeof(checksum_t),
-    reinterpret_cast<const char *>(&header_crc_le));
-  return bl;
-}
+  : trimmer(trimmer), path(path),
+  cjs(device),
+  record_submitter(crimson::common::get_conf<uint64_t>(
+      "seastore_journal_iodepth_limit"),
+    crimson::common::get_conf<uint64_t>(
+      "seastore_journal_batch_capacity"),
+    crimson::common::get_conf<Option::size_t>(
+      "seastore_journal_batch_flush_size"),
+    crimson::common::get_conf<double>(
+      "seastore_journal_batch_preferred_fullness"),
+    cjs)
+  {}
 
 CircularBoundedJournal::open_for_mkfs_ret
 CircularBoundedJournal::open_for_mkfs()
 {
-  LOG_PREFIX(CircularBoundedJournal::open_for_mkfs);
-  assert(device);
-  ceph::bufferlist bl;
-  CircularBoundedJournal::cbj_header_t head;
-  assert(device->get_journal_size());
-  head.dirty_tail =
-    journal_seq_t{0,
-      convert_abs_addr_to_paddr(
-       get_records_start(),
-       device->get_device_id())};
-  head.alloc_tail = head.dirty_tail;
-  encode(head, bl);
-  header = head;
-  set_written_to(head.dirty_tail);
-  initialized = true;
-  DEBUG(
-    "initialize header block in CircularBoundedJournal, length {}",
-    bl.length());
-  return write_header(
-  ).safe_then([this]() {
-    return open_for_mount();
-  }).handle_error(
-    open_for_mkfs_ertr::pass_further{},
-    crimson::ct_error::assert_all{
-      "Invalid error write_header"
-    }
-  );
+  return record_submitter.open(true
+  ).safe_then([this](auto ret) {
+    record_submitter.update_committed_to(get_written_to());
+    return open_for_mkfs_ret(
+      open_for_mkfs_ertr::ready_future_marker{},
+      get_written_to());
+  });
 }
 
 CircularBoundedJournal::open_for_mount_ret
 CircularBoundedJournal::open_for_mount()
 {
-  ceph_assert(initialized);
-  if (written_to.segment_seq == NULL_SEG_SEQ) {
-    written_to.segment_seq = 0;
-  }
-  return open_for_mount_ret(
-    open_for_mount_ertr::ready_future_marker{},
-    get_written_to());
+  return record_submitter.open(false
+  ).safe_then([this](auto ret) {
+    record_submitter.update_committed_to(get_written_to());
+    return open_for_mount_ret(
+      open_for_mount_ertr::ready_future_marker{},
+      get_written_to());
+  });
 }
 
 CircularBoundedJournal::close_ertr::future<> CircularBoundedJournal::close()
 {
-  return write_header(
-  ).safe_then([this]() -> close_ertr::future<> {
-    initialized = false;
-    return close_ertr::now();
-  }).handle_error(
-    open_for_mount_ertr::pass_further{},
-    crimson::ct_error::assert_all{
-      "Invalid error write_header"
-    }
-  );
+  return record_submitter.close();
 }
 
-CircularBoundedJournal::submit_record_ret CircularBoundedJournal::submit_record(
-  record_t &&record,
-  OrderingHandle &handle)
+CircularBoundedJournal::submit_record_ret
+CircularBoundedJournal::submit_record(
+    record_t &&record,
+    OrderingHandle &handle)
 {
   LOG_PREFIX(CircularBoundedJournal::submit_record);
+  DEBUG("H{} {} start ...", (void*)&handle, record);
   assert(write_pipeline);
-  assert(written_to.segment_seq != NULL_SEG_SEQ);
-  auto r_size = record_group_size_t(record.size, get_block_size());
-  auto encoded_size = r_size.get_encoded_length();
-  if (encoded_size > get_records_available_size()) {
-    ERROR("record size {}, but available size {}",
-          encoded_size, get_records_available_size());
-    return crimson::ct_error::erange::make();
-  }
-  if (encoded_size + get_rbm_addr(get_written_to()) > get_journal_end()) {
-    DEBUG("roll");
-    paddr_t paddr = convert_abs_addr_to_paddr(
-      get_records_start(),
-      get_device_id());
-    set_written_to(
-      journal_seq_t{++written_to.segment_seq, paddr});
-    if (encoded_size > get_records_available_size()) {
-      ERROR("rolled, record size {}, but available size {}",
-            encoded_size, get_records_available_size());
-      return crimson::ct_error::erange::make();
-    }
-  }
-
-  journal_seq_t j_seq = get_written_to();
-  ceph::bufferlist to_write = encode_record(
-    std::move(record), device->get_block_size(),
-    j_seq, 0);
-  assert(to_write.length() == encoded_size);
-  auto target = get_rbm_addr(get_written_to());
-  auto new_written_to = target + encoded_size;
-  if (new_written_to >= get_journal_end()) {
-    assert(new_written_to == get_journal_end());
-    DEBUG("roll");
-    paddr_t paddr = convert_abs_addr_to_paddr(
-      get_records_start(),
-      get_device_id());
-    set_written_to(
-      journal_seq_t{++written_to.segment_seq, paddr});
-  } else {
-    paddr_t paddr = convert_abs_addr_to_paddr(
-      new_written_to,
-      get_device_id());
-    set_written_to(
-      journal_seq_t{written_to.segment_seq, paddr});
-  }
-  DEBUG("{}, target {}", r_size, target);
-
-  auto write_result = write_result_t{
-    j_seq,
-    encoded_size
-  };
-  auto write_fut = device_write_bl(target, to_write);
-  return handle.enter(write_pipeline->device_submission
-  ).then([write_fut = std::move(write_fut)]() mutable {
-    return std::move(write_fut);
-  }).safe_then([this, &handle] {
-    return handle.enter(write_pipeline->finalize);
-  }).safe_then([this, target,
-    length=encoded_size,
-    write_result,
-    r_size,
-    FNAME] {
-    DEBUG("commit target {} used_size {} written length {}",
-          target, get_records_used_size(), length);
-
-    paddr_t paddr = convert_abs_addr_to_paddr(
-      target + r_size.get_mdlength(),
-      get_device_id());
-    auto submit_result = record_locator_t{
-      paddr,
-      write_result
-    };
-    trimmer.set_journal_head(write_result.start_seq);
-    return submit_result;
-  });
+  return do_submit_record(std::move(record), handle);
 }
 
-CircularBoundedJournal::write_ertr::future<> CircularBoundedJournal::device_write_bl(
-    rbm_abs_addr offset, bufferlist &bl)
+CircularBoundedJournal::submit_record_ret
+CircularBoundedJournal::do_submit_record(
+  record_t &&record,
+  OrderingHandle &handle)
 {
-  LOG_PREFIX(CircularBoundedJournal::device_write_bl);
-  auto length = bl.length();
-  if (offset + length > get_journal_end()) {
-    return crimson::ct_error::erange::make();
+  LOG_PREFIX(CircularBoundedJournal::do_submit_record);
+  if (!record_submitter.is_available()) {
+    DEBUG("H{} wait ...", (void*)&handle);
+    return record_submitter.wait_available(
+    ).safe_then([this, record=std::move(record), &handle]() mutable {
+      return do_submit_record(std::move(record), handle);
+    });
+  }
+  auto action = record_submitter.check_action(record.size);
+  if (action == RecordSubmitter::action_t::ROLL) {
+    return record_submitter.roll_segment(
+    ).safe_then([this, record=std::move(record), &handle]() mutable {
+      return do_submit_record(std::move(record), handle);
+    });
   }
-  DEBUG(
-    "overwrite in CircularBoundedJournal, offset {}, length {}",
-    offset,
-    length);
-  return device->writev(offset, bl
-  ).handle_error(
-    write_ertr::pass_further{},
-    crimson::ct_error::assert_all{ "Invalid error device->write" }
-  );
-}
 
-CircularBoundedJournal::read_header_ret
-CircularBoundedJournal::read_header()
-{
-  LOG_PREFIX(CircularBoundedJournal::read_header);
-  assert(device);
-  auto bptr = bufferptr(ceph::buffer::create_page_aligned(
-                       device->get_block_size()));
-  DEBUG("reading {}", device->get_journal_start());
-  return device->read(device->get_journal_start(), bptr
-  ).safe_then([bptr, FNAME]() mutable
-    -> read_header_ret {
-    bufferlist bl;
-    bl.append(bptr);
-    auto bp = bl.cbegin();
-    cbj_header_t cbj_header;
-    try {
-      decode(cbj_header, bp);
-    } catch (ceph::buffer::error &e) {
-      ERROR("unable to read header block");
-      return crimson::ct_error::enoent::make();
-    }
-    auto bliter = bl.cbegin();
-    auto test_crc = bliter.crc32c(
-      ceph::encoded_sizeof_bounded<cbj_header_t>(),
-      -1);
-    ceph_le32 recorded_crc_le;
-    decode(recorded_crc_le, bliter);
-    uint32_t recorded_crc = recorded_crc_le;
-    if (test_crc != recorded_crc) {
-      ERROR("error, header crc mismatch.");
-      return read_header_ret(
-       read_header_ertr::ready_future_marker{},
-       std::nullopt);
-    }
-    return read_header_ret(
-      read_header_ertr::ready_future_marker{},
-      std::make_pair(cbj_header, bl)
-    );
+  DEBUG("H{} submit {} ...",
+       (void*)&handle,
+       action == RecordSubmitter::action_t::SUBMIT_FULL ?
+       "FULL" : "NOT_FULL");
+  auto submit_fut = record_submitter.submit(std::move(record));
+  return handle.enter(write_pipeline->device_submission
+  ).then([submit_fut=std::move(submit_fut)]() mutable {
+    return std::move(submit_fut);
+  }).safe_then([FNAME, this, &handle](record_locator_t result) {
+    return handle.enter(write_pipeline->finalize
+    ).then([FNAME, this, result, &handle] {
+      DEBUG("H{} finish with {}", (void*)&handle, result);
+      auto new_committed_to = result.write_result.get_end_seq();
+      record_submitter.update_committed_to(new_committed_to);
+      return result;
+    });
   });
 }
 
@@ -356,7 +222,7 @@ Journal::replay_ret CircularBoundedJournal::replay(
    * read records from last applied record prior to written_to, and replay
    */
   LOG_PREFIX(CircularBoundedJournal::replay);
-  return read_header(
+  return cjs.read_header(
   ).handle_error(
     open_for_mount_ertr::pass_further{},
     crimson::ct_error::assert_all{
@@ -364,9 +230,9 @@ Journal::replay_ret CircularBoundedJournal::replay(
   }).safe_then([this, FNAME, delta_handler=std::move(delta_handler)](auto p)
     mutable {
     auto &[head, bl] = *p;
-    header = head;
-    DEBUG("header : {}", header);
-    initialized = true;
+    cjs.set_cbj_header(head);
+    DEBUG("header : {}", cjs.get_cbj_header());
+    cjs.set_initialized(true);
     return seastar::do_with(
       std::move(delta_handler),
       std::map<paddr_t, journal_seq_t>(),
@@ -387,7 +253,6 @@ Journal::replay_ret CircularBoundedJournal::replay(
        }
        return replay_ertr::make_ready_future<bool>(true);
       };
-      written_to.segment_seq = NULL_SEG_SEQ;
       auto tail = get_dirty_tail() <= get_alloc_tail() ?
        get_dirty_tail() : get_alloc_tail();
       set_written_to(tail);
@@ -405,8 +270,8 @@ Journal::replay_ret CircularBoundedJournal::replay(
            return d_handler(
              offsets,
              e,
-             header.dirty_tail,
-             header.alloc_tail,
+             get_dirty_tail(),
+             get_alloc_tail(),
              modify_time
            );
          }
@@ -416,9 +281,10 @@ Journal::replay_ret CircularBoundedJournal::replay(
        return scan_valid_record_delta(std::move(call_d_handler_if_valid), tail);
       });
     }).safe_then([this]() {
+      record_submitter.update_committed_to(get_written_to());
       trimmer.update_journal_tails(
-       header.dirty_tail,
-       header.alloc_tail);
+       get_dirty_tail(),
+       get_alloc_tail());
     });
   });
 }
@@ -454,7 +320,7 @@ CircularBoundedJournal::read_record(paddr_t off, segment_seq_t expected_seq)
   assert(addr + read_length <= get_journal_end());
   DEBUG("reading record from abs addr {} read length {}", addr, read_length);
   auto bptr = bufferptr(ceph::buffer::create_page_aligned(read_length));
-  return device->read(addr, bptr
+  return cjs.read(addr, bptr
   ).safe_then([this, addr, bptr, expected_seq, FNAME]() mutable
     -> read_record_ret {
     record_group_header_t h;
@@ -486,7 +352,7 @@ CircularBoundedJournal::read_record(paddr_t off, segment_seq_t expected_seq)
       auto next_bptr = bufferptr(ceph::buffer::create_page_aligned(next_length));
       DEBUG("reading record part 2 from abs addr {} read length {}",
             next_addr, next_length);
-      return device->read(next_addr, next_bptr
+      return cjs.read(next_addr, next_bptr
       ).safe_then([this, h, next_bptr=std::move(next_bptr), bl=std::move(bl)]() mutable {
         bl.append(next_bptr);
         return return_record(h, bl);
@@ -498,26 +364,6 @@ CircularBoundedJournal::read_record(paddr_t off, segment_seq_t expected_seq)
   });
 }
 
-CircularBoundedJournal::write_ertr::future<>
-CircularBoundedJournal::write_header()
-{
-  LOG_PREFIX(CircularBoundedJournal::write_header);
-  ceph::bufferlist bl = encode_header();
-  ceph_assert(bl.length() <= get_block_size());
-  DEBUG(
-    "sync header of CircularBoundedJournal, length {}",
-    bl.length());
-  assert(device);
-  auto iter = bl.begin();
-  assert(bl.length() < get_block_size());
-  bufferptr bp = bufferptr(ceph::buffer::create_page_aligned(get_block_size()));
-  iter.copy(bl.length(), bp.c_str());
-  return device->write(device->get_journal_start(), std::move(bp)
-  ).handle_error(
-    write_ertr::pass_further{},
-    crimson::ct_error::assert_all{ "Invalid error device->write" }
-  );
-}
 seastar::future<> CircularBoundedJournal::finish_commit(transaction_type_t type) {
   if (is_trim_transaction(type)) {
     return update_journal_tail(
@@ -527,5 +373,4 @@ seastar::future<> CircularBoundedJournal::finish_commit(transaction_type_t type)
   return seastar::now();
 }
 
-
 }
index b7117df5043694040345ac964c53f50e43a62949..58bfce4873e5d9f3f0e0643e592bd89ef1894a58 100644 (file)
@@ -19,6 +19,8 @@
 #include "crimson/os/seastore/random_block_manager.h"
 #include "crimson/os/seastore/random_block_manager/rbm_device.h"
 #include <list>
+#include "crimson/os/seastore/journal/record_submitter.h"
+#include "crimson/os/seastore/journal/circular_journal_space.h"
 
 namespace crimson::os::seastore::journal {
 
@@ -87,16 +89,32 @@ public:
 
   replay_ret replay(delta_handler_t &&delta_handler) final;
 
-  struct cbj_header_t;
-  using write_ertr = submit_record_ertr;
-  /*
-   * device_write_bl
+  rbm_abs_addr get_rbm_addr(journal_seq_t seq) const {
+    return convert_paddr_to_abs_addr(seq.offset);
+  }
+
+  /**
    *
-   * @param device address to write
-   * @param bufferlist to write
+   * CircularBoundedJournal write
+   *
+   * NVMe will support a large block write (< 512KB) with atomic write unit command.
+   * With this command, we expect that the most of incoming data can be stored
+   * as a single write call, which has lower overhead than existing
+   * way that uses a combination of system calls such as write() and sync().
    *
    */
-  write_ertr::future<> device_write_bl(rbm_abs_addr offset, ceph::bufferlist &bl);
+
+  seastar::future<> update_journal_tail(
+    journal_seq_t dirty,
+    journal_seq_t alloc) {
+    return cjs.update_journal_tail(dirty, alloc);
+  }
+  journal_seq_t get_dirty_tail() const {
+    return cjs.get_dirty_tail();
+  }
+  journal_seq_t get_alloc_tail() const {
+    return cjs.get_alloc_tail();
+  }
 
   using read_ertr = crimson::errorator<
     crimson::ct_error::input_output_error,
@@ -107,10 +125,6 @@ public:
   using read_record_ret = read_record_ertr::future<
        std::optional<std::pair<record_group_header_t, bufferlist>>
        >;
-  using read_header_ertr = read_ertr;
-  using read_header_ret = read_header_ertr::future<
-       std::optional<std::pair<cbj_header_t, bufferlist>>
-       >;
   /*
    * read_record
    *
@@ -121,84 +135,6 @@ public:
    *
    */
   read_record_ret read_record(paddr_t offset, segment_seq_t expected_seq);
-  /*
-   * read_header
-   *
-   * read header block from given absolute address
-   *
-   * @param absolute address
-   *
-   */
-  read_header_ret read_header();
-
-  ceph::bufferlist encode_header();
-
-
-  /**
-   * CircularBoundedJournal structure
-   *
-   * +-------------------------------------------------------+
-   * |   header    | record | record | record | record | ... |
-   * +-------------------------------------------------------+
-   *               ^-----------block aligned-----------------^
-   * <----fixed---->
-   */
-
-
-  /**
-   *
-   * CircularBoundedJournal write
-   *
-   * NVMe will support a large block write (< 512KB) with atomic write unit command.
-   * With this command, we expect that the most of incoming data can be stored
-   * as a single write call, which has lower overhead than existing
-   * way that uses a combination of system calls such as write() and sync().
-   *
-   */
-
-  struct cbj_header_t {
-    // start offset of CircularBoundedJournal in the device
-    journal_seq_t dirty_tail;
-    journal_seq_t alloc_tail;
-
-    DENC(cbj_header_t, v, p) {
-      DENC_START(1, 1, p);
-      denc(v.dirty_tail, p);
-      denc(v.alloc_tail, p);
-      DENC_FINISH(p);
-    }
-  };
-
-  /**
-   *
-   * Write position for CircularBoundedJournal
-   *
-   * | written to rbm |    written length to CircularBoundedJournal    | new write |
-   * ----------------->------------------------------------------------>
-   *                  ^                                               ^
-   *            applied_to                                        written_to
-   *
-   */
-
-  seastar::future<> update_journal_tail(
-    journal_seq_t dirty,
-    journal_seq_t alloc) {
-    header.dirty_tail = dirty;
-    header.alloc_tail = alloc;
-    return write_header(
-    ).handle_error(
-      crimson::ct_error::assert_all{
-      "encountered invalid error in update_journal_tail"
-    });
-  }
-  journal_seq_t get_dirty_tail() const {
-    return header.dirty_tail;
-  }
-  journal_seq_t get_alloc_tail() const {
-    return header.alloc_tail;
-  }
-
-  write_ertr::future<> write_header();
 
   read_record_ret return_record(record_group_header_t& header, bufferlist bl);
 
@@ -206,74 +142,29 @@ public:
     write_pipeline = _write_pipeline;
   }
 
-  journal_seq_t get_written_to() const {
-    return written_to;
-  }
-  rbm_abs_addr get_rbm_addr(journal_seq_t seq) const {
-    return convert_paddr_to_abs_addr(seq.offset);
-  }
-  void set_written_to(journal_seq_t seq) {
-    rbm_abs_addr addr = convert_paddr_to_abs_addr(seq.offset);
-    assert(addr >= get_records_start());
-    assert(addr < get_journal_end());
-    written_to = seq;
-  }
   device_id_t get_device_id() const {
-    return device->get_device_id();
+    return cjs.get_device_id();
   }
   extent_len_t get_block_size() const {
-    assert(device);
-    return device->get_block_size();
+    return cjs.get_block_size();
   }
 
-  /* 
-    Size-related interfaces
-     +---------------------------------------------------------+
-     |   header      | record | record | record | record | ... | 
-     +---------------------------------------------------------+
-     ^               ^                                         ^
-     |               |                                         |
-   get_journal_start |                                     get_journal_end
-              get_records_start
-                     <-- get_records_total_size + block_size -->
-     <--------------- get_journal_size ------------------------>
-  */
-
-  size_t get_records_used_size() const {
-    auto rbm_written_to = get_rbm_addr(get_written_to());
-    auto rbm_tail = get_rbm_addr(get_dirty_tail());
-    return rbm_written_to >= rbm_tail ?
-      rbm_written_to - rbm_tail :
-      rbm_written_to + get_records_total_size() + get_block_size()
-      - rbm_tail;
-  }
-  size_t get_records_total_size() const {
-    assert(device);
-    // a block is for header and a block is reserved to denote the end
-    return device->get_journal_size() - (2 * get_block_size());
-  }
-  rbm_abs_addr get_records_start() const {
-    assert(device);
-    return device->get_journal_start() + get_block_size();
+  rbm_abs_addr get_journal_end() const {
+    return cjs.get_journal_end();
   }
-  size_t get_records_available_size() const {
-    return get_records_total_size() - get_records_used_size();
+
+  void set_written_to(journal_seq_t seq) {
+    cjs.set_written_to(seq);
   }
-  bool is_available_size(uint64_t size) {
-    auto rbm_written_to = get_rbm_addr(get_written_to());
-    auto rbm_tail = get_rbm_addr(get_dirty_tail());
-    if (rbm_written_to > rbm_tail && 
-       (get_journal_end() - rbm_written_to) < size &&
-       size > (get_records_used_size() - 
-       (get_journal_end() - rbm_written_to))) {
-      return false;
-    } 
-    return get_records_available_size() >= size;
+
+  journal_seq_t get_written_to() {
+    return cjs.get_written_to();
   }
-  rbm_abs_addr get_journal_end() const {
-    assert(device);
-    return device->get_journal_start() + device->get_journal_size();
+
+  rbm_abs_addr get_records_start() const {
+    return cjs.get_records_start();
   }
+
   seastar::future<> finish_commit(transaction_type_t type) final;
 
   using cbj_delta_handler_t = std::function<
@@ -286,10 +177,10 @@ public:
     cbj_delta_handler_t &&delta_handler,
     journal_seq_t tail);
 
+  submit_record_ret do_submit_record(record_t &&record, OrderingHandle &handle);
+
 private:
-  cbj_header_t header;
   JournalTrimmer &trimmer;
-  RBMDevice* device;
   std::string path;
   WritePipeline *write_pipeline = nullptr;
   /**
@@ -304,15 +195,9 @@ private:
   // should be in range [get_records_start(), get_journal_end())
   // written_to.segment_seq is circulation seq to track 
   // the sequence to written records
-  journal_seq_t written_to;
+  CircularJournalSpace cjs;
+  RecordSubmitter record_submitter; 
 };
 
-std::ostream &operator<<(std::ostream &out, const CircularBoundedJournal::cbj_header_t &header);
-
 }
 
-WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::journal::CircularBoundedJournal::cbj_header_t)
-
-#if FMT_VERSION >= 90000
-template <> struct fmt::formatter<crimson::os::seastore::journal::CircularBoundedJournal::cbj_header_t> : fmt::ostream_formatter {};
-#endif
diff --git a/src/crimson/os/seastore/journal/circular_journal_space.cc b/src/crimson/os/seastore/journal/circular_journal_space.cc
new file mode 100644 (file)
index 0000000..7565c28
--- /dev/null
@@ -0,0 +1,232 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
+
+#include "circular_journal_space.h"
+
+#include <fmt/format.h>
+#include <fmt/os.h>
+
+#include "crimson/os/seastore/logging.h"
+#include "crimson/os/seastore/async_cleaner.h"
+#include "crimson/os/seastore/journal/circular_bounded_journal.h"
+
+SET_SUBSYS(seastore_journal);
+
+namespace crimson::os::seastore::journal {
+
+std::ostream &operator<<(std::ostream &out,
+    const CircularJournalSpace::cbj_header_t &header)
+{
+  return out << "cbj_header_t(" 
+            << ", dirty_tail=" << header.dirty_tail
+            << ", alloc_tail=" << header.alloc_tail
+             << ")";
+}
+
+CircularJournalSpace::CircularJournalSpace(RBMDevice * device) : device(device) {}
+  
+bool CircularJournalSpace::needs_roll(std::size_t length) const {
+  if (length + get_rbm_addr(get_written_to()) > get_journal_end()) {
+    return true;
+  }
+  return false;
+}
+
+extent_len_t CircularJournalSpace::get_block_size() const {
+  return device->get_block_size();
+}
+
+CircularJournalSpace::roll_ertr::future<> CircularJournalSpace::roll() {
+  paddr_t paddr = convert_abs_addr_to_paddr(
+    get_records_start(),
+    get_device_id());
+  auto seq = get_written_to();
+  set_written_to(
+    journal_seq_t{++seq.segment_seq, paddr});
+  return roll_ertr::now();
+}
+
+CircularJournalSpace::write_ret
+CircularJournalSpace::write(ceph::bufferlist&& to_write) {
+  LOG_PREFIX(CircularJournalSpace::write);
+  assert(get_written_to().segment_seq != NULL_SEG_SEQ);
+  auto encoded_size = to_write.length();
+  if (encoded_size > get_records_available_size()) {
+    ceph_abort("should be impossible with EPM reservation");
+  }
+  assert(encoded_size + get_rbm_addr(get_written_to())
+        < get_journal_end());
+
+  journal_seq_t j_seq = get_written_to();
+  auto target = get_rbm_addr(get_written_to());
+  auto new_written_to = target + encoded_size;
+  assert(new_written_to < get_journal_end());
+  paddr_t paddr = convert_abs_addr_to_paddr(
+    new_written_to,
+    get_device_id());
+  set_written_to(
+    journal_seq_t{get_written_to().segment_seq, paddr});
+  DEBUG("{}, target {}", to_write.length(), target);
+
+  auto write_result = write_result_t{
+    j_seq,
+    encoded_size
+  };
+  return device_write_bl(target, to_write
+  ).safe_then([this, target,
+    length=encoded_size,
+    write_result,
+    FNAME] {
+    DEBUG("commit target {} used_size {} written length {}",
+          target, get_records_used_size(), length);
+    return write_result;
+  }).handle_error(
+    base_ertr::pass_further{},
+    crimson::ct_error::assert_all{ "Invalid error" }
+  );
+}
+
+CircularJournalSpace::open_ret CircularJournalSpace::open(bool is_mkfs) {
+  std::ostringstream oss;
+  oss << device_id_printer_t{get_device_id()};
+  print_name = oss.str();
+
+  if (is_mkfs) {
+    LOG_PREFIX(CircularJournalSpace::open);
+    assert(device);
+    ceph::bufferlist bl;
+    CircularJournalSpace::cbj_header_t head;
+    assert(device->get_journal_size());
+    head.dirty_tail =
+      journal_seq_t{0,
+       convert_abs_addr_to_paddr(
+         get_records_start(),
+         device->get_device_id())};
+    head.alloc_tail = head.dirty_tail;
+    encode(head, bl);
+    header = head;
+    set_written_to(head.dirty_tail);
+    initialized = true;
+    DEBUG(
+      "initialize header block in CircularJournalSpace length {}",
+      bl.length());
+    return write_header(
+    ).safe_then([this]() {
+      return open_ret(
+       open_ertr::ready_future_marker{},
+       get_written_to());
+    }).handle_error(
+      open_ertr::pass_further{},
+      crimson::ct_error::assert_all{
+       "Invalid error write_header"
+      }
+    );
+  }
+  ceph_assert(initialized);
+  if (written_to.segment_seq == NULL_SEG_SEQ) {
+    written_to.segment_seq = 0;
+  }
+  return open_ret(
+    open_ertr::ready_future_marker{},
+    get_written_to());
+}
+
+ceph::bufferlist CircularJournalSpace::encode_header()
+{
+  bufferlist bl;
+  encode(header, bl);
+  auto header_crc_filler = bl.append_hole(sizeof(checksum_t));
+  auto bliter = bl.cbegin();
+  auto header_crc = bliter.crc32c(
+    ceph::encoded_sizeof_bounded<cbj_header_t>(),
+    -1);
+  ceph_le32 header_crc_le;
+  header_crc_le = header_crc;
+  header_crc_filler.copy_in(
+    sizeof(checksum_t),
+    reinterpret_cast<const char *>(&header_crc_le));
+  return bl;
+}
+
+CircularJournalSpace::write_ertr::future<> CircularJournalSpace::device_write_bl(
+    rbm_abs_addr offset, bufferlist &bl)
+{
+  LOG_PREFIX(CircularJournalSpace::device_write_bl);
+  auto length = bl.length();
+  if (offset + length > get_journal_end()) {
+    return crimson::ct_error::erange::make();
+  }
+  DEBUG(
+    "overwrite in CircularJournalSpace, offset {}, length {}",
+    offset,
+    length);
+  return device->writev(offset, bl
+  ).handle_error(
+    write_ertr::pass_further{},
+    crimson::ct_error::assert_all{ "Invalid error device->write" }
+  );
+}
+
+CircularJournalSpace::read_header_ret
+CircularJournalSpace::read_header()
+{
+  LOG_PREFIX(CircularJournalSpace::read_header);
+  assert(device);
+  auto bptr = bufferptr(ceph::buffer::create_page_aligned(
+                       device->get_block_size()));
+  DEBUG("reading {}", device->get_journal_start());
+  return device->read(device->get_journal_start(), bptr
+  ).safe_then([bptr, FNAME]() mutable
+    -> read_header_ret {
+    bufferlist bl;
+    bl.append(bptr);
+    auto bp = bl.cbegin();
+    cbj_header_t cbj_header;
+    try {
+      decode(cbj_header, bp);
+    } catch (ceph::buffer::error &e) {
+      ERROR("unable to read header block");
+      return crimson::ct_error::enoent::make();
+    }
+    auto bliter = bl.cbegin();
+    auto test_crc = bliter.crc32c(
+      ceph::encoded_sizeof_bounded<cbj_header_t>(),
+      -1);
+    ceph_le32 recorded_crc_le;
+    decode(recorded_crc_le, bliter);
+    uint32_t recorded_crc = recorded_crc_le;
+    if (test_crc != recorded_crc) {
+      ERROR("error, header crc mismatch.");
+      return read_header_ret(
+       read_header_ertr::ready_future_marker{},
+       std::nullopt);
+    }
+    return read_header_ret(
+      read_header_ertr::ready_future_marker{},
+      std::make_pair(cbj_header, bl)
+    );
+  });
+}
+
+CircularJournalSpace::write_ertr::future<>
+CircularJournalSpace::write_header()
+{
+  LOG_PREFIX(CircularJournalSpace::write_header);
+  ceph::bufferlist bl = encode_header();
+  ceph_assert(bl.length() <= get_block_size());
+  DEBUG(
+    "sync header of CircularJournalSpace, length {}",
+    bl.length());
+  assert(device);
+  auto iter = bl.begin();
+  assert(bl.length() < get_block_size());
+  bufferptr bp = bufferptr(ceph::buffer::create_page_aligned(get_block_size()));
+  iter.copy(bl.length(), bp.c_str());
+  return device->write(device->get_journal_start(), std::move(bp)
+  ).handle_error(
+    write_ertr::pass_further{},
+    crimson::ct_error::assert_all{ "Invalid error device->write" }
+  );
+}
+
+}
diff --git a/src/crimson/os/seastore/journal/circular_journal_space.h b/src/crimson/os/seastore/journal/circular_journal_space.h
new file mode 100644 (file)
index 0000000..1e97f4e
--- /dev/null
@@ -0,0 +1,259 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
+
+#pragma once
+
+#include <optional>
+#include <seastar/core/circular_buffer.hh>
+#include <seastar/core/metrics.hh>
+#include <seastar/core/shared_future.hh>
+
+#include "include/buffer.h"
+
+#include "crimson/common/errorator.h"
+#include "crimson/os/seastore/journal.h"
+#include "crimson/os/seastore/random_block_manager.h"
+#include "crimson/os/seastore/random_block_manager/rbm_device.h"
+#include "crimson/os/seastore/journal/record_submitter.h"
+#include "crimson/os/seastore/async_cleaner.h"
+
+namespace crimson::os::seastore {
+  class SegmentProvider;
+  class JournalTrimmer;
+}
+
+namespace crimson::os::seastore::journal {
+
+class CircularBoundedJournal;
+class CircularJournalSpace : public JournalAllocator {
+
+ public:
+  const std::string& get_name() const final {
+    return print_name;
+  }
+
+  extent_len_t get_block_size() const final;
+
+  bool can_write() const final {
+    return (device != nullptr);
+  }
+
+  segment_nonce_t get_nonce() const final {
+    return 0;
+  }
+
+  bool needs_roll(std::size_t length) const final;
+
+  roll_ertr::future<> roll() final;
+
+  write_ret write(ceph::bufferlist&& to_write) final;
+
+  void update_modify_time(record_t& record) final {}
+
+  close_ertr::future<> close() final {
+    return write_header(
+    ).safe_then([this]() -> close_ertr::future<> {
+      initialized = false;
+      return close_ertr::now();
+    }).handle_error(
+      Journal::open_for_mount_ertr::pass_further{},
+      crimson::ct_error::assert_all{
+       "Invalid error write_header"
+      }
+    );
+  }
+
+  open_ret open(bool is_mkfs) final;
+
+ public:
+  CircularJournalSpace(RBMDevice * device);
+
+  struct cbj_header_t;
+  using write_ertr = Journal::submit_record_ertr;
+  /*
+   * device_write_bl
+   *
+   * @param device address to write
+   * @param bufferlist to write
+   *
+   */
+  write_ertr::future<> device_write_bl(rbm_abs_addr offset, ceph::bufferlist &bl);
+
+  using read_ertr = crimson::errorator<
+    crimson::ct_error::input_output_error,
+    crimson::ct_error::invarg,
+    crimson::ct_error::enoent,
+    crimson::ct_error::erange>;
+  using read_header_ertr = read_ertr;
+  using read_header_ret = read_header_ertr::future<
+       std::optional<std::pair<cbj_header_t, bufferlist>>
+       >;
+  /*
+   * read_header
+   *
+   * read header block from given absolute address
+   *
+   * @param absolute address
+   *
+   */
+  read_header_ret read_header();
+
+  ceph::bufferlist encode_header();
+
+  write_ertr::future<> write_header();
+
+
+  /**
+   * CircularBoundedJournal structure
+   *
+   * +-------------------------------------------------------+
+   * |   header    | record | record | record | record | ... |
+   * +-------------------------------------------------------+
+   *               ^-----------block aligned-----------------^
+   * <----fixed---->
+   */
+
+  struct cbj_header_t {
+    // start offset of CircularBoundedJournal in the device
+    journal_seq_t dirty_tail;
+    journal_seq_t alloc_tail;
+
+    DENC(cbj_header_t, v, p) {
+      DENC_START(1, 1, p);
+      denc(v.dirty_tail, p);
+      denc(v.alloc_tail, p);
+      DENC_FINISH(p);
+    }
+  };
+
+  /**
+   *
+   * Write position for CircularBoundedJournal
+   *
+   * | written to rbm |    written length to CircularBoundedJournal    | new write |
+   * ----------------->------------------------------------------------>
+   *                  ^                                               ^
+   *            applied_to                                        written_to
+   *
+   */
+
+  journal_seq_t get_written_to() const {
+    return written_to;
+  }
+  rbm_abs_addr get_rbm_addr(journal_seq_t seq) const {
+    return convert_paddr_to_abs_addr(seq.offset);
+  }
+  void set_written_to(journal_seq_t seq) {
+    rbm_abs_addr addr = convert_paddr_to_abs_addr(seq.offset);
+    assert(addr >= get_records_start());
+    assert(addr < get_journal_end());
+    written_to = seq;
+  }
+  device_id_t get_device_id() const {
+    return device->get_device_id();
+  }
+
+  journal_seq_t get_dirty_tail() const {
+    return header.dirty_tail;
+  }
+  journal_seq_t get_alloc_tail() const {
+    return header.alloc_tail;
+  }
+
+  /* 
+    Size-related interfaces
+     +---------------------------------------------------------+
+     |   header      | record | record | record | record | ... | 
+     +---------------------------------------------------------+
+     ^               ^                                         ^
+     |               |                                         |
+   get_journal_start |                                     get_journal_end
+              get_records_start
+                     <-- get_records_total_size + block_size -->
+     <--------------- get_journal_size ------------------------>
+  */
+
+  size_t get_records_used_size() const {
+    auto rbm_written_to = get_rbm_addr(get_written_to());
+    auto rbm_tail = get_rbm_addr(get_dirty_tail());
+    return rbm_written_to >= rbm_tail ?
+      rbm_written_to - rbm_tail :
+      rbm_written_to + get_records_total_size() + get_block_size()
+      - rbm_tail;
+  }
+  size_t get_records_total_size() const {
+    assert(device);
+    // a block is for header and a block is reserved to denote the end
+    return device->get_journal_size() - (2 * get_block_size());
+  }
+  rbm_abs_addr get_records_start() const {
+    assert(device);
+    return device->get_journal_start() + get_block_size();
+  }
+  size_t get_records_available_size() const {
+    return get_records_total_size() - get_records_used_size();
+  }
+  bool is_available_size(uint64_t size) {
+    auto rbm_written_to = get_rbm_addr(get_written_to());
+    auto rbm_tail = get_rbm_addr(get_dirty_tail());
+    if (rbm_written_to > rbm_tail && 
+       (get_journal_end() - rbm_written_to) < size &&
+       size > (get_records_used_size() - 
+       (get_journal_end() - rbm_written_to))) {
+      return false;
+    } 
+    return get_records_available_size() >= size;
+  }
+  rbm_abs_addr get_journal_end() const {
+    assert(device);
+    return device->get_journal_start() + device->get_journal_size();
+  }
+
+  read_ertr::future<> read(
+    uint64_t offset,
+    bufferptr &bptr) {
+    assert(device);
+    return device->read(offset, bptr);
+  }
+
+  seastar::future<> update_journal_tail(
+    journal_seq_t dirty,
+    journal_seq_t alloc) {
+    header.dirty_tail = dirty;
+    header.alloc_tail = alloc;
+    return write_header(
+    ).handle_error(
+      crimson::ct_error::assert_all{
+      "encountered invalid error in update_journal_tail"
+    });
+  }
+
+  void set_initialized(bool init) {
+    initialized = init;
+  }
+
+  void set_cbj_header(cbj_header_t& head) {
+    header = head;
+  }
+
+  cbj_header_t get_cbj_header() {
+    return header;
+  }
+
+ private:
+  std::string print_name;
+  cbj_header_t header;
+  RBMDevice* device;
+  journal_seq_t written_to;
+  bool initialized = false;
+};
+
+std::ostream &operator<<(std::ostream &out, const CircularJournalSpace::cbj_header_t &header);
+
+}
+
+WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::journal::CircularJournalSpace::cbj_header_t)
+
+#if FMT_VERSION >= 90000
+template <> struct fmt::formatter<crimson::os::seastore::journal::CircularJournalSpace::cbj_header_t> : fmt::ostream_formatter {};
+#endif