if (is_rewrite_transaction(t.get_src()) &&
!is_root_type(i->get_type())) {
i->new_committer(t);
+ i->committer->block_trans(t);
}
assert(i->is_exist_mutation_pending() ||
i->prior_instance);
if (is_lba_backref_node(i->get_type())) {
committer.sync_checksum();
}
+ committer.block_trans(t);
i->get_prior_instance()->set_io_wait(
CachedExtent::extent_state_t::CLEAN, true);
}
committer.share_prior_data_to_pending();
}
touch_extent_fully(prior, &t_src, t.get_cache_hint());
+ committer.unblock_trans(t);
prior.complete_io();
i->committer.reset();
prior.committer.reset();
t, *i, *i->prior_instance);
assert(i->committer);
auto &committer = *i->committer;
+ committer.unblock_trans(t);
auto &prior = *i->prior_instance;
prior.pending_for_transaction = TRANS_ID_NULL;
ceph_assert(prior.is_valid());
}
}
+SET_SUBSYS(seastore_cache);
+
namespace crimson::os::seastore {
#ifdef DEBUG_CACHED_EXTENT_REF
prior_instance->committer = committer;
}
+void ExtentCommitter::block_trans(Transaction &t) {
+ LOG_PREFIX(ExtentCommitter::block_trans);
+ auto &prior = *extent.prior_instance;
+ for (auto &item : prior.read_transactions) {
+ TRACET("blocking trans {} for rewriting {}",
+ t, item.t->get_trans_id(), *item.ref);
+ item.t->need_wait_rewrite = true;
+ }
+}
+
+void ExtentCommitter::unblock_trans(Transaction &t) {
+ LOG_PREFIX(ExtentCommitter::unblock_trans);
+ auto &prior = *extent.prior_instance;
+ for (auto &item : prior.read_transactions) {
+ TRACET("unblocking trans {} for rewriting {}",
+ t, item.t->get_trans_id(), *item.ref);
+ item.t->need_wait_rewrite = false;
+ }
+}
+
}
ExtentCommitter(CachedExtent &extent, Transaction &t)
: extent(extent), t(t) {}
+ void block_trans(Transaction &);
+ void unblock_trans(Transaction &);
// commit all extent states to the prior instance,
// except poffset and extent content
void commit_state();
ool_write_stats = {};
rewrite_stats = {};
conflicted = false;
+ need_wait_rewrite = false;
assert(backref_entries.empty());
if (!has_reset) {
has_reset = true;
}
btree_cursor_stats_t cursor_stats;
+ bool need_wait_rewrite = false;
+
private:
friend class Cache;
friend Ref make_test_transaction();
tref.get_handle().enter(write_pipeline.prepare)
);
+ while (tref.need_wait_rewrite) {
+ co_await trans_intr::make_interruptible(seastar::yield());
+ }
if (trim_alloc_to && *trim_alloc_to != JOURNAL_SEQ_NULL) {
SUBTRACET(seastore_t, "trim backref_bufs to {}", tref, *trim_alloc_to);
cache->trim_backref_bufs(*trim_alloc_to);