template void Node::as_child<true>(const search_position_t&, Ref<InternalNode>);
template void Node::as_child<false>(const search_position_t&, Ref<InternalNode>);
-node_future<> Node::insert_parent(
- context_t c, const key_view_t& pivot_index, Ref<Node> right_node)
+node_future<> Node::apply_split_to_parent(
+ context_t c, Ref<Node> split_right)
{
assert(!is_root());
// TODO(cross-node string dedup)
return parent_info().ptr->apply_child_split(
- c, parent_info().position, pivot_index, this, right_node);
+ c, this, split_right);
}
node_future<Ref<tree_cursor_t>>
}
node_future<> InternalNode::apply_child_split(
- context_t c, const search_position_t& pos, const key_view_t& left_key,
- Ref<Node> left_child, Ref<Node> right_child)
+ context_t c, Ref<Node> left_child, Ref<Node> right_child)
{
+ auto& left_pos = left_child->parent_info().position;
+
#ifndef NDEBUG
- if (pos.is_end()) {
+ assert(left_child->parent_info().ptr.get() == this);
+ assert(!left_child->impl->is_level_tail());
+ if (left_pos.is_end()) {
assert(impl->is_level_tail());
assert(right_child->impl->is_level_tail());
}
- auto _left_key = *left_child->impl->get_pivot_index();
- assert(left_key.compare_to(_left_key) == MatchKindCMP::EQ);
#endif
+
impl->prepare_mutate(c);
+ auto left_key = *left_child->impl->get_pivot_index();
auto left_child_addr = left_child->impl->laddr();
+ auto op_right_key = right_child->impl->get_pivot_index();
auto right_child_addr = right_child->impl->laddr();
- auto right_key = right_child->impl->get_pivot_index();
- if (right_key.has_value()) {
+ if (op_right_key.has_value()) {
logger().debug("OTree::Internal::Insert: "
- "pos({}), left_child({}, {:#x}), right_child({}, {:#x}) ...",
- pos, left_key, left_child_addr, *right_key, right_child_addr);
+ "left_pos({}), left_child({}, {:#x}), right_child({}, {:#x}) ...",
+ left_pos, left_key, left_child_addr, *op_right_key, right_child_addr);
} else {
logger().debug("OTree::Internal::Insert: "
- "pos({}), left_child({}, {:#x}), right_child(N/A, {:#x}) ...",
- pos, left_key, left_child_addr, right_child_addr);
+ "left_pos({}), left_child({}, {:#x}), right_child(N/A, {:#x}) ...",
+ left_pos, left_key, left_child_addr, right_child_addr);
}
- // update pos => left_child to pos => right_child
- impl->replace_child_addr(pos, right_child_addr, left_child_addr);
- replace_track(pos, right_child, left_child);
+ // update left_pos => left_child to left_pos => right_child
+ impl->replace_child_addr(left_pos, right_child_addr, left_child_addr);
+ replace_track(right_child, left_child);
- search_position_t insert_pos = pos;
+ search_position_t insert_pos = left_pos;
auto [insert_stage, insert_size] = impl->evaluate_insert(
left_key, left_child_addr, insert_pos);
auto free_size = impl->free_size();
[[maybe_unused]] auto p_value = impl->insert(
left_key, left_child_addr, insert_pos, insert_stage, insert_size);
assert(impl->free_size() == free_size - insert_size);
- assert(insert_pos <= pos);
+ assert(insert_pos <= left_pos);
assert(p_value->value == left_child_addr);
track_insert(insert_pos, insert_stage, left_child, right_child);
validate_tracked_children();
validate_tracked_children();
right_node->validate_tracked_children();
- // propagate index to parent
- // TODO: get from trim()
- key_view_t pivot_index;
- impl->get_largest_slot(nullptr, &pivot_index, nullptr);
- return insert_parent(c, pivot_index, right_node);
+ return apply_split_to_parent(c, right_node);
// TODO (optimize)
// try to acquire space from siblings before split... see btrfs
});
assert(position == child->parent_info().position);
std::ignore = position;
std::ignore = child_addr;
- validate_child(*child);
+ validate_child_tracked(*child);
return child;
});
}
}
void InternalNode::replace_track(
- const search_position_t& position, Ref<Node> new_child, Ref<Node> old_child)
+ Ref<Node> new_child, Ref<Node> old_child)
{
- assert(tracked_child_nodes[position] == old_child);
- tracked_child_nodes.erase(position);
- new_child->as_child(position, this);
- assert(tracked_child_nodes[position] == new_child);
+ assert(old_child->parent_info().ptr == this);
+ auto& pos = old_child->parent_info().position;
+ do_untrack_child(*old_child);
+ new_child->as_child(pos, this);
+ // ok, safe to release ref
+ old_child->_parent_info.reset();
+
+ validate_child_tracked(*new_child);
}
void InternalNode::track_split(
validate_tracked_cursors();
right_node->validate_tracked_cursors();
- // propagate insert to parent
- // TODO: get from trim()
- key_view_t pivot_index;
- impl->get_largest_slot(nullptr, &pivot_index, nullptr);
- return insert_parent(c, pivot_index, right_node).safe_then([ret] {
+ return apply_split_to_parent(c, right_node
+ ).safe_then([ret] {
return ret;
});
// TODO (optimize)
// as child/non-root
template <bool VALIDATE = true>
void as_child(const search_position_t&, Ref<InternalNode>);
+
struct parent_info_t {
search_position_t position;
Ref<InternalNode> ptr;
};
const parent_info_t& parent_info() const { return *_parent_info; }
- node_future<> insert_parent(context_t, const key_view_t&, Ref<Node> right_node);
+
+ node_future<> apply_split_to_parent(context_t, Ref<Node>);
node_future<Ref<tree_cursor_t>> get_next_cursor_from_parent(context_t);
private:
node_future<Ref<tree_cursor_t>> get_next_cursor(context_t, const search_position_t&);
- node_future<> apply_child_split(
- context_t, const search_position_t&, const key_view_t& left_key,
- Ref<Node> left, Ref<Node> right);
+ node_future<> apply_child_split(context_t, Ref<Node> left, Ref<Node> right);
template <bool VALIDATE>
void do_track_child(Node& child) {
}
}
+ void validate_child_tracked(const Node& child) const {
+ validate_child(child);
+ assert(tracked_child_nodes.find(child.parent_info().position) !=
+ tracked_child_nodes.end());
+ assert(tracked_child_nodes.find(child.parent_info().position)->second == &child);
+ }
+
static node_future<Ref<InternalNode>> allocate_root(
context_t, level_t, laddr_t, Super::URef&&);
node_future<Ref<Node>> get_or_track_child(context_t, const search_position_t&, laddr_t);
void track_insert(
const search_position_t&, match_stage_t, Ref<Node>, Ref<Node> nxt_child = nullptr);
- void replace_track(const search_position_t&, Ref<Node> new_child, Ref<Node> old_child);
+ void replace_track(Ref<Node> new_child, Ref<Node> old_child);
void track_split(const search_position_t&, Ref<InternalNode>);
void validate_tracked_children() const {
#ifndef NDEBUG