t,
LBA_BLOCK_SIZE);
root_leaf->set_size(0);
- root_leaf->set_meta({0, L_ADDR_MAX, 1});
+ lba_node_meta_t meta{0, L_ADDR_MAX, 1};
+ root_leaf->set_meta(meta);
+ root_leaf->pin.set_range(meta);
croot->get_lba_root() =
root_t{
1,
});
}
+static bool is_lba_node(const CachedExtent &e)
+{
+ return e.get_type() == extent_types_t::LADDR_INTERNAL ||
+ e.get_type() == extent_types_t::LADDR_LEAF;
+}
+
+btree_range_pin_t &BtreeLBAManager::get_pin(CachedExtent &e)
+{
+ if (is_lba_node(e)) {
+ return e.cast<LBANode>()->pin;
+ } else if (e.is_logical()) {
+ return static_cast<BtreeLBAPin &>(
+ e.cast<LogicalCachedExtent>()->get_pin()).pin;
+ } else {
+ assert(0 == "impossible");
+ return *static_cast<btree_range_pin_t*>(nullptr);
+ }
+}
+
+static depth_t get_depth(const CachedExtent &e)
+{
+ if (is_lba_node(e)) {
+ return e.cast<LBANode>()->get_node_meta().depth;
+ } else if (e.is_logical()) {
+ return 0;
+ } else {
+ ceph_assert(0 == "currently impossible");
+ return 0;
+ }
+}
+
BtreeLBAManager::complete_transaction_ret
BtreeLBAManager::complete_transaction(
Transaction &t)
{
- // This is a noop for now and may end up not being necessary
+ std::vector<CachedExtentRef> to_clear;
+ to_clear.reserve(t.get_retired_set().size());
+ for (auto &e: t.get_retired_set()) {
+ if (e->is_logical() || is_lba_node(*e))
+ to_clear.push_back(e);
+ }
+ // need to call check_parent from leaf->parent
+ std::sort(
+ to_clear.begin(), to_clear.end(),
+ [](auto &l, auto &r) { return get_depth(*l) < get_depth(*r); });
+
+ for (auto &e: to_clear) {
+ auto &pin = get_pin(*e);
+ logger().debug("{}: retiring {}, {}", __func__, *e, pin);
+ pin_set.retire(pin);
+ }
+
+ // ...but add_pin from parent->leaf
+ std::vector<CachedExtentRef> to_link;
+ to_link.reserve(
+ t.get_fresh_block_list().size() +
+ t.get_mutated_block_list().size());
+ for (auto &l: {t.get_fresh_block_list(), t.get_mutated_block_list()}) {
+ for (auto &e: l) {
+ if (e->is_valid() && (is_lba_node(*e) || e->is_logical()))
+ to_link.push_back(e);
+ }
+ }
+ std::sort(
+ to_link.begin(), to_link.end(),
+ [](auto &l, auto &r) -> bool { return get_depth(*l) > get_depth(*r); });
+
+ for (auto &e : to_link) {
+ logger().debug("{}: linking {}", __func__, *e);
+ pin_set.add_pin(get_pin(*e));
+ }
+
+ for (auto &e: to_clear) {
+ auto &pin = get_pin(*e);
+ logger().debug("{}: checking {}, {}", __func__, *e, pin);
+ pin_set.check_parent(pin);
+ }
return complete_transaction_ertr::now();
}
croot = mut_croot->cast<RootBlock>();
}
auto nroot = cache.alloc_new_extent<LBAInternalNode>(t, LBA_BLOCK_SIZE);
- nroot->set_meta({0, L_ADDR_MAX, croot->root.lba_depth + 1});
+ lba_node_meta_t meta{0, L_ADDR_MAX, root->get_node_meta().depth + 1};
+ nroot->set_meta(meta);
+ nroot->pin.set_range(meta);
nroot->journal_insert(
nroot->begin(),
L_ADDR_MIN,
SegmentManager &segment_manager;
Cache &cache;
+ btree_pin_set_t pin_set;
+
op_context_t get_context(Transaction &t) {
- return op_context_t{cache, t};
+ return op_context_t{cache, pin_set, t};
}
+ static btree_range_pin_t &get_pin(CachedExtent &e);
+
+
/**
* get_root
*
struct op_context_t {
Cache &cache;
+ btree_pin_set_t &pins;
Transaction &trans;
};
using lookup_range_ertr = LBAManager::get_mapping_ertr;
using lookup_range_ret = LBAManager::get_mapping_ret;
+ btree_range_pin_t pin;
- LBANode(ceph::bufferptr &&ptr) : CachedExtent(std::move(ptr)) {}
+ LBANode(ceph::bufferptr &&ptr) : CachedExtent(std::move(ptr)), pin(this) {}
LBANode(const LBANode &rhs)
- : CachedExtent(rhs) {}
+ : CachedExtent(rhs), pin(rhs.pin, this) {}
virtual lba_node_meta_t get_node_meta() const = 0;
val.get_val(),
get_paddr()).safe_then(
[c, &result, addr, len](auto extent) mutable {
- // TODO: add backrefs to ensure cache residence of parents
return extent->lookup_range(
c,
addr,
auto ret = lba_pin_list_t();
auto [i, end] = get_leaf_entries(addr, len);
for (; i != end; ++i) {
- auto val = (*i).get_val();
+ auto val = i->get_val();
+ auto begin = i->get_key();
ret.emplace_back(
std::make_unique<BtreeLBAPin>(
val.paddr,
- (*i).get_key(),
- val.len));
+ lba_node_meta_t{ begin, begin + val.len, 0}));
}
return lookup_range_ertr::make_ready_future<lba_pin_list_t>(
std::move(ret));
insert_pt.get_key(),
insert_pt.get_val().len,
insert_pt.get_val().paddr);
+ auto begin = insert_pt.get_key();
return insert_ret(
insert_ertr::ready_future_marker{},
std::make_unique<BtreeLBAPin>(
val.paddr,
- laddr,
- val.len));
+ lba_node_meta_t{ begin, begin + val.len, 0}));
}
LBALeafNode::mutate_mapping_ret LBALeafNode::mutate_mapping(
return c.cache.get_extent<LBAInternalNode>(
c.trans,
offset,
- LBA_BLOCK_SIZE).safe_then([depth](auto ret) {
+ LBA_BLOCK_SIZE).safe_then([c](auto ret) {
auto meta = ret->get_meta();
if (ret->get_size()) {
ceph_assert(meta.begin <= ret->begin()->get_key());
ceph_assert(meta.end > (ret->end() - 1)->get_key());
}
+ if (!ret->is_pending() && !ret->pin.is_linked()) {
+ ret->pin.set_range(meta);
+ c.pins.add_pin(ret->pin);
+ }
return LBANodeRef(ret.detach(), /* add_ref = */ false);
});
} else {
return c.cache.get_extent<LBALeafNode>(
c.trans,
offset,
- LBA_BLOCK_SIZE).safe_then([offset, depth](auto ret) {
+ LBA_BLOCK_SIZE).safe_then([offset, c](auto ret) {
logger().debug(
"get_lba_btree_extent: read leaf at offset {}",
offset);
ceph_assert(meta.begin <= ret->begin()->get_key());
ceph_assert(meta.end > (ret->end() - 1)->get_key());
}
+ if (!ret->is_pending() && !ret->pin.is_linked()) {
+ ret->pin.set_range(meta);
+ c.pins.add_pin(ret->pin);
+ }
return LBANodeRef(ret.detach(), /* add_ref = */ false);
});
}
c.trans, LBA_BLOCK_SIZE);
auto right = c.cache.alloc_new_extent<LBAInternalNode>(
c.trans, LBA_BLOCK_SIZE);
+ auto pivot = split_into(*left, *right);
+ left->pin.set_range(left->get_meta());
+ right->pin.set_range(right->get_meta());
return std::make_tuple(
left,
right,
- split_into(*left, *right));
+ pivot);
}
LBANodeRef make_full_merge(
auto replacement = c.cache.alloc_new_extent<LBAInternalNode>(
c.trans, LBA_BLOCK_SIZE);
replacement->merge_from(*this, *right->cast<LBAInternalNode>());
+ replacement->pin.set_range(replacement->get_meta());
return replacement;
}
auto replacement_right = c.cache.alloc_new_extent<LBAInternalNode>(
c.trans, LBA_BLOCK_SIZE);
+ auto pivot = balance_into_new_nodes(
+ *this,
+ right,
+ prefer_left,
+ *replacement_left,
+ *replacement_right);
+
+ replacement_left->pin.set_range(replacement_left->get_meta());
+ replacement_right->pin.set_range(replacement_right->get_meta());
return std::make_tuple(
replacement_left,
replacement_right,
- balance_into_new_nodes(
- *this,
- right,
- prefer_left,
- *replacement_left,
- *replacement_right));
+ pivot);
}
/**
c.trans, LBA_BLOCK_SIZE);
auto right = c.cache.alloc_new_extent<LBALeafNode>(
c.trans, LBA_BLOCK_SIZE);
+ auto pivot = split_into(*left, *right);
+ left->pin.set_range(left->get_meta());
+ right->pin.set_range(right->get_meta());
return std::make_tuple(
left,
right,
- split_into(*left, *right));
+ pivot);
}
LBANodeRef make_full_merge(
auto replacement = c.cache.alloc_new_extent<LBALeafNode>(
c.trans, LBA_BLOCK_SIZE);
replacement->merge_from(*this, *right->cast<LBALeafNode>());
+ replacement->pin.set_range(replacement->get_meta());
return replacement;
}
c.trans, LBA_BLOCK_SIZE);
auto replacement_right = c.cache.alloc_new_extent<LBALeafNode>(
c.trans, LBA_BLOCK_SIZE);
+
+ auto pivot = balance_into_new_nodes(
+ *this,
+ right,
+ prefer_left,
+ *replacement_left,
+ *replacement_right);
+
+ replacement_left->pin.set_range(replacement_left->get_meta());
+ replacement_right->pin.set_range(replacement_right->get_meta());
return std::make_tuple(
replacement_left,
replacement_right,
- balance_into_new_nodes(
- *this,
- right,
- prefer_left,
- *replacement_left,
- *replacement_right));
+ pivot);
}
// See LBAInternalNode, same concept