for (auto &item : prior.read_transactions) {
TRACET("blocking trans {} for rewriting {}",
t, item.t->get_trans_id(), *item.ref);
- item.t->need_wait_visibility = true;
+ item.t->blocked_by.insert(t.get_trans_id());
}
}
for (auto &item : prior.read_transactions) {
TRACET("unblocking trans {} for rewriting {}",
t, item.t->get_trans_id(), *item.ref);
- item.t->need_wait_visibility = false;
+ item.t->blocked_by.erase(t.get_trans_id());
}
}
ool_write_stats = {};
rewrite_stats = {};
conflicted = false;
- need_wait_visibility = false;
+ blocked_by.clear();
assert(backref_entries.empty());
if (!has_reset) {
has_reset = true;
btree_cursor_stats_t cursor_stats;
- bool need_wait_visibility = false;
+ // XXX: To allow multiple rewrite transactions to block the same
+ // transaction, we can't simply use a counter. Because a client
+ // transaction may be add to the read_transactions of an extent
+ // when the rewrite transactions that modifies the extent are
+ // committing.
+ std::set<transaction_id_t> blocked_by;;
private:
friend class Cache;
tref.get_handle().enter(write_pipeline.prepare)
);
- while (tref.need_wait_visibility) {
+ while (!tref.blocked_by.empty()) {
co_await trans_intr::make_interruptible(seastar::yield());
}
if (trim_alloc_to && *trim_alloc_to != JOURNAL_SEQ_NULL) {
rewrite_gen_printer_t{target_generation},
sea_time_point_printer_t{modify_time},
*extent);
- ceph_assert(!extent->is_pending_io());
}
assert(extent->is_valid() && !extent->is_initial_pending());