From 0ad74ec02698e47235bb0530eef87fc040af22ae Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Mon, 1 Nov 2021 16:28:59 +0800 Subject: [PATCH] crimson/os/seastore: fix ordered updates to JournalSegmentManager::committed_to Journal segment should not update committed_to during rolling as there might be still pending writes from the previous segment. A side-effect here is that committed_to now needs to include segment_seq_t to point to a previous segment. Signed-off-by: Yingxin Cheng --- .../os/seastore/extent_placement_manager.h | 4 +- src/crimson/os/seastore/extent_reader.cc | 37 +++++++++++-------- src/crimson/os/seastore/journal.cc | 23 ++++-------- src/crimson/os/seastore/journal.h | 10 ++--- src/crimson/os/seastore/seastore_types.cc | 2 +- src/crimson/os/seastore/seastore_types.h | 30 +++++++++------ src/crimson/os/seastore/segment_cleaner.cc | 10 ++--- src/crimson/os/seastore/segment_cleaner.h | 24 ++++++------ 8 files changed, 69 insertions(+), 71 deletions(-) diff --git a/src/crimson/os/seastore/extent_placement_manager.h b/src/crimson/os/seastore/extent_placement_manager.h index e8bde09b44d0c..a691f036ee4af 100644 --- a/src/crimson/os/seastore/extent_placement_manager.h +++ b/src/crimson/os/seastore/extent_placement_manager.h @@ -69,7 +69,7 @@ public: extent_offset += extent.get_bptr().length(); } assert(extent_offset == (segment_off_t)(base + rsize.mdlength + rsize.dlength)); - return encode_record(rsize, std::move(record), block_size, base, nonce); + return encode_record(rsize, std::move(record), block_size, journal_seq_t(), nonce); } void add_extent(LogicalCachedExtentRef& extent) { extents.emplace_back(extent); @@ -87,7 +87,7 @@ public: void set_base(segment_off_t b) { base = b; } - segment_off_t get_base() { + segment_off_t get_base() const { return base; } void clear() { diff --git a/src/crimson/os/seastore/extent_reader.cc b/src/crimson/os/seastore/extent_reader.cc index 984be2d5198be..a3dca456bf542 100644 --- a/src/crimson/os/seastore/extent_reader.cc +++ b/src/crimson/os/seastore/extent_reader.cc @@ -62,7 +62,7 @@ ExtentReader::scan_extents_ret ExtentReader::scan_extents( { auto ret = std::make_unique(); auto* extents = ret.get(); - return read_segment_header(cursor.get_offset().segment + return read_segment_header(cursor.get_segment_id() ).handle_error( scan_extents_ertr::pass_further{}, crimson::ct_error::assert_all{ @@ -114,9 +114,9 @@ ExtentReader::scan_valid_records_ret ExtentReader::scan_valid_records( found_record_handler_t &handler) { auto& segment_manager = - *segment_managers[cursor.offset.segment.device_id()]; - if (cursor.offset.offset == 0) { - cursor.offset.offset = segment_manager.get_block_size(); + *segment_managers[cursor.get_segment_id().device_id()]; + if (cursor.get_segment_offset() == 0) { + cursor.increment(segment_manager.get_block_size()); } auto retref = std::make_unique(0); auto &budget_used = *retref; @@ -125,30 +125,33 @@ ExtentReader::scan_valid_records_ret ExtentReader::scan_valid_records( -> scan_valid_records_ertr::future { return [=, &handler, &cursor, &budget_used] { if (!cursor.last_valid_header_found) { - return read_validate_record_metadata(cursor.offset, nonce + return read_validate_record_metadata(cursor.seq.offset, nonce ).safe_then([=, &cursor](auto md) { logger().debug( "ExtentReader::scan_valid_records: read complete {}", - cursor.offset); + cursor.seq); if (!md) { logger().debug( "ExtentReader::scan_valid_records: found invalid header at {}, presumably at end", - cursor.offset); + cursor.seq); cursor.last_valid_header_found = true; return scan_valid_records_ertr::now(); } else { + auto new_committed_to = md->first.committed_to; logger().debug( - "ExtentReader::scan_valid_records: valid record read at {}", - cursor.offset); - cursor.last_committed = paddr_t{ - cursor.offset.segment, - md->first.committed_to}; + "ExtentReader::scan_valid_records: valid record read at {}, now committed at {}", + cursor.seq, + new_committed_to); + ceph_assert(cursor.last_committed == journal_seq_t() || + cursor.last_committed <= new_committed_to); + cursor.last_committed = new_committed_to; cursor.pending_records.emplace_back( - cursor.offset, + cursor.seq.offset, md->first, md->second); - cursor.offset.offset += - md->first.dlength + md->first.mdlength; + cursor.increment(md->first.dlength + md->first.mdlength); + ceph_assert(new_committed_to == journal_seq_t() || + new_committed_to < cursor.seq); return scan_valid_records_ertr::now(); } }).safe_then([=, &cursor, &budget_used, &handler] { @@ -166,7 +169,9 @@ ExtentReader::scan_valid_records_ret ExtentReader::scan_valid_records( seastar::stop_iteration>(seastar::stop_iteration::yes); } auto &next = cursor.pending_records.front(); - if (next.offset > cursor.last_committed) { + journal_seq_t next_seq = {cursor.seq.segment_seq, next.offset}; + if (cursor.last_committed == journal_seq_t() || + next_seq > cursor.last_committed) { return scan_valid_records_ertr::make_ready_future< seastar::stop_iteration>(seastar::stop_iteration::yes); } diff --git a/src/crimson/os/seastore/journal.cc b/src/crimson/os/seastore/journal.cc index 55057b88bdf38..b2d4aaf43ea08 100644 --- a/src/crimson/os/seastore/journal.cc +++ b/src/crimson/os/seastore/journal.cc @@ -170,7 +170,7 @@ Journal::replay_segment( { logger().debug("Journal::replay_segment: starting at {}", seq); return seastar::do_with( - scan_valid_records_cursor(seq.offset), + scan_valid_records_cursor(seq), ExtentReader::found_record_handler_t( [=, &handler](paddr_t base, const record_header_t &header, @@ -378,13 +378,9 @@ void Journal::JournalSegmentManager::mark_committed( logger().debug( "JournalSegmentManager::mark_committed: committed_to {} => {}", committed_to, new_committed_to); - assert(new_committed_to.segment_seq <= - get_segment_seq()); - if (new_committed_to.segment_seq == - get_segment_seq()) { - assert(committed_to.offset.offset < new_committed_to.offset.offset); - committed_to = new_committed_to; - } + assert(committed_to == journal_seq_t() || + committed_to <= new_committed_to); + committed_to = new_committed_to; } Journal::JournalSegmentManager::initialize_segment_ertr::future<> @@ -401,7 +397,7 @@ Journal::JournalSegmentManager::initialize_segment(Segment& segment) auto header = segment_header_t{ seq, segment.get_segment_id(), - segment_provider->get_journal_tail_target(), + new_tail, current_segment_nonce, false}; logger().debug( @@ -421,14 +417,9 @@ Journal::JournalSegmentManager::initialize_segment(Segment& segment) bl.append(bp); written_to = 0; - // FIXME: improve committed_to to point to another segment - committed_to = get_current_write_seq(); return write(bl ).safe_then([this, new_tail, write_size=bl.length() ](journal_seq_t write_start_seq) { - auto committed_to = write_start_seq; - committed_to.offset.offset += write_size; - mark_committed(committed_to); segment_provider->update_journal_tail_committed(new_tail); }); } @@ -475,7 +466,7 @@ Journal::RecordBatch::add_pending( ceph::bufferlist Journal::RecordBatch::encode_records( size_t block_size, - segment_off_t committed_to, + const journal_seq_t& committed_to, segment_nonce_t segment_nonce) { logger().debug( @@ -532,7 +523,7 @@ ceph::bufferlist Journal::RecordBatch::submit_pending_fast( record_t&& record, const record_size_t& rsize, size_t block_size, - segment_off_t committed_to, + const journal_seq_t& committed_to, segment_nonce_t segment_nonce) { logger().debug( diff --git a/src/crimson/os/seastore/journal.h b/src/crimson/os/seastore/journal.h index 6e0aa3942ff84..be7c49695617a 100644 --- a/src/crimson/os/seastore/journal.h +++ b/src/crimson/os/seastore/journal.h @@ -140,10 +140,8 @@ private: return current_segment_nonce; } - segment_off_t get_committed_to() const { - assert(committed_to.segment_seq == - get_segment_seq()); - return committed_to.offset.offset; + journal_seq_t get_committed_to() const { + return committed_to; } segment_seq_t get_segment_seq() const { @@ -287,7 +285,7 @@ private: // Encode the batched records for write. ceph::bufferlist encode_records( size_t block_size, - segment_off_t committed_to, + const journal_seq_t& committed_to, segment_nonce_t segment_nonce); // Set the write result and reset for reuse @@ -304,7 +302,7 @@ private: record_t&&, const record_size_t&, size_t block_size, - segment_off_t committed_to, + const journal_seq_t& committed_to, segment_nonce_t segment_nonce); private: diff --git a/src/crimson/os/seastore/seastore_types.cc b/src/crimson/os/seastore/seastore_types.cc index 63fa9759f3e9d..cafe6fe6a8de0 100644 --- a/src/crimson/os/seastore/seastore_types.cc +++ b/src/crimson/os/seastore/seastore_types.cc @@ -150,7 +150,7 @@ ceph::bufferlist encode_record( record_size_t rsize, record_t &&record, size_t block_size, - segment_off_t committed_to, + const journal_seq_t& committed_to, segment_nonce_t current_segment_nonce) { bufferlist data_bl; diff --git a/src/crimson/os/seastore/seastore_types.h b/src/crimson/os/seastore/seastore_types.h index 226010713884a..a7fc29a46ffc3 100644 --- a/src/crimson/os/seastore/seastore_types.h +++ b/src/crimson/os/seastore/seastore_types.h @@ -1143,11 +1143,11 @@ struct record_header_t { // Fixed portion extent_len_t mdlength; // block aligned, length of metadata extent_len_t dlength; // block aligned, length of data - uint32_t deltas; // number of deltas - uint32_t extents; // number of extents + uint32_t deltas; // number of deltas + uint32_t extents; // number of extents segment_nonce_t segment_nonce;// nonce of containing segment - segment_off_t committed_to; // records in this segment prior to committed_to - // have been fully written + journal_seq_t committed_to; // records prior to committed_to have been + // fully written, maybe in another segment. checksum_t data_crc; // crc of data payload @@ -1188,14 +1188,14 @@ ceph::bufferlist encode_record( record_size_t rsize, record_t &&record, size_t block_size, - segment_off_t committed_to, + const journal_seq_t& committed_to, segment_nonce_t current_segment_nonce = 0); /// scan segment for end incrementally struct scan_valid_records_cursor { bool last_valid_header_found = false; - paddr_t offset; - paddr_t last_committed; + journal_seq_t seq; + journal_seq_t last_committed; struct found_record_t { paddr_t offset; @@ -1214,13 +1214,21 @@ struct scan_valid_records_cursor { return last_valid_header_found && pending_records.empty(); } - paddr_t get_offset() const { - return offset; + segment_id_t get_segment_id() const { + return seq.offset.segment; + } + + segment_off_t get_segment_offset() const { + return seq.offset.offset; + } + + void increment(segment_off_t off) { + seq.offset.offset += off; } scan_valid_records_cursor( - paddr_t offset) - : offset(offset) {} + journal_seq_t seq) + : seq(seq) {} }; } diff --git a/src/crimson/os/seastore/segment_cleaner.cc b/src/crimson/os/seastore/segment_cleaner.cc index 23e0aebc648d3..e209cf47fbf9f 100644 --- a/src/crimson/os/seastore/segment_cleaner.cc +++ b/src/crimson/os/seastore/segment_cleaner.cc @@ -321,20 +321,18 @@ SegmentCleaner::gc_trim_journal_ret SegmentCleaner::gc_trim_journal() SegmentCleaner::gc_reclaim_space_ret SegmentCleaner::gc_reclaim_space() { if (!scan_cursor) { - paddr_t next = P_ADDR_NULL; - next.segment = get_next_gc_target(); - if (next == P_ADDR_NULL) { + journal_seq_t next = get_next_gc_target(); + if (next == journal_seq_t()) { logger().debug( "SegmentCleaner::do_gc: no segments to gc"); return seastar::now(); } - next.offset = 0; scan_cursor = std::make_unique( next); logger().debug( "SegmentCleaner::do_gc: starting gc on segment {}", - scan_cursor->get_offset().segment); + scan_cursor->seq); } else { ceph_assert(!scan_cursor->is_complete()); } @@ -384,7 +382,7 @@ SegmentCleaner::gc_reclaim_space_ret SegmentCleaner::gc_reclaim_space() }); }).si_then([this, &t] { if (scan_cursor->is_complete()) { - t.mark_segment_to_release(scan_cursor->get_offset().segment); + t.mark_segment_to_release(scan_cursor->get_segment_id()); } return ecb->submit_transaction_direct(t); }); diff --git a/src/crimson/os/seastore/segment_cleaner.h b/src/crimson/os/seastore/segment_cleaner.h index aaac218395cf9..c1732aca6ce09 100644 --- a/src/crimson/os/seastore/segment_cleaner.h +++ b/src/crimson/os/seastore/segment_cleaner.h @@ -683,10 +683,6 @@ public: void update_journal_tail_target(journal_seq_t target); - void init_journal_tail(journal_seq_t tail) { - journal_tail_target = journal_tail_committed = tail; - } - void init_mkfs(journal_seq_t head) { journal_tail_target = head; journal_tail_committed = head; @@ -773,30 +769,32 @@ public: assert(ret >= 0); } - segment_id_t get_next_gc_target() const { - segment_id_t ret = NULL_SEG_ID; + journal_seq_t get_next_gc_target() const { + segment_id_t id = NULL_SEG_ID; segment_seq_t seq = NULL_SEG_SEQ; int64_t least_live_bytes = std::numeric_limits::max(); for (auto it = segments.begin(); it != segments.end(); ++it) { - auto id = it->first; + auto _id = it->first; const auto& segment_info = it->second; if (segment_info.is_closed() && !segment_info.is_in_journal(journal_tail_committed) && - space_tracker->get_usage(id) < least_live_bytes) { - ret = id; + space_tracker->get_usage(_id) < least_live_bytes) { + id = _id; seq = segment_info.journal_segment_seq; least_live_bytes = space_tracker->get_usage(id); } } - if (ret != NULL_SEG_ID) { + if (id != NULL_SEG_ID) { crimson::get_logger(ceph_subsys_seastore).debug( "SegmentCleaner::get_next_gc_target: segment {} seq {}", - ret, + id, seq); + return journal_seq_t{seq, {id, 0}}; + } else { + return journal_seq_t(); } - return ret; } SpaceTrackerIRef get_empty_space_tracker() const { @@ -987,7 +985,7 @@ private: if (!scan_cursor) return 0; - return scan_cursor->get_offset().offset; + return scan_cursor->get_segment_offset(); } /// Returns free space available for writes -- 2.47.3