From: myoungwon oh Date: Sun, 15 Aug 2021 12:33:27 +0000 (+0900) Subject: seatore: add CircularBoundedJournal X-Git-Tag: v18.0.0~857^2~47 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=76d52a96c3c2133737d9b7f84515ca8a0d2e9f06;p=ceph.git seatore: add CircularBoundedJournal Signed-off-by: Myoungwon Oh --- diff --git a/src/crimson/os/seastore/CMakeLists.txt b/src/crimson/os/seastore/CMakeLists.txt index cd5b8f94fb94..55d2168cebb1 100644 --- a/src/crimson/os/seastore/CMakeLists.txt +++ b/src/crimson/os/seastore/CMakeLists.txt @@ -43,6 +43,7 @@ set(crimson_seastore_srcs journal.cc device.cc segment_manager_group.cc + journal/circular_bounded_journal.cc ../../../test/crimson/seastore/test_block.cc ${PROJECT_SOURCE_DIR}/src/os/Transaction.cc ) diff --git a/src/crimson/os/seastore/journal.cc b/src/crimson/os/seastore/journal.cc index c9117a52a00b..314d62d89f5b 100644 --- a/src/crimson/os/seastore/journal.cc +++ b/src/crimson/os/seastore/journal.cc @@ -3,6 +3,7 @@ #include "journal.h" #include "journal/segmented_journal.h" +#include "journal/circular_bounded_journal.h" namespace crimson::os::seastore::journal { @@ -11,4 +12,11 @@ JournalRef make_segmented(SegmentProvider &provider) return std::make_unique(provider); } +JournalRef make_circularbounded( + crimson::os::seastore::nvme_device::NVMeBlockDevice* device, + std::string path) +{ + return std::make_unique(device, path); +} + } diff --git a/src/crimson/os/seastore/journal.h b/src/crimson/os/seastore/journal.h index a33a5468684b..9ee5e9e34be2 100644 --- a/src/crimson/os/seastore/journal.h +++ b/src/crimson/os/seastore/journal.h @@ -18,6 +18,11 @@ class NVMeBlockDevice; class SegmentManagerGroup; class SegmentProvider; +enum class journal_type { + SEGMENT_JOURNAL = 0, + CIRCULARBOUNDED_JOURNAL +}; + class Journal { public: /** @@ -87,6 +92,8 @@ public: delta_handler_t &&delta_handler) = 0; virtual ~Journal() {} + + virtual journal_type get_type() = 0; }; using JournalRef = std::unique_ptr; @@ -94,6 +101,10 @@ namespace journal { JournalRef make_segmented(SegmentProvider &provider); +JournalRef make_circularbounded( + crimson::os::seastore::nvme_device::NVMeBlockDevice* device, + std::string path); + } } diff --git a/src/crimson/os/seastore/journal/circular_bounded_journal.cc b/src/crimson/os/seastore/journal/circular_bounded_journal.cc new file mode 100644 index 000000000000..c01122dca42e --- /dev/null +++ b/src/crimson/os/seastore/journal/circular_bounded_journal.cc @@ -0,0 +1,590 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include + +#include "crimson/common/errorator-loop.h" +#include "include/intarith.h" +#include "crimson/os/seastore/journal/circular_bounded_journal.h" +#include "crimson/os/seastore/logging.h" + +SET_SUBSYS(seastore_journal); + +namespace crimson::os::seastore::journal { + +std::ostream &operator<<(std::ostream &out, + const CircularBoundedJournal::cbj_header_t &header) +{ + return out << "cbj_header_t(magin=" << header.magic + << ", uuid=" << header.uuid + << ", block_size=" << header.block_size + << ", size=" << header.size + << ", used_size=" << header.used_size + << ", error=" << header.error + << ", start_offset=" << header.start_offset + << ", applied_to="<< header.applied_to + << ", last_committed_record_base="<< header.last_committed_record_base + << ", written_to=" << header.written_to + << ", flsg=" << header.flag + << ", csum_type=" << header.csum_type + << ", csum=" << header.csum + << ", start=" << header.start + << ", end=" << header.end + << ")"; +} + + +CircularBoundedJournal::CircularBoundedJournal(NVMeBlockDevice* device, + const std::string path) + : device(device), path(path) {} + +CircularBoundedJournal::mkfs_ret +CircularBoundedJournal::mkfs(mkfs_config_t& config) +{ + LOG_PREFIX(CircularBoundedJournal::mkfs); + return _open_device(path + ).safe_then([this, config, FNAME]() mutable -> mkfs_ret { + rbm_abs_addr start_addr = convert_paddr_to_abs_addr( + config.start); + assert(config.block_size == device->get_block_size()); + ceph::bufferlist bl; + CircularBoundedJournal::cbj_header_t head; + head.magic = CBJOURNAL_MAGIC; + head.uuid = uuid_d(); // TODO + head.block_size = config.block_size; + rbm_abs_addr end_addr = convert_paddr_to_abs_addr( + config.end); + head.size = end_addr - start_addr + - device->get_block_size(); + head.used_size = 0; + head.error = 0; + head.start_offset = device->get_block_size(); + head.last_committed_record_base = 0; + head.written_to = head.start_offset; + head.applied_to = head.start_offset; + head.flag = 0; + head.csum_type = 0; + head.csum = 0; + head.cur_segment_seq = 0; + head.start = start_addr; + head.end = end_addr; + head.device_id = config.device_id; + encode(head, bl); + header = head; + DEBUG( + "initialize header block in CircularBoundedJournal, length {}", + bl.length()); + return device_write_bl(start_addr, bl + ).handle_error( + mkfs_ertr::pass_further{}, + crimson::ct_error::assert_all{ + "Invalid error device_write during CircularBoundedJournal::mkfs" + }).safe_then([]() { + return mkfs_ertr::now(); + }); + }).handle_error( + mkfs_ertr::pass_further{}, + crimson::ct_error::assert_all{ + "Invalid error _open_device in CircularBoundedJournal::mkfs" + }).finally([this] { + if (device) { + return device->close(); + } else { + return seastar::now(); + } + }); +} + +CircularBoundedJournal::open_for_write_ertr::future<> +CircularBoundedJournal::_open_device(const std::string path) +{ + ceph_assert(device); + return device->open(path, seastar::open_flags::rw + ).handle_error( + open_for_write_ertr::pass_further{}, + crimson::ct_error::assert_all{ + "Invalid error device->open" + } + ); +} + +ceph::bufferlist CircularBoundedJournal::encode_super() +{ + bufferlist bl; + encode(header, bl); + return bl; +} + +CircularBoundedJournal::open_for_write_ret CircularBoundedJournal::open_for_write() +{ + return open_for_write(CBJOURNAL_START_ADDRESS); +} + +CircularBoundedJournal::close_ertr::future<> CircularBoundedJournal::close() +{ + return write_super( + ).safe_then([this]() -> close_ertr::future<> { + init = false; + return device->close(); + }).handle_error( + open_for_write_ertr::pass_further{}, + crimson::ct_error::assert_all{ + "Invalid error write_super" + } + ); +} + +CircularBoundedJournal::open_for_write_ret +CircularBoundedJournal::open_for_write(rbm_abs_addr start) +{ + LOG_PREFIX(CircularBoundedJournal::open_for_write); + if (init) { + paddr_t paddr = convert_abs_addr_to_paddr( + get_written_to(), + header.device_id); + return open_for_write_ret( + open_for_write_ertr::ready_future_marker{}, + journal_seq_t{ + header.cur_segment_seq, + paddr + }); + } + return _open_device(path + ).safe_then([this, start, FNAME]() { + return read_super(start + ).handle_error( + open_for_write_ertr::pass_further{}, + crimson::ct_error::assert_all{ + "Invalid error read_super" + }).safe_then([this, FNAME](auto p) mutable { + auto &[head, bl] = *p; + header = head; + DEBUG("super : {}", header); + paddr_t paddr = convert_abs_addr_to_paddr( + get_written_to(), + header.device_id); + init = true; + return open_for_write_ret( + open_for_write_ertr::ready_future_marker{}, + journal_seq_t{ + header.cur_segment_seq, + paddr + }); + }); + }).handle_error( + open_for_write_ertr::pass_further{}, + crimson::ct_error::assert_all{ + "Invalid error _open_device" + }); +} + +CircularBoundedJournal::write_ertr::future<> CircularBoundedJournal::append_record( + ceph::bufferlist bl, + rbm_abs_addr addr) +{ + LOG_PREFIX(CircularBoundedJournal::append_record); + std::vector> writes; + if (addr + bl.length() <= header.end) { + writes.push_back(std::make_pair(addr, bl)); + } else { + // write remaining data---in this case, + // data is splited into two parts before due to the end of CircularBoundedJournal. + // the following code is to write the second part + bufferlist first_write, next_write; + first_write.substr_of(bl, 0, header.end - addr); + writes.push_back(std::make_pair(addr, first_write)); + next_write.substr_of( + bl, first_write.length(), bl.length() - first_write.length()); + writes.push_back(std::make_pair(get_start_addr(), next_write)); + } + + return seastar::do_with( + std::move(bl), + [this, writes=move(writes), FNAME](auto& bl) mutable + { + DEBUG("original bl length {}", bl.length()); + return write_ertr::parallel_for_each( + writes, + [this, FNAME](auto& p) mutable + { + DEBUG( + "append_record: offset {}, length {}", + p.first, + p.second.length()); + return device_write_bl(p.first, p.second + ).handle_error( + write_ertr::pass_further{}, + crimson::ct_error::assert_all{ "Invalid error device->write" } + ).safe_then([]() { + return write_ertr::now(); + }); + }); + }); +} + +CircularBoundedJournal::submit_record_ret CircularBoundedJournal::submit_record( + record_t &&record, + OrderingHandle &handle) +{ + LOG_PREFIX(CircularBoundedJournal::submit_record); + assert(write_pipeline); + auto r_size = record_group_size_t(record.size, get_block_size()); + auto encoded_size = r_size.get_encoded_length(); + if (get_written_to() + + ceph::encoded_sizeof_bounded() > header.end) { + // not enough space between written_to and the end of journal, + // so that update used size to increase the amount of the remaing space + // | cbjournal | + // v v + // written_to <-> the end of journal + set_used_size(get_used_size() + (header.end - get_written_to())); + set_written_to(get_start_addr()); + } + if (encoded_size > get_available_size()) { + ERROR( + "CircularBoundedJournal::submit_record: record size {}, but available size {}", + encoded_size, + get_available_size() + ); + return crimson::ct_error::erange::make(); + } + + journal_seq_t j_seq { + header.cur_segment_seq++, + convert_abs_addr_to_paddr( + get_written_to(), + header.device_id)}; + ceph::bufferlist to_write = encode_record( + std::move(record), device->get_block_size(), + j_seq, 0); + auto target = get_written_to(); + if (get_written_to() + to_write.length() >= header.end) { + set_written_to(get_start_addr() + + (to_write.length() - (header.end - get_written_to()))); + } else { + set_written_to(get_written_to() + to_write.length()); + } + DEBUG( + "submit_record: mdlength {}, dlength {}, target {}", + r_size.get_mdlength(), + r_size.dlength, + target); + + auto write_result = write_result_t{ + j_seq, + (seastore_off_t)to_write.length() + }; + auto write_fut = append_record(to_write, target); + return handle.enter(write_pipeline->device_submission + ).then([write_fut = std::move(write_fut)]() mutable { + return std::move(write_fut + ).handle_error( + write_ertr::pass_further{}, + crimson::ct_error::assert_all{ + "Invalid error in CircularBoundedJournal::append_record" + } + ); + }).safe_then([this, &handle] { + return handle.enter(write_pipeline->finalize); + }).safe_then([this, target, + length=to_write.length(), + write_result, + r_size, + FNAME] { + DEBUG( + "append_record: commit target {} used_size {} written length {}", + target, get_used_size(), length); + + set_last_committed_record_base(target); + set_used_size(get_used_size() + length); + paddr_t paddr = convert_abs_addr_to_paddr( + target + r_size.get_mdlength(), + header.device_id); + auto submit_result = record_locator_t{ + paddr, + write_result + }; + return submit_result; + }); +} + +CircularBoundedJournal::write_ertr::future<> CircularBoundedJournal::device_write_bl( + rbm_abs_addr offset, bufferlist &bl) +{ + LOG_PREFIX(CircularBoundedJournal::device_write_bl); + auto length = bl.length(); + if (offset + length > header.end) { + return crimson::ct_error::erange::make(); + } + bl.rebuild_aligned(get_block_size()); + DEBUG( + "overwrite in CircularBoundedJournal, offset {}, length {}", + offset, + length); + auto write_length = length < get_block_size() ? get_block_size() : length; + auto bptr = bufferptr(ceph::buffer::create_page_aligned(write_length)); + auto iter = bl.cbegin(); + iter.copy(bl.length(), bptr.c_str()); + return device->write(offset, bptr + ).handle_error( + write_ertr::pass_further{}, + crimson::ct_error::assert_all{ "Invalid error device->write" } + ).safe_then([] { + return write_ertr::now(); + }); +} + +CircularBoundedJournal::read_super_ret +CircularBoundedJournal::read_super(rbm_abs_addr start) +{ + LOG_PREFIX(CircularBoundedJournal::read_super); + auto bptr = bufferptr(ceph::buffer::create_page_aligned( + device->get_block_size())); + return device->read(start, bptr + ).safe_then([start, bptr, FNAME]() mutable + -> read_super_ret { + DEBUG("read_super: reading {}", start); + bufferlist bl; + bl.append(bptr); + auto bp = bl.cbegin(); + cbj_header_t cbj_header; + try { + decode(cbj_header, bp); + } catch (ceph::buffer::error &e) { + ERROR("read_super: unable to read super block"); + return crimson::ct_error::enoent::make(); + } + return read_super_ret( + read_super_ertr::ready_future_marker{}, + std::make_pair(cbj_header, bl) + ); + }); +} + +Journal::replay_ret CircularBoundedJournal::replay( + delta_handler_t &&delta_handler) +{ + /* + * read records from last applied record prior to written_to, and replay + */ + LOG_PREFIX(CircularBoundedJournal::replay); + auto fut = open_for_write(CBJOURNAL_START_ADDRESS); + return fut.safe_then([this, FNAME, delta_handler=std::move(delta_handler)] (auto addr) { + if (get_used_size() == 0) { + return replay_ertr::now(); + } + return seastar::do_with( + rbm_abs_addr(get_applied_to()), + std::move(delta_handler), + [this, FNAME](auto &cursor_addr, auto &d_handler) { + return crimson::repeat( + [this, &cursor_addr, &d_handler, FNAME]() mutable + -> replay_ertr::future { + paddr_t cursor_paddr = convert_abs_addr_to_paddr( + cursor_addr, + header.device_id); + return read_record(cursor_paddr + ).safe_then([this, &cursor_addr, &d_handler, FNAME](auto ret) { + auto [r_header, bl] = *ret; + bufferlist mdbuf; + mdbuf.substr_of(bl, 0, r_header.mdlength); + paddr_t record_block_base = paddr_t::make_blk_paddr( + header.device_id, cursor_addr + r_header.mdlength); + auto maybe_record_deltas_list = try_decode_deltas( + r_header, mdbuf, record_block_base); + if (!maybe_record_deltas_list) { + DEBUG("unable to decode deltas for record {} at {}", + r_header, record_block_base); + return replay_ertr::make_ready_future< + seastar::stop_iteration>(seastar::stop_iteration::yes); + } + DEBUG(" record_group_header_t: {}, cursor_addr: {} ", + r_header, cursor_addr); + auto write_result = write_result_t{ + r_header.committed_to, + (seastore_off_t)bl.length() + }; + cursor_addr += bl.length(); + return seastar::do_with( + std::move(*maybe_record_deltas_list), + [write_result, + this, + &d_handler, + &cursor_addr, + FNAME](auto& record_deltas_list) { + return crimson::do_for_each( + record_deltas_list, + [write_result, + &d_handler, FNAME](record_deltas_t& record_deltas) { + auto locator = record_locator_t{ + record_deltas.record_block_base, + write_result + }; + DEBUG("processing {} deltas at block_base {}", + record_deltas.deltas.size(), + locator); + return crimson::do_for_each( + record_deltas.deltas, + [locator, + &d_handler](auto& p) { + auto& commit_time = p.first; + auto& delta = p.second; + return d_handler(locator, + delta, + seastar::lowres_system_clock::time_point( + seastar::lowres_system_clock::duration(commit_time)) + ); + }); + }).safe_then([this, &cursor_addr]() { + if (cursor_addr >= header.end) { + cursor_addr = (cursor_addr - header.end) + get_start_addr(); + } + if (get_written_to() + + ceph::encoded_sizeof_bounded() > header.end) { + cursor_addr = get_start_addr(); + } + if (cursor_addr == get_written_to()) { + return replay_ertr::make_ready_future< + seastar::stop_iteration>(seastar::stop_iteration::yes); + } + return replay_ertr::make_ready_future< + seastar::stop_iteration>(seastar::stop_iteration::no); + }); + }); + }); + }); + }); + }); +} + +CircularBoundedJournal::read_record_ret +CircularBoundedJournal::return_record(record_group_header_t& header, bufferlist bl) +{ + LOG_PREFIX(CircularBoundedJournal::return_record); + bufferlist md_bl, data_bl; + md_bl.substr_of(bl, 0, get_block_size()); + data_bl.substr_of(bl, header.mdlength, header.dlength); + if (validate_records_metadata(md_bl) && + validate_records_data(header, data_bl)) { + return read_record_ret( + read_record_ertr::ready_future_marker{}, + std::make_pair(header, std::move(bl))); + } else { + DEBUG("invalid matadata"); + return read_record_ret( + read_record_ertr::ready_future_marker{}, + std::nullopt); + } +} + +CircularBoundedJournal::read_record_ret CircularBoundedJournal::read_record(paddr_t off) +{ + LOG_PREFIX(CircularBoundedJournal::read_record); + rbm_abs_addr offset = convert_paddr_to_abs_addr( + off); + rbm_abs_addr addr = offset; + auto read_length = get_block_size(); + if (addr + get_block_size() > header.end) { + addr = get_start_addr(); + read_length = header.end - offset; + } + DEBUG("read_record: reading record from abs addr {} read length {}", + addr, read_length); + auto bptr = bufferptr(ceph::buffer::create_page_aligned(read_length)); + bptr.zero(); + return device->read(addr, bptr + ).safe_then([this, addr, read_length, bptr, FNAME]() mutable + -> read_record_ret { + record_group_header_t h; + bufferlist bl; + bl.append(bptr); + auto bp = bl.cbegin(); + try { + decode(h, bp); + } catch (ceph::buffer::error &e) { + return read_record_ret( + read_record_ertr::ready_future_marker{}, + std::nullopt); + } + /* + * | journal | + * | record 1 header | | record 1 data + * record 1 data (remaining) | + * + * <---- 1 block ----><-- + * -- 2 block ---> + * + * If record has logner than read_length and its data is located across + * the end of journal and the begining of journal, we need three reads + * ---reads of header, other remaining data before the end, and + * the other remaining data from the begining. + * + */ + if (h.mdlength + h.dlength > read_length) { + rbm_abs_addr next_read_addr = addr + read_length; + auto next_read = h.mdlength + h.dlength - read_length; + DEBUG(" next_read_addr {}, next_read_length {} ", + next_read_addr, next_read); + if (header.end < next_read_addr + next_read) { + // In this case, need two more reads. + // The first is to read remain bytes to the end of cbjournal + // The second is to read the data at the begining of cbjournal + next_read = header.end - (addr + read_length); + } + DEBUG("read_entry: additional reading addr {} length {}", + next_read_addr, + next_read); + auto next_bptr = bufferptr(ceph::buffer::create_page_aligned(next_read)); + next_bptr.zero(); + return device->read( + next_read_addr, + next_bptr + ).safe_then([this, h=h, next_bptr=std::move(next_bptr), bl=std::move(bl), + FNAME]() mutable { + bl.append(next_bptr); + if (h.mdlength + h.dlength == bl.length()) { + DEBUG("read_record: record length {} done", bl.length()); + return return_record(h, bl); + } + // need one more read + auto next_read_addr = get_start_addr(); + auto last_bptr = bufferptr(ceph::buffer::create_page_aligned( + h.mdlength + h.dlength - bl.length())); + DEBUG("read_record: last additional reading addr {} length {}", + next_read_addr, + h.mdlength + h.dlength - bl.length()); + return device->read( + next_read_addr, + last_bptr + ).safe_then([this, h=h, last_bptr=std::move(last_bptr), + bl=std::move(bl), FNAME]() mutable { + bl.append(last_bptr); + DEBUG("read_record: complte size {}", bl.length()); + return return_record(h, bl); + }); + }); + } else { + DEBUG("read_record: complte size {}", bl.length()); + return return_record(h, bl); + } + }); +} + +CircularBoundedJournal::write_ertr::future<> +CircularBoundedJournal::write_super() +{ + LOG_PREFIX(CircularBoundedJournal::write_super); + ceph::bufferlist bl; + try { + bl = encode_super(); + } catch (ceph::buffer::error &e) { + DEBUG("unable to encode super block from underlying deivce"); + return crimson::ct_error::input_output_error::make(); + } + DEBUG( + "sync header of CircularBoundedJournal, length {}", + bl.length()); + return device_write_bl(header.start, bl); +} + +} diff --git a/src/crimson/os/seastore/journal/circular_bounded_journal.h b/src/crimson/os/seastore/journal/circular_bounded_journal.h new file mode 100644 index 000000000000..2537df12503e --- /dev/null +++ b/src/crimson/os/seastore/journal/circular_bounded_journal.h @@ -0,0 +1,315 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include "crimson/common/log.h" + +#include + +#include + +#include "include/ceph_assert.h" +#include "include/buffer.h" +#include "include/denc.h" + +#include "crimson/osd/exceptions.h" +#include "crimson/os/seastore/journal.h" +#include "include/uuid.h" +#include "crimson/os/seastore/random_block_manager.h" +#include "crimson/os/seastore/random_block_manager/nvmedevice.h" +#include + + +namespace crimson::os::seastore::journal { + +constexpr rbm_abs_addr CBJOURNAL_START_ADDRESS = 0; +constexpr uint64_t CBJOURNAL_MAGIC = 0xCCCC; +using NVMeBlockDevice = nvme_device::NVMeBlockDevice; + +/** + * CircularBoundedJournal + * + * TODO: move record from CircularBoundedJournal to RandomBlockManager + * + */ + +constexpr uint64_t DEFAULT_SIZE = 1 << 26; +constexpr uint64_t DEFAULT_BLOCK_SIZE = 4096; + +class CircularBoundedJournal : public Journal { +public: + struct mkfs_config_t { + std::string path; + paddr_t start; + paddr_t end; + size_t block_size = 0; + size_t total_size = 0; + device_id_t device_id = 0; + seastore_meta_t meta; + static mkfs_config_t get_default() { + device_id_t d_id = 1 << (std::numeric_limits::digits - 1); + return mkfs_config_t { + "", + paddr_t::make_blk_paddr(d_id, 0), + paddr_t::make_blk_paddr(d_id, DEFAULT_SIZE), + DEFAULT_BLOCK_SIZE, + DEFAULT_SIZE, + d_id, + seastore_meta_t {} + }; + } + }; + + CircularBoundedJournal(NVMeBlockDevice* device, const std::string path); + ~CircularBoundedJournal() {} + + open_for_write_ret open_for_write() final; + open_for_write_ret open_for_write(rbm_abs_addr start); + close_ertr::future<> close() final; + + journal_type get_type() final { + return journal_type::CIRCULARBOUNDED_JOURNAL; + } + + submit_record_ret submit_record( + record_t &&record, + OrderingHandle &handle + ) final; + + seastar::future<> flush( + OrderingHandle &handle + ) final { + // TODO + return seastar::now(); + } + + replay_ret replay(delta_handler_t &&delta_handler); + + open_for_write_ertr::future<> _open_device(const std::string path); + + struct cbj_header_t; + using write_ertr = crimson::errorator< + crimson::ct_error::input_output_error, + crimson::ct_error::erange>; + /* + * append_record + * + * append data to current write position of CircularBoundedJournal + * + * @param bufferlist to write + * @param rbm_abs_addr where data is written + * + */ + write_ertr::future<> append_record(ceph::bufferlist bl, rbm_abs_addr addr); + /* + * device_write_bl + * + * @param device address to write + * @param bufferlist to write + * + */ + write_ertr::future<> device_write_bl(rbm_abs_addr offset, ceph::bufferlist &bl); + + using read_ertr = crimson::errorator< + crimson::ct_error::input_output_error, + crimson::ct_error::invarg, + crimson::ct_error::enoent, + crimson::ct_error::erange>; + using read_record_ertr = read_ertr; + using read_record_ret = read_record_ertr::future< + std::optional> + >; + using read_super_ertr = read_ertr; + using read_super_ret = read_super_ertr::future< + std::optional> + >; + /* + * read_record + * + * read record from given address + * + * @param paddr_t to read + * + */ + read_record_ret read_record(paddr_t offset); + /* + * read_super + * + * read super block from given absolute address + * + * @param absolute address + * + */ + read_super_ret read_super(rbm_abs_addr start); + + ceph::bufferlist encode_super(); + + using mkfs_ertr = crimson::errorator< + crimson::ct_error::input_output_error, + crimson::ct_error::invarg + >; + using mkfs_ret = mkfs_ertr::future<>; + + /* + * mkfs + * + * make a new journal layout even if old journal exists + * + * @param mkfs_config_t + * + */ + mkfs_ret mkfs(mkfs_config_t& config); + + + /** + * CircularBoundedJournal structure + * + * +-------------------------------------------------------+ + * | header | record | record | record | record | ... | + * +-------------------------------------------------------+ + * ^-----------block aligned-----------------^ + * <----fixed----> + */ + + + /** + * + * CircularBoundedJournal write + * + * NVMe will support a large block write (< 512KB) with atomic write unit command. + * With this command, we expect that the most of incoming data can be stored + * as a single write call, which has lower overhead than existing + * way that uses a combination of system calls such as write() and sync(). + * + */ + + struct cbj_header_t { + uint64_t magic; + uuid_d uuid; + uint64_t block_size; // aligned with block_size + uint64_t size; // max length of journal + uint64_t used_size; // current used_size of journal + uint32_t error; // reserved + + rbm_abs_addr start_offset; // start offset of CircularBoundedJournal + rbm_abs_addr last_committed_record_base; + rbm_abs_addr written_to; + rbm_abs_addr applied_to; + + uint64_t flag; // represent features (reserved) + uint8_t csum_type; // type of checksum algoritghm used in cbj_header_t + uint64_t csum; // checksum of entire cbj_header_t + uint32_t cur_segment_seq; + + rbm_abs_addr start; // start address of the device + rbm_abs_addr end; // start address of the device + device_id_t device_id; + + DENC(cbj_header_t, v, p) { + DENC_START(1, 1, p); + denc(v.magic, p); + denc(v.uuid, p); + denc(v.block_size, p); + denc(v.size, p); + denc(v.used_size, p); + denc(v.error, p); + + denc(v.start_offset, p); + + denc(v.last_committed_record_base, p); + denc(v.written_to, p); + denc(v.applied_to, p); + + denc(v.flag, p); + denc(v.csum_type, p); + denc(v.csum, p); + denc(v.cur_segment_seq, p); + denc(v.start, p); + denc(v.end, p); + denc(v.device_id, p); + + DENC_FINISH(p); + } + }; + + /** + * + * Write position for CircularBoundedJournal + * + * | written to rbm | written length to CircularBoundedJournal | new write | + * ----------------->----------------------------------->------------> + * ^ ^ ^ + * applied_to last_committed_record_base written_to + * + */ + + size_t get_used_size() const { + return header.used_size; + } + void set_used_size(size_t size) { + header.used_size = size; + } + size_t get_total_size() const { + return header.size; + } + rbm_abs_addr get_start_addr() const { + return header.start_offset; + } + size_t get_available_size() const { + return get_total_size() - get_used_size(); + } + + void update_applied_to(rbm_abs_addr addr, uint32_t len) { + rbm_abs_addr new_applied_to = addr; + set_used_size( + get_last_committed_record_base() >= new_applied_to ? + get_written_to() - (new_applied_to + len) : + get_written_to() + get_total_size() - (new_applied_to + len)); + set_applied_to(new_applied_to + len); + } + + write_ertr::future<> write_super(); + + read_record_ret return_record(record_group_header_t& header, bufferlist bl); + + void set_write_pipeline(WritePipeline *_write_pipeline) final { + write_pipeline = _write_pipeline; + } + + rbm_abs_addr get_written_to() const { + return header.written_to; + } + void set_written_to(rbm_abs_addr addr) { + header.written_to = addr; + } + rbm_abs_addr get_last_committed_record_base() const { + return header.last_committed_record_base; + } + void set_last_committed_record_base(rbm_abs_addr addr) { + header.last_committed_record_base = addr; + } + rbm_abs_addr get_applied_to() const { + return header.applied_to; + } + void set_applied_to(rbm_abs_addr addr) { + header.applied_to = addr; + } + device_id_t get_device_id() const { + return header.device_id; + } + size_t get_block_size() const { + return header.block_size; + } +private: + cbj_header_t header; + NVMeBlockDevice* device; + std::string path; + WritePipeline *write_pipeline = nullptr; + bool init = false; +}; + +std::ostream &operator<<(std::ostream &out, const CircularBoundedJournal::cbj_header_t &header); +} + +WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::journal::CircularBoundedJournal::cbj_header_t) diff --git a/src/crimson/os/seastore/journal/segmented_journal.h b/src/crimson/os/seastore/journal/segmented_journal.h index 2a35c3b729d3..558189b37433 100644 --- a/src/crimson/os/seastore/journal/segmented_journal.h +++ b/src/crimson/os/seastore/journal/segmented_journal.h @@ -43,6 +43,10 @@ public: write_pipeline = _write_pipeline; } + journal_type get_type() final { + return journal_type::SEGMENT_JOURNAL; + } + private: submit_record_ret do_submit_record( record_t &&record, diff --git a/src/crimson/os/seastore/seastore_types.cc b/src/crimson/os/seastore/seastore_types.cc index 0500bcf540e3..df693d7970f7 100644 --- a/src/crimson/os/seastore/seastore_types.cc +++ b/src/crimson/os/seastore/seastore_types.cc @@ -576,8 +576,14 @@ try_decode_deltas( } } for (auto& i: r.extent_infos) { - auto& seg_addr = record_block_base.as_seg_paddr(); - seg_addr.set_segment_off(seg_addr.get_segment_off() + i.len); + if (record_block_base.get_addr_type() == addr_types_t::SEGMENT) { + auto& seg_addr = record_block_base.as_seg_paddr(); + seg_addr.set_segment_off(seg_addr.get_segment_off() + i.len); + } else if (record_block_base.get_addr_type() == + addr_types_t::RANDOM_BLOCK) { + auto& blk_addr = record_block_base.as_blk_paddr(); + blk_addr.set_block_off(blk_addr.get_block_off() + i.len); + } } ++result_iter; }