CachedExtentRef next,
CachedExtentRef prev)
{
- assert(next->get_paddr() == prev->get_paddr());
+ assert(next->get_paddr() == prev->get_paddr() ||
+ // prev is being rewritten by a trim_dirty
+ // or cleaner transaction
+ prev->get_paddr().is_record_relative());
assert(next->get_paddr().is_absolute() || next->get_paddr().is_root());
assert(next->version == prev->version + 1);
- if (booting) {
+ const auto t_src = t.get_src();
+ bool t_rewrite = is_rewrite_transaction(t_src);
+ if (booting && !t_rewrite) {
extents_index.replace(*next, *prev);
}
- const auto t_src = t.get_src();
if (is_root_type(prev->get_type())) {
assert(prev->is_stable_dirty());
assert(prev->is_linked_to_list());
// add the new dirty root to front
remove_from_dirty(prev, nullptr/* exclude root */);
add_to_dirty(next, nullptr/* exclude root */);
+ } else if (t_rewrite) {
+ bool was_stable_dirty = prev->is_stable_dirty();
+ if (!was_stable_dirty) {
+ pinboard->remove(*prev);
+ }
+ prev->set_io_wait(CachedExtent::extent_state_t::DIRTY, true);
+ ceph_assert(next->committer);
+ ceph_assert(prev->committer);
+ ceph_assert(next->committer == prev->committer);
+ auto &committer = *next->committer;
+ committer.commit_state();
+ if (is_lba_backref_node(next->get_type())) {
+ committer.sync_checksum();
+ }
+ if (!was_stable_dirty) {
+ add_to_dirty(prev, &t_src);
+ }
} else if (prev->is_stable_dirty()) {
replace_dirty(next, prev, t_src);
} else {
add_to_dirty(next, &t_src);
}
- invalidate_extent(t, *prev);
+ if (!t_rewrite || is_root_type(prev->get_type())) {
+ invalidate_extent(t, *prev);
+ }
}
void Cache::invalidate_extent(
auto ret = i->duplicate_for_write(t);
ret->pending_for_transaction = t.get_trans_id();
ret->set_prior_instance(i);
+ if (!is_root_type(ret->get_type())) {
+ assert(ret->get_paddr().is_absolute());
+ }
auto [iter, inserted] = i->mutation_pending_extents.insert(*ret);
ceph_assert(inserted);
if (is_root_type(ret->get_type())) {
DEBUGT("invalid mutated extent -- {}", t, *i);
continue;
}
+ if (is_rewrite_transaction(t.get_src()) &&
+ !is_root_type(i->get_type())) {
+ i->new_committer(t);
+ }
assert(i->is_exist_mutation_pending() ||
i->prior_instance);
get_by_ext(efforts.mutate_by_ext,
t, delta_length, *i);
assert(delta_length);
- if (i->is_mutation_pending()) {
- // If inplace rewrite happens from a concurrent transaction,
- // i->prior_instance will be changed from DIRTY to CLEAN implicitly, thus
- // i->prior_instance->version become 0. This won't cause conflicts
- // intentionally because inplace rewrite won't modify the shared extent.
+ if (i->is_mutation_pending()) {
+ DEBUGT("commit replace extent ... -- {}, prior={}",
+ t, *i, *i->prior_instance);
+ // Rewrite transactions will be change stable extents' versions implicitly,
+ // and i->prior_instance->version will become different than i->version + 1.
+ // This won't cause conflicts intentionally because rewrite transactions
+ // only modifies lba/backref addresses.
//
// However, this leads to version mismatch below, thus we reset the
- // version to 1 in this case.
- if (i->prior_instance->version == 0 && i->version > 1) {
- DEBUGT("commit replace extent (inplace-rewrite) ... -- {}, prior={}",
- t, *i, *i->prior_instance);
-
- assert(can_inplace_rewrite(i->get_type()));
- assert(can_inplace_rewrite(i->prior_instance->get_type()));
- assert(i->prior_instance->dirty_from == JOURNAL_SEQ_MIN);
- assert(i->prior_instance->state == CachedExtent::extent_state_t::CLEAN);
- assert(i->prior_instance->get_paddr().is_absolute_random_block());
- i->version = 1;
- } else {
- DEBUGT("commit replace extent ... -- {}, prior={}",
- t, *i, *i->prior_instance);
+ // version to i->prior_instance->1 in this case.
+ if (i->version != i->prior_instance->version + 1) {
+ assert(i->prior_instance->is_stable());
+ i->version = i->prior_instance->version + 1;
}
} else {
assert(i->is_exist_mutation_pending());
retire_stat.increment(extent->get_length());
DEBUGT("retired and remove extent {}~0x{:x} -- {}",
t, extent->get_paddr(), extent->get_length(), *extent);
- commit_retire_extent(t, extent);
+ if (is_rewrite_transaction(t.get_src())) {
+ assert(extent->is_stable());
+ if (extent->is_stable_dirty()) {
+ remove_from_dirty(extent, &trans_src);
+ // set the version to zero because the extent state is now clean
+ // in order to handle this transparently
+ extent->version = 0;
+ }
+ touch_extent_fully(*extent, &trans_src, t.get_cache_hint());
+ } else {
+ commit_retire_extent(t, extent);
+ }
// Note: commit extents and backref allocations in the same place
if (is_backref_mapped_type(extent->get_type()) ||
i->set_io_wait(CachedExtent::extent_state_t::CLEAN);
// Note, paddr is known until complete_commit(),
// so add_extent() later.
+ if (is_rewrite_transaction(t.get_src())) {
+ assert(i->get_prior_instance());
+ assert(!i->committer);
+ assert(!i->get_prior_instance()->committer);
+ i->new_committer(t);
+ assert(i->committer);
+ auto &committer = *i->committer;
+ // this must have been a rewriten extent
+ committer.commit_state();
+ if (is_lba_backref_node(i->get_type())) {
+ committer.sync_checksum();
+ }
+ i->get_prior_instance()->set_io_wait(
+ CachedExtent::extent_state_t::CLEAN);
+ }
}
for (auto &i: t.ool_block_list) {
i->set_io_wait(CachedExtent::extent_state_t::CLEAN);
// Note, paddr is (can be) known until complete_commit(),
// so add_extent() later.
+ if (is_rewrite_transaction(t.get_src())) {
+ assert(i->get_prior_instance());
+ assert(!i->committer);
+ assert(!i->get_prior_instance()->committer);
+ i->new_committer(t);
+ assert(i->committer);
+ i->get_prior_instance()->committer = i->committer;
+ auto &committer = *i->committer;
+ // this must have been a rewriten extent
+ committer.commit_state();
+ if (is_lba_backref_node(i->get_type())) {
+ committer.sync_checksum();
+ }
+ i->get_prior_instance()->set_io_wait(
+ CachedExtent::extent_state_t::CLEAN, true);
+ }
}
for (auto &i: t.inplace_ool_block_list) {
LOG_PREFIX(Cache::complete_commit);
SUBTRACET(seastore_t, "final_block_start={}, start_seq={}",
t, final_block_start, start_seq);
+ for (auto &i: t.retired_set) {
+ auto &extent = i.extent;
+ auto trans_src = t.get_src();
+ if (is_rewrite_transaction(trans_src)) {
+ assert(extent->is_valid());
+ }
+ epm.mark_space_free(extent->get_paddr(), extent->get_length());
+ }
backref_entry_refs_t backref_entries;
t.for_each_finalized_fresh_block([&](const CachedExtentRef &i) {
#endif
i->pending_for_transaction = TRANS_ID_NULL;
i->on_initial_write();
- i->reset_prior_instance();
- DEBUGT("add extent as fresh, inline={} -- {}",
- t, is_inline, *i);
- i->invalidate_hints();
- add_extent(i);
const auto t_src = t.get_src();
- touch_extent_fully(*i, &t_src, t.get_cache_hint());
+ if (is_rewrite_transaction(t_src)) {
+ ceph_assert(i->committer);
+ auto &committer = *i->committer;
+ auto &prior = *i->get_prior_instance();
+ ceph_assert(prior.is_valid());
+ TRACET("committing rewritten extent into "
+ "existing, inline={} -- {}, prior={}",
+ t, is_inline, *i, prior);
+ prior.pending_for_transaction = TRANS_ID_NULL;
+ committer.commit_and_share_paddr();
+ if (is_lba_backref_node(i->get_type())) {
+ committer.commit_data();
+ committer.share_prior_data_to_pending();
+ }
+ touch_extent_fully(prior, &t_src, t.get_cache_hint());
+ prior.complete_io();
+ i->committer.reset();
+ prior.committer.reset();
+ } else {
+ TRACET("add extent as fresh, inline={} -- {}",
+ t, is_inline, *i);
+ i->invalidate_hints();
+ add_extent(i);
+ touch_extent_fully(*i, &t_src, t.get_cache_hint());
+ }
+ i->reset_prior_instance();
i->complete_io();
epm.commit_space_used(i->get_paddr(), i->get_length());
assert(i->io_wait->from_state == CachedExtent::extent_state_t::EXIST_MUTATION_PENDING
|| (i->io_wait->from_state == CachedExtent::extent_state_t::MUTATION_PENDING
&& i->prior_instance));
- i->on_delta_write(final_block_start);
- i->pending_for_transaction = TRANS_ID_NULL;
- i->reset_prior_instance();
- assert(i->version > 0);
if (i->version == 1 || is_root_type(i->get_type())) {
i->dirty_from = start_seq;
DEBUGT("commit extent done, become dirty -- {}", t, *i);
+ if (is_rewrite_transaction(t.get_src()) && !is_root_type(i->get_type())) {
+ auto &prior = *i->get_prior_instance();
+ prior.dirty_from = start_seq;
+ ceph_assert(i->committer);
+ auto &committer = *i->committer;
+ committer.sync_dirty_from();
+ }
} else {
DEBUGT("commit extent done -- {}", t, *i);
}
+ i->on_delta_write(final_block_start);
+ if (is_rewrite_transaction(t.get_src()) &&
+ !is_root_type(i->get_type())) {
+ TRACET("committing paddr to prior for {}, prior={}",
+ t, *i, *i->prior_instance);
+ assert(i->committer);
+ auto &committer = *i->committer;
+ auto &prior = *i->prior_instance;
+ prior.pending_for_transaction = TRANS_ID_NULL;
+ ceph_assert(prior.is_valid());
+ if (is_lba_backref_node(i->get_type())) {
+ committer.commit_data();
+ committer.share_prior_data_to_pending();
+ }
+ prior.complete_io();
+ prior.clear_delta();
+ i->committer.reset();
+ prior.committer.reset();
+ }
+
+ i->pending_for_transaction = TRANS_ID_NULL;
+ i->reset_prior_instance();
+ assert(i->version > 0);
i->complete_io();
+ i->clear_delta();
}
- for (auto &i: t.retired_set) {
- auto &extent = i.extent;
- epm.mark_space_free(extent->get_paddr(), extent->get_length());
- }
for (auto &i: t.existing_block_list) {
if (!i->is_valid()) {
continue;
apply_backref_byseq(t.move_backref_entries(), start_seq);
commit_backref_entries(std::move(backref_entries), start_seq);
}
+
+ if (is_rewrite_transaction(t.get_src())) {
+ t.for_each_finalized_fresh_block([&t](const CachedExtentRef &i) {
+ i->set_invalid(t);
+ });
+ for (auto &i: t.mutated_block_list) {
+ if (i->get_type() != extent_types_t::ROOT) {
+ i->set_invalid(t);
+ }
+ }
+ }
}
void Cache::init()
Max
};
-class ExtentCommitter {
+class ExtentCommitter : public boost::intrusive_ref_counter<
+ ExtentCommitter, boost::thread_unsafe_counter> {
public:
ExtentCommitter(CachedExtent &extent, Transaction &t)
: extent(extent), t(t) {}
void _share_prior_data_to_mutations();
void _share_prior_data_to_pending_versions();
+
+ template <typename T>
+ void _set_invalidaters(Transaction &t);
+
+ friend class Cache;
};
using ExtentCommitterRef = boost::intrusive_ptr<ExtentCommitter>;
*/
virtual extent_types_t get_type() const = 0;
+ /**
+ * clear_delta
+ *
+ * clear the mutation delta buffer of the cached extent.
+ */
+ virtual void clear_delta() {}
+
virtual bool is_logical() const {
assert(!is_logical_type(get_type()));
assert(is_physical_type(get_type()));
// This field is unused when the ExtentPinboard use LRU algorithm
extent_2q_state_t cache_state = extent_2q_state_t::Fresh;
+ ExtentCommitterRef committer;
+
+ void new_committer(Transaction &t);
+
protected:
trans_view_set_t mutation_pending_extents;
trans_view_set_t retired_transactions;
dirty_from(other.dirty_from),
length(other.get_length()),
loaded_length(other.get_loaded_length()),
- version(other.version) {
+ version(other.version),
+ poffset(other.poffset) {
// the extent must be fully loaded before CoW
assert(other.is_fully_loaded());
assert(is_aligned(length, CEPH_PAGE_SIZE));
- if (other.poffset.is_absolute() ||
- !other.prior_poffset.has_value()) {
- poffset = other.poffset;
- } else {
- poffset = *other.prior_poffset;
- }
if (length > 0) {
ptr = create_extent_ptr_rand(length);
other.ptr->copy_out(0, length, ptr->c_str());
length(other.get_length()),
loaded_length(other.get_loaded_length()),
version(other.version),
- poffset(other.poffset.is_absolute()
- ? other.poffset
- : *other.prior_poffset) {
+ poffset(other.poffset) {
// the extent must be fully loaded before CoW
assert(other.is_fully_loaded());
assert(is_aligned(length, CEPH_PAGE_SIZE));