]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
seatore: add CircularBoundedJournal
authormyoungwon oh <ohmyoungwon@gmail.com>
Sun, 15 Aug 2021 12:33:27 +0000 (21:33 +0900)
committermyoungwon oh <ohmyoungwon@gmail.com>
Thu, 19 May 2022 00:29:25 +0000 (09:29 +0900)
Signed-off-by: Myoungwon Oh <myoungwon.oh@samsung.com>
src/crimson/os/seastore/CMakeLists.txt
src/crimson/os/seastore/journal.cc
src/crimson/os/seastore/journal.h
src/crimson/os/seastore/journal/circular_bounded_journal.cc [new file with mode: 0644]
src/crimson/os/seastore/journal/circular_bounded_journal.h [new file with mode: 0644]
src/crimson/os/seastore/journal/segmented_journal.h
src/crimson/os/seastore/seastore_types.cc

index cd5b8f94fb9475e6f523aa83115128d6f4e550fa..55d2168cebb1c9bb2837d410afb310a9adcbfe64 100644 (file)
@@ -43,6 +43,7 @@ set(crimson_seastore_srcs
   journal.cc
   device.cc
   segment_manager_group.cc
+  journal/circular_bounded_journal.cc
   ../../../test/crimson/seastore/test_block.cc
   ${PROJECT_SOURCE_DIR}/src/os/Transaction.cc
        )
index c9117a52a00b431fcc31d8136068dab7d82fca35..314d62d89f5b669882ea77165804114c3bcb9118 100644 (file)
@@ -3,6 +3,7 @@
 
 #include "journal.h"
 #include "journal/segmented_journal.h"
+#include "journal/circular_bounded_journal.h"
 
 namespace crimson::os::seastore::journal {
 
@@ -11,4 +12,11 @@ JournalRef make_segmented(SegmentProvider &provider)
   return std::make_unique<SegmentedJournal>(provider);
 }
 
+JournalRef make_circularbounded(
+  crimson::os::seastore::nvme_device::NVMeBlockDevice* device,
+  std::string path)
+{
+  return std::make_unique<CircularBoundedJournal>(device, path);
+}
+
 }
index a33a5468684bd6c5257b0bd2f0adc5f6baa14fb2..9ee5e9e34be22c874d55f900c62f11c702362032 100644 (file)
@@ -18,6 +18,11 @@ class NVMeBlockDevice;
 class SegmentManagerGroup;
 class SegmentProvider;
 
