From: Xuehan Xu Date: Fri, 21 Nov 2025 13:01:15 +0000 (+0800) Subject: crimson/os/seastore/cache: rewrite transactions don't invalidate other X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=72a69a4ae05bf87e8f82be420edb64d6c326f54d;p=ceph-ci.git crimson/os/seastore/cache: rewrite transactions don't invalidate other transactions anymore Fixes: https://tracker.ceph.com/issues/73070 Signed-off-by: Xuehan Xu --- diff --git a/src/crimson/common/fixed_kv_node_layout.h b/src/crimson/common/fixed_kv_node_layout.h index fb8fc9bd177..7adce7158b3 100644 --- a/src/crimson/common/fixed_kv_node_layout.h +++ b/src/crimson/common/fixed_kv_node_layout.h @@ -296,7 +296,6 @@ public: void copy_out(char *out, size_t len) { assert(len == get_bytes()); ::memcpy(out, reinterpret_cast(buffer.data()), get_bytes()); - buffer.clear(); } void copy_in(const char *out, size_t len) { assert(empty()); @@ -308,6 +307,9 @@ public: bool operator==(const delta_buffer_t &rhs) const { return buffer == rhs.buffer; } + void clear() { + buffer.clear(); + } }; void journal_insert( diff --git a/src/crimson/os/seastore/btree/fixed_kv_node.h b/src/crimson/os/seastore/btree/fixed_kv_node.h index 8dcf1e9e68e..926408f64a9 100644 --- a/src/crimson/os/seastore/btree/fixed_kv_node.h +++ b/src/crimson/os/seastore/btree/fixed_kv_node.h @@ -246,6 +246,10 @@ struct FixedKVInternalNode return CachedExtentRef(new node_type_t(*this)); }; + void clear_delta() final { + delta_buffer.clear(); + } + void on_replace_prior() final { this->parent_node_t::on_replace_prior(); if (this->is_btree_root()) { @@ -719,6 +723,10 @@ struct FixedKVLeafNode return CachedExtentRef(new node_type_t(*static_cast(this))); }; + void clear_delta() final { + delta_buffer.clear(); + } + virtual void update( internal_const_iterator_t iter, VAL val) = 0; diff --git a/src/crimson/os/seastore/cache.cc b/src/crimson/os/seastore/cache.cc index a2bf6bea14c..c4ee118fb3b 100644 --- a/src/crimson/os/seastore/cache.cc +++ b/src/crimson/os/seastore/cache.cc @@ -907,20 +907,41 @@ void Cache::commit_replace_extent( CachedExtentRef next, CachedExtentRef prev) { - assert(next->get_paddr() == prev->get_paddr()); + assert(next->get_paddr() == prev->get_paddr() || + // prev is being rewritten by a trim_dirty + // or cleaner transaction + prev->get_paddr().is_record_relative()); assert(next->get_paddr().is_absolute() || next->get_paddr().is_root()); assert(next->version == prev->version + 1); - if (booting) { + const auto t_src = t.get_src(); + bool t_rewrite = is_rewrite_transaction(t_src); + if (booting && !t_rewrite) { extents_index.replace(*next, *prev); } - const auto t_src = t.get_src(); if (is_root_type(prev->get_type())) { assert(prev->is_stable_dirty()); assert(prev->is_linked_to_list()); // add the new dirty root to front remove_from_dirty(prev, nullptr/* exclude root */); add_to_dirty(next, nullptr/* exclude root */); + } else if (t_rewrite) { + bool was_stable_dirty = prev->is_stable_dirty(); + if (!was_stable_dirty) { + pinboard->remove(*prev); + } + prev->set_io_wait(CachedExtent::extent_state_t::DIRTY, true); + ceph_assert(next->committer); + ceph_assert(prev->committer); + ceph_assert(next->committer == prev->committer); + auto &committer = *next->committer; + committer.commit_state(); + if (is_lba_backref_node(next->get_type())) { + committer.sync_checksum(); + } + if (!was_stable_dirty) { + add_to_dirty(prev, &t_src); + } } else if (prev->is_stable_dirty()) { replace_dirty(next, prev, t_src); } else { @@ -928,7 +949,9 @@ void Cache::commit_replace_extent( add_to_dirty(next, &t_src); } - invalidate_extent(t, *prev); + if (!t_rewrite || is_root_type(prev->get_type())) { + invalidate_extent(t, *prev); + } } void Cache::invalidate_extent( @@ -1221,6 +1244,9 @@ CachedExtentRef Cache::duplicate_for_write( auto ret = i->duplicate_for_write(t); ret->pending_for_transaction = t.get_trans_id(); ret->set_prior_instance(i); + if (!is_root_type(ret->get_type())) { + assert(ret->get_paddr().is_absolute()); + } auto [iter, inserted] = i->mutation_pending_extents.insert(*ret); ceph_assert(inserted); if (is_root_type(ret->get_type())) { @@ -1287,6 +1313,10 @@ record_t Cache::prepare_record( DEBUGT("invalid mutated extent -- {}", t, *i); continue; } + if (is_rewrite_transaction(t.get_src()) && + !is_root_type(i->get_type())) { + i->new_committer(t); + } assert(i->is_exist_mutation_pending() || i->prior_instance); get_by_ext(efforts.mutate_by_ext, @@ -1299,27 +1329,19 @@ record_t Cache::prepare_record( t, delta_length, *i); assert(delta_length); - if (i->is_mutation_pending()) { - // If inplace rewrite happens from a concurrent transaction, - // i->prior_instance will be changed from DIRTY to CLEAN implicitly, thus - // i->prior_instance->version become 0. This won't cause conflicts - // intentionally because inplace rewrite won't modify the shared extent. + if (i->is_mutation_pending()) { + DEBUGT("commit replace extent ... -- {}, prior={}", + t, *i, *i->prior_instance); + // Rewrite transactions will be change stable extents' versions implicitly, + // and i->prior_instance->version will become different than i->version + 1. + // This won't cause conflicts intentionally because rewrite transactions + // only modifies lba/backref addresses. // // However, this leads to version mismatch below, thus we reset the - // version to 1 in this case. - if (i->prior_instance->version == 0 && i->version > 1) { - DEBUGT("commit replace extent (inplace-rewrite) ... -- {}, prior={}", - t, *i, *i->prior_instance); - - assert(can_inplace_rewrite(i->get_type())); - assert(can_inplace_rewrite(i->prior_instance->get_type())); - assert(i->prior_instance->dirty_from == JOURNAL_SEQ_MIN); - assert(i->prior_instance->state == CachedExtent::extent_state_t::CLEAN); - assert(i->prior_instance->get_paddr().is_absolute_random_block()); - i->version = 1; - } else { - DEBUGT("commit replace extent ... -- {}, prior={}", - t, *i, *i->prior_instance); + // version to i->prior_instance->1 in this case. + if (i->version != i->prior_instance->version + 1) { + assert(i->prior_instance->is_stable()); + i->version = i->prior_instance->version + 1; } } else { assert(i->is_exist_mutation_pending()); @@ -1432,7 +1454,18 @@ record_t Cache::prepare_record( retire_stat.increment(extent->get_length()); DEBUGT("retired and remove extent {}~0x{:x} -- {}", t, extent->get_paddr(), extent->get_length(), *extent); - commit_retire_extent(t, extent); + if (is_rewrite_transaction(t.get_src())) { + assert(extent->is_stable()); + if (extent->is_stable_dirty()) { + remove_from_dirty(extent, &trans_src); + // set the version to zero because the extent state is now clean + // in order to handle this transparently + extent->version = 0; + } + touch_extent_fully(*extent, &trans_src, t.get_cache_hint()); + } else { + commit_retire_extent(t, extent); + } // Note: commit extents and backref allocations in the same place if (is_backref_mapped_type(extent->get_type()) || @@ -1545,6 +1578,21 @@ record_t Cache::prepare_record( i->set_io_wait(CachedExtent::extent_state_t::CLEAN); // Note, paddr is known until complete_commit(), // so add_extent() later. + if (is_rewrite_transaction(t.get_src())) { + assert(i->get_prior_instance()); + assert(!i->committer); + assert(!i->get_prior_instance()->committer); + i->new_committer(t); + assert(i->committer); + auto &committer = *i->committer; + // this must have been a rewriten extent + committer.commit_state(); + if (is_lba_backref_node(i->get_type())) { + committer.sync_checksum(); + } + i->get_prior_instance()->set_io_wait( + CachedExtent::extent_state_t::CLEAN); + } } for (auto &i: t.ool_block_list) { @@ -1571,6 +1619,22 @@ record_t Cache::prepare_record( i->set_io_wait(CachedExtent::extent_state_t::CLEAN); // Note, paddr is (can be) known until complete_commit(), // so add_extent() later. + if (is_rewrite_transaction(t.get_src())) { + assert(i->get_prior_instance()); + assert(!i->committer); + assert(!i->get_prior_instance()->committer); + i->new_committer(t); + assert(i->committer); + i->get_prior_instance()->committer = i->committer; + auto &committer = *i->committer; + // this must have been a rewriten extent + committer.commit_state(); + if (is_lba_backref_node(i->get_type())) { + committer.sync_checksum(); + } + i->get_prior_instance()->set_io_wait( + CachedExtent::extent_state_t::CLEAN, true); + } } for (auto &i: t.inplace_ool_block_list) { @@ -1848,6 +1912,14 @@ void Cache::complete_commit( LOG_PREFIX(Cache::complete_commit); SUBTRACET(seastore_t, "final_block_start={}, start_seq={}", t, final_block_start, start_seq); + for (auto &i: t.retired_set) { + auto &extent = i.extent; + auto trans_src = t.get_src(); + if (is_rewrite_transaction(trans_src)) { + assert(extent->is_valid()); + } + epm.mark_space_free(extent->get_paddr(), extent->get_length()); + } backref_entry_refs_t backref_entries; t.for_each_finalized_fresh_block([&](const CachedExtentRef &i) { @@ -1870,13 +1942,33 @@ void Cache::complete_commit( #endif i->pending_for_transaction = TRANS_ID_NULL; i->on_initial_write(); - i->reset_prior_instance(); - DEBUGT("add extent as fresh, inline={} -- {}", - t, is_inline, *i); - i->invalidate_hints(); - add_extent(i); const auto t_src = t.get_src(); - touch_extent_fully(*i, &t_src, t.get_cache_hint()); + if (is_rewrite_transaction(t_src)) { + ceph_assert(i->committer); + auto &committer = *i->committer; + auto &prior = *i->get_prior_instance(); + ceph_assert(prior.is_valid()); + TRACET("committing rewritten extent into " + "existing, inline={} -- {}, prior={}", + t, is_inline, *i, prior); + prior.pending_for_transaction = TRANS_ID_NULL; + committer.commit_and_share_paddr(); + if (is_lba_backref_node(i->get_type())) { + committer.commit_data(); + committer.share_prior_data_to_pending(); + } + touch_extent_fully(prior, &t_src, t.get_cache_hint()); + prior.complete_io(); + i->committer.reset(); + prior.committer.reset(); + } else { + TRACET("add extent as fresh, inline={} -- {}", + t, is_inline, *i); + i->invalidate_hints(); + add_extent(i); + touch_extent_fully(*i, &t_src, t.get_cache_hint()); + } + i->reset_prior_instance(); i->complete_io(); epm.commit_space_used(i->get_paddr(), i->get_length()); @@ -1925,23 +2017,46 @@ void Cache::complete_commit( assert(i->io_wait->from_state == CachedExtent::extent_state_t::EXIST_MUTATION_PENDING || (i->io_wait->from_state == CachedExtent::extent_state_t::MUTATION_PENDING && i->prior_instance)); - i->on_delta_write(final_block_start); - i->pending_for_transaction = TRANS_ID_NULL; - i->reset_prior_instance(); - assert(i->version > 0); if (i->version == 1 || is_root_type(i->get_type())) { i->dirty_from = start_seq; DEBUGT("commit extent done, become dirty -- {}", t, *i); + if (is_rewrite_transaction(t.get_src()) && !is_root_type(i->get_type())) { + auto &prior = *i->get_prior_instance(); + prior.dirty_from = start_seq; + ceph_assert(i->committer); + auto &committer = *i->committer; + committer.sync_dirty_from(); + } } else { DEBUGT("commit extent done -- {}", t, *i); } + i->on_delta_write(final_block_start); + if (is_rewrite_transaction(t.get_src()) && + !is_root_type(i->get_type())) { + TRACET("committing paddr to prior for {}, prior={}", + t, *i, *i->prior_instance); + assert(i->committer); + auto &committer = *i->committer; + auto &prior = *i->prior_instance; + prior.pending_for_transaction = TRANS_ID_NULL; + ceph_assert(prior.is_valid()); + if (is_lba_backref_node(i->get_type())) { + committer.commit_data(); + committer.share_prior_data_to_pending(); + } + prior.complete_io(); + prior.clear_delta(); + i->committer.reset(); + prior.committer.reset(); + } + + i->pending_for_transaction = TRANS_ID_NULL; + i->reset_prior_instance(); + assert(i->version > 0); i->complete_io(); + i->clear_delta(); } - for (auto &i: t.retired_set) { - auto &extent = i.extent; - epm.mark_space_free(extent->get_paddr(), extent->get_length()); - } for (auto &i: t.existing_block_list) { if (!i->is_valid()) { continue; @@ -1958,6 +2073,17 @@ void Cache::complete_commit( apply_backref_byseq(t.move_backref_entries(), start_seq); commit_backref_entries(std::move(backref_entries), start_seq); } + + if (is_rewrite_transaction(t.get_src())) { + t.for_each_finalized_fresh_block([&t](const CachedExtentRef &i) { + i->set_invalid(t); + }); + for (auto &i: t.mutated_block_list) { + if (i->get_type() != extent_types_t::ROOT) { + i->set_invalid(t); + } + } + } } void Cache::init() diff --git a/src/crimson/os/seastore/cache.h b/src/crimson/os/seastore/cache.h index 2c0401dde7b..547246a6972 100644 --- a/src/crimson/os/seastore/cache.h +++ b/src/crimson/os/seastore/cache.h @@ -613,7 +613,6 @@ public: p_extent = extent->maybe_get_transactional_view(t); ceph_assert(p_extent); if (p_extent != extent.get()) { - assert(!extent->is_pending_io()); assert(p_extent->is_pending_in_trans(t.get_trans_id())); assert(!p_extent->is_pending_io()); ++access_stats.trans_pending; @@ -666,7 +665,6 @@ public: ++stats.access.trans_pending; if (extent->is_mutable()) { assert(extent->is_fully_loaded()); - assert(!extent->is_pending_io()); return get_extent_iertr::make_ready_future(extent); } else { assert(extent->is_exist_clean()); diff --git a/src/crimson/os/seastore/cached_extent.cc b/src/crimson/os/seastore/cached_extent.cc index e28b2c406fe..72fe6840993 100644 --- a/src/crimson/os/seastore/cached_extent.cc +++ b/src/crimson/os/seastore/cached_extent.cc @@ -359,6 +359,8 @@ void ExtentCommitter::sync_dirty_from() { auto &prior = *extent.prior_instance; for (auto &mext : prior.mutation_pending_extents) { auto &mextent = static_cast(mext); + assert(mextent.dirty_from < extent.dirty_from || + mextent.dirty_from == JOURNAL_SEQ_NULL); mextent.dirty_from = extent.dirty_from; } } @@ -411,17 +413,13 @@ void ExtentCommitter::commit_and_share_paddr() { return; } if (prior.read_transactions.empty()) { - prior.set_paddr( - extent.get_paddr(), - prior.get_paddr().is_absolute()); + prior.set_paddr(extent.get_paddr()); return; } for (auto &item : prior.read_transactions) { auto [removed, retired] = item.t->pre_stable_extent_paddr_mod(item); if (prior.get_paddr() != extent.get_paddr()) { - prior.set_paddr( - extent.get_paddr(), - prior.get_paddr().is_absolute()); + prior.set_paddr(extent.get_paddr()); } item.t->post_stable_extent_paddr_mod(item, retired); item.t->maybe_update_pending_paddr(old_paddr, extent.get_paddr()); @@ -459,5 +457,13 @@ void ExtentCommitter::_share_prior_data_to_pending_versions() } } +void CachedExtent::new_committer(Transaction &t) { + ceph_assert(is_rewrite_transaction(t.get_src())); + ceph_assert(!committer); + committer = new ExtentCommitter(*this, t); + assert(prior_instance); + assert(!prior_instance->committer); + prior_instance->committer = committer; +} } diff --git a/src/crimson/os/seastore/cached_extent.h b/src/crimson/os/seastore/cached_extent.h index bf51eddc745..d2c4e6eed85 100644 --- a/src/crimson/os/seastore/cached_extent.h +++ b/src/crimson/os/seastore/cached_extent.h @@ -273,7 +273,8 @@ enum class extent_2q_state_t : uint8_t { Max }; -class ExtentCommitter { +class ExtentCommitter : public boost::intrusive_ref_counter< + ExtentCommitter, boost::thread_unsafe_counter> { public: ExtentCommitter(CachedExtent &extent, Transaction &t) : extent(extent), t(t) {} @@ -305,6 +306,11 @@ private: void _share_prior_data_to_mutations(); void _share_prior_data_to_pending_versions(); + + template + void _set_invalidaters(Transaction &t); + + friend class Cache; }; using ExtentCommitterRef = boost::intrusive_ptr; @@ -467,6 +473,13 @@ public: */ virtual extent_types_t get_type() const = 0; + /** + * clear_delta + * + * clear the mutation delta buffer of the cached extent. + */ + virtual void clear_delta() {} + virtual bool is_logical() const { assert(!is_logical_type(get_type())); assert(is_physical_type(get_type())); @@ -996,6 +1009,10 @@ private: // This field is unused when the ExtentPinboard use LRU algorithm extent_2q_state_t cache_state = extent_2q_state_t::Fresh; + ExtentCommitterRef committer; + + void new_committer(Transaction &t); + protected: trans_view_set_t mutation_pending_extents; trans_view_set_t retired_transactions; @@ -1032,16 +1049,11 @@ protected: dirty_from(other.dirty_from), length(other.get_length()), loaded_length(other.get_loaded_length()), - version(other.version) { + version(other.version), + poffset(other.poffset) { // the extent must be fully loaded before CoW assert(other.is_fully_loaded()); assert(is_aligned(length, CEPH_PAGE_SIZE)); - if (other.poffset.is_absolute() || - !other.prior_poffset.has_value()) { - poffset = other.poffset; - } else { - poffset = *other.prior_poffset; - } if (length > 0) { ptr = create_extent_ptr_rand(length); other.ptr->copy_out(0, length, ptr->c_str()); @@ -1061,9 +1073,7 @@ protected: length(other.get_length()), loaded_length(other.get_loaded_length()), version(other.version), - poffset(other.poffset.is_absolute() - ? other.poffset - : *other.prior_poffset) { + poffset(other.poffset) { // the extent must be fully loaded before CoW assert(other.is_fully_loaded()); assert(is_aligned(length, CEPH_PAGE_SIZE)); diff --git a/src/crimson/os/seastore/linked_tree_node.h b/src/crimson/os/seastore/linked_tree_node.h index 8a9ddee8462..ec5dd810275 100644 --- a/src/crimson/os/seastore/linked_tree_node.h +++ b/src/crimson/os/seastore/linked_tree_node.h @@ -426,6 +426,33 @@ public: set_child_ptracker(child); } + // copy dests points from a stable node back to its pending nodes + // having copy sources at the same tree level, it serves as a two-level index: + // transaction-id then node-key to the pending node. + // + // The copy dest pointers must be symmetric to the copy source pointers. + // + // copy_dests_t will be automatically unregisterred upon transaction destruction, + // see Transaction::views + struct copy_dests_t : trans_spec_view_t { + std::set, Comparator> dests_by_key; + copy_dests_t(Transaction &t) : trans_spec_view_t{t.get_trans_id()} {} + ~copy_dests_t() { + LOG_PREFIX(~copy_dests_t); + SUBTRACE(seastore_fixedkv_tree, "copy_dests_t destroyed"); + } + }; + + const copy_dests_t *get_copy_dests(Transaction &t) { + auto iter = copy_dests_by_trans.find( + t.get_trans_id(), trans_spec_view_t::cmp_t()); + if (iter == copy_dests_by_trans.end()) { + return nullptr; + } else { + return static_cast(&*iter); + } + } + protected: ParentNode(btreenode_pos_t capacity) : children(capacity, nullptr) {} @@ -1002,6 +1029,7 @@ protected: parent_tracker_t* my_tracker = nullptr; std::vector*> children; + private: T& down_cast() { return *static_cast(this); @@ -1032,23 +1060,6 @@ private: std::set, Comparator> copy_sources; - // copy dests points from a stable node back to its pending nodes - // having copy sources at the same tree level, it serves as a two-level index: - // transaction-id then node-key to the pending node. - // - // The copy dest pointers must be symmetric to the copy source pointers. - // - // copy_dests_t will be automatically unregisterred upon transaction destruction, - // see Transaction::views - struct copy_dests_t : trans_spec_view_t { - std::set, Comparator> dests_by_key; - copy_dests_t(Transaction &t) : trans_spec_view_t{t.get_trans_id()} {} - ~copy_dests_t() { - LOG_PREFIX(~copy_dests_t); - SUBTRACE(seastore_fixedkv_tree, "copy_dests_t destroyed"); - } - }; - using trans_view_set_t = trans_spec_view_t::trans_view_set_t; trans_view_set_t copy_dests_by_trans; template