From 84f64ab494a376e93b6acc2d2ca7265d91541250 Mon Sep 17 00:00:00 2001 From: myoungwon oh Date: Thu, 6 Apr 2023 01:56:10 +0000 Subject: [PATCH] crimson/os/seastore/journal: add CircularJournalSpace Signed-off-by: Myoungwon Oh (cherry picked from commit 59cb8c1050b08bd3fa002c0c401605b977631be9) --- src/crimson/os/seastore/CMakeLists.txt | 1 + .../journal/circular_bounded_journal.cc | 313 +++++------------- .../journal/circular_bounded_journal.h | 201 +++-------- .../journal/circular_journal_space.cc | 232 +++++++++++++ .../seastore/journal/circular_journal_space.h | 259 +++++++++++++++ 5 files changed, 614 insertions(+), 392 deletions(-) create mode 100644 src/crimson/os/seastore/journal/circular_journal_space.cc create mode 100644 src/crimson/os/seastore/journal/circular_journal_space.h diff --git a/src/crimson/os/seastore/CMakeLists.txt b/src/crimson/os/seastore/CMakeLists.txt index d26ae813afe..6dd19a1563a 100644 --- a/src/crimson/os/seastore/CMakeLists.txt +++ b/src/crimson/os/seastore/CMakeLists.txt @@ -44,6 +44,7 @@ set(crimson_seastore_srcs journal/segmented_journal.cc journal/segment_allocator.cc journal/record_submitter.cc + journal/circular_journal_space.cc journal.cc device.cc segment_manager_group.cc diff --git a/src/crimson/os/seastore/journal/circular_bounded_journal.cc b/src/crimson/os/seastore/journal/circular_bounded_journal.cc index 1f9762bdbb2..4a493b22ceb 100644 --- a/src/crimson/os/seastore/journal/circular_bounded_journal.cc +++ b/src/crimson/os/seastore/journal/circular_bounded_journal.cc @@ -8,240 +8,106 @@ #include "crimson/os/seastore/async_cleaner.h" #include "crimson/os/seastore/journal/circular_bounded_journal.h" #include "crimson/os/seastore/logging.h" +#include "crimson/os/seastore/journal/circular_journal_space.h" SET_SUBSYS(seastore_journal); namespace crimson::os::seastore::journal { -std::ostream &operator<<(std::ostream &out, - const CircularBoundedJournal::cbj_header_t &header) -{ - return out << "cbj_header_t(" - << ", dirty_tail=" << header.dirty_tail - << ", alloc_tail=" << header.alloc_tail - << ")"; -} - CircularBoundedJournal::CircularBoundedJournal( JournalTrimmer &trimmer, RBMDevice* device, const std::string &path) - : trimmer(trimmer), device(device), path(path) {} - -ceph::bufferlist CircularBoundedJournal::encode_header() -{ - bufferlist bl; - encode(header, bl); - auto header_crc_filler = bl.append_hole(sizeof(checksum_t)); - auto bliter = bl.cbegin(); - auto header_crc = bliter.crc32c( - ceph::encoded_sizeof_bounded(), - -1); - ceph_le32 header_crc_le; - header_crc_le = header_crc; - header_crc_filler.copy_in( - sizeof(checksum_t), - reinterpret_cast(&header_crc_le)); - return bl; -} + : trimmer(trimmer), path(path), + cjs(device), + record_submitter(crimson::common::get_conf( + "seastore_journal_iodepth_limit"), + crimson::common::get_conf( + "seastore_journal_batch_capacity"), + crimson::common::get_conf( + "seastore_journal_batch_flush_size"), + crimson::common::get_conf( + "seastore_journal_batch_preferred_fullness"), + cjs) + {} CircularBoundedJournal::open_for_mkfs_ret CircularBoundedJournal::open_for_mkfs() { - LOG_PREFIX(CircularBoundedJournal::open_for_mkfs); - assert(device); - ceph::bufferlist bl; - CircularBoundedJournal::cbj_header_t head; - assert(device->get_journal_size()); - head.dirty_tail = - journal_seq_t{0, - convert_abs_addr_to_paddr( - get_records_start(), - device->get_device_id())}; - head.alloc_tail = head.dirty_tail; - encode(head, bl); - header = head; - set_written_to(head.dirty_tail); - initialized = true; - DEBUG( - "initialize header block in CircularBoundedJournal, length {}", - bl.length()); - return write_header( - ).safe_then([this]() { - return open_for_mount(); - }).handle_error( - open_for_mkfs_ertr::pass_further{}, - crimson::ct_error::assert_all{ - "Invalid error write_header" - } - ); + return record_submitter.open(true + ).safe_then([this](auto ret) { + record_submitter.update_committed_to(get_written_to()); + return open_for_mkfs_ret( + open_for_mkfs_ertr::ready_future_marker{}, + get_written_to()); + }); } CircularBoundedJournal::open_for_mount_ret CircularBoundedJournal::open_for_mount() { - ceph_assert(initialized); - if (written_to.segment_seq == NULL_SEG_SEQ) { - written_to.segment_seq = 0; - } - return open_for_mount_ret( - open_for_mount_ertr::ready_future_marker{}, - get_written_to()); + return record_submitter.open(false + ).safe_then([this](auto ret) { + record_submitter.update_committed_to(get_written_to()); + return open_for_mount_ret( + open_for_mount_ertr::ready_future_marker{}, + get_written_to()); + }); } CircularBoundedJournal::close_ertr::future<> CircularBoundedJournal::close() { - return write_header( - ).safe_then([this]() -> close_ertr::future<> { - initialized = false; - return close_ertr::now(); - }).handle_error( - open_for_mount_ertr::pass_further{}, - crimson::ct_error::assert_all{ - "Invalid error write_header" - } - ); + return record_submitter.close(); } -CircularBoundedJournal::submit_record_ret CircularBoundedJournal::submit_record( - record_t &&record, - OrderingHandle &handle) +CircularBoundedJournal::submit_record_ret +CircularBoundedJournal::submit_record( + record_t &&record, + OrderingHandle &handle) { LOG_PREFIX(CircularBoundedJournal::submit_record); + DEBUG("H{} {} start ...", (void*)&handle, record); assert(write_pipeline); - assert(written_to.segment_seq != NULL_SEG_SEQ); - auto r_size = record_group_size_t(record.size, get_block_size()); - auto encoded_size = r_size.get_encoded_length(); - if (encoded_size > get_records_available_size()) { - ERROR("record size {}, but available size {}", - encoded_size, get_records_available_size()); - return crimson::ct_error::erange::make(); - } - if (encoded_size + get_rbm_addr(get_written_to()) > get_journal_end()) { - DEBUG("roll"); - paddr_t paddr = convert_abs_addr_to_paddr( - get_records_start(), - get_device_id()); - set_written_to( - journal_seq_t{++written_to.segment_seq, paddr}); - if (encoded_size > get_records_available_size()) { - ERROR("rolled, record size {}, but available size {}", - encoded_size, get_records_available_size()); - return crimson::ct_error::erange::make(); - } - } - - journal_seq_t j_seq = get_written_to(); - ceph::bufferlist to_write = encode_record( - std::move(record), device->get_block_size(), - j_seq, 0); - assert(to_write.length() == encoded_size); - auto target = get_rbm_addr(get_written_to()); - auto new_written_to = target + encoded_size; - if (new_written_to >= get_journal_end()) { - assert(new_written_to == get_journal_end()); - DEBUG("roll"); - paddr_t paddr = convert_abs_addr_to_paddr( - get_records_start(), - get_device_id()); - set_written_to( - journal_seq_t{++written_to.segment_seq, paddr}); - } else { - paddr_t paddr = convert_abs_addr_to_paddr( - new_written_to, - get_device_id()); - set_written_to( - journal_seq_t{written_to.segment_seq, paddr}); - } - DEBUG("{}, target {}", r_size, target); - - auto write_result = write_result_t{ - j_seq, - encoded_size - }; - auto write_fut = device_write_bl(target, to_write); - return handle.enter(write_pipeline->device_submission - ).then([write_fut = std::move(write_fut)]() mutable { - return std::move(write_fut); - }).safe_then([this, &handle] { - return handle.enter(write_pipeline->finalize); - }).safe_then([this, target, - length=encoded_size, - write_result, - r_size, - FNAME] { - DEBUG("commit target {} used_size {} written length {}", - target, get_records_used_size(), length); - - paddr_t paddr = convert_abs_addr_to_paddr( - target + r_size.get_mdlength(), - get_device_id()); - auto submit_result = record_locator_t{ - paddr, - write_result - }; - trimmer.set_journal_head(write_result.start_seq); - return submit_result; - }); + return do_submit_record(std::move(record), handle); } -CircularBoundedJournal::write_ertr::future<> CircularBoundedJournal::device_write_bl( - rbm_abs_addr offset, bufferlist &bl) +CircularBoundedJournal::submit_record_ret +CircularBoundedJournal::do_submit_record( + record_t &&record, + OrderingHandle &handle) { - LOG_PREFIX(CircularBoundedJournal::device_write_bl); - auto length = bl.length(); - if (offset + length > get_journal_end()) { - return crimson::ct_error::erange::make(); + LOG_PREFIX(CircularBoundedJournal::do_submit_record); + if (!record_submitter.is_available()) { + DEBUG("H{} wait ...", (void*)&handle); + return record_submitter.wait_available( + ).safe_then([this, record=std::move(record), &handle]() mutable { + return do_submit_record(std::move(record), handle); + }); + } + auto action = record_submitter.check_action(record.size); + if (action == RecordSubmitter::action_t::ROLL) { + return record_submitter.roll_segment( + ).safe_then([this, record=std::move(record), &handle]() mutable { + return do_submit_record(std::move(record), handle); + }); } - DEBUG( - "overwrite in CircularBoundedJournal, offset {}, length {}", - offset, - length); - return device->writev(offset, bl - ).handle_error( - write_ertr::pass_further{}, - crimson::ct_error::assert_all{ "Invalid error device->write" } - ); -} -CircularBoundedJournal::read_header_ret -CircularBoundedJournal::read_header() -{ - LOG_PREFIX(CircularBoundedJournal::read_header); - assert(device); - auto bptr = bufferptr(ceph::buffer::create_page_aligned( - device->get_block_size())); - DEBUG("reading {}", device->get_journal_start()); - return device->read(device->get_journal_start(), bptr - ).safe_then([bptr, FNAME]() mutable - -> read_header_ret { - bufferlist bl; - bl.append(bptr); - auto bp = bl.cbegin(); - cbj_header_t cbj_header; - try { - decode(cbj_header, bp); - } catch (ceph::buffer::error &e) { - ERROR("unable to read header block"); - return crimson::ct_error::enoent::make(); - } - auto bliter = bl.cbegin(); - auto test_crc = bliter.crc32c( - ceph::encoded_sizeof_bounded(), - -1); - ceph_le32 recorded_crc_le; - decode(recorded_crc_le, bliter); - uint32_t recorded_crc = recorded_crc_le; - if (test_crc != recorded_crc) { - ERROR("error, header crc mismatch."); - return read_header_ret( - read_header_ertr::ready_future_marker{}, - std::nullopt); - } - return read_header_ret( - read_header_ertr::ready_future_marker{}, - std::make_pair(cbj_header, bl) - ); + DEBUG("H{} submit {} ...", + (void*)&handle, + action == RecordSubmitter::action_t::SUBMIT_FULL ? + "FULL" : "NOT_FULL"); + auto submit_fut = record_submitter.submit(std::move(record)); + return handle.enter(write_pipeline->device_submission + ).then([submit_fut=std::move(submit_fut)]() mutable { + return std::move(submit_fut); + }).safe_then([FNAME, this, &handle](record_locator_t result) { + return handle.enter(write_pipeline->finalize + ).then([FNAME, this, result, &handle] { + DEBUG("H{} finish with {}", (void*)&handle, result); + auto new_committed_to = result.write_result.get_end_seq(); + record_submitter.update_committed_to(new_committed_to); + return result; + }); }); } @@ -356,7 +222,7 @@ Journal::replay_ret CircularBoundedJournal::replay( * read records from last applied record prior to written_to, and replay */ LOG_PREFIX(CircularBoundedJournal::replay); - return read_header( + return cjs.read_header( ).handle_error( open_for_mount_ertr::pass_further{}, crimson::ct_error::assert_all{ @@ -364,9 +230,9 @@ Journal::replay_ret CircularBoundedJournal::replay( }).safe_then([this, FNAME, delta_handler=std::move(delta_handler)](auto p) mutable { auto &[head, bl] = *p; - header = head; - DEBUG("header : {}", header); - initialized = true; + cjs.set_cbj_header(head); + DEBUG("header : {}", cjs.get_cbj_header()); + cjs.set_initialized(true); return seastar::do_with( std::move(delta_handler), std::map(), @@ -387,7 +253,6 @@ Journal::replay_ret CircularBoundedJournal::replay( } return replay_ertr::make_ready_future(true); }; - written_to.segment_seq = NULL_SEG_SEQ; auto tail = get_dirty_tail() <= get_alloc_tail() ? get_dirty_tail() : get_alloc_tail(); set_written_to(tail); @@ -405,8 +270,8 @@ Journal::replay_ret CircularBoundedJournal::replay( return d_handler( offsets, e, - header.dirty_tail, - header.alloc_tail, + get_dirty_tail(), + get_alloc_tail(), modify_time ); } @@ -416,9 +281,10 @@ Journal::replay_ret CircularBoundedJournal::replay( return scan_valid_record_delta(std::move(call_d_handler_if_valid), tail); }); }).safe_then([this]() { + record_submitter.update_committed_to(get_written_to()); trimmer.update_journal_tails( - header.dirty_tail, - header.alloc_tail); + get_dirty_tail(), + get_alloc_tail()); }); }); } @@ -454,7 +320,7 @@ CircularBoundedJournal::read_record(paddr_t off, segment_seq_t expected_seq) assert(addr + read_length <= get_journal_end()); DEBUG("reading record from abs addr {} read length {}", addr, read_length); auto bptr = bufferptr(ceph::buffer::create_page_aligned(read_length)); - return device->read(addr, bptr + return cjs.read(addr, bptr ).safe_then([this, addr, bptr, expected_seq, FNAME]() mutable -> read_record_ret { record_group_header_t h; @@ -486,7 +352,7 @@ CircularBoundedJournal::read_record(paddr_t off, segment_seq_t expected_seq) auto next_bptr = bufferptr(ceph::buffer::create_page_aligned(next_length)); DEBUG("reading record part 2 from abs addr {} read length {}", next_addr, next_length); - return device->read(next_addr, next_bptr + return cjs.read(next_addr, next_bptr ).safe_then([this, h, next_bptr=std::move(next_bptr), bl=std::move(bl)]() mutable { bl.append(next_bptr); return return_record(h, bl); @@ -498,26 +364,6 @@ CircularBoundedJournal::read_record(paddr_t off, segment_seq_t expected_seq) }); } -CircularBoundedJournal::write_ertr::future<> -CircularBoundedJournal::write_header() -{ - LOG_PREFIX(CircularBoundedJournal::write_header); - ceph::bufferlist bl = encode_header(); - ceph_assert(bl.length() <= get_block_size()); - DEBUG( - "sync header of CircularBoundedJournal, length {}", - bl.length()); - assert(device); - auto iter = bl.begin(); - assert(bl.length() < get_block_size()); - bufferptr bp = bufferptr(ceph::buffer::create_page_aligned(get_block_size())); - iter.copy(bl.length(), bp.c_str()); - return device->write(device->get_journal_start(), std::move(bp) - ).handle_error( - write_ertr::pass_further{}, - crimson::ct_error::assert_all{ "Invalid error device->write" } - ); -} seastar::future<> CircularBoundedJournal::finish_commit(transaction_type_t type) { if (is_trim_transaction(type)) { return update_journal_tail( @@ -527,5 +373,4 @@ seastar::future<> CircularBoundedJournal::finish_commit(transaction_type_t type) return seastar::now(); } - } diff --git a/src/crimson/os/seastore/journal/circular_bounded_journal.h b/src/crimson/os/seastore/journal/circular_bounded_journal.h index b7117df5043..58bfce4873e 100644 --- a/src/crimson/os/seastore/journal/circular_bounded_journal.h +++ b/src/crimson/os/seastore/journal/circular_bounded_journal.h @@ -19,6 +19,8 @@ #include "crimson/os/seastore/random_block_manager.h" #include "crimson/os/seastore/random_block_manager/rbm_device.h" #include +#include "crimson/os/seastore/journal/record_submitter.h" +#include "crimson/os/seastore/journal/circular_journal_space.h" namespace crimson::os::seastore::journal { @@ -87,16 +89,32 @@ public: replay_ret replay(delta_handler_t &&delta_handler) final; - struct cbj_header_t; - using write_ertr = submit_record_ertr; - /* - * device_write_bl + rbm_abs_addr get_rbm_addr(journal_seq_t seq) const { + return convert_paddr_to_abs_addr(seq.offset); + } + + /** * - * @param device address to write - * @param bufferlist to write + * CircularBoundedJournal write + * + * NVMe will support a large block write (< 512KB) with atomic write unit command. + * With this command, we expect that the most of incoming data can be stored + * as a single write call, which has lower overhead than existing + * way that uses a combination of system calls such as write() and sync(). * */ - write_ertr::future<> device_write_bl(rbm_abs_addr offset, ceph::bufferlist &bl); + + seastar::future<> update_journal_tail( + journal_seq_t dirty, + journal_seq_t alloc) { + return cjs.update_journal_tail(dirty, alloc); + } + journal_seq_t get_dirty_tail() const { + return cjs.get_dirty_tail(); + } + journal_seq_t get_alloc_tail() const { + return cjs.get_alloc_tail(); + } using read_ertr = crimson::errorator< crimson::ct_error::input_output_error, @@ -107,10 +125,6 @@ public: using read_record_ret = read_record_ertr::future< std::optional> >; - using read_header_ertr = read_ertr; - using read_header_ret = read_header_ertr::future< - std::optional> - >; /* * read_record * @@ -121,84 +135,6 @@ public: * */ read_record_ret read_record(paddr_t offset, segment_seq_t expected_seq); - /* - * read_header - * - * read header block from given absolute address - * - * @param absolute address - * - */ - read_header_ret read_header(); - - ceph::bufferlist encode_header(); - - - /** - * CircularBoundedJournal structure - * - * +-------------------------------------------------------+ - * | header | record | record | record | record | ... | - * +-------------------------------------------------------+ - * ^-----------block aligned-----------------^ - * <----fixed----> - */ - - - /** - * - * CircularBoundedJournal write - * - * NVMe will support a large block write (< 512KB) with atomic write unit command. - * With this command, we expect that the most of incoming data can be stored - * as a single write call, which has lower overhead than existing - * way that uses a combination of system calls such as write() and sync(). - * - */ - - struct cbj_header_t { - // start offset of CircularBoundedJournal in the device - journal_seq_t dirty_tail; - journal_seq_t alloc_tail; - - DENC(cbj_header_t, v, p) { - DENC_START(1, 1, p); - denc(v.dirty_tail, p); - denc(v.alloc_tail, p); - DENC_FINISH(p); - } - }; - - /** - * - * Write position for CircularBoundedJournal - * - * | written to rbm | written length to CircularBoundedJournal | new write | - * ----------------->------------------------------------------------> - * ^ ^ - * applied_to written_to - * - */ - - seastar::future<> update_journal_tail( - journal_seq_t dirty, - journal_seq_t alloc) { - header.dirty_tail = dirty; - header.alloc_tail = alloc; - return write_header( - ).handle_error( - crimson::ct_error::assert_all{ - "encountered invalid error in update_journal_tail" - }); - } - journal_seq_t get_dirty_tail() const { - return header.dirty_tail; - } - journal_seq_t get_alloc_tail() const { - return header.alloc_tail; - } - - write_ertr::future<> write_header(); read_record_ret return_record(record_group_header_t& header, bufferlist bl); @@ -206,74 +142,29 @@ public: write_pipeline = _write_pipeline; } - journal_seq_t get_written_to() const { - return written_to; - } - rbm_abs_addr get_rbm_addr(journal_seq_t seq) const { - return convert_paddr_to_abs_addr(seq.offset); - } - void set_written_to(journal_seq_t seq) { - rbm_abs_addr addr = convert_paddr_to_abs_addr(seq.offset); - assert(addr >= get_records_start()); - assert(addr < get_journal_end()); - written_to = seq; - } device_id_t get_device_id() const { - return device->get_device_id(); + return cjs.get_device_id(); } extent_len_t get_block_size() const { - assert(device); - return device->get_block_size(); + return cjs.get_block_size(); } - /* - Size-related interfaces - +---------------------------------------------------------+ - | header | record | record | record | record | ... | - +---------------------------------------------------------+ - ^ ^ ^ - | | | - get_journal_start | get_journal_end - get_records_start - <-- get_records_total_size + block_size --> - <--------------- get_journal_size ------------------------> - */ - - size_t get_records_used_size() const { - auto rbm_written_to = get_rbm_addr(get_written_to()); - auto rbm_tail = get_rbm_addr(get_dirty_tail()); - return rbm_written_to >= rbm_tail ? - rbm_written_to - rbm_tail : - rbm_written_to + get_records_total_size() + get_block_size() - - rbm_tail; - } - size_t get_records_total_size() const { - assert(device); - // a block is for header and a block is reserved to denote the end - return device->get_journal_size() - (2 * get_block_size()); - } - rbm_abs_addr get_records_start() const { - assert(device); - return device->get_journal_start() + get_block_size(); + rbm_abs_addr get_journal_end() const { + return cjs.get_journal_end(); } - size_t get_records_available_size() const { - return get_records_total_size() - get_records_used_size(); + + void set_written_to(journal_seq_t seq) { + cjs.set_written_to(seq); } - bool is_available_size(uint64_t size) { - auto rbm_written_to = get_rbm_addr(get_written_to()); - auto rbm_tail = get_rbm_addr(get_dirty_tail()); - if (rbm_written_to > rbm_tail && - (get_journal_end() - rbm_written_to) < size && - size > (get_records_used_size() - - (get_journal_end() - rbm_written_to))) { - return false; - } - return get_records_available_size() >= size; + + journal_seq_t get_written_to() { + return cjs.get_written_to(); } - rbm_abs_addr get_journal_end() const { - assert(device); - return device->get_journal_start() + device->get_journal_size(); + + rbm_abs_addr get_records_start() const { + return cjs.get_records_start(); } + seastar::future<> finish_commit(transaction_type_t type) final; using cbj_delta_handler_t = std::function< @@ -286,10 +177,10 @@ public: cbj_delta_handler_t &&delta_handler, journal_seq_t tail); + submit_record_ret do_submit_record(record_t &&record, OrderingHandle &handle); + private: - cbj_header_t header; JournalTrimmer &trimmer; - RBMDevice* device; std::string path; WritePipeline *write_pipeline = nullptr; /** @@ -304,15 +195,9 @@ private: // should be in range [get_records_start(), get_journal_end()) // written_to.segment_seq is circulation seq to track // the sequence to written records - journal_seq_t written_to; + CircularJournalSpace cjs; + RecordSubmitter record_submitter; }; -std::ostream &operator<<(std::ostream &out, const CircularBoundedJournal::cbj_header_t &header); - } -WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::journal::CircularBoundedJournal::cbj_header_t) - -#if FMT_VERSION >= 90000 -template <> struct fmt::formatter : fmt::ostream_formatter {}; -#endif diff --git a/src/crimson/os/seastore/journal/circular_journal_space.cc b/src/crimson/os/seastore/journal/circular_journal_space.cc new file mode 100644 index 00000000000..7565c281557 --- /dev/null +++ b/src/crimson/os/seastore/journal/circular_journal_space.cc @@ -0,0 +1,232 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab expandtab + +#include "circular_journal_space.h" + +#include +#include + +#include "crimson/os/seastore/logging.h" +#include "crimson/os/seastore/async_cleaner.h" +#include "crimson/os/seastore/journal/circular_bounded_journal.h" + +SET_SUBSYS(seastore_journal); + +namespace crimson::os::seastore::journal { + +std::ostream &operator<<(std::ostream &out, + const CircularJournalSpace::cbj_header_t &header) +{ + return out << "cbj_header_t(" + << ", dirty_tail=" << header.dirty_tail + << ", alloc_tail=" << header.alloc_tail + << ")"; +} + +CircularJournalSpace::CircularJournalSpace(RBMDevice * device) : device(device) {} + +bool CircularJournalSpace::needs_roll(std::size_t length) const { + if (length + get_rbm_addr(get_written_to()) > get_journal_end()) { + return true; + } + return false; +} + +extent_len_t CircularJournalSpace::get_block_size() const { + return device->get_block_size(); +} + +CircularJournalSpace::roll_ertr::future<> CircularJournalSpace::roll() { + paddr_t paddr = convert_abs_addr_to_paddr( + get_records_start(), + get_device_id()); + auto seq = get_written_to(); + set_written_to( + journal_seq_t{++seq.segment_seq, paddr}); + return roll_ertr::now(); +} + +CircularJournalSpace::write_ret +CircularJournalSpace::write(ceph::bufferlist&& to_write) { + LOG_PREFIX(CircularJournalSpace::write); + assert(get_written_to().segment_seq != NULL_SEG_SEQ); + auto encoded_size = to_write.length(); + if (encoded_size > get_records_available_size()) { + ceph_abort("should be impossible with EPM reservation"); + } + assert(encoded_size + get_rbm_addr(get_written_to()) + < get_journal_end()); + + journal_seq_t j_seq = get_written_to(); + auto target = get_rbm_addr(get_written_to()); + auto new_written_to = target + encoded_size; + assert(new_written_to < get_journal_end()); + paddr_t paddr = convert_abs_addr_to_paddr( + new_written_to, + get_device_id()); + set_written_to( + journal_seq_t{get_written_to().segment_seq, paddr}); + DEBUG("{}, target {}", to_write.length(), target); + + auto write_result = write_result_t{ + j_seq, + encoded_size + }; + return device_write_bl(target, to_write + ).safe_then([this, target, + length=encoded_size, + write_result, + FNAME] { + DEBUG("commit target {} used_size {} written length {}", + target, get_records_used_size(), length); + return write_result; + }).handle_error( + base_ertr::pass_further{}, + crimson::ct_error::assert_all{ "Invalid error" } + ); +} + +CircularJournalSpace::open_ret CircularJournalSpace::open(bool is_mkfs) { + std::ostringstream oss; + oss << device_id_printer_t{get_device_id()}; + print_name = oss.str(); + + if (is_mkfs) { + LOG_PREFIX(CircularJournalSpace::open); + assert(device); + ceph::bufferlist bl; + CircularJournalSpace::cbj_header_t head; + assert(device->get_journal_size()); + head.dirty_tail = + journal_seq_t{0, + convert_abs_addr_to_paddr( + get_records_start(), + device->get_device_id())}; + head.alloc_tail = head.dirty_tail; + encode(head, bl); + header = head; + set_written_to(head.dirty_tail); + initialized = true; + DEBUG( + "initialize header block in CircularJournalSpace length {}", + bl.length()); + return write_header( + ).safe_then([this]() { + return open_ret( + open_ertr::ready_future_marker{}, + get_written_to()); + }).handle_error( + open_ertr::pass_further{}, + crimson::ct_error::assert_all{ + "Invalid error write_header" + } + ); + } + ceph_assert(initialized); + if (written_to.segment_seq == NULL_SEG_SEQ) { + written_to.segment_seq = 0; + } + return open_ret( + open_ertr::ready_future_marker{}, + get_written_to()); +} + +ceph::bufferlist CircularJournalSpace::encode_header() +{ + bufferlist bl; + encode(header, bl); + auto header_crc_filler = bl.append_hole(sizeof(checksum_t)); + auto bliter = bl.cbegin(); + auto header_crc = bliter.crc32c( + ceph::encoded_sizeof_bounded(), + -1); + ceph_le32 header_crc_le; + header_crc_le = header_crc; + header_crc_filler.copy_in( + sizeof(checksum_t), + reinterpret_cast(&header_crc_le)); + return bl; +} + +CircularJournalSpace::write_ertr::future<> CircularJournalSpace::device_write_bl( + rbm_abs_addr offset, bufferlist &bl) +{ + LOG_PREFIX(CircularJournalSpace::device_write_bl); + auto length = bl.length(); + if (offset + length > get_journal_end()) { + return crimson::ct_error::erange::make(); + } + DEBUG( + "overwrite in CircularJournalSpace, offset {}, length {}", + offset, + length); + return device->writev(offset, bl + ).handle_error( + write_ertr::pass_further{}, + crimson::ct_error::assert_all{ "Invalid error device->write" } + ); +} + +CircularJournalSpace::read_header_ret +CircularJournalSpace::read_header() +{ + LOG_PREFIX(CircularJournalSpace::read_header); + assert(device); + auto bptr = bufferptr(ceph::buffer::create_page_aligned( + device->get_block_size())); + DEBUG("reading {}", device->get_journal_start()); + return device->read(device->get_journal_start(), bptr + ).safe_then([bptr, FNAME]() mutable + -> read_header_ret { + bufferlist bl; + bl.append(bptr); + auto bp = bl.cbegin(); + cbj_header_t cbj_header; + try { + decode(cbj_header, bp); + } catch (ceph::buffer::error &e) { + ERROR("unable to read header block"); + return crimson::ct_error::enoent::make(); + } + auto bliter = bl.cbegin(); + auto test_crc = bliter.crc32c( + ceph::encoded_sizeof_bounded(), + -1); + ceph_le32 recorded_crc_le; + decode(recorded_crc_le, bliter); + uint32_t recorded_crc = recorded_crc_le; + if (test_crc != recorded_crc) { + ERROR("error, header crc mismatch."); + return read_header_ret( + read_header_ertr::ready_future_marker{}, + std::nullopt); + } + return read_header_ret( + read_header_ertr::ready_future_marker{}, + std::make_pair(cbj_header, bl) + ); + }); +} + +CircularJournalSpace::write_ertr::future<> +CircularJournalSpace::write_header() +{ + LOG_PREFIX(CircularJournalSpace::write_header); + ceph::bufferlist bl = encode_header(); + ceph_assert(bl.length() <= get_block_size()); + DEBUG( + "sync header of CircularJournalSpace, length {}", + bl.length()); + assert(device); + auto iter = bl.begin(); + assert(bl.length() < get_block_size()); + bufferptr bp = bufferptr(ceph::buffer::create_page_aligned(get_block_size())); + iter.copy(bl.length(), bp.c_str()); + return device->write(device->get_journal_start(), std::move(bp) + ).handle_error( + write_ertr::pass_further{}, + crimson::ct_error::assert_all{ "Invalid error device->write" } + ); +} + +} diff --git a/src/crimson/os/seastore/journal/circular_journal_space.h b/src/crimson/os/seastore/journal/circular_journal_space.h new file mode 100644 index 00000000000..1e97f4efedc --- /dev/null +++ b/src/crimson/os/seastore/journal/circular_journal_space.h @@ -0,0 +1,259 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab expandtab + +#pragma once + +#include +#include +#include +#include + +#include "include/buffer.h" + +#include "crimson/common/errorator.h" +#include "crimson/os/seastore/journal.h" +#include "crimson/os/seastore/random_block_manager.h" +#include "crimson/os/seastore/random_block_manager/rbm_device.h" +#include "crimson/os/seastore/journal/record_submitter.h" +#include "crimson/os/seastore/async_cleaner.h" + +namespace crimson::os::seastore { + class SegmentProvider; + class JournalTrimmer; +} + +namespace crimson::os::seastore::journal { + +class CircularBoundedJournal; +class CircularJournalSpace : public JournalAllocator { + + public: + const std::string& get_name() const final { + return print_name; + } + + extent_len_t get_block_size() const final; + + bool can_write() const final { + return (device != nullptr); + } + + segment_nonce_t get_nonce() const final { + return 0; + } + + bool needs_roll(std::size_t length) const final; + + roll_ertr::future<> roll() final; + + write_ret write(ceph::bufferlist&& to_write) final; + + void update_modify_time(record_t& record) final {} + + close_ertr::future<> close() final { + return write_header( + ).safe_then([this]() -> close_ertr::future<> { + initialized = false; + return close_ertr::now(); + }).handle_error( + Journal::open_for_mount_ertr::pass_further{}, + crimson::ct_error::assert_all{ + "Invalid error write_header" + } + ); + } + + open_ret open(bool is_mkfs) final; + + public: + CircularJournalSpace(RBMDevice * device); + + struct cbj_header_t; + using write_ertr = Journal::submit_record_ertr; + /* + * device_write_bl + * + * @param device address to write + * @param bufferlist to write + * + */ + write_ertr::future<> device_write_bl(rbm_abs_addr offset, ceph::bufferlist &bl); + + using read_ertr = crimson::errorator< + crimson::ct_error::input_output_error, + crimson::ct_error::invarg, + crimson::ct_error::enoent, + crimson::ct_error::erange>; + using read_header_ertr = read_ertr; + using read_header_ret = read_header_ertr::future< + std::optional> + >; + /* + * read_header + * + * read header block from given absolute address + * + * @param absolute address + * + */ + read_header_ret read_header(); + + ceph::bufferlist encode_header(); + + write_ertr::future<> write_header(); + + + /** + * CircularBoundedJournal structure + * + * +-------------------------------------------------------+ + * | header | record | record | record | record | ... | + * +-------------------------------------------------------+ + * ^-----------block aligned-----------------^ + * <----fixed----> + */ + + struct cbj_header_t { + // start offset of CircularBoundedJournal in the device + journal_seq_t dirty_tail; + journal_seq_t alloc_tail; + + DENC(cbj_header_t, v, p) { + DENC_START(1, 1, p); + denc(v.dirty_tail, p); + denc(v.alloc_tail, p); + DENC_FINISH(p); + } + }; + + /** + * + * Write position for CircularBoundedJournal + * + * | written to rbm | written length to CircularBoundedJournal | new write | + * ----------------->------------------------------------------------> + * ^ ^ + * applied_to written_to + * + */ + + journal_seq_t get_written_to() const { + return written_to; + } + rbm_abs_addr get_rbm_addr(journal_seq_t seq) const { + return convert_paddr_to_abs_addr(seq.offset); + } + void set_written_to(journal_seq_t seq) { + rbm_abs_addr addr = convert_paddr_to_abs_addr(seq.offset); + assert(addr >= get_records_start()); + assert(addr < get_journal_end()); + written_to = seq; + } + device_id_t get_device_id() const { + return device->get_device_id(); + } + + journal_seq_t get_dirty_tail() const { + return header.dirty_tail; + } + journal_seq_t get_alloc_tail() const { + return header.alloc_tail; + } + + /* + Size-related interfaces + +---------------------------------------------------------+ + | header | record | record | record | record | ... | + +---------------------------------------------------------+ + ^ ^ ^ + | | | + get_journal_start | get_journal_end + get_records_start + <-- get_records_total_size + block_size --> + <--------------- get_journal_size ------------------------> + */ + + size_t get_records_used_size() const { + auto rbm_written_to = get_rbm_addr(get_written_to()); + auto rbm_tail = get_rbm_addr(get_dirty_tail()); + return rbm_written_to >= rbm_tail ? + rbm_written_to - rbm_tail : + rbm_written_to + get_records_total_size() + get_block_size() + - rbm_tail; + } + size_t get_records_total_size() const { + assert(device); + // a block is for header and a block is reserved to denote the end + return device->get_journal_size() - (2 * get_block_size()); + } + rbm_abs_addr get_records_start() const { + assert(device); + return device->get_journal_start() + get_block_size(); + } + size_t get_records_available_size() const { + return get_records_total_size() - get_records_used_size(); + } + bool is_available_size(uint64_t size) { + auto rbm_written_to = get_rbm_addr(get_written_to()); + auto rbm_tail = get_rbm_addr(get_dirty_tail()); + if (rbm_written_to > rbm_tail && + (get_journal_end() - rbm_written_to) < size && + size > (get_records_used_size() - + (get_journal_end() - rbm_written_to))) { + return false; + } + return get_records_available_size() >= size; + } + rbm_abs_addr get_journal_end() const { + assert(device); + return device->get_journal_start() + device->get_journal_size(); + } + + read_ertr::future<> read( + uint64_t offset, + bufferptr &bptr) { + assert(device); + return device->read(offset, bptr); + } + + seastar::future<> update_journal_tail( + journal_seq_t dirty, + journal_seq_t alloc) { + header.dirty_tail = dirty; + header.alloc_tail = alloc; + return write_header( + ).handle_error( + crimson::ct_error::assert_all{ + "encountered invalid error in update_journal_tail" + }); + } + + void set_initialized(bool init) { + initialized = init; + } + + void set_cbj_header(cbj_header_t& head) { + header = head; + } + + cbj_header_t get_cbj_header() { + return header; + } + + private: + std::string print_name; + cbj_header_t header; + RBMDevice* device; + journal_seq_t written_to; + bool initialized = false; +}; + +std::ostream &operator<<(std::ostream &out, const CircularJournalSpace::cbj_header_t &header); + +} + +WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::journal::CircularJournalSpace::cbj_header_t) + +#if FMT_VERSION >= 90000 +template <> struct fmt::formatter : fmt::ostream_formatter {}; +#endif -- 2.39.5