]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/os/seastore/cache: rewrite transactions don't invalidate other
authorXuehan Xu <xuxuehan@qianxin.com>
Fri, 21 Nov 2025 13:01:15 +0000 (21:01 +0800)
committerXuehan Xu <xuxuehan@qianxin.com>
Tue, 10 Feb 2026 02:58:02 +0000 (10:58 +0800)
transactions anymore

Fixes: https://tracker.ceph.com/issues/73070
Signed-off-by: Xuehan Xu <xuxuehan@qianxin.com>
src/crimson/common/fixed_kv_node_layout.h
src/crimson/os/seastore/btree/fixed_kv_node.h
src/crimson/os/seastore/cache.cc
src/crimson/os/seastore/cache.h
src/crimson/os/seastore/cached_extent.cc
src/crimson/os/seastore/cached_extent.h
src/crimson/os/seastore/linked_tree_node.h

index fb8fc9bd1772689a77795f1a69bc13dab4f3bdb6..7adce7158b3e3e4ecf2adbf808962db7d0832c8c 100644 (file)
@@ -296,7 +296,6 @@ public:
     void copy_out(char *out, size_t len) {
       assert(len == get_bytes());
       ::memcpy(out, reinterpret_cast<const void *>(buffer.data()), get_bytes());
-      buffer.clear();
     }
     void copy_in(const char *out, size_t len) {
       assert(empty());
@@ -308,6 +307,9 @@ public:
     bool operator==(const delta_buffer_t &rhs) const {
       return buffer == rhs.buffer;
     }
+    void clear() {
+      buffer.clear();
+    }
   };
 
   void journal_insert(
index 8dcf1e9e68e25f1635ba9ce296da020018ff0a72..926408f64a99d4afa6736db93b4d998c71ea636d 100644 (file)
@@ -246,6 +246,10 @@ struct FixedKVInternalNode
     return CachedExtentRef(new node_type_t(*this));
   };
 
