--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <boost/iterator/counting_iterator.hpp>
+
+#include "crimson/common/errorator-loop.h"
+#include "include/intarith.h"
+#include "crimson/os/seastore/journal/circular_bounded_journal.h"
+#include "crimson/os/seastore/logging.h"
+
+SET_SUBSYS(seastore_journal);
+
+namespace crimson::os::seastore::journal {
+
+std::ostream &operator<<(std::ostream &out,
+ const CircularBoundedJournal::cbj_header_t &header)
+{
+ return out << "cbj_header_t(magin=" << header.magic
+ << ", uuid=" << header.uuid
+ << ", block_size=" << header.block_size
+ << ", size=" << header.size
+ << ", used_size=" << header.used_size
+ << ", error=" << header.error
+ << ", start_offset=" << header.start_offset
+ << ", applied_to="<< header.applied_to
+ << ", last_committed_record_base="<< header.last_committed_record_base
+ << ", written_to=" << header.written_to
+ << ", flsg=" << header.flag
+ << ", csum_type=" << header.csum_type
+ << ", csum=" << header.csum
+ << ", start=" << header.start
+ << ", end=" << header.end
+ << ")";
+}
+
+
+CircularBoundedJournal::CircularBoundedJournal(NVMeBlockDevice* device,
+ const std::string path)
+ : device(device), path(path) {}
+
+CircularBoundedJournal::mkfs_ret
+CircularBoundedJournal::mkfs(mkfs_config_t& config)
+{
+ LOG_PREFIX(CircularBoundedJournal::mkfs);
+ return _open_device(path
+ ).safe_then([this, config, FNAME]() mutable -> mkfs_ret {
+ rbm_abs_addr start_addr = convert_paddr_to_abs_addr(
+ config.start);
+ assert(config.block_size == device->get_block_size());
+ ceph::bufferlist bl;
+ CircularBoundedJournal::cbj_header_t head;
+ head.magic = CBJOURNAL_MAGIC;
+ head.uuid = uuid_d(); // TODO
+ head.block_size = config.block_size;
+ rbm_abs_addr end_addr = convert_paddr_to_abs_addr(
+ config.end);
+ head.size = end_addr - start_addr
+ - device->get_block_size();
+ head.used_size = 0;
+ head.error = 0;
+ head.start_offset = device->get_block_size();
+ head.last_committed_record_base = 0;
+ head.written_to = head.start_offset;
+ head.applied_to = head.start_offset;
+ head.flag = 0;
+ head.csum_type = 0;
+ head.csum = 0;
+ head.cur_segment_seq = 0;
+ head.start = start_addr;
+ head.end = end_addr;
+ head.device_id = config.device_id;
+ encode(head, bl);
+ header = head;
+ DEBUG(
+ "initialize header block in CircularBoundedJournal, length {}",
+ bl.length());
+ return device_write_bl(start_addr, bl
+ ).handle_error(
+ mkfs_ertr::pass_further{},
+ crimson::ct_error::assert_all{
+ "Invalid error device_write during CircularBoundedJournal::mkfs"
+ }).safe_then([]() {
+ return mkfs_ertr::now();
+ });
+ }).handle_error(
+ mkfs_ertr::pass_further{},
+ crimson::ct_error::assert_all{
+ "Invalid error _open_device in CircularBoundedJournal::mkfs"
+ }).finally([this] {
+ if (device) {
+ return device->close();
+ } else {
+ return seastar::now();
+ }
+ });
+}
+
+CircularBoundedJournal::open_for_write_ertr::future<>
+CircularBoundedJournal::_open_device(const std::string path)
+{
+ ceph_assert(device);
+ return device->open(path, seastar::open_flags::rw
+ ).handle_error(
+ open_for_write_ertr::pass_further{},
+ crimson::ct_error::assert_all{
+ "Invalid error device->open"
+ }
+ );
+}
+
+ceph::bufferlist CircularBoundedJournal::encode_super()
+{
+ bufferlist bl;
+ encode(header, bl);
+ return bl;
+}
+
+CircularBoundedJournal::open_for_write_ret CircularBoundedJournal::open_for_write()
+{
+ return open_for_write(CBJOURNAL_START_ADDRESS);
+}
+
+CircularBoundedJournal::close_ertr::future<> CircularBoundedJournal::close()
+{
+ return write_super(
+ ).safe_then([this]() -> close_ertr::future<> {
+ init = false;
+ return device->close();
+ }).handle_error(
+ open_for_write_ertr::pass_further{},
+ crimson::ct_error::assert_all{
+ "Invalid error write_super"
+ }
+ );
+}
+
+CircularBoundedJournal::open_for_write_ret
+CircularBoundedJournal::open_for_write(rbm_abs_addr start)
+{
+ LOG_PREFIX(CircularBoundedJournal::open_for_write);
+ if (init) {
+ paddr_t paddr = convert_abs_addr_to_paddr(
+ get_written_to(),
+ header.device_id);
+ return open_for_write_ret(
+ open_for_write_ertr::ready_future_marker{},
+ journal_seq_t{
+ header.cur_segment_seq,
+ paddr
+ });
+ }
+ return _open_device(path
+ ).safe_then([this, start, FNAME]() {
+ return read_super(start
+ ).handle_error(
+ open_for_write_ertr::pass_further{},
+ crimson::ct_error::assert_all{
+ "Invalid error read_super"
+ }).safe_then([this, FNAME](auto p) mutable {
+ auto &[head, bl] = *p;
+ header = head;
+ DEBUG("super : {}", header);
+ paddr_t paddr = convert_abs_addr_to_paddr(
+ get_written_to(),
+ header.device_id);
+ init = true;
+ return open_for_write_ret(
+ open_for_write_ertr::ready_future_marker{},
+ journal_seq_t{
+ header.cur_segment_seq,
+ paddr
+ });
+ });
+ }).handle_error(
+ open_for_write_ertr::pass_further{},
+ crimson::ct_error::assert_all{
+ "Invalid error _open_device"
+ });
+}
+
+CircularBoundedJournal::write_ertr::future<> CircularBoundedJournal::append_record(
+ ceph::bufferlist bl,
+ rbm_abs_addr addr)
+{
+ LOG_PREFIX(CircularBoundedJournal::append_record);
+ std::vector<std::pair<rbm_abs_addr, bufferlist>> writes;
+ if (addr + bl.length() <= header.end) {
+ writes.push_back(std::make_pair(addr, bl));
+ } else {
+ // write remaining data---in this case,
+ // data is splited into two parts before due to the end of CircularBoundedJournal.
+ // the following code is to write the second part
+ bufferlist first_write, next_write;
+ first_write.substr_of(bl, 0, header.end - addr);
+ writes.push_back(std::make_pair(addr, first_write));
+ next_write.substr_of(
+ bl, first_write.length(), bl.length() - first_write.length());
+ writes.push_back(std::make_pair(get_start_addr(), next_write));
+ }
+
+ return seastar::do_with(
+ std::move(bl),
+ [this, writes=move(writes), FNAME](auto& bl) mutable
+ {
+ DEBUG("original bl length {}", bl.length());
+ return write_ertr::parallel_for_each(
+ writes,
+ [this, FNAME](auto& p) mutable
+ {
+ DEBUG(
+ "append_record: offset {}, length {}",
+ p.first,
+ p.second.length());
+ return device_write_bl(p.first, p.second
+ ).handle_error(
+ write_ertr::pass_further{},
+ crimson::ct_error::assert_all{ "Invalid error device->write" }
+ ).safe_then([]() {
+ return write_ertr::now();
+ });
+ });
+ });
+}
+
+CircularBoundedJournal::submit_record_ret CircularBoundedJournal::submit_record(
+ record_t &&record,
+ OrderingHandle &handle)
+{
+ LOG_PREFIX(CircularBoundedJournal::submit_record);
+ assert(write_pipeline);
+ auto r_size = record_group_size_t(record.size, get_block_size());
+ auto encoded_size = r_size.get_encoded_length();
+ if (get_written_to() +
+ ceph::encoded_sizeof_bounded<record_group_header_t>() > header.end) {
+ // not enough space between written_to and the end of journal,
+ // so that update used size to increase the amount of the remaing space
+ // | cbjournal |
+ // v v
+ // written_to <-> the end of journal
+ set_used_size(get_used_size() + (header.end - get_written_to()));
+ set_written_to(get_start_addr());
+ }
+ if (encoded_size > get_available_size()) {
+ ERROR(
+ "CircularBoundedJournal::submit_record: record size {}, but available size {}",
+ encoded_size,
+ get_available_size()
+ );
+ return crimson::ct_error::erange::make();
+ }
+
+ journal_seq_t j_seq {
+ header.cur_segment_seq++,
+ convert_abs_addr_to_paddr(
+ get_written_to(),
+ header.device_id)};
+ ceph::bufferlist to_write = encode_record(
+ std::move(record), device->get_block_size(),
+ j_seq, 0);
+ auto target = get_written_to();
+ if (get_written_to() + to_write.length() >= header.end) {
+ set_written_to(get_start_addr() +
+ (to_write.length() - (header.end - get_written_to())));
+ } else {
+ set_written_to(get_written_to() + to_write.length());
+ }
+ DEBUG(
+ "submit_record: mdlength {}, dlength {}, target {}",
+ r_size.get_mdlength(),
+ r_size.dlength,
+ target);
+
+ auto write_result = write_result_t{
+ j_seq,
+ (seastore_off_t)to_write.length()
+ };
+ auto write_fut = append_record(to_write, target);
+ return handle.enter(write_pipeline->device_submission
+ ).then([write_fut = std::move(write_fut)]() mutable {
+ return std::move(write_fut
+ ).handle_error(
+ write_ertr::pass_further{},
+ crimson::ct_error::assert_all{
+ "Invalid error in CircularBoundedJournal::append_record"
+ }
+ );
+ }).safe_then([this, &handle] {
+ return handle.enter(write_pipeline->finalize);
+ }).safe_then([this, target,
+ length=to_write.length(),
+ write_result,
+ r_size,
+ FNAME] {
+ DEBUG(
+ "append_record: commit target {} used_size {} written length {}",
+ target, get_used_size(), length);
+
+ set_last_committed_record_base(target);
+ set_used_size(get_used_size() + length);
+ paddr_t paddr = convert_abs_addr_to_paddr(
+ target + r_size.get_mdlength(),
+ header.device_id);
+ auto submit_result = record_locator_t{
+ paddr,
+ write_result
+ };
+ return submit_result;
+ });
+}
+
+CircularBoundedJournal::write_ertr::future<> CircularBoundedJournal::device_write_bl(
+ rbm_abs_addr offset, bufferlist &bl)
+{
+ LOG_PREFIX(CircularBoundedJournal::device_write_bl);
+ auto length = bl.length();
+ if (offset + length > header.end) {
+ return crimson::ct_error::erange::make();
+ }
+ bl.rebuild_aligned(get_block_size());
+ DEBUG(
+ "overwrite in CircularBoundedJournal, offset {}, length {}",
+ offset,
+ length);
+ auto write_length = length < get_block_size() ? get_block_size() : length;
+ auto bptr = bufferptr(ceph::buffer::create_page_aligned(write_length));
+ auto iter = bl.cbegin();
+ iter.copy(bl.length(), bptr.c_str());
+ return device->write(offset, bptr
+ ).handle_error(
+ write_ertr::pass_further{},
+ crimson::ct_error::assert_all{ "Invalid error device->write" }
+ ).safe_then([] {
+ return write_ertr::now();
+ });
+}
+
+CircularBoundedJournal::read_super_ret
+CircularBoundedJournal::read_super(rbm_abs_addr start)
+{
+ LOG_PREFIX(CircularBoundedJournal::read_super);
+ auto bptr = bufferptr(ceph::buffer::create_page_aligned(
+ device->get_block_size()));
+ return device->read(start, bptr
+ ).safe_then([start, bptr, FNAME]() mutable
+ -> read_super_ret {
+ DEBUG("read_super: reading {}", start);
+ bufferlist bl;
+ bl.append(bptr);
+ auto bp = bl.cbegin();
+ cbj_header_t cbj_header;
+ try {
+ decode(cbj_header, bp);
+ } catch (ceph::buffer::error &e) {
+ ERROR("read_super: unable to read super block");
+ return crimson::ct_error::enoent::make();
+ }
+ return read_super_ret(
+ read_super_ertr::ready_future_marker{},
+ std::make_pair(cbj_header, bl)
+ );
+ });
+}
+
+Journal::replay_ret CircularBoundedJournal::replay(
+ delta_handler_t &&delta_handler)
+{
+ /*
+ * read records from last applied record prior to written_to, and replay
+ */
+ LOG_PREFIX(CircularBoundedJournal::replay);
+ auto fut = open_for_write(CBJOURNAL_START_ADDRESS);
+ return fut.safe_then([this, FNAME, delta_handler=std::move(delta_handler)] (auto addr) {
+ if (get_used_size() == 0) {
+ return replay_ertr::now();
+ }
+ return seastar::do_with(
+ rbm_abs_addr(get_applied_to()),
+ std::move(delta_handler),
+ [this, FNAME](auto &cursor_addr, auto &d_handler) {
+ return crimson::repeat(
+ [this, &cursor_addr, &d_handler, FNAME]() mutable
+ -> replay_ertr::future<seastar::stop_iteration> {
+ paddr_t cursor_paddr = convert_abs_addr_to_paddr(
+ cursor_addr,
+ header.device_id);
+ return read_record(cursor_paddr
+ ).safe_then([this, &cursor_addr, &d_handler, FNAME](auto ret) {
+ auto [r_header, bl] = *ret;
+ bufferlist mdbuf;
+ mdbuf.substr_of(bl, 0, r_header.mdlength);
+ paddr_t record_block_base = paddr_t::make_blk_paddr(
+ header.device_id, cursor_addr + r_header.mdlength);
+ auto maybe_record_deltas_list = try_decode_deltas(
+ r_header, mdbuf, record_block_base);
+ if (!maybe_record_deltas_list) {
+ DEBUG("unable to decode deltas for record {} at {}",
+ r_header, record_block_base);
+ return replay_ertr::make_ready_future<
+ seastar::stop_iteration>(seastar::stop_iteration::yes);
+ }
+ DEBUG(" record_group_header_t: {}, cursor_addr: {} ",
+ r_header, cursor_addr);
+ auto write_result = write_result_t{
+ r_header.committed_to,
+ (seastore_off_t)bl.length()
+ };
+ cursor_addr += bl.length();
+ return seastar::do_with(
+ std::move(*maybe_record_deltas_list),
+ [write_result,
+ this,
+ &d_handler,
+ &cursor_addr,
+ FNAME](auto& record_deltas_list) {
+ return crimson::do_for_each(
+ record_deltas_list,
+ [write_result,
+ &d_handler, FNAME](record_deltas_t& record_deltas) {
+ auto locator = record_locator_t{
+ record_deltas.record_block_base,
+ write_result
+ };
+ DEBUG("processing {} deltas at block_base {}",
+ record_deltas.deltas.size(),
+ locator);
+ return crimson::do_for_each(
+ record_deltas.deltas,
+ [locator,
+ &d_handler](auto& p) {
+ auto& commit_time = p.first;
+ auto& delta = p.second;
+ return d_handler(locator,
+ delta,
+ seastar::lowres_system_clock::time_point(
+ seastar::lowres_system_clock::duration(commit_time))
+ );
+ });
+ }).safe_then([this, &cursor_addr]() {
+ if (cursor_addr >= header.end) {
+ cursor_addr = (cursor_addr - header.end) + get_start_addr();
+ }
+ if (get_written_to() +
+ ceph::encoded_sizeof_bounded<record_group_header_t>() > header.end) {
+ cursor_addr = get_start_addr();
+ }
+ if (cursor_addr == get_written_to()) {
+ return replay_ertr::make_ready_future<
+ seastar::stop_iteration>(seastar::stop_iteration::yes);
+ }
+ return replay_ertr::make_ready_future<
+ seastar::stop_iteration>(seastar::stop_iteration::no);
+ });
+ });
+ });
+ });
+ });
+ });
+}
+
+CircularBoundedJournal::read_record_ret
+CircularBoundedJournal::return_record(record_group_header_t& header, bufferlist bl)
+{
+ LOG_PREFIX(CircularBoundedJournal::return_record);
+ bufferlist md_bl, data_bl;
+ md_bl.substr_of(bl, 0, get_block_size());
+ data_bl.substr_of(bl, header.mdlength, header.dlength);
+ if (validate_records_metadata(md_bl) &&
+ validate_records_data(header, data_bl)) {
+ return read_record_ret(
+ read_record_ertr::ready_future_marker{},
+ std::make_pair(header, std::move(bl)));
+ } else {
+ DEBUG("invalid matadata");
+ return read_record_ret(
+ read_record_ertr::ready_future_marker{},
+ std::nullopt);
+ }
+}
+
+CircularBoundedJournal::read_record_ret CircularBoundedJournal::read_record(paddr_t off)
+{
+ LOG_PREFIX(CircularBoundedJournal::read_record);
+ rbm_abs_addr offset = convert_paddr_to_abs_addr(
+ off);
+ rbm_abs_addr addr = offset;
+ auto read_length = get_block_size();
+ if (addr + get_block_size() > header.end) {
+ addr = get_start_addr();
+ read_length = header.end - offset;
+ }
+ DEBUG("read_record: reading record from abs addr {} read length {}",
+ addr, read_length);
+ auto bptr = bufferptr(ceph::buffer::create_page_aligned(read_length));
+ bptr.zero();
+ return device->read(addr, bptr
+ ).safe_then([this, addr, read_length, bptr, FNAME]() mutable
+ -> read_record_ret {
+ record_group_header_t h;
+ bufferlist bl;
+ bl.append(bptr);
+ auto bp = bl.cbegin();
+ try {
+ decode(h, bp);
+ } catch (ceph::buffer::error &e) {
+ return read_record_ret(
+ read_record_ertr::ready_future_marker{},
+ std::nullopt);
+ }
+ /*
+ * | journal |
+ * | record 1 header | | record 1 data
+ * record 1 data (remaining) |
+ *
+ * <---- 1 block ----><--
+ * -- 2 block --->
+ *
+ * If record has logner than read_length and its data is located across
+ * the end of journal and the begining of journal, we need three reads
+ * ---reads of header, other remaining data before the end, and
+ * the other remaining data from the begining.
+ *
+ */
+ if (h.mdlength + h.dlength > read_length) {
+ rbm_abs_addr next_read_addr = addr + read_length;
+ auto next_read = h.mdlength + h.dlength - read_length;
+ DEBUG(" next_read_addr {}, next_read_length {} ",
+ next_read_addr, next_read);
+ if (header.end < next_read_addr + next_read) {
+ // In this case, need two more reads.
+ // The first is to read remain bytes to the end of cbjournal
+ // The second is to read the data at the begining of cbjournal
+ next_read = header.end - (addr + read_length);
+ }
+ DEBUG("read_entry: additional reading addr {} length {}",
+ next_read_addr,
+ next_read);
+ auto next_bptr = bufferptr(ceph::buffer::create_page_aligned(next_read));
+ next_bptr.zero();
+ return device->read(
+ next_read_addr,
+ next_bptr
+ ).safe_then([this, h=h, next_bptr=std::move(next_bptr), bl=std::move(bl),
+ FNAME]() mutable {
+ bl.append(next_bptr);
+ if (h.mdlength + h.dlength == bl.length()) {
+ DEBUG("read_record: record length {} done", bl.length());
+ return return_record(h, bl);
+ }
+ // need one more read
+ auto next_read_addr = get_start_addr();
+ auto last_bptr = bufferptr(ceph::buffer::create_page_aligned(
+ h.mdlength + h.dlength - bl.length()));
+ DEBUG("read_record: last additional reading addr {} length {}",
+ next_read_addr,
+ h.mdlength + h.dlength - bl.length());
+ return device->read(
+ next_read_addr,
+ last_bptr
+ ).safe_then([this, h=h, last_bptr=std::move(last_bptr),
+ bl=std::move(bl), FNAME]() mutable {
+ bl.append(last_bptr);
+ DEBUG("read_record: complte size {}", bl.length());
+ return return_record(h, bl);
+ });
+ });
+ } else {
+ DEBUG("read_record: complte size {}", bl.length());
+ return return_record(h, bl);
+ }
+ });
+}
+
+CircularBoundedJournal::write_ertr::future<>
+CircularBoundedJournal::write_super()
+{
+ LOG_PREFIX(CircularBoundedJournal::write_super);
+ ceph::bufferlist bl;
+ try {
+ bl = encode_super();
+ } catch (ceph::buffer::error &e) {
+ DEBUG("unable to encode super block from underlying deivce");
+ return crimson::ct_error::input_output_error::make();
+ }
+ DEBUG(
+ "sync header of CircularBoundedJournal, length {}",
+ bl.length());
+ return device_write_bl(header.start, bl);
+}
+
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "crimson/common/log.h"
+
+#include <boost/intrusive_ptr.hpp>
+
+#include <seastar/core/future.hh>
+
+#include "include/ceph_assert.h"
+#include "include/buffer.h"
+#include "include/denc.h"
+
+#include "crimson/osd/exceptions.h"
+#include "crimson/os/seastore/journal.h"
+#include "include/uuid.h"
+#include "crimson/os/seastore/random_block_manager.h"
+#include "crimson/os/seastore/random_block_manager/nvmedevice.h"
+#include <list>
+
+
+namespace crimson::os::seastore::journal {
+
+constexpr rbm_abs_addr CBJOURNAL_START_ADDRESS = 0;
+constexpr uint64_t CBJOURNAL_MAGIC = 0xCCCC;
+using NVMeBlockDevice = nvme_device::NVMeBlockDevice;
+
+/**
+ * CircularBoundedJournal
+ *
+ * TODO: move record from CircularBoundedJournal to RandomBlockManager
+ *
+ */
+
+constexpr uint64_t DEFAULT_SIZE = 1 << 26;
+constexpr uint64_t DEFAULT_BLOCK_SIZE = 4096;
+
+class CircularBoundedJournal : public Journal {
+public:
+ struct mkfs_config_t {
+ std::string path;
+ paddr_t start;
+ paddr_t end;
+ size_t block_size = 0;
+ size_t total_size = 0;
+ device_id_t device_id = 0;
+ seastore_meta_t meta;
+ static mkfs_config_t get_default() {
+ device_id_t d_id = 1 << (std::numeric_limits<device_id_t>::digits - 1);
+ return mkfs_config_t {
+ "",
+ paddr_t::make_blk_paddr(d_id, 0),
+ paddr_t::make_blk_paddr(d_id, DEFAULT_SIZE),
+ DEFAULT_BLOCK_SIZE,
+ DEFAULT_SIZE,
+ d_id,
+ seastore_meta_t {}
+ };
+ }
+ };
+
+ CircularBoundedJournal(NVMeBlockDevice* device, const std::string path);
+ ~CircularBoundedJournal() {}
+
+ open_for_write_ret open_for_write() final;
+ open_for_write_ret open_for_write(rbm_abs_addr start);
+ close_ertr::future<> close() final;
+
+ journal_type get_type() final {
+ return journal_type::CIRCULARBOUNDED_JOURNAL;
+ }
+
+ submit_record_ret submit_record(
+ record_t &&record,
+ OrderingHandle &handle
+ ) final;
+
+ seastar::future<> flush(
+ OrderingHandle &handle
+ ) final {
+ // TODO
+ return seastar::now();
+ }
+
+ replay_ret replay(delta_handler_t &&delta_handler);
+
+ open_for_write_ertr::future<> _open_device(const std::string path);
+
+ struct cbj_header_t;
+ using write_ertr = crimson::errorator<
+ crimson::ct_error::input_output_error,
+ crimson::ct_error::erange>;
+ /*
+ * append_record
+ *
+ * append data to current write position of CircularBoundedJournal
+ *
+ * @param bufferlist to write
+ * @param rbm_abs_addr where data is written
+ *
+ */
+ write_ertr::future<> append_record(ceph::bufferlist bl, rbm_abs_addr addr);
+ /*
+ * device_write_bl
+ *
+ * @param device address to write
+ * @param bufferlist to write
+ *
+ */
+ write_ertr::future<> device_write_bl(rbm_abs_addr offset, ceph::bufferlist &bl);
+
+ using read_ertr = crimson::errorator<
+ crimson::ct_error::input_output_error,
+ crimson::ct_error::invarg,
+ crimson::ct_error::enoent,
+ crimson::ct_error::erange>;
+ using read_record_ertr = read_ertr;
+ using read_record_ret = read_record_ertr::future<
+ std::optional<std::pair<record_group_header_t, bufferlist>>
+ >;
+ using read_super_ertr = read_ertr;
+ using read_super_ret = read_super_ertr::future<
+ std::optional<std::pair<cbj_header_t, bufferlist>>
+ >;
+ /*
+ * read_record
+ *
+ * read record from given address
+ *
+ * @param paddr_t to read
+ *
+ */
+ read_record_ret read_record(paddr_t offset);
+ /*
+ * read_super
+ *
+ * read super block from given absolute address
+ *
+ * @param absolute address
+ *
+ */
+ read_super_ret read_super(rbm_abs_addr start);
+
+ ceph::bufferlist encode_super();
+
+ using mkfs_ertr = crimson::errorator<
+ crimson::ct_error::input_output_error,
+ crimson::ct_error::invarg
+ >;
+ using mkfs_ret = mkfs_ertr::future<>;
+
+ /*
+ * mkfs
+ *
+ * make a new journal layout even if old journal exists
+ *
+ * @param mkfs_config_t
+ *
+ */
+ mkfs_ret mkfs(mkfs_config_t& config);
+
+
+ /**
+ * CircularBoundedJournal structure
+ *
+ * +-------------------------------------------------------+
+ * | header | record | record | record | record | ... |
+ * +-------------------------------------------------------+
+ * ^-----------block aligned-----------------^
+ * <----fixed---->
+ */
+
+
+ /**
+ *
+ * CircularBoundedJournal write
+ *
+ * NVMe will support a large block write (< 512KB) with atomic write unit command.
+ * With this command, we expect that the most of incoming data can be stored
+ * as a single write call, which has lower overhead than existing
+ * way that uses a combination of system calls such as write() and sync().
+ *
+ */
+
+ struct cbj_header_t {
+ uint64_t magic;
+ uuid_d uuid;
+ uint64_t block_size; // aligned with block_size
+ uint64_t size; // max length of journal
+ uint64_t used_size; // current used_size of journal
+ uint32_t error; // reserved
+
+ rbm_abs_addr start_offset; // start offset of CircularBoundedJournal
+ rbm_abs_addr last_committed_record_base;
+ rbm_abs_addr written_to;
+ rbm_abs_addr applied_to;
+
+ uint64_t flag; // represent features (reserved)
+ uint8_t csum_type; // type of checksum algoritghm used in cbj_header_t
+ uint64_t csum; // checksum of entire cbj_header_t
+ uint32_t cur_segment_seq;
+
+ rbm_abs_addr start; // start address of the device
+ rbm_abs_addr end; // start address of the device
+ device_id_t device_id;
+
+ DENC(cbj_header_t, v, p) {
+ DENC_START(1, 1, p);
+ denc(v.magic, p);
+ denc(v.uuid, p);
+ denc(v.block_size, p);
+ denc(v.size, p);
+ denc(v.used_size, p);
+ denc(v.error, p);
+
+ denc(v.start_offset, p);
+
+ denc(v.last_committed_record_base, p);
+ denc(v.written_to, p);
+ denc(v.applied_to, p);
+
+ denc(v.flag, p);
+ denc(v.csum_type, p);
+ denc(v.csum, p);
+ denc(v.cur_segment_seq, p);
+ denc(v.start, p);
+ denc(v.end, p);
+ denc(v.device_id, p);
+
+ DENC_FINISH(p);
+ }
+ };
+
+ /**
+ *
+ * Write position for CircularBoundedJournal
+ *
+ * | written to rbm | written length to CircularBoundedJournal | new write |
+ * ----------------->----------------------------------->------------>
+ * ^ ^ ^
+ * applied_to last_committed_record_base written_to
+ *
+ */
+
+ size_t get_used_size() const {
+ return header.used_size;
+ }
+ void set_used_size(size_t size) {
+ header.used_size = size;
+ }
+ size_t get_total_size() const {
+ return header.size;
+ }
+ rbm_abs_addr get_start_addr() const {
+ return header.start_offset;
+ }
+ size_t get_available_size() const {
+ return get_total_size() - get_used_size();
+ }
+
+ void update_applied_to(rbm_abs_addr addr, uint32_t len) {
+ rbm_abs_addr new_applied_to = addr;
+ set_used_size(
+ get_last_committed_record_base() >= new_applied_to ?
+ get_written_to() - (new_applied_to + len) :
+ get_written_to() + get_total_size() - (new_applied_to + len));
+ set_applied_to(new_applied_to + len);
+ }
+
+ write_ertr::future<> write_super();
+
+ read_record_ret return_record(record_group_header_t& header, bufferlist bl);
+
+ void set_write_pipeline(WritePipeline *_write_pipeline) final {
+ write_pipeline = _write_pipeline;
+ }
+
+ rbm_abs_addr get_written_to() const {
+ return header.written_to;
+ }
+ void set_written_to(rbm_abs_addr addr) {
+ header.written_to = addr;
+ }
+ rbm_abs_addr get_last_committed_record_base() const {
+ return header.last_committed_record_base;
+ }
+ void set_last_committed_record_base(rbm_abs_addr addr) {
+ header.last_committed_record_base = addr;
+ }
+ rbm_abs_addr get_applied_to() const {
+ return header.applied_to;
+ }
+ void set_applied_to(rbm_abs_addr addr) {
+ header.applied_to = addr;
+ }
+ device_id_t get_device_id() const {
+ return header.device_id;
+ }
+ size_t get_block_size() const {
+ return header.block_size;
+ }
+private:
+ cbj_header_t header;
+ NVMeBlockDevice* device;
+ std::string path;
+ WritePipeline *write_pipeline = nullptr;
+ bool init = false;
+};
+
+std::ostream &operator<<(std::ostream &out, const CircularBoundedJournal::cbj_header_t &header);
+}
+
+WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::journal::CircularBoundedJournal::cbj_header_t)