virtual ~FixedKVNodeLayout() = default;
void set_layout_buf(char *_buf) {
- assert(buf == nullptr);
assert(_buf != nullptr);
buf = _buf;
}
virtual ~FixedKVNode() = default;
virtual void do_on_rewrite(Transaction &t, CachedExtent &extent) = 0;
+ virtual void on_state_commit() override {
+ auto &prior = static_cast<FixedKVNode&>(*get_prior_instance());
+ prior.range = std::move(range);
+ }
bool is_in_range(const node_key_t key) const {
return get_node_meta().is_in_range(key);
}
return this->get_split_pivot().get_offset();
}
+ void sync_layout_buf() {
+ this->set_layout_buf(this->get_bptr().c_str());
+ }
+
void prepare_commit() final {
parent_node_t::prepare_commit();
}
}
}
+ void on_data_commit() final {
+ auto &prior = static_cast<this_type_t&>(*this->get_prior_instance());
+ if (this->is_mutation_pending()) {
+ prior.delta_buffer = std::move(delta_buffer);
+ }
+ prior.set_layout_buf(prior.get_bptr().c_str());
+ }
+
void on_invalidated(Transaction &t) final {
this->child_node_t::on_invalidated();
}
assert(this->get_size() >= (get_min_capacity() - 1));
return this->get_size() < get_min_capacity();
}
+
+ void reapply_delta() final {
+ if (delta_buffer.empty()) {
+ return;
+ }
+ delta_buffer.replay(*this);
+ }
+
+ void merge_content_to_pending_versions(Transaction &t) {
+ ceph_assert(is_rewrite_transaction(t.get_src()));
+ this->for_each_copy_dest_set(t, [this, &t](auto ©_dests) {
+ this->merge_content_to(t, copy_dests.dests_by_key);
+ });
+ }
+
+ template <template <typename...> typename Container, typename... T>
+ void merge_content_to(Transaction &t, Container<T...> &container) {
+ auto iter = this->begin();
+ for (auto ©_dest : container) {
+ auto &pending_version = static_cast<this_type_t&>(*copy_dest);
+ auto it = pending_version.begin();
+ while (it != pending_version.end() && iter != this->end()) {
+ if (auto child = pending_version.children[it->get_offset()];
+ (is_valid_child_ptr(child) &&
+ (pending_version.is_pending() || child->_is_pending_io()))) {
+ it++;
+ continue;
+ }
+ if (it->get_key() == iter->get_key()) {
+ it->set_val(iter->get_val());
+ it++;
+ iter++;
+ } else if (it->get_key() > iter->get_key()) {
+ iter++;
+ } else {
+ it++;
+ }
+ }
+ if (pending_version.get_last_committed_crc()) {
+ // if pending_version has already calculated its crc,
+ // calculate it again.
+ pending_version.set_last_committed_crc(pending_version.calc_crc32c());
+ }
+ }
+ }
+
};
template <
// modifications can be detected (see BtreeLBAMapping.parent_modifications)
uint64_t modifications = 0;
+ void on_state_commit() override {
+ base_t::on_state_commit();
+ auto &prior = static_cast<this_type_t&>(*this->get_prior_instance());
+ // We don't touch the prior's modifications field here, because there maybe
+ // other transactions accessing the prior, and the modifications field is
+ // not to be tainted.
+ if (!prior.is_mutation_pending()) {
+ assert(!prior.modifications);
+ }
+ }
+
+ void on_data_commit() final {
+ auto &prior = static_cast<this_type_t&>(*this->get_prior_instance());
+ if (this->is_mutation_pending()) {
+ prior.delta_buffer = std::move(delta_buffer);
+ }
+ prior.set_layout_buf(prior.get_bptr().c_str());
+ }
+
+ void sync_layout_buf() {
+ this->set_layout_buf(this->get_bptr().c_str());
+ }
+
void on_invalidated(Transaction &t) final {
this->child_node_t::on_invalidated();
}
assert(this->get_size() >= (get_min_capacity() - 1));
return this->get_size() < get_min_capacity();
}
+
+ void reapply_delta() final {
+ if (delta_buffer.empty()) {
+ return;
+ }
+ delta_buffer.replay(*this);
+ }
};
} // namespace crimson::os::seastore
#include "crimson/os/seastore/btree/fixed_kv_node.h"
#include "crimson/os/seastore/lba_mapping.h"
#include "crimson/os/seastore/logical_child_node.h"
+#include "crimson/os/seastore/lba/lba_btree_node.h"
+#include "crimson/os/seastore/backref/backref_tree_node.h"
namespace {
[[maybe_unused]] seastar::logger& logger() {
return ptr;
}
+void ExtentCommitter::sync_version() {
+ assert(extent.prior_instance);
+ auto &prior = *extent.prior_instance;
+ for (auto &mext : prior.mutation_pending_extents) {
+ auto &mextent = static_cast<CachedExtent&>(mext);
+ mextent.version = extent.version + 1;
+ }
+}
+
+void ExtentCommitter::sync_dirty_from() {
+ assert(extent.prior_instance);
+ auto &prior = *extent.prior_instance;
+ for (auto &mext : prior.mutation_pending_extents) {
+ auto &mextent = static_cast<CachedExtent&>(mext);
+ mextent.dirty_from = extent.dirty_from;
+ }
+}
+
+void ExtentCommitter::sync_checksum() {
+ assert(extent.prior_instance);
+ auto &prior = *extent.prior_instance;
+ for (auto &mext : prior.mutation_pending_extents) {
+ auto &mextent = static_cast<CachedExtent&>(mext);
+ mextent.set_last_committed_crc(extent.last_committed_crc);
+ }
+}
+
+void ExtentCommitter::commit_data() {
+ assert(extent.prior_instance);
+ // extent and its prior are sharing the same bptr content
+ extent.prior_instance->set_bptr(extent.get_bptr());
+ extent.on_data_commit();
+}
+
+void ExtentCommitter::commit_state() {
+ LOG_PREFIX(CachedExtent::commit_state_to_prior);
+ assert(extent.prior_instance);
+ SUBTRACET(seastore_cache, "{} prior={}",
+ t, extent, *extent.prior_instance);
+ auto &prior = *extent.prior_instance;
+ prior.pending_for_transaction = extent.pending_for_transaction;
+ prior.modify_time = extent.modify_time;
+ prior.last_committed_crc = extent.last_committed_crc;
+ prior.dirty_from = extent.dirty_from;
+ prior.length = extent.length;
+ prior.loaded_length = extent.loaded_length;
+ prior.buffer_space = std::move(extent.buffer_space);
+ // XXX: We can go ahead and change the prior's version because
+ // transactions don't hold a local view of the version field,
+ // unlike FixedKVLeafNode::modifications
+ prior.version = extent.version;
+ prior.user_hint = extent.user_hint;
+ prior.rewrite_generation = extent.rewrite_generation;
+ prior.last_touch_end = extent.last_touch_end;
+ prior.cache_state = extent.cache_state;
+ prior.state = extent.state;
+ extent.on_state_commit();
+}
+
+void ExtentCommitter::commit_and_share_paddr() {
+ auto &prior = *extent.prior_instance;
+ auto old_paddr = prior.get_prior_paddr_and_reset();
+ if (prior.get_paddr() == extent.get_paddr()) {
+ return;
+ }
+ if (prior.read_transactions.empty()) {
+ prior.set_paddr(
+ extent.get_paddr(),
+ prior.get_paddr().is_absolute());
+ return;
+ }
+ for (auto &item : prior.read_transactions) {
+ auto [removed, retired] = item.t->pre_stable_extent_paddr_mod(item);
+ if (prior.get_paddr() != extent.get_paddr()) {
+ prior.set_paddr(
+ extent.get_paddr(),
+ prior.get_paddr().is_absolute());
+ }
+ item.t->post_stable_extent_paddr_mod(item, retired);
+ item.t->maybe_update_pending_paddr(old_paddr, extent.get_paddr());
+ }
+}
+
+void ExtentCommitter::_share_prior_data_to_mutations() {
+ auto &prior = *extent.prior_instance;
+ for (auto &mext : prior.mutation_pending_extents) {
+ auto &mextent = static_cast<CachedExtent&>(mext);
+ extent.get_bptr().copy_out(
+ 0, extent.get_length(), mextent.get_bptr().c_str());
+ mextent.reapply_delta();
+ }
+}
+
+void ExtentCommitter::_share_prior_data_to_pending_versions()
+{
+ auto &prior = *extent.prior_instance;
+ switch (extent.get_type()) {
+ case extent_types_t::LADDR_LEAF:
+ static_cast<lba::LBALeafNode&>(
+ prior).merge_content_to_pending_versions(t);
+ break;
+ case extent_types_t::LADDR_INTERNAL:
+ static_cast<lba::LBAInternalNode&>(prior
+ ).merge_content_to_pending_versions(t);
+ break;
+ case extent_types_t::BACKREF_INTERNAL:
+ static_cast<backref::BackrefInternalNode&>(prior
+ ).merge_content_to_pending_versions(t);
+ break;
+ default:
+ break;
+ }
+}
+
+
}
namespace crimson::os::seastore {
+class ExtentCommitter;
class Transaction;
class CachedExtent;
using CachedExtentRef = boost::intrusive_ptr<CachedExtent>;
Max
};
+class ExtentCommitter {
+public:
+ ExtentCommitter(CachedExtent &extent, Transaction &t)
+ : extent(extent), t(t) {}
+
+ // commit all extent states to the prior instance,
+ // except poffset and extent content
+ void commit_state();
+
+ void commit_data();
+
+ // synchronize last_committed_crc among mutation pending extents
+ void sync_checksum();
+
+ void sync_dirty_from();
+
+ void sync_version();
+
+ void share_prior_data_to_pending() {
+ _share_prior_data_to_mutations();
+ _share_prior_data_to_pending_versions();
+ }
+
+ void commit_and_share_paddr();
+
+private:
+ // the rewritten extent
+ CachedExtent &extent;
+ Transaction &t;
+
+ void _share_prior_data_to_mutations();
+ void _share_prior_data_to_pending_versions();
+};
+using ExtentCommitterRef = boost::intrusive_ptr<ExtentCommitter>;
+
class ExtentIndex;
class CachedExtent
: public boost::intrusive_ref_counter<
using index = boost::intrusive::set<CachedExtent, index_member_options>;
friend class ExtentIndex;
friend class Transaction;
+ friend class ExtentCommitter;
bool is_linked_to_index() {
return extent_index_hook.is_linked();
dirty_from(other.dirty_from),
length(other.get_length()),
loaded_length(other.get_loaded_length()),
- version(other.version),
- poffset(other.poffset) {
+ version(other.version) {
// the extent must be fully loaded before CoW
assert(other.is_fully_loaded());
assert(is_aligned(length, CEPH_PAGE_SIZE));
+ if (other.poffset.is_absolute() ||
+ !other.prior_poffset.has_value()) {
+ poffset = other.poffset;
+ } else {
+ poffset = *other.prior_poffset;
+ }
if (length > 0) {
ptr = create_extent_ptr_rand(length);
other.ptr->copy_out(0, length, ptr->c_str());
length(other.get_length()),
loaded_length(other.get_loaded_length()),
version(other.version),
- poffset(other.poffset) {
+ poffset(other.poffset.is_absolute()
+ ? other.poffset
+ : *other.prior_poffset) {
// the extent must be fully loaded before CoW
assert(other.is_fully_loaded());
assert(is_aligned(length, CEPH_PAGE_SIZE));
return new T();
}
+ /**
+ * on_state_commit
+ *
+ * Called when the current extent's common states
+ * are copied to its prior instance, this should
+ * only be used in the context of rewriting transactions,
+ * e.g. TRIM_DIRTY and CLEANER
+ */
+ virtual void on_state_commit() {}
+
+ /**
+ * on_data_commit
+ *
+ * Called when the current extent's ptr is shared with
+ * its prior instance, this should only be used when
+ * commit extent rewriting transactions, e.g. TRIM_DIRTY
+ * and CLEANER
+ */
+ virtual void on_data_commit() {}
+
+ /**
+ * reapply_delta
+ *
+ * Called when there's need to reapply the current extent's
+ * deltas, this happens when the rewritting transaction
+ * overwrite the data of mutation pending extents, which erase
+ * all modifications and make the deltas needed to be reapplied
+ */
+ virtual void reapply_delta() {}
+
void reset_prior_instance() {
prior_instance.reset();
}
void set_bptr(ceph::bufferptr &&nptr) {
ptr = nptr;
}
+ void set_bptr(ceph::bufferptr &nptr) {
+ ptr = nptr;
+ }
/**
* maybe_generate_relative
}
void erase(CachedExtent &extent) {
+ auto it = extent_index.s_iterator_to(extent);
+ erase(it);
+ }
+
+ void erase(CachedExtent::index::iterator &it) {
+ auto &extent = *it;
assert(extent.parent_index);
assert(extent.is_linked_to_index());
- [[maybe_unused]] auto erased = extent_index.erase(
- extent_index.s_iterator_to(extent));
- extent.parent_index = nullptr;
-
+ [[maybe_unused]] auto erased = extent_index.erase(it);
assert(erased);
+ extent.parent_index = nullptr;
bytes -= extent.get_length();
}
logical_on_delta_write();
}
+ void on_state_commit() final {
+ auto &prior = static_cast<LogicalCachedExtent&>(*get_prior_instance());
+ prior.laddr = laddr;
+ do_on_state_commit();
+ }
+
+ virtual void do_on_state_commit() {}
+
private:
// the logical address of the extent, and if shared,
// it is the intermediate_base, see BtreeLBAMapping comments.
coll_map_t decoded;
delta_buffer_t delta_buffer;
+ void do_on_state_commit() final {
+ auto &prior = static_cast<CollectionNode&>(*get_prior_instance());
+ prior.delta_buffer = std::move(delta_buffer);
+ prior.decoded = std::move(decoded);
+ }
+
CachedExtentRef duplicate_for_write(Transaction&) final {
assert(delta_buffer.empty());
return CachedExtentRef(new CollectionNode(*this));
}
std::ostream &print_detail(std::ostream &out) const final;
+
+ template <template <typename...> typename Container, typename... T>
+ void merge_content_to(Transaction &t, Container<T...> &container) {
+ auto iter = this->begin();
+ for (auto ©_dest : container) {
+ auto &pending_version = static_cast<LBALeafNode&>(*copy_dest);
+ auto it = pending_version.begin();
+ while (it != pending_version.end() && iter != this->end()) {
+ if (iter->get_val().pladdr.is_laddr() ||
+ iter->get_val().pladdr.get_paddr().is_zero()) {
+ iter++;
+ continue;
+ }
+ if (auto child = pending_version.children[it->get_offset()];
+ is_valid_child_ptr(child) &&
+ (pending_version.is_pending() || child->_is_pending_io())) {
+ it++;
+ continue;
+ }
+ if (it->get_key() == iter->get_key()) {
+ it->set_val(iter->get_val());
+ it++;
+ iter++;
+ } else if (it->get_key() > iter->get_key()) {
+ iter++;
+ } else {
+ it++;
+ }
+ }
+ if (pending_version.get_last_committed_crc()) {
+ // if pending_version has already calculated its crc,
+ // calculate it again.
+ pending_version.set_last_committed_crc(pending_version.calc_crc32c());
+ }
+ }
+ }
+
+ void merge_content_to_pending_versions(Transaction &t) {
+ ceph_assert(is_rewrite_transaction(t.get_src()));
+ this->for_each_copy_dest_set(t, [this, &t](auto ©_dests) {
+#ifndef NDEBUG
+ for (auto ©_dest : copy_dests.dests_by_key) {
+ auto &pending_version = static_cast<LBALeafNode&>(*copy_dest);
+ ceph_assert(pending_version.is_pending());
+ }
+#endif
+ this->merge_content_to(t, copy_dests.dests_by_key);
+ });
+ }
};
using LBALeafNodeRef = TCachedExtentRef<LBALeafNode>;
}
virtual key_t node_begin() const = 0;
virtual bool is_retired_placeholder() const = 0;
+ virtual bool _is_pending_io() const = 0;
protected:
parent_tracker_ref<ParentT> parent_tracker;
virtual bool _is_valid() const = 0;
return *it;
}
+ template <typename Func>
+ void for_each_copy_dest_set(Transaction &t, Func &&f) {
+ for (auto &dests : copy_dests_by_trans) {
+ if (dests.pending_for_transaction == t.get_trans_id()) {
+ continue;
+ }
+ auto ©_dests = static_cast<copy_dests_t&>(dests);
+ std::invoke(f, copy_dests);
+ }
+ }
+
void add_copy_dest(Transaction &t, TCachedExtentRef<T> dest) {
ceph_assert(down_cast().is_stable());
ceph_assert(dest->is_pending());
}
parent_tracker_t<T>* my_tracker = nullptr;
+ std::vector<BaseChildNode<T, node_key_t>*> children;
private:
T& down_cast() {
return *static_cast<T*>(this);
}
}
- std::vector<BaseChildNode<T, node_key_t>*> children;
std::set<TCachedExtentRef<T>, Comparator> copy_sources;
// copy dests points from a stable node back to its pending nodes
bool _is_stable() const final {
return down_cast().is_stable();
}
+ bool _is_pending_io() const final {
+ return down_cast().is_pending_io();
+ }
key_t node_begin() const final {
return down_cast().get_begin();
}
explicit ObjectDataBlock(extent_len_t length)
: LogicalChildNode(length) {}
+ void do_on_state_commit() final {
+ auto &prior = static_cast<ObjectDataBlock&>(*get_prior_instance());
+ prior.delta = std::move(delta);
+ prior.modified_region = std::move(modified_region);
+ prior.cached_overwrites = std::move(cached_overwrites);
+ }
+
CachedExtentRef duplicate_for_write(Transaction&) final {
return CachedExtentRef(new ObjectDataBlock(*this, share_buffer_t{}));
};
protected:
NodeExtentRef mutate(context_t, DeltaRecorderURef&&) override;
+ void do_on_state_commit() final {
+ auto &prior = static_cast<SeastoreNodeExtent&>(*get_prior_instance());
+ prior.recorder = std::move(recorder);
+ }
+
DeltaRecorder* get_recorder() const override {
return recorder.get();
}
}
}
+ std::pair<bool, bool> pre_stable_extent_paddr_mod(
+ read_set_item_t<Transaction> &item)
+ {
+ LOG_PREFIX(Transaction::sync_extent_state);
+ SUBTRACET(seastore_t, "{}", *this, *item.ref);
+#ifndef NDEBUG
+ auto [existed, it] = lookup_trans_from_read_extent(item.ref);
+ assert(existed);
+ assert(item.ref.get() == it->ref.get());
+ assert(item.t = it->t);
+#endif
+
+ if (!item.is_extent_attached_to_trans()) {
+ return {false, false};
+ }
+ auto &extent = *item.ref;
+ read_set.erase(read_extent_set_t<Transaction>::s_iterator_to(item));
+ auto where1 = retired_set.find(extent.get_paddr());
+ bool retired = (where1 != retired_set.end());
+ if (where1 != retired_set.end()) {
+ retired_set.erase(where1);
+ }
+ return {true, retired};
+ }
+ void post_stable_extent_paddr_mod(
+ read_set_item_t<Transaction> &item,
+ bool retired) {
+ read_set.insert(item);
+ if (retired) {
+ retired_set.emplace(item.ref, trans_id);
+ }
+ }
+ void maybe_update_pending_paddr(
+ const paddr_t &old_paddr,
+ const paddr_t &new_paddr) {
+ if (!new_paddr.is_absolute()) {
+ return;
+ }
+ auto where2 = write_set.find_offset(old_paddr);
+ if (where2 != write_set.end()) {
+ auto &mextent = *where2;
+ write_set.erase(where2);
+ mextent.set_paddr(new_paddr);
+ write_set.insert(mextent);
+ }
+ }
+
template <typename F>
auto for_each_finalized_fresh_block(F &&f) const {
std::for_each(ool_block_list.begin(), ool_block_list.end(), f);