+  void clear_delta() final {
+    delta_buffer.clear();
+  }
+
   void on_replace_prior() final {
     this->parent_node_t::on_replace_prior();
     if (this->is_btree_root()) {
@@ -719,6 +723,10 @@ struct FixedKVLeafNode
     return CachedExtentRef(new node_type_t(*static_cast<node_type_t*>(this)));
   };
 
+  void clear_delta() final {
+    delta_buffer.clear();
+  }
+
   virtual void update(
     internal_const_iterator_t iter,
     VAL val) = 0;
index a2bf6bea14c89d08fa26493a2e6ba36c202b15e3..c4ee118fb3b13cf4a5c2d7adaa02a8bc61101dec 100644 (file)
@@ -907,20 +907,41 @@ void Cache::commit_replace_extent(
     CachedExtentRef next,
     CachedExtentRef prev)
 {
-  assert(next->get_paddr() == prev->get_paddr());
+  assert(next->get_paddr() == prev->get_paddr() ||
+         // prev is being rewritten by a trim_dirty
+         // or cleaner transaction
+         prev->get_paddr().is_record_relative());
   assert(next->get_paddr().is_absolute() || next->get_paddr().is_root());
   assert(next->version == prev->version + 1);
-  if (booting) {
+  const auto t_src = t.get_src();
+  bool t_rewrite = is_rewrite_transaction(t_src);
+  if (booting && !t_rewrite) {
     extents_index.replace(*next, *prev);
   }
 
-  const auto t_src = t.get_src();
   if (is_root_type(prev->get_type())) {
     assert(prev->is_stable_dirty());
     assert(prev->is_linked_to_list());
     // add the new dirty root to front
     remove_from_dirty(prev, nullptr/* exclude root */);
     add_to_dirty(next, nullptr/* exclude root */);
+  } else if (t_rewrite) {
+    bool was_stable_dirty = prev->is_stable_dirty();
+    if (!was_stable_dirty) {
+      pinboard->remove(*prev);
+    }
+    prev->set_io_wait(CachedExtent::extent_state_t::DIRTY, true);
+    ceph_assert(next->committer);
+    ceph_assert(prev->committer);
+    ceph_assert(next->committer == prev->committer);
+    auto &committer = *next->committer;
+    committer.commit_state();
+    if (is_lba_backref_node(next->get_type())) {
+      committer.sync_checksum();
+    }
+    if (!was_stable_dirty) {
+      add_to_dirty(prev, &t_src);
+    }
   } else if (prev->is_stable_dirty()) {
     replace_dirty(next, prev, t_src);
   } else {
@@ -928,7 +949,9 @@ void Cache::commit_replace_extent(
     add_to_dirty(next, &t_src);
   }
 
-  invalidate_extent(t, *prev);
+  if (!t_rewrite || is_root_type(prev->get_type())) {
+    invalidate_extent(t, *prev);
+  }
 }
 
 void Cache::invalidate_extent(
@@ -1221,6 +1244,9 @@ CachedExtentRef Cache::duplicate_for_write(
   auto ret = i->duplicate_for_write(t);
   ret->pending_for_transaction = t.get_trans_id();
   ret->set_prior_instance(i);
+  if (!is_root_type(ret->get_type())) {
+    assert(ret->get_paddr().is_absolute());
+  }
   auto [iter, inserted] = i->mutation_pending_extents.insert(*ret);
   ceph_assert(inserted);
   if (is_root_type(ret->get_type())) {
@@ -1287,6 +1313,10 @@ record_t Cache::prepare_record(
       DEBUGT("invalid mutated extent -- {}", t, *i);
       continue;
     }
+    if (is_rewrite_transaction(t.get_src()) &&
+        !is_root_type(i->get_type())) {
+      i->new_committer(t);
+    }
     assert(i->is_exist_mutation_pending() ||
           i->prior_instance);
     get_by_ext(efforts.mutate_by_ext,
@@ -1299,27 +1329,19 @@ record_t Cache::prepare_record(
           t, delta_length, *i);
     assert(delta_length);
 
-    if (i->is_mutation_pending()) {
-      // If inplace rewrite happens from a concurrent transaction,
-      // i->prior_instance will be changed from DIRTY to CLEAN implicitly, thus
-      // i->prior_instance->version become 0. This won't cause conflicts
-      // intentionally because inplace rewrite won't modify the shared extent.
+   if (i->is_mutation_pending()) {
+      DEBUGT("commit replace extent ... -- {}, prior={}",
+        t, *i, *i->prior_instance);
+      // Rewrite transactions will be change stable extents' versions implicitly,
+      // and i->prior_instance->version will become different than i->version + 1.
+      // This won't cause conflicts intentionally because rewrite transactions
+      // only modifies lba/backref addresses.
       //
       // However, this leads to version mismatch below, thus we reset the
-      // version to 1 in this case.
-      if (i->prior_instance->version == 0 && i->version > 1) {
-        DEBUGT("commit replace extent (inplace-rewrite) ... -- {}, prior={}",
-               t, *i, *i->prior_instance);
-
-       assert(can_inplace_rewrite(i->get_type()));
-       assert(can_inplace_rewrite(i->prior_instance->get_type()));
-       assert(i->prior_instance->dirty_from == JOURNAL_SEQ_MIN);
-       assert(i->prior_instance->state == CachedExtent::extent_state_t::CLEAN);
-       assert(i->prior_instance->get_paddr().is_absolute_random_block());
-       i->version = 1;
-      } else {
-        DEBUGT("commit replace extent ... -- {}, prior={}",
-               t, *i, *i->prior_instance);
+      // version to i->prior_instance->1 in this case.
+      if (i->version != i->prior_instance->version + 1) {
+        assert(i->prior_instance->is_stable());
+        i->version = i->prior_instance->version + 1;
       }
     } else {
       assert(i->is_exist_mutation_pending());
@@ -1432,7 +1454,18 @@ record_t Cache::prepare_record(
     retire_stat.increment(extent->get_length());
     DEBUGT("retired and remove extent {}~0x{:x} -- {}",
           t, extent->get_paddr(), extent->get_length(), *extent);
-    commit_retire_extent(t, extent);
+    if (is_rewrite_transaction(t.get_src())) {
+      assert(extent->is_stable());
+      if (extent->is_stable_dirty()) {
+        remove_from_dirty(extent, &trans_src);
+        // set the version to zero because the extent state is now clean
+        // in order to handle this transparently
+        extent->version = 0;
+      }
+      touch_extent_fully(*extent, &trans_src, t.get_cache_hint());
+    } else {
+      commit_retire_extent(t, extent);
+    }
 
     // Note: commit extents and backref allocations in the same place
     if (is_backref_mapped_type(extent->get_type()) ||
@@ -1545,6 +1578,21 @@ record_t Cache::prepare_record(
     i->set_io_wait(CachedExtent::extent_state_t::CLEAN);
     // Note, paddr is known until complete_commit(),
     // so add_extent() later.
+    if (is_rewrite_transaction(t.get_src())) {
+      assert(i->get_prior_instance());
+      assert(!i->committer);
+      assert(!i->get_prior_instance()->committer);
+      i->new_committer(t);
+      assert(i->committer);
+      auto &committer = *i->committer;
+      // this must have been a rewriten extent
+      committer.commit_state();
+      if (is_lba_backref_node(i->get_type())) {
+        committer.sync_checksum();
+      }
+      i->get_prior_instance()->set_io_wait(
+        CachedExtent::extent_state_t::CLEAN);
+    }
   }
 
   for (auto &i: t.ool_block_list) {
@@ -1571,6 +1619,22 @@ record_t Cache::prepare_record(
     i->set_io_wait(CachedExtent::extent_state_t::CLEAN);
     // Note, paddr is (can be) known until complete_commit(),
     // so add_extent() later.
+    if (is_rewrite_transaction(t.get_src())) {
+      assert(i->get_prior_instance());
+      assert(!i->committer);
+      assert(!i->get_prior_instance()->committer);
+      i->new_committer(t);
+      assert(i->committer);
+      i->get_prior_instance()->committer = i->committer;
+      auto &committer = *i->committer;
+      // this must have been a rewriten extent
+      committer.commit_state();
+      if (is_lba_backref_node(i->get_type())) {
+        committer.sync_checksum();
+      }
+      i->get_prior_instance()->set_io_wait(
+        CachedExtent::extent_state_t::CLEAN, true);
+    }
   }
 
   for (auto &i: t.inplace_ool_block_list) {
@@ -1848,6 +1912,14 @@ void Cache::complete_commit(
   LOG_PREFIX(Cache::complete_commit);
   SUBTRACET(seastore_t, "final_block_start={}, start_seq={}",
             t, final_block_start, start_seq);
+  for (auto &i: t.retired_set) {
+    auto &extent = i.extent;
+    auto trans_src = t.get_src();
+    if (is_rewrite_transaction(trans_src)) {
+      assert(extent->is_valid());
+    }
+    epm.mark_space_free(extent->get_paddr(), extent->get_length());
+  }
 
   backref_entry_refs_t backref_entries;
   t.for_each_finalized_fresh_block([&](const CachedExtentRef &i) {
@@ -1870,13 +1942,33 @@ void Cache::complete_commit(
 #endif
     i->pending_for_transaction = TRANS_ID_NULL;
     i->on_initial_write();
-    i->reset_prior_instance();
-    DEBUGT("add extent as fresh, inline={} -- {}",
-          t, is_inline, *i);
-    i->invalidate_hints();
-    add_extent(i);
     const auto t_src = t.get_src();
-    touch_extent_fully(*i, &t_src, t.get_cache_hint());
+    if (is_rewrite_transaction(t_src)) {
+      ceph_assert(i->committer);
+      auto &committer = *i->committer;
+      auto &prior = *i->get_prior_instance();
+      ceph_assert(prior.is_valid());
+      TRACET("committing rewritten extent into "
+             "existing, inline={} -- {}, prior={}",
+             t, is_inline, *i, prior);
+      prior.pending_for_transaction = TRANS_ID_NULL;
+      committer.commit_and_share_paddr();
+      if (is_lba_backref_node(i->get_type())) {
+        committer.commit_data();
+        committer.share_prior_data_to_pending();
+      }
+      touch_extent_fully(prior, &t_src, t.get_cache_hint());
+      prior.complete_io();
+      i->committer.reset();
+      prior.committer.reset();
+    } else {
+      TRACET("add extent as fresh, inline={} -- {}",
+             t, is_inline, *i);
+      i->invalidate_hints();
+      add_extent(i);
+      touch_extent_fully(*i, &t_src, t.get_cache_hint());
+    }
+    i->reset_prior_instance();
     i->complete_io();
     epm.commit_space_used(i->get_paddr(), i->get_length());
 
@@ -1925,23 +2017,46 @@ void Cache::complete_commit(
     assert(i->io_wait->from_state == CachedExtent::extent_state_t::EXIST_MUTATION_PENDING
            || (i->io_wait->from_state == CachedExtent::extent_state_t::MUTATION_PENDING
                && i->prior_instance));
-    i->on_delta_write(final_block_start);
-    i->pending_for_transaction = TRANS_ID_NULL;
-    i->reset_prior_instance();
-    assert(i->version > 0);
     if (i->version == 1 || is_root_type(i->get_type())) {
       i->dirty_from = start_seq;
       DEBUGT("commit extent done, become dirty -- {}", t, *i);
+      if (is_rewrite_transaction(t.get_src()) && !is_root_type(i->get_type())) {
+        auto &prior = *i->get_prior_instance();
+        prior.dirty_from = start_seq;
+        ceph_assert(i->committer);
+        auto &committer = *i->committer;
+        committer.sync_dirty_from();
+      }
     } else {
       DEBUGT("commit extent done -- {}", t, *i);
     }
+    i->on_delta_write(final_block_start);
+    if (is_rewrite_transaction(t.get_src()) &&
+        !is_root_type(i->get_type())) {
+      TRACET("committing paddr to prior for {}, prior={}",
+        t, *i, *i->prior_instance);
+      assert(i->committer);
+      auto &committer = *i->committer;
+      auto &prior = *i->prior_instance;
+      prior.pending_for_transaction = TRANS_ID_NULL;
+      ceph_assert(prior.is_valid());
+      if (is_lba_backref_node(i->get_type())) {
+        committer.commit_data();
+        committer.share_prior_data_to_pending();
+      }
+      prior.complete_io();
+      prior.clear_delta();
+      i->committer.reset();
+      prior.committer.reset();
+    }
+
+    i->pending_for_transaction = TRANS_ID_NULL;
+    i->reset_prior_instance();
+    assert(i->version > 0);
     i->complete_io();
+    i->clear_delta();
   }
 
-  for (auto &i: t.retired_set) {
-    auto &extent = i.extent;
-    epm.mark_space_free(extent->get_paddr(), extent->get_length());
-  }
   for (auto &i: t.existing_block_list) {
     if (!i->is_valid()) {
       continue;
@@ -1958,6 +2073,17 @@ void Cache::complete_commit(
     apply_backref_byseq(t.move_backref_entries(), start_seq);
     commit_backref_entries(std::move(backref_entries), start_seq);
   }
+
+  if (is_rewrite_transaction(t.get_src())) {
+    t.for_each_finalized_fresh_block([&t](const CachedExtentRef &i) {
+      i->set_invalid(t);
+    });
+    for (auto &i: t.mutated_block_list) {
+      if (i->get_type() != extent_types_t::ROOT) {
+        i->set_invalid(t);
+      }
+    }
+  }
 }
 
 void Cache::init()
index 2c0401dde7b074b29d4f36012b1df79353a80f73..547246a6972f57c8a87a94e52e449989a4913bd4 100644 (file)
@@ -613,7 +613,6 @@ public:
       p_extent = extent->maybe_get_transactional_view(t);
       ceph_assert(p_extent);
       if (p_extent != extent.get()) {
-        assert(!extent->is_pending_io());
         assert(p_extent->is_pending_in_trans(t.get_trans_id()));
         assert(!p_extent->is_pending_io());
         ++access_stats.trans_pending;
@@ -666,7 +665,6 @@ public:
       ++stats.access.trans_pending;
       if (extent->is_mutable()) {
         assert(extent->is_fully_loaded());
-        assert(!extent->is_pending_io());
         return get_extent_iertr::make_ready_future<CachedExtentRef>(extent);
       } else {
         assert(extent->is_exist_clean());
index e28b2c406feeb4ba8aeedd9e7e6096f00d0ee805..72fe6840993dbfa719255fa4d042057b11b7133c 100644 (file)
@@ -359,6 +359,8 @@ void ExtentCommitter::sync_dirty_from() {
   auto &prior = *extent.prior_instance;
   for (auto &mext : prior.mutation_pending_extents) {
     auto &mextent = static_cast<CachedExtent&>(mext);
+    assert(mextent.dirty_from < extent.dirty_from ||
+      mextent.dirty_from == JOURNAL_SEQ_NULL);
     mextent.dirty_from = extent.dirty_from;
   }
 }
@@ -411,17 +413,13 @@ void ExtentCommitter::commit_and_share_paddr() {
     return;
   }
   if (prior.read_transactions.empty()) {
-    prior.set_paddr(
-      extent.get_paddr(),
-      prior.get_paddr().is_absolute());
+    prior.set_paddr(extent.get_paddr());
     return;
   }
   for (auto &item : prior.read_transactions) {
     auto [removed, retired] = item.t->pre_stable_extent_paddr_mod(item);
     if (prior.get_paddr() != extent.get_paddr()) {
-      prior.set_paddr(
-        extent.get_paddr(),
-        prior.get_paddr().is_absolute());
+      prior.set_paddr(extent.get_paddr());
     }
     item.t->post_stable_extent_paddr_mod(item, retired);
     item.t->maybe_update_pending_paddr(old_paddr, extent.get_paddr());
@@ -459,5 +457,13 @@ void ExtentCommitter::_share_prior_data_to_pending_versions()
   }
 }
 
+void CachedExtent::new_committer(Transaction &t) {
+  ceph_assert(is_rewrite_transaction(t.get_src()));
+  ceph_assert(!committer);
+  committer = new ExtentCommitter(*this, t);
+  assert(prior_instance);
+  assert(!prior_instance->committer);
+  prior_instance->committer = committer;
+}
 
 }
index bf51eddc745a78cf98a360068049829d03740932..d2c4e6eed85b4f03def9bdcd69b797ff34f7d544 100644 (file)
@@ -273,7 +273,8 @@ enum class extent_2q_state_t : uint8_t {
   Max
 };
 
-class ExtentCommitter {
+class ExtentCommitter : public boost::intrusive_ref_counter<
+  ExtentCommitter, boost::thread_unsafe_counter> {
 public:
   ExtentCommitter(CachedExtent &extent, Transaction &t)
     : extent(extent), t(t) {}
@@ -305,6 +306,11 @@ private:
 
   void _share_prior_data_to_mutations();
   void _share_prior_data_to_pending_versions();
+
+  template <typename T>
+  void _set_invalidaters(Transaction &t);
+
+  friend class Cache;
 };
 using ExtentCommitterRef = boost::intrusive_ptr<ExtentCommitter>;
 
@@ -467,6 +473,13 @@ public:
    */
   virtual extent_types_t get_type() const = 0;
 
+  /**
+   * clear_delta
+   *
+   * clear the mutation delta buffer of the cached extent.
+   */
+  virtual void clear_delta() {}
+
   virtual bool is_logical() const {
     assert(!is_logical_type(get_type()));
     assert(is_physical_type(get_type()));
@@ -996,6 +1009,10 @@ private:
   // This field is unused when the ExtentPinboard use LRU algorithm
   extent_2q_state_t cache_state = extent_2q_state_t::Fresh;
 
+  ExtentCommitterRef committer;
+
+  void new_committer(Transaction &t);
+
 protected:
   trans_view_set_t mutation_pending_extents;
   trans_view_set_t retired_transactions;
@@ -1032,16 +1049,11 @@ protected:
       dirty_from(other.dirty_from),
       length(other.get_length()),
       loaded_length(other.get_loaded_length()),
-      version(other.version) {
+      version(other.version),
+      poffset(other.poffset) {
     // the extent must be fully loaded before CoW
     assert(other.is_fully_loaded());
     assert(is_aligned(length, CEPH_PAGE_SIZE));
-    if (other.poffset.is_absolute() ||
-        !other.prior_poffset.has_value()) {
-      poffset = other.poffset;
-    } else {
-      poffset = *other.prior_poffset;
-    }
     if (length > 0) {
       ptr = create_extent_ptr_rand(length);
       other.ptr->copy_out(0, length, ptr->c_str());
@@ -1061,9 +1073,7 @@ protected:
       length(other.get_length()),
       loaded_length(other.get_loaded_length()),
       version(other.version),
-      poffset(other.poffset.is_absolute()
-        ? other.poffset
-        : *other.prior_poffset) {
+      poffset(other.poffset) {
     // the extent must be fully loaded before CoW
     assert(other.is_fully_loaded());
     assert(is_aligned(length, CEPH_PAGE_SIZE));
index 8a9ddee84620822e7a522b300bb04c90c3d43ab5..ec5dd810275d46cf1ec6520776ebf22f0af681d6 100644 (file)
@@ -426,6 +426,33 @@ public:
     set_child_ptracker(child);
   }
 
+  // copy dests points from a stable node back to its pending nodes
+  // having copy sources at the same tree level, it serves as a two-level index:
+  // transaction-id then node-key to the pending node.
+  //
+  // The copy dest pointers must be symmetric to the copy source pointers.
+  //
+  // copy_dests_t will be automatically unregisterred upon transaction destruction,
+  // see Transaction::views
+  struct copy_dests_t : trans_spec_view_t {
+    std::set<TCachedExtentRef<T>, Comparator> dests_by_key;
+    copy_dests_t(Transaction &t) : trans_spec_view_t{t.get_trans_id()} {}
+    ~copy_dests_t() {
+      LOG_PREFIX(~copy_dests_t);
+      SUBTRACE(seastore_fixedkv_tree, "copy_dests_t destroyed");
+    }
+  };
+
+  const copy_dests_t *get_copy_dests(Transaction &t) {
+    auto iter = copy_dests_by_trans.find(
+      t.get_trans_id(), trans_spec_view_t::cmp_t());
+    if (iter == copy_dests_by_trans.end()) {
+      return nullptr;
+    } else {
+      return static_cast<copy_dests_t*>(&*iter);
+    }
+  }
+
 protected:
   ParentNode(btreenode_pos_t capacity)
     : children(capacity, nullptr) {}
@@ -1002,6 +1029,7 @@ protected:
 
   parent_tracker_t<T>* my_tracker = nullptr;
   std::vector<BaseChildNode<T, node_key_t>*> children;
+
 private:
   T& down_cast() {
     return *static_cast<T*>(this);
@@ -1032,23 +1060,6 @@ private:
 
   std::set<TCachedExtentRef<T>, Comparator> copy_sources;
 
-  // copy dests points from a stable node back to its pending nodes
-  // having copy sources at the same tree level, it serves as a two-level index:
-  // transaction-id then node-key to the pending node.
-  //
-  // The copy dest pointers must be symmetric to the copy source pointers.
-  //
-  // copy_dests_t will be automatically unregisterred upon transaction destruction,
-  // see Transaction::views
-  struct copy_dests_t : trans_spec_view_t {
-    std::set<TCachedExtentRef<T>, Comparator> dests_by_key;
-    copy_dests_t(Transaction &t) : trans_spec_view_t{t.get_trans_id()} {}
-    ~copy_dests_t() {
-      LOG_PREFIX(~copy_dests_t);
-      SUBTRACE(seastore_fixedkv_tree, "copy_dests_t destroyed");
-    }
-  };
-
   using trans_view_set_t = trans_spec_view_t::trans_view_set_t;
   trans_view_set_t copy_dests_by_trans;
   template <typename, typename, typename>