+enum class journal_type {
+  SEGMENT_JOURNAL = 0,
+  CIRCULARBOUNDED_JOURNAL
+};
+
 class Journal {
 public:
   /**
@@ -87,6 +92,8 @@ public:
     delta_handler_t &&delta_handler) = 0;
 
   virtual ~Journal() {}
+
+  virtual journal_type get_type() = 0;
 };
 using JournalRef = std::unique_ptr<Journal>;
 
@@ -94,6 +101,10 @@ namespace journal {
 
 JournalRef make_segmented(SegmentProvider &provider);
 
+JournalRef make_circularbounded(
+  crimson::os::seastore::nvme_device::NVMeBlockDevice* device,
+  std::string path);
+
 }
 
 }
diff --git a/src/crimson/os/seastore/journal/circular_bounded_journal.cc b/src/crimson/os/seastore/journal/circular_bounded_journal.cc
new file mode 100644 (file)
index 0000000..c01122d
--- /dev/null
@@ -0,0 +1,590 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <boost/iterator/counting_iterator.hpp>
+
+#include "crimson/common/errorator-loop.h"
+#include "include/intarith.h"
+#include "crimson/os/seastore/journal/circular_bounded_journal.h"
+#include "crimson/os/seastore/logging.h"
+
+SET_SUBSYS(seastore_journal);
+
+namespace crimson::os::seastore::journal {
+
+std::ostream &operator<<(std::ostream &out,
+    const CircularBoundedJournal::cbj_header_t &header)
+{
+  return out << "cbj_header_t(magin=" << header.magic
+            << ", uuid=" << header.uuid
+            << ", block_size=" << header.block_size
+            << ", size=" << header.size
+            << ", used_size=" << header.used_size
+            << ", error=" << header.error
+            << ", start_offset=" << header.start_offset
+            << ", applied_to="<< header.applied_to
+            << ", last_committed_record_base="<< header.last_committed_record_base
+            << ", written_to=" << header.written_to
+            << ", flsg=" << header.flag
+            << ", csum_type=" << header.csum_type
+            << ", csum=" << header.csum
+            << ", start=" << header.start
+            << ", end=" << header.end
+             << ")";
+}
+
+
+CircularBoundedJournal::CircularBoundedJournal(NVMeBlockDevice* device,
+    const std::string path)
+  : device(device), path(path) {}
+
+CircularBoundedJournal::mkfs_ret
+CircularBoundedJournal::mkfs(mkfs_config_t& config)
+{
+  LOG_PREFIX(CircularBoundedJournal::mkfs);
+  return _open_device(path
+  ).safe_then([this, config, FNAME]() mutable -> mkfs_ret {
+    rbm_abs_addr start_addr = convert_paddr_to_abs_addr(
+      config.start);
+    assert(config.block_size == device->get_block_size());
+    ceph::bufferlist bl;
+    CircularBoundedJournal::cbj_header_t head;
+    head.magic = CBJOURNAL_MAGIC;
+    head.uuid = uuid_d(); // TODO
+    head.block_size = config.block_size;
+    rbm_abs_addr end_addr = convert_paddr_to_abs_addr(
+      config.end);
+    head.size = end_addr - start_addr
+      - device->get_block_size();
+    head.used_size = 0;
+    head.error = 0;
+    head.start_offset = device->get_block_size();
+    head.last_committed_record_base = 0;
+    head.written_to = head.start_offset;
+    head.applied_to = head.start_offset;
+    head.flag = 0;
+    head.csum_type = 0;
+    head.csum = 0;
+    head.cur_segment_seq = 0;
+    head.start = start_addr;
+    head.end = end_addr;
+    head.device_id = config.device_id;
+    encode(head, bl);
+    header = head;
+    DEBUG(
+      "initialize header block in CircularBoundedJournal, length {}",
+      bl.length());
+    return device_write_bl(start_addr, bl
+    ).handle_error(
+      mkfs_ertr::pass_further{},
+      crimson::ct_error::assert_all{
+      "Invalid error device_write during CircularBoundedJournal::mkfs"
+    }).safe_then([]() {
+      return mkfs_ertr::now();
+    });
+  }).handle_error(
+    mkfs_ertr::pass_further{},
+    crimson::ct_error::assert_all{
+    "Invalid error _open_device in CircularBoundedJournal::mkfs"
+  }).finally([this] {
+    if (device) {
+      return device->close();
+    } else {
+      return seastar::now();
+    }
+  });
+}
+
+CircularBoundedJournal::open_for_write_ertr::future<>
+CircularBoundedJournal::_open_device(const std::string path)
+{
+  ceph_assert(device);
+  return device->open(path, seastar::open_flags::rw
+  ).handle_error(
+    open_for_write_ertr::pass_further{},
+    crimson::ct_error::assert_all{
+      "Invalid error device->open"
+    }
+  );
+}
+
+ceph::bufferlist CircularBoundedJournal::encode_super()
+{
+  bufferlist bl;
+  encode(header, bl);
+  return bl;
+}
+
+CircularBoundedJournal::open_for_write_ret CircularBoundedJournal::open_for_write()
+{
+  return open_for_write(CBJOURNAL_START_ADDRESS);
+}
+
+CircularBoundedJournal::close_ertr::future<> CircularBoundedJournal::close()
+{
+  return write_super(
+  ).safe_then([this]() -> close_ertr::future<> {
+    init = false;
+    return device->close();
+  }).handle_error(
+    open_for_write_ertr::pass_further{},
+    crimson::ct_error::assert_all{
+      "Invalid error write_super"
+    }
+  );
+}
+
+CircularBoundedJournal::open_for_write_ret
+CircularBoundedJournal::open_for_write(rbm_abs_addr start)
+{
+  LOG_PREFIX(CircularBoundedJournal::open_for_write);
+  if (init) {
+    paddr_t paddr = convert_abs_addr_to_paddr(
+      get_written_to(),
+      header.device_id);
+    return open_for_write_ret(
+      open_for_write_ertr::ready_future_marker{},
+      journal_seq_t{
+       header.cur_segment_seq,
+       paddr
+      });
+  }
+  return _open_device(path
+  ).safe_then([this, start, FNAME]() {
+    return read_super(start
+    ).handle_error(
+      open_for_write_ertr::pass_further{},
+      crimson::ct_error::assert_all{
+       "Invalid error read_super"
+    }).safe_then([this, FNAME](auto p) mutable {
+      auto &[head, bl] = *p;
+      header = head;
+      DEBUG("super : {}", header);
+      paddr_t paddr = convert_abs_addr_to_paddr(
+       get_written_to(),
+       header.device_id);
+      init = true;
+      return open_for_write_ret(
+       open_for_write_ertr::ready_future_marker{},
+       journal_seq_t{
+         header.cur_segment_seq,
+         paddr
+       });
+    });
+  }).handle_error(
+    open_for_write_ertr::pass_further{},
+    crimson::ct_error::assert_all{
+      "Invalid error _open_device"
+  });
+}
+
+CircularBoundedJournal::write_ertr::future<> CircularBoundedJournal::append_record(
+  ceph::bufferlist bl,
+  rbm_abs_addr addr)
+{
+  LOG_PREFIX(CircularBoundedJournal::append_record);
+  std::vector<std::pair<rbm_abs_addr, bufferlist>> writes;
+  if (addr + bl.length() <= header.end) {
+    writes.push_back(std::make_pair(addr, bl));
+  } else {
+    // write remaining data---in this case,
+    // data is splited into two parts before due to the end of CircularBoundedJournal.
+    // the following code is to write the second part
+    bufferlist first_write, next_write;
+    first_write.substr_of(bl, 0, header.end - addr);
+    writes.push_back(std::make_pair(addr, first_write));
+    next_write.substr_of(
+      bl, first_write.length(), bl.length() - first_write.length());
+    writes.push_back(std::make_pair(get_start_addr(), next_write));
+  }
+
+  return seastar::do_with(
+    std::move(bl),
+    [this, writes=move(writes), FNAME](auto& bl) mutable
+  {
+    DEBUG("original bl length {}", bl.length());
+    return write_ertr::parallel_for_each(
+      writes,
+      [this, FNAME](auto& p) mutable
+    {
+      DEBUG(
+       "append_record: offset {}, length {}",
+       p.first,
+       p.second.length());
+      return device_write_bl(p.first, p.second 
+      ).handle_error(
+       write_ertr::pass_further{},
+       crimson::ct_error::assert_all{ "Invalid error device->write" }
+      ).safe_then([]() {
+       return write_ertr::now();
+      });
+    });
+  });
+}
+
+CircularBoundedJournal::submit_record_ret CircularBoundedJournal::submit_record(
+  record_t &&record,
+  OrderingHandle &handle)
+{
+  LOG_PREFIX(CircularBoundedJournal::submit_record);
+  assert(write_pipeline);
+  auto r_size = record_group_size_t(record.size, get_block_size());
+  auto encoded_size = r_size.get_encoded_length();
+  if (get_written_to() +
+      ceph::encoded_sizeof_bounded<record_group_header_t>() > header.end) {
+    // not enough space between written_to and the end of journal,
+    // so that update used size to increase the amount of the remaing space
+    // |        cbjournal      |
+    //                   v            v
+    //      written_to <-> the end of journal
+    set_used_size(get_used_size() + (header.end - get_written_to()));
+    set_written_to(get_start_addr());
+  }
+  if (encoded_size > get_available_size()) {
+    ERROR(
+      "CircularBoundedJournal::submit_record: record size {}, but available size {}",
+      encoded_size,
+      get_available_size()
+      );
+    return crimson::ct_error::erange::make();
+  }
+
+  journal_seq_t j_seq {
+    header.cur_segment_seq++,
+    convert_abs_addr_to_paddr(
+      get_written_to(),
+      header.device_id)};
+  ceph::bufferlist to_write = encode_record(
+    std::move(record), device->get_block_size(),
+    j_seq, 0);
+  auto target = get_written_to();
+  if (get_written_to() + to_write.length() >= header.end) {
+    set_written_to(get_start_addr() +
+      (to_write.length() - (header.end - get_written_to())));
+  } else {
+    set_written_to(get_written_to() + to_write.length());
+  }
+  DEBUG(
+    "submit_record: mdlength {}, dlength {}, target {}",
+    r_size.get_mdlength(),
+    r_size.dlength,
+    target);
+
+  auto write_result = write_result_t{
+    j_seq,
+    (seastore_off_t)to_write.length()
+  };
+  auto write_fut = append_record(to_write, target);
+  return handle.enter(write_pipeline->device_submission
+  ).then([write_fut = std::move(write_fut)]() mutable {
+    return std::move(write_fut
+    ).handle_error(
+      write_ertr::pass_further{},
+      crimson::ct_error::assert_all{
+        "Invalid error in CircularBoundedJournal::append_record"
+      }
+    );
+  }).safe_then([this, &handle] {
+    return handle.enter(write_pipeline->finalize);
+  }).safe_then([this, target,
+    length=to_write.length(),
+    write_result,
+    r_size,
+    FNAME] {
+    DEBUG(
+      "append_record: commit target {} used_size {} written length {}",
+      target, get_used_size(), length);
+
+    set_last_committed_record_base(target);
+    set_used_size(get_used_size() + length);
+    paddr_t paddr = convert_abs_addr_to_paddr(
+      target + r_size.get_mdlength(),
+      header.device_id);
+    auto submit_result = record_locator_t{
+      paddr,
+      write_result
+    };
+    return submit_result;
+  });
+}
+
+CircularBoundedJournal::write_ertr::future<> CircularBoundedJournal::device_write_bl(
+    rbm_abs_addr offset, bufferlist &bl)
+{
+  LOG_PREFIX(CircularBoundedJournal::device_write_bl);
+  auto length = bl.length();
+  if (offset + length > header.end) {
+    return crimson::ct_error::erange::make();
+  }
+  bl.rebuild_aligned(get_block_size());
+  DEBUG(
+    "overwrite in CircularBoundedJournal, offset {}, length {}",
+    offset,
+    length);
+  auto write_length = length < get_block_size() ? get_block_size() : length;
+  auto bptr = bufferptr(ceph::buffer::create_page_aligned(write_length));
+  auto iter = bl.cbegin();
+  iter.copy(bl.length(), bptr.c_str());
+  return device->write(offset, bptr
+  ).handle_error(
+    write_ertr::pass_further{},
+    crimson::ct_error::assert_all{ "Invalid error device->write" }
+  ).safe_then([] {
+    return write_ertr::now();
+  });
+}
+
+CircularBoundedJournal::read_super_ret
+CircularBoundedJournal::read_super(rbm_abs_addr start)
+{
+  LOG_PREFIX(CircularBoundedJournal::read_super);
+  auto bptr = bufferptr(ceph::buffer::create_page_aligned(
+                       device->get_block_size()));
+  return device->read(start, bptr
+  ).safe_then([start, bptr, FNAME]() mutable
+    -> read_super_ret {
+    DEBUG("read_super: reading {}", start);
+    bufferlist bl;
+    bl.append(bptr);
+    auto bp = bl.cbegin();
+    cbj_header_t cbj_header;
+    try {
+      decode(cbj_header, bp);
+    } catch (ceph::buffer::error &e) {
+      ERROR("read_super: unable to read super block");
+      return crimson::ct_error::enoent::make();
+    }
+    return read_super_ret(
+      read_super_ertr::ready_future_marker{},
+      std::make_pair(cbj_header, bl)
+    );
+  });
+}
+
+Journal::replay_ret CircularBoundedJournal::replay(
+    delta_handler_t &&delta_handler)
+{
+  /*
+   * read records from last applied record prior to written_to, and replay
+   */
+  LOG_PREFIX(CircularBoundedJournal::replay);
+  auto fut = open_for_write(CBJOURNAL_START_ADDRESS);
+  return fut.safe_then([this, FNAME, delta_handler=std::move(delta_handler)] (auto addr) {
+    if (get_used_size() == 0) {
+      return replay_ertr::now();
+    }
+    return seastar::do_with(
+      rbm_abs_addr(get_applied_to()),
+      std::move(delta_handler),
+      [this, FNAME](auto &cursor_addr, auto &d_handler) {
+      return crimson::repeat(
+       [this, &cursor_addr, &d_handler, FNAME]() mutable
+       -> replay_ertr::future<seastar::stop_iteration> {
+       paddr_t cursor_paddr = convert_abs_addr_to_paddr(
+         cursor_addr,
+         header.device_id);
+       return read_record(cursor_paddr
+       ).safe_then([this, &cursor_addr, &d_handler, FNAME](auto ret) {
+         auto [r_header, bl] = *ret;
+         bufferlist mdbuf;
+         mdbuf.substr_of(bl, 0, r_header.mdlength);
+         paddr_t record_block_base = paddr_t::make_blk_paddr(
+           header.device_id, cursor_addr + r_header.mdlength);
+         auto maybe_record_deltas_list = try_decode_deltas(
+           r_header, mdbuf, record_block_base);
+         if (!maybe_record_deltas_list) {
+           DEBUG("unable to decode deltas for record {} at {}",
+             r_header, record_block_base);
+           return replay_ertr::make_ready_future<
+             seastar::stop_iteration>(seastar::stop_iteration::yes);
+         }
+         DEBUG(" record_group_header_t: {}, cursor_addr: {} ",
+           r_header, cursor_addr);
+         auto write_result = write_result_t{
+           r_header.committed_to,
+           (seastore_off_t)bl.length()
+         };
+         cursor_addr += bl.length();
+         return seastar::do_with(
+           std::move(*maybe_record_deltas_list),
+           [write_result,
+           this,
+           &d_handler,
+           &cursor_addr,
+           FNAME](auto& record_deltas_list) {
+           return crimson::do_for_each(
+             record_deltas_list,
+             [write_result,
+             &d_handler, FNAME](record_deltas_t& record_deltas) {
+             auto locator = record_locator_t{
+               record_deltas.record_block_base,
+               write_result
+             };
+             DEBUG("processing {} deltas at block_base {}",
+                 record_deltas.deltas.size(),
+                 locator);
+             return crimson::do_for_each(
+               record_deltas.deltas,
+               [locator,
+               &d_handler](auto& p) {
+               auto& commit_time = p.first;
+               auto& delta = p.second;
+               return d_handler(locator,
+                 delta,
+                 seastar::lowres_system_clock::time_point(
+                   seastar::lowres_system_clock::duration(commit_time))
+                 );
+             });
+           }).safe_then([this, &cursor_addr]() {
+             if (cursor_addr >= header.end) {
+               cursor_addr = (cursor_addr - header.end) + get_start_addr();
+             }
+             if (get_written_to() +
+                 ceph::encoded_sizeof_bounded<record_group_header_t>() > header.end) {
+               cursor_addr = get_start_addr();
+             }
+             if (cursor_addr == get_written_to()) {
+               return replay_ertr::make_ready_future<
+                 seastar::stop_iteration>(seastar::stop_iteration::yes);
+             }
+             return replay_ertr::make_ready_future<
+               seastar::stop_iteration>(seastar::stop_iteration::no);
+           });
+         });
+       });
+      });
+    });
+  });
+}
+
+CircularBoundedJournal::read_record_ret
+CircularBoundedJournal::return_record(record_group_header_t& header, bufferlist bl)
+{
+  LOG_PREFIX(CircularBoundedJournal::return_record);
+  bufferlist md_bl, data_bl;
+  md_bl.substr_of(bl, 0, get_block_size());
+  data_bl.substr_of(bl, header.mdlength, header.dlength);
+  if (validate_records_metadata(md_bl) &&
+      validate_records_data(header, data_bl)) {
+    return read_record_ret(
+      read_record_ertr::ready_future_marker{},
+      std::make_pair(header, std::move(bl)));
+  } else {
+    DEBUG("invalid matadata");
+    return read_record_ret(
+      read_record_ertr::ready_future_marker{},
+      std::nullopt);
+  }
+}
+
+CircularBoundedJournal::read_record_ret CircularBoundedJournal::read_record(paddr_t off)
+{
+  LOG_PREFIX(CircularBoundedJournal::read_record);
+  rbm_abs_addr offset = convert_paddr_to_abs_addr(
+    off);
+  rbm_abs_addr addr = offset;
+  auto read_length = get_block_size();
+  if (addr + get_block_size() > header.end) {
+    addr = get_start_addr();
+    read_length = header.end - offset;
+  }
+  DEBUG("read_record: reading record from abs addr {} read length {}",
+      addr, read_length);
+  auto bptr = bufferptr(ceph::buffer::create_page_aligned(read_length));
+  bptr.zero();
+  return device->read(addr, bptr
+  ).safe_then([this, addr, read_length, bptr, FNAME]() mutable
+    -> read_record_ret {
+    record_group_header_t h;
+    bufferlist bl;
+    bl.append(bptr);
+    auto bp = bl.cbegin();
+    try {
+      decode(h, bp);
+    } catch (ceph::buffer::error &e) {
+      return read_record_ret(
+       read_record_ertr::ready_future_marker{},
+       std::nullopt);
+    }
+    /*
+     * |          journal          |
+     *        | record 1 header |  | record 1 data
+     *  record 1 data  (remaining) |
+     *
+     *        <---- 1 block ----><--
+     * -- 2 block --->
+     *
+     *  If record has logner than read_length and its data is located across
+     *  the end of journal and the begining of journal, we need three reads
+     *  ---reads of header, other remaining data before the end, and
+     *  the other remaining data from the begining.
+     *
+     */
+    if (h.mdlength + h.dlength > read_length) {
+      rbm_abs_addr next_read_addr = addr + read_length;
+      auto next_read = h.mdlength + h.dlength - read_length;
+      DEBUG(" next_read_addr {}, next_read_length {} ",
+         next_read_addr, next_read);
+      if (header.end < next_read_addr + next_read) {
+       // In this case, need two more reads.
+       // The first is to read remain bytes to the end of cbjournal
+       // The second is to read the data at the begining of cbjournal
+       next_read = header.end - (addr + read_length);
+      }
+      DEBUG("read_entry: additional reading addr {} length {}",
+           next_read_addr,
+           next_read);
+      auto next_bptr = bufferptr(ceph::buffer::create_page_aligned(next_read));
+      next_bptr.zero();
+      return device->read(
+       next_read_addr,
+       next_bptr
+      ).safe_then([this, h=h, next_bptr=std::move(next_bptr), bl=std::move(bl),
+        FNAME]() mutable {
+       bl.append(next_bptr);
+       if (h.mdlength + h.dlength == bl.length()) {
+         DEBUG("read_record: record length {} done", bl.length());
+         return return_record(h, bl);
+       }
+       // need one more read
+       auto next_read_addr = get_start_addr();
+       auto last_bptr = bufferptr(ceph::buffer::create_page_aligned(
+             h.mdlength + h.dlength - bl.length()));
+       DEBUG("read_record: last additional reading addr {} length {}",
+             next_read_addr,
+             h.mdlength + h.dlength - bl.length());
+       return device->read(
+         next_read_addr,
+         last_bptr
+       ).safe_then([this, h=h, last_bptr=std::move(last_bptr),
+         bl=std::move(bl), FNAME]() mutable {
+         bl.append(last_bptr);
+         DEBUG("read_record: complte size {}", bl.length());
+         return return_record(h, bl);
+       });
+      });
+    } else {
+      DEBUG("read_record: complte size {}", bl.length());
+      return return_record(h, bl);
+    }
+  });
+}
+
+CircularBoundedJournal::write_ertr::future<>
+CircularBoundedJournal::write_super()
+{
+  LOG_PREFIX(CircularBoundedJournal::write_super);
+  ceph::bufferlist bl;
+  try {
+    bl = encode_super();
+  } catch (ceph::buffer::error &e) {
+    DEBUG("unable to encode super block from underlying deivce");
+    return crimson::ct_error::input_output_error::make();
+  }
+  DEBUG(
+    "sync header of CircularBoundedJournal, length {}",
+    bl.length());
+  return device_write_bl(header.start, bl);
+}
+
+}
diff --git a/src/crimson/os/seastore/journal/circular_bounded_journal.h b/src/crimson/os/seastore/journal/circular_bounded_journal.h
new file mode 100644 (file)
index 0000000..2537df1
--- /dev/null
@@ -0,0 +1,315 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "crimson/common/log.h"
+
+#include <boost/intrusive_ptr.hpp>
+
+#include <seastar/core/future.hh>
+
+#include "include/ceph_assert.h"
+#include "include/buffer.h"
+#include "include/denc.h"
+
+#include "crimson/osd/exceptions.h"
+#include "crimson/os/seastore/journal.h"
+#include "include/uuid.h"
+#include "crimson/os/seastore/random_block_manager.h"
+#include "crimson/os/seastore/random_block_manager/nvmedevice.h"
+#include <list>
+
+
+namespace crimson::os::seastore::journal {
+
+constexpr rbm_abs_addr CBJOURNAL_START_ADDRESS = 0;
+constexpr uint64_t CBJOURNAL_MAGIC = 0xCCCC;
+using NVMeBlockDevice = nvme_device::NVMeBlockDevice;
+
+/**
+ * CircularBoundedJournal
+ *
+ * TODO: move record from CircularBoundedJournal to RandomBlockManager
+ *
+ */
+
+constexpr uint64_t DEFAULT_SIZE = 1 << 26;
+constexpr uint64_t DEFAULT_BLOCK_SIZE = 4096;
+
+class CircularBoundedJournal : public Journal {
+public:
+  struct mkfs_config_t {
+    std::string path;
+    paddr_t start;
+    paddr_t end;
+    size_t block_size = 0;
+    size_t total_size = 0;
+    device_id_t device_id = 0;
+    seastore_meta_t meta;
+    static mkfs_config_t get_default() {
+      device_id_t d_id = 1 << (std::numeric_limits<device_id_t>::digits - 1);
+      return mkfs_config_t {
+       "",
+       paddr_t::make_blk_paddr(d_id, 0),
+       paddr_t::make_blk_paddr(d_id, DEFAULT_SIZE),
+       DEFAULT_BLOCK_SIZE,
+       DEFAULT_SIZE,
+       d_id,
+       seastore_meta_t {}
+      };
+    }
+  };
+
+  CircularBoundedJournal(NVMeBlockDevice* device, const std::string path);
+  ~CircularBoundedJournal() {}
+
+  open_for_write_ret open_for_write() final;
+  open_for_write_ret open_for_write(rbm_abs_addr start);
+  close_ertr::future<> close() final;
+
+  journal_type get_type() final {
+    return journal_type::CIRCULARBOUNDED_JOURNAL;
+  }
+
+  submit_record_ret submit_record(
+    record_t &&record,
+    OrderingHandle &handle
+  ) final;
+
+  seastar::future<> flush(
+    OrderingHandle &handle
+  ) final {
+    // TODO
+    return seastar::now();
+  }
+
+  replay_ret replay(delta_handler_t &&delta_handler);
+
+  open_for_write_ertr::future<> _open_device(const std::string path);
+
+  struct cbj_header_t;
+  using write_ertr = crimson::errorator<
+    crimson::ct_error::input_output_error,
+    crimson::ct_error::erange>;
+  /*
+   * append_record
+   *
+   * append data to current write position of CircularBoundedJournal
+   *
+   * @param bufferlist to write
+   * @param rbm_abs_addr where data is written
+   *
+   */
+  write_ertr::future<> append_record(ceph::bufferlist bl, rbm_abs_addr addr);
+  /*
+   * device_write_bl
+   *
+   * @param device address to write
+   * @param bufferlist to write
+   *
+   */
+  write_ertr::future<> device_write_bl(rbm_abs_addr offset, ceph::bufferlist &bl);
+
+  using read_ertr = crimson::errorator<
+    crimson::ct_error::input_output_error,
+    crimson::ct_error::invarg,
+    crimson::ct_error::enoent,
+    crimson::ct_error::erange>;
+  using read_record_ertr = read_ertr;
+  using read_record_ret = read_record_ertr::future<
+       std::optional<std::pair<record_group_header_t, bufferlist>>
+       >;
+  using read_super_ertr = read_ertr;
+  using read_super_ret = read_super_ertr::future<
+       std::optional<std::pair<cbj_header_t, bufferlist>>
+       >;
+  /*
+   * read_record
+   *
+   * read record from given address
+   *
+   * @param paddr_t to read
+   *
+   */
+  read_record_ret read_record(paddr_t offset);
+  /*
+   * read_super
+   *
+   * read super block from given absolute address
+   *
+   * @param absolute address
+   *
+   */
+  read_super_ret read_super(rbm_abs_addr start);
+
+  ceph::bufferlist encode_super();
+
+  using mkfs_ertr = crimson::errorator<
+    crimson::ct_error::input_output_error,
+    crimson::ct_error::invarg
+  >;
+  using mkfs_ret = mkfs_ertr::future<>;
+
+  /*
+   * mkfs
+   *
+   * make a new journal layout even if old journal exists
+   *
+   * @param mkfs_config_t
+   *
+   */
+  mkfs_ret mkfs(mkfs_config_t& config);
+
+
+  /**
+   * CircularBoundedJournal structure
+   *
+   * +-------------------------------------------------------+
+   * |   header    | record | record | record | record | ... |
+   * +-------------------------------------------------------+
+   *               ^-----------block aligned-----------------^
+   * <----fixed---->
+   */
+
+
+  /**
+   *
+   * CircularBoundedJournal write
+   *
+   * NVMe will support a large block write (< 512KB) with atomic write unit command.
+   * With this command, we expect that the most of incoming data can be stored
+   * as a single write call, which has lower overhead than existing
+   * way that uses a combination of system calls such as write() and sync().
+   *
+   */
+
+  struct cbj_header_t {
+    uint64_t magic;
+    uuid_d uuid;
+    uint64_t block_size; // aligned with block_size
+    uint64_t size;   // max length of journal
+    uint64_t used_size;  // current used_size of journal
+    uint32_t error;      // reserved
+
+    rbm_abs_addr start_offset;      // start offset of CircularBoundedJournal
+    rbm_abs_addr last_committed_record_base;
+    rbm_abs_addr written_to;
+    rbm_abs_addr applied_to;
+
+    uint64_t flag;       // represent features (reserved)
+    uint8_t csum_type;   // type of checksum algoritghm used in cbj_header_t
+    uint64_t csum;       // checksum of entire cbj_header_t
+    uint32_t cur_segment_seq;
+
+    rbm_abs_addr start; // start address of the device
+    rbm_abs_addr end;   // start address of the device
+    device_id_t device_id;
+
+    DENC(cbj_header_t, v, p) {
+      DENC_START(1, 1, p);
+      denc(v.magic, p);
+      denc(v.uuid, p);
+      denc(v.block_size, p);
+      denc(v.size, p);
+      denc(v.used_size, p);
+      denc(v.error, p);
+
+      denc(v.start_offset, p);
+
+      denc(v.last_committed_record_base, p);
+      denc(v.written_to, p);
+      denc(v.applied_to, p);
+
+      denc(v.flag, p);
+      denc(v.csum_type, p);
+      denc(v.csum, p);
+      denc(v.cur_segment_seq, p);
+      denc(v.start, p);
+      denc(v.end, p);
+      denc(v.device_id, p);
+
+      DENC_FINISH(p);
+    }
+  };
+
+  /**
+   *
+   * Write position for CircularBoundedJournal
+   *
+   * | written to rbm |    written length to CircularBoundedJournal    | new write |
+   * ----------------->----------------------------------->------------>
+   *                  ^                    ^             ^
+   *            applied_to   last_committed_record_base   written_to
+   *
+   */
+
+  size_t get_used_size() const {
+    return header.used_size;
+  }
+  void set_used_size(size_t size) {
+    header.used_size = size;
+  }
+  size_t get_total_size() const {
+    return header.size;
+  }
+  rbm_abs_addr get_start_addr() const {
+    return header.start_offset;
+  }
+  size_t get_available_size() const {
+    return get_total_size() - get_used_size();
+  }
+
+  void update_applied_to(rbm_abs_addr addr, uint32_t len) {
+    rbm_abs_addr new_applied_to = addr;
+    set_used_size(
+      get_last_committed_record_base() >= new_applied_to ?
+      get_written_to() - (new_applied_to + len) :
+      get_written_to() + get_total_size() - (new_applied_to + len));
+    set_applied_to(new_applied_to + len);
+  }
+
+  write_ertr::future<> write_super();
+
+  read_record_ret return_record(record_group_header_t& header, bufferlist bl);
+
+  void set_write_pipeline(WritePipeline *_write_pipeline) final {
+    write_pipeline = _write_pipeline;
+  }
+
+  rbm_abs_addr get_written_to() const {
+    return header.written_to;
+  }
+  void set_written_to(rbm_abs_addr addr) {
+    header.written_to = addr;
+  }
+  rbm_abs_addr get_last_committed_record_base() const {
+    return header.last_committed_record_base;
+  }
+  void set_last_committed_record_base(rbm_abs_addr addr) {
+    header.last_committed_record_base = addr;
+  }
+  rbm_abs_addr get_applied_to() const {
+    return header.applied_to;
+  }
+  void set_applied_to(rbm_abs_addr addr) {
+    header.applied_to = addr;
+  }
+  device_id_t get_device_id() const {
+    return header.device_id;
+  }
+  size_t get_block_size() const {
+    return header.block_size;
+  }
+private:
+  cbj_header_t header;
+  NVMeBlockDevice* device;
+  std::string path;
+  WritePipeline *write_pipeline = nullptr;
+  bool init = false;
+};
+
+std::ostream &operator<<(std::ostream &out, const CircularBoundedJournal::cbj_header_t &header);
+}
+
+WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::journal::CircularBoundedJournal::cbj_header_t)
index 2a35c3b729d3043080e70395eb526390e84e185f..558189b37433146d395a040075ae2da455b55b94 100644 (file)
@@ -43,6 +43,10 @@ public:
     write_pipeline = _write_pipeline;
   }
 
+  journal_type get_type() final {
+    return journal_type::SEGMENT_JOURNAL;
+  }
+
 private:
   submit_record_ret do_submit_record(
     record_t &&record,
index 0500bcf540e347462d9f39a25522d4dd197f642a..df693d7970f7ed1a7917b1c8eaa84a146a5e8ff0 100644 (file)
@@ -576,8 +576,14 @@ try_decode_deltas(
       }
     }
     for (auto& i: r.extent_infos) {
-      auto& seg_addr = record_block_base.as_seg_paddr();
-      seg_addr.set_segment_off(seg_addr.get_segment_off() + i.len);
+      if (record_block_base.get_addr_type() == addr_types_t::SEGMENT) {
+       auto& seg_addr = record_block_base.as_seg_paddr();
+       seg_addr.set_segment_off(seg_addr.get_segment_off() + i.len);
+      } else if (record_block_base.get_addr_type() ==
+         addr_types_t::RANDOM_BLOCK) {
+       auto& blk_addr = record_block_base.as_blk_paddr();
+       blk_addr.set_block_off(blk_addr.get_block_off() + i.len);
+      }
     }
     ++result_iter;
   }