From d7aa90b17c9c570b7c1ba604bf95c102c60f4ad7 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Tue, 5 Jul 2022 22:32:23 +0800 Subject: [PATCH] crimson/os/seastore/circular_bounded_journal: increment seq when roll Similar to segmented_journal, only increment seq when roll circular_bounded_journal. Signed-off-by: Yingxin Cheng --- .../journal/circular_bounded_journal.cc | 42 ++++++++++++------- .../journal/circular_bounded_journal.h | 9 ++-- 2 files changed, 34 insertions(+), 17 deletions(-) diff --git a/src/crimson/os/seastore/journal/circular_bounded_journal.cc b/src/crimson/os/seastore/journal/circular_bounded_journal.cc index 03facde435f7c..38134462fd03d 100644 --- a/src/crimson/os/seastore/journal/circular_bounded_journal.cc +++ b/src/crimson/os/seastore/journal/circular_bounded_journal.cc @@ -104,10 +104,13 @@ CircularBoundedJournal::open_for_write_ret CircularBoundedJournal::open_for_writ paddr_t paddr = convert_abs_addr_to_paddr( get_written_to(), header.device_id); + if (circulation_seq == NULL_SEG_SEQ) { + circulation_seq = 0; + } return open_for_write_ret( open_for_write_ertr::ready_future_marker{}, journal_seq_t{ - cur_segment_seq, + circulation_seq, paddr }); } @@ -149,7 +152,7 @@ CircularBoundedJournal::open_device_read_header() return open_for_write_ret( open_for_write_ertr::ready_future_marker{}, journal_seq_t{ - cur_segment_seq, + circulation_seq, paddr }); }); @@ -166,6 +169,7 @@ CircularBoundedJournal::submit_record_ret CircularBoundedJournal::submit_record( { LOG_PREFIX(CircularBoundedJournal::submit_record); assert(write_pipeline); + assert(circulation_seq != NULL_SEG_SEQ); auto r_size = record_group_size_t(record.size, get_block_size()); auto encoded_size = r_size.get_encoded_length(); if (encoded_size > get_available_size()) { @@ -179,6 +183,7 @@ CircularBoundedJournal::submit_record_ret CircularBoundedJournal::submit_record( if (encoded_size + get_written_to() > get_journal_end()) { DEBUG("roll"); set_written_to(get_start_addr()); + ++circulation_seq; if (encoded_size > get_available_size()) { ERROR("rolled, record size {}, but available size {}", encoded_size, get_available_size()); @@ -187,7 +192,7 @@ CircularBoundedJournal::submit_record_ret CircularBoundedJournal::submit_record( } journal_seq_t j_seq { - cur_segment_seq++, + circulation_seq, convert_abs_addr_to_paddr( get_written_to(), header.device_id)}; @@ -200,6 +205,7 @@ CircularBoundedJournal::submit_record_ret CircularBoundedJournal::submit_record( assert(new_written_to == get_journal_end()); DEBUG("roll"); set_written_to(get_start_addr()); + ++circulation_seq; } else { set_written_to(new_written_to); } @@ -309,29 +315,31 @@ Journal::replay_ret CircularBoundedJournal::replay( LOG_PREFIX(CircularBoundedJournal::replay); return open_device_read_header( ).safe_then([this, FNAME, delta_handler=std::move(delta_handler)] (auto) { + circulation_seq = NULL_SEG_SEQ; set_written_to(get_journal_tail()); return seastar::do_with( bool(false), rbm_abs_addr(get_journal_tail()), std::move(delta_handler), - segment_seq_t(), - [this, FNAME](auto &is_rolled, auto &cursor_addr, auto &d_handler, auto &last_seq) { + segment_seq_t(NULL_SEG_SEQ), + [this, FNAME](auto &is_rolled, auto &cursor_addr, auto &d_handler, auto &expected_seq) { return crimson::repeat( - [this, &is_rolled, &cursor_addr, &d_handler, &last_seq, FNAME]() mutable + [this, &is_rolled, &cursor_addr, &d_handler, &expected_seq, FNAME]() mutable -> replay_ertr::future { paddr_t record_paddr = convert_abs_addr_to_paddr( cursor_addr, header.device_id); - return read_record(record_paddr, last_seq - ).safe_then([this, &is_rolled, &cursor_addr, &d_handler, &last_seq, FNAME](auto ret) + return read_record(record_paddr, expected_seq + ).safe_then([this, &is_rolled, &cursor_addr, &d_handler, &expected_seq, FNAME](auto ret) -> replay_ertr::future { if (!ret.has_value()) { - if (is_rolled) { + if (expected_seq == NULL_SEG_SEQ || is_rolled) { DEBUG("no more records, stop replaying"); return replay_ertr::make_ready_future< seastar::stop_iteration>(seastar::stop_iteration::yes); } else { cursor_addr = get_start_addr(); + ++expected_seq; is_rolled = true; return replay_ertr::make_ready_future< seastar::stop_iteration>(seastar::stop_iteration::no); @@ -355,15 +363,20 @@ Journal::replay_ret CircularBoundedJournal::replay( r_header.committed_to, (seastore_off_t)bl.length() }; - cur_segment_seq = r_header.committed_to.segment_seq + 1; + if (expected_seq == NULL_SEG_SEQ) { + expected_seq = r_header.committed_to.segment_seq; + } else { + assert(expected_seq == r_header.committed_to.segment_seq); + } cursor_addr += bl.length(); if (cursor_addr >= get_journal_end()) { assert(cursor_addr == get_journal_end()); cursor_addr = get_start_addr(); + ++expected_seq; is_rolled = true; } set_written_to(cursor_addr); - last_seq = r_header.committed_to.segment_seq; + circulation_seq = expected_seq; return seastar::do_with( std::move(*maybe_record_deltas_list), [write_result, @@ -425,7 +438,7 @@ CircularBoundedJournal::return_record(record_group_header_t& header, bufferlist } CircularBoundedJournal::read_record_ret -CircularBoundedJournal::read_record(paddr_t off, segment_seq_t last_seq) +CircularBoundedJournal::read_record(paddr_t off, segment_seq_t expected_seq) { LOG_PREFIX(CircularBoundedJournal::read_record); rbm_abs_addr addr = convert_paddr_to_abs_addr(off); @@ -435,7 +448,7 @@ CircularBoundedJournal::read_record(paddr_t off, segment_seq_t last_seq) addr, read_length); auto bptr = bufferptr(ceph::buffer::create_page_aligned(read_length)); return device->read(addr, bptr - ).safe_then([this, addr, bptr, last_seq, FNAME]() mutable + ).safe_then([this, addr, bptr, expected_seq, FNAME]() mutable -> read_record_ret { record_group_header_t h; bufferlist bl; @@ -453,7 +466,8 @@ CircularBoundedJournal::read_record(paddr_t off, segment_seq_t last_seq) h.dlength % get_block_size() != 0 || addr + h.mdlength + h.dlength > get_journal_end() || h.committed_to.segment_seq == NULL_SEG_SEQ || - h.committed_to.segment_seq < last_seq) { + (expected_seq != NULL_SEG_SEQ && + h.committed_to.segment_seq != expected_seq)) { return read_record_ret( read_record_ertr::ready_future_marker{}, std::nullopt); diff --git a/src/crimson/os/seastore/journal/circular_bounded_journal.h b/src/crimson/os/seastore/journal/circular_bounded_journal.h index 265fea984266c..4d869f017223a 100644 --- a/src/crimson/os/seastore/journal/circular_bounded_journal.h +++ b/src/crimson/os/seastore/journal/circular_bounded_journal.h @@ -136,10 +136,10 @@ public: * read record from given address * * @param paddr_t to read - * @param last_seq + * @param expected_seq * */ - read_record_ret read_record(paddr_t offset, segment_seq_t last_seq); + read_record_ret read_record(paddr_t offset, segment_seq_t expected_seq); /* * read_header * @@ -291,7 +291,10 @@ private: * Indicates that device is open and in-memory header is valid. */ bool initialized = false; - segment_seq_t cur_segment_seq = 0; // segment seq to track the sequence to written records + + // circulation seq to track the sequence to written records + segment_seq_t circulation_seq = NULL_SEG_SEQ; + // start address where the newest record will be written // should be in range [get_start_addr(), get_journal_end()) rbm_abs_addr written_to = 0; -- 2.39.5