From: Yingxin Cheng Date: Tue, 5 Jul 2022 08:31:34 +0000 (+0800) Subject: crimson/os/seastore/circular_bounded_journal: do not split records X-Git-Tag: v18.0.0~552^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=2aedfcae4d83934f04a92d65ba65ae7ed8606f9b;p=ceph.git crimson/os/seastore/circular_bounded_journal: do not split records * no split record due to relative paddr resolution * fix md_bl.substr_of(bl, 0, header.mdlength) * maintain written_to in range [get_start_addr(), get_journal_end()) Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/os/seastore/journal/circular_bounded_journal.cc b/src/crimson/os/seastore/journal/circular_bounded_journal.cc index d038b215d8da..03facde435f7 100644 --- a/src/crimson/os/seastore/journal/circular_bounded_journal.cc +++ b/src/crimson/os/seastore/journal/circular_bounded_journal.cc @@ -45,7 +45,7 @@ CircularBoundedJournal::mkfs(const mkfs_config_t& config) head.device_id = config.device_id; encode(head, bl); header = head; - written_to = head.journal_tail; + set_written_to(head.journal_tail); DEBUG( "initialize header block in CircularBoundedJournal, length {}", bl.length()); @@ -160,50 +160,6 @@ CircularBoundedJournal::open_device_read_header() }); } -CircularBoundedJournal::write_ertr::future<> CircularBoundedJournal::append_record( - ceph::bufferlist bl, - rbm_abs_addr addr) -{ - LOG_PREFIX(CircularBoundedJournal::append_record); - std::vector> writes; - if (addr + bl.length() <= get_journal_end()) { - writes.push_back(std::make_pair(addr, bl)); - } else { - // write remaining data---in this case, - // data is splited into two parts before due to the end of CircularBoundedJournal. - // the following code is to write the second part - bufferlist first_write, next_write; - first_write.substr_of(bl, 0, get_journal_end() - addr); - writes.push_back(std::make_pair(addr, first_write)); - next_write.substr_of( - bl, first_write.length(), bl.length() - first_write.length()); - writes.push_back(std::make_pair(get_start_addr(), next_write)); - } - - return seastar::do_with( - std::move(bl), - [this, writes=move(writes), FNAME](auto& bl) mutable - { - DEBUG("original bl length {}", bl.length()); - return write_ertr::parallel_for_each( - writes, - [this, FNAME](auto& p) mutable - { - DEBUG( - "append_record: offset {}, length {}", - p.first, - p.second.length()); - return device_write_bl(p.first, p.second - ).handle_error( - write_ertr::pass_further{}, - crimson::ct_error::assert_all{ "Invalid error device->write" } - ).safe_then([]() { - return write_ertr::now(); - }); - }); - }); -} - CircularBoundedJournal::submit_record_ret CircularBoundedJournal::submit_record( record_t &&record, OrderingHandle &handle) @@ -212,16 +168,6 @@ CircularBoundedJournal::submit_record_ret CircularBoundedJournal::submit_record( assert(write_pipeline); auto r_size = record_group_size_t(record.size, get_block_size()); auto encoded_size = r_size.get_encoded_length(); - if (get_written_to() + - ceph::encoded_sizeof_bounded() > get_journal_end()) { - // not enough space between written_to and the end of journal, - // so that set written_to to the beginning of cbjournal to append - // the record at the start address of cbjournal - // | cbjournal | - // v v - // written_to <-> the end of journal - set_written_to(get_start_addr()); - } if (encoded_size > get_available_size()) { ERROR( "CircularBoundedJournal::submit_record: record size {}, but available size {}", @@ -230,6 +176,15 @@ CircularBoundedJournal::submit_record_ret CircularBoundedJournal::submit_record( ); return crimson::ct_error::erange::make(); } + if (encoded_size + get_written_to() > get_journal_end()) { + DEBUG("roll"); + set_written_to(get_start_addr()); + if (encoded_size > get_available_size()) { + ERROR("rolled, record size {}, but available size {}", + encoded_size, get_available_size()); + return crimson::ct_error::erange::make(); + } + } journal_seq_t j_seq { cur_segment_seq++, @@ -240,11 +195,13 @@ CircularBoundedJournal::submit_record_ret CircularBoundedJournal::submit_record( std::move(record), device->get_block_size(), j_seq, 0); auto target = get_written_to(); - if (get_written_to() + to_write.length() >= get_journal_end()) { - set_written_to(get_start_addr() + - (to_write.length() - (get_journal_end() - get_written_to()))); + auto new_written_to = target + to_write.length(); + if (new_written_to >= get_journal_end()) { + assert(new_written_to == get_journal_end()); + DEBUG("roll"); + set_written_to(get_start_addr()); } else { - set_written_to(get_written_to() + to_write.length()); + set_written_to(new_written_to); } DEBUG( "submit_record: mdlength {}, dlength {}, target {}", @@ -256,16 +213,10 @@ CircularBoundedJournal::submit_record_ret CircularBoundedJournal::submit_record( j_seq, (seastore_off_t)to_write.length() }; - auto write_fut = append_record(to_write, target); + auto write_fut = device_write_bl(target, to_write); return handle.enter(write_pipeline->device_submission ).then([write_fut = std::move(write_fut)]() mutable { - return std::move(write_fut - ).handle_error( - write_ertr::pass_further{}, - crimson::ct_error::assert_all{ - "Invalid error in CircularBoundedJournal::append_record" - } - ); + return std::move(write_fut); }).safe_then([this, &handle] { return handle.enter(write_pipeline->finalize); }).safe_then([this, target, @@ -356,32 +307,37 @@ Journal::replay_ret CircularBoundedJournal::replay( * read records from last applied record prior to written_to, and replay */ LOG_PREFIX(CircularBoundedJournal::replay); - auto fut = open_device_read_header(); - return fut.safe_then([this, FNAME, delta_handler=std::move(delta_handler)] (auto addr) { + return open_device_read_header( + ).safe_then([this, FNAME, delta_handler=std::move(delta_handler)] (auto) { + set_written_to(get_journal_tail()); return seastar::do_with( + bool(false), rbm_abs_addr(get_journal_tail()), std::move(delta_handler), segment_seq_t(), - [this, FNAME](auto &cursor_addr, auto &d_handler, auto &last_seq) { + [this, FNAME](auto &is_rolled, auto &cursor_addr, auto &d_handler, auto &last_seq) { return crimson::repeat( - [this, &cursor_addr, &d_handler, &last_seq, FNAME]() mutable + [this, &is_rolled, &cursor_addr, &d_handler, &last_seq, FNAME]() mutable -> replay_ertr::future { - paddr_t cursor_paddr = convert_abs_addr_to_paddr( + paddr_t record_paddr = convert_abs_addr_to_paddr( cursor_addr, header.device_id); - return read_record(cursor_paddr - ).safe_then([this, &cursor_addr, &d_handler, &last_seq, FNAME](auto ret) { + return read_record(record_paddr, last_seq + ).safe_then([this, &is_rolled, &cursor_addr, &d_handler, &last_seq, FNAME](auto ret) + -> replay_ertr::future { if (!ret.has_value()) { - DEBUG("no more records"); - return replay_ertr::make_ready_future< - seastar::stop_iteration>(seastar::stop_iteration::yes); + if (is_rolled) { + DEBUG("no more records, stop replaying"); + return replay_ertr::make_ready_future< + seastar::stop_iteration>(seastar::stop_iteration::yes); + } else { + cursor_addr = get_start_addr(); + is_rolled = true; + return replay_ertr::make_ready_future< + seastar::stop_iteration>(seastar::stop_iteration::no); + } } auto [r_header, bl] = *ret; - if (last_seq > r_header.committed_to.segment_seq) { - DEBUG("found invalide record. stop replaying"); - return replay_ertr::make_ready_future< - seastar::stop_iteration>(seastar::stop_iteration::yes); - } bufferlist mdbuf; mdbuf.substr_of(bl, 0, r_header.mdlength); paddr_t record_block_base = paddr_t::make_blk_paddr( @@ -389,27 +345,29 @@ Journal::replay_ret CircularBoundedJournal::replay( auto maybe_record_deltas_list = try_decode_deltas( r_header, mdbuf, record_block_base); if (!maybe_record_deltas_list) { - DEBUG("unable to decode deltas for record {} at {}", - r_header, record_block_base); - return replay_ertr::make_ready_future< - seastar::stop_iteration>(seastar::stop_iteration::yes); + // This should be impossible, we did check the crc on the mdbuf + ERROR("unable to decode deltas for record {} at {}", + r_header, record_block_base); + return crimson::ct_error::input_output_error::make(); } - DEBUG(" record_group_header_t: {}, cursor_addr: {} ", - r_header, cursor_addr); + DEBUG("{} at {}", r_header, cursor_addr); auto write_result = write_result_t{ r_header.committed_to, (seastore_off_t)bl.length() }; cur_segment_seq = r_header.committed_to.segment_seq + 1; cursor_addr += bl.length(); + if (cursor_addr >= get_journal_end()) { + assert(cursor_addr == get_journal_end()); + cursor_addr = get_start_addr(); + is_rolled = true; + } set_written_to(cursor_addr); last_seq = r_header.committed_to.segment_seq; return seastar::do_with( std::move(*maybe_record_deltas_list), [write_result, - this, &d_handler, - &cursor_addr, FNAME](auto& record_deltas_list) { return crimson::do_for_each( record_deltas_list, @@ -433,14 +391,7 @@ Journal::replay_ret CircularBoundedJournal::replay( locator.write_result.start_seq, modify_time); }); - }).safe_then([this, &cursor_addr]() { - if (cursor_addr >= get_journal_end()) { - cursor_addr = (cursor_addr - get_journal_end()) + get_start_addr(); - } - if (get_written_to() + - ceph::encoded_sizeof_bounded() > get_journal_end()) { - cursor_addr = get_start_addr(); - } + }).safe_then([]() { return replay_ertr::make_ready_future< seastar::stop_iteration>(seastar::stop_iteration::no); }); @@ -455,8 +406,10 @@ CircularBoundedJournal::read_record_ret CircularBoundedJournal::return_record(record_group_header_t& header, bufferlist bl) { LOG_PREFIX(CircularBoundedJournal::return_record); + DEBUG("record size {}", bl.length()); + assert(bl.length() == header.mdlength + header.dlength); bufferlist md_bl, data_bl; - md_bl.substr_of(bl, 0, get_block_size()); + md_bl.substr_of(bl, 0, header.mdlength); data_bl.substr_of(bl, header.mdlength, header.dlength); if (validate_records_metadata(md_bl) && validate_records_data(header, data_bl)) { @@ -471,23 +424,18 @@ CircularBoundedJournal::return_record(record_group_header_t& header, bufferlist } } -CircularBoundedJournal::read_record_ret CircularBoundedJournal::read_record(paddr_t off) +CircularBoundedJournal::read_record_ret +CircularBoundedJournal::read_record(paddr_t off, segment_seq_t last_seq) { LOG_PREFIX(CircularBoundedJournal::read_record); - rbm_abs_addr offset = convert_paddr_to_abs_addr( - off); - rbm_abs_addr addr = offset; + rbm_abs_addr addr = convert_paddr_to_abs_addr(off); auto read_length = get_block_size(); - if (addr + get_block_size() > get_journal_end()) { - addr = get_start_addr(); - read_length = get_journal_end() - offset; - } + assert(addr + read_length <= get_journal_end()); DEBUG("read_record: reading record from abs addr {} read length {}", addr, read_length); auto bptr = bufferptr(ceph::buffer::create_page_aligned(read_length)); - bptr.zero(); return device->read(addr, bptr - ).safe_then([this, addr, read_length, bptr, FNAME]() mutable + ).safe_then([this, addr, bptr, last_seq, FNAME]() mutable -> read_record_ret { record_group_header_t h; bufferlist bl; @@ -500,65 +448,30 @@ CircularBoundedJournal::read_record_ret CircularBoundedJournal::read_record(padd read_record_ertr::ready_future_marker{}, std::nullopt); } - /* - * | journal | - * | record 1 header | | record 1 data - * record 1 data (remaining) | - * - * <---- 1 block ----><-- - * -- 2 block ---> - * - * If record has logner than read_length and its data is located across - * the end of journal and the begining of journal, we need three reads - * ---reads of header, other remaining data before the end, and - * the other remaining data from the begining. - * - */ - if (h.mdlength + h.dlength > read_length) { - rbm_abs_addr next_read_addr = addr + read_length; - auto next_read = h.mdlength + h.dlength - read_length; - DEBUG(" next_read_addr {}, next_read_length {} ", - next_read_addr, next_read); - if (get_journal_end() < next_read_addr + next_read) { - // In this case, need two more reads. - // The first is to read remain bytes to the end of cbjournal - // The second is to read the data at the begining of cbjournal - next_read = get_journal_end() - (addr + read_length); - } - DEBUG("read_entry: additional reading addr {} length {}", - next_read_addr, - next_read); - auto next_bptr = bufferptr(ceph::buffer::create_page_aligned(next_read)); - next_bptr.zero(); - return device->read( - next_read_addr, - next_bptr - ).safe_then([this, h=h, next_bptr=std::move(next_bptr), bl=std::move(bl), - FNAME]() mutable { - bl.append(next_bptr); - if (h.mdlength + h.dlength == bl.length()) { - DEBUG("read_record: record length {} done", bl.length()); - return return_record(h, bl); - } - // need one more read - auto next_read_addr = get_start_addr(); - auto last_bptr = bufferptr(ceph::buffer::create_page_aligned( - h.mdlength + h.dlength - bl.length())); - DEBUG("read_record: last additional reading addr {} length {}", - next_read_addr, - h.mdlength + h.dlength - bl.length()); - return device->read( - next_read_addr, - last_bptr - ).safe_then([this, h=h, last_bptr=std::move(last_bptr), - bl=std::move(bl), FNAME]() mutable { - bl.append(last_bptr); - DEBUG("read_record: complte size {}", bl.length()); - return return_record(h, bl); - }); + if (h.mdlength < get_block_size() || + h.mdlength % get_block_size() != 0 || + h.dlength % get_block_size() != 0 || + addr + h.mdlength + h.dlength > get_journal_end() || + h.committed_to.segment_seq == NULL_SEG_SEQ || + h.committed_to.segment_seq < last_seq) { + return read_record_ret( + read_record_ertr::ready_future_marker{}, + std::nullopt); + } + auto record_size = h.mdlength + h.dlength; + if (record_size > get_block_size()) { + auto next_addr = addr + get_block_size(); + auto next_length = record_size - get_block_size(); + auto next_bptr = bufferptr(ceph::buffer::create_page_aligned(next_length)); + DEBUG("reading record part 2 from abs addr {} read length {}", + next_addr, next_length); + return device->read(next_addr, next_bptr + ).safe_then([this, h, next_bptr=std::move(next_bptr), bl=std::move(bl)]() mutable { + bl.append(next_bptr); + return return_record(h, bl); }); } else { - DEBUG("read_record: complte size {}", bl.length()); + assert(record_size == get_block_size()); return return_record(h, bl); } }); diff --git a/src/crimson/os/seastore/journal/circular_bounded_journal.h b/src/crimson/os/seastore/journal/circular_bounded_journal.h index 230d3fdd1229..265fea984266 100644 --- a/src/crimson/os/seastore/journal/circular_bounded_journal.h +++ b/src/crimson/os/seastore/journal/circular_bounded_journal.h @@ -108,16 +108,6 @@ public: using write_ertr = crimson::errorator< crimson::ct_error::input_output_error, crimson::ct_error::erange>; - /* - * append_record - * - * append data to current write position of CircularBoundedJournal - * - * @param bufferlist to write - * @param rbm_abs_addr where data is written - * - */ - write_ertr::future<> append_record(ceph::bufferlist bl, rbm_abs_addr addr); /* * device_write_bl * @@ -146,9 +136,10 @@ public: * read record from given address * * @param paddr_t to read + * @param last_seq * */ - read_record_ret read_record(paddr_t offset); + read_record_ret read_record(paddr_t offset, segment_seq_t last_seq); /* * read_header * @@ -240,7 +231,7 @@ public: size_t get_used_size() const { return get_written_to() >= get_journal_tail() ? get_written_to() - get_journal_tail() : - get_written_to() + get_total_size() - get_journal_tail(); + get_written_to() + header.size + get_block_size() - get_journal_tail(); } size_t get_total_size() const { return header.size; @@ -272,6 +263,8 @@ public: return written_to; } void set_written_to(rbm_abs_addr addr) { + assert(addr >= get_start_addr()); + assert(addr < get_journal_end()); written_to = addr; } device_id_t get_device_id() const { @@ -281,7 +274,7 @@ public: return header.block_size; } rbm_abs_addr get_journal_end() const { - return get_start_addr() + header.size; // journal size + header length + return get_start_addr() + header.size + get_block_size(); // journal size + header length } void add_device(NVMeBlockDevice* dev) { device = dev; @@ -300,6 +293,7 @@ private: bool initialized = false; segment_seq_t cur_segment_seq = 0; // segment seq to track the sequence to written records // start address where the newest record will be written + // should be in range [get_start_addr(), get_journal_end()) rbm_abs_addr written_to = 0; }; diff --git a/src/test/crimson/seastore/test_cbjournal.cc b/src/test/crimson/seastore/test_cbjournal.cc index a389301b281f..1161f7dfc32f 100644 --- a/src/test/crimson/seastore/test_cbjournal.cc +++ b/src/test/crimson/seastore/test_cbjournal.cc @@ -101,7 +101,7 @@ struct entry_validator_t { paddr_t paddr = convert_abs_addr_to_paddr( addr + offset, cbj.get_device_id()); - auto [header, buf] = *(cbj.read_record(paddr).unsafe_get0()); + auto [header, buf] = *(cbj.read_record(paddr, NULL_SEG_SEQ).unsafe_get0()); auto record = decode_record(buf); validate(*record); offset += header.mdlength + header.dlength; @@ -138,7 +138,7 @@ struct cbjournal_test_t : public seastar_test_suite_t epm(new ExtentPlacementManager(true)), cache(*epm) { - device = new nvme_device::TestMemory(CBTEST_DEFAULT_TEST_SIZE); + device = new nvme_device::TestMemory(CBTEST_DEFAULT_TEST_SIZE + CBTEST_DEFAULT_BLOCK_SIZE); cbj.reset(new CircularBoundedJournal(device, std::string())); device_id_t d_id = 1 << (std::numeric_limits::digits - 1); config.block_size = CBTEST_DEFAULT_BLOCK_SIZE; @@ -232,8 +232,9 @@ struct cbjournal_test_t : public seastar_test_suite_t auto mkfs() { return cbj->mkfs(config).unsafe_get0(); } - auto open() { - return cbj->open_device_read_header().unsafe_get0(); + void open() { + cbj->open_device_read_header().unsafe_get0(); + cbj->open_for_write().unsafe_get0(); } auto get_available_size() { return cbj->get_available_size();