]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/onode-staged-tree: cleanup node interfaces
authorYingxin Cheng <yingxin.cheng@intel.com>
Thu, 1 Apr 2021 06:01:42 +0000 (14:01 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Thu, 29 Apr 2021 08:03:37 +0000 (16:03 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/os/seastore/onode_manager/staged-fltree/node.cc
src/crimson/os/seastore/onode_manager/staged-fltree/node.h
src/test/crimson/seastore/onode_tree/test_staged_fltree.cc

index 99297ff06083d227f03283d6e19a7848492fcfa6..c1358f39040156ac6e41429fbfcaf59dc726014f 100644 (file)
@@ -445,13 +445,13 @@ void Node::as_child(const search_position_t& pos, Ref<InternalNode> parent_node)
 template void Node::as_child<true>(const search_position_t&, Ref<InternalNode>);
 template void Node::as_child<false>(const search_position_t&, Ref<InternalNode>);
 
-node_future<> Node::insert_parent(
-    context_t c, const key_view_t& pivot_index, Ref<Node> right_node)
+node_future<> Node::apply_split_to_parent(
+    context_t c, Ref<Node> split_right)
 {
   assert(!is_root());
   // TODO(cross-node string dedup)
   return parent_info().ptr->apply_child_split(
-      c, parent_info().position, pivot_index, this, right_node);
+      c, this, split_right);
 }
 
 node_future<Ref<tree_cursor_t>>
@@ -518,36 +518,39 @@ InternalNode::get_next_cursor(context_t c, const search_position_t& pos)
 }
 
 node_future<> InternalNode::apply_child_split(
-    context_t c, const search_position_t& pos, const key_view_t& left_key,
-    Ref<Node> left_child, Ref<Node> right_child)
+    context_t c, Ref<Node> left_child, Ref<Node> right_child)
 {
+  auto& left_pos = left_child->parent_info().position;
+
 #ifndef NDEBUG
-  if (pos.is_end()) {
+  assert(left_child->parent_info().ptr.get() == this);
+  assert(!left_child->impl->is_level_tail());
+  if (left_pos.is_end()) {
     assert(impl->is_level_tail());
     assert(right_child->impl->is_level_tail());
   }
-  auto _left_key = *left_child->impl->get_pivot_index();
-  assert(left_key.compare_to(_left_key) == MatchKindCMP::EQ);
 #endif
+
   impl->prepare_mutate(c);
 
+  auto left_key = *left_child->impl->get_pivot_index();
   auto left_child_addr = left_child->impl->laddr();
+  auto op_right_key = right_child->impl->get_pivot_index();
   auto right_child_addr = right_child->impl->laddr();
-  auto right_key = right_child->impl->get_pivot_index();
-  if (right_key.has_value()) {
+  if (op_right_key.has_value()) {
     logger().debug("OTree::Internal::Insert: "
-                   "pos({}), left_child({}, {:#x}), right_child({}, {:#x}) ...",
-                   pos, left_key, left_child_addr, *right_key, right_child_addr);
+                   "left_pos({}), left_child({}, {:#x}), right_child({}, {:#x}) ...",
+                   left_pos, left_key, left_child_addr, *op_right_key, right_child_addr);
   } else {
     logger().debug("OTree::Internal::Insert: "
-                   "pos({}), left_child({}, {:#x}), right_child(N/A, {:#x}) ...",
-                   pos, left_key, left_child_addr, right_child_addr);
+                   "left_pos({}), left_child({}, {:#x}), right_child(N/A, {:#x}) ...",
+                   left_pos, left_key, left_child_addr, right_child_addr);
   }
-  // update pos => left_child to pos => right_child
-  impl->replace_child_addr(pos, right_child_addr, left_child_addr);
-  replace_track(pos, right_child, left_child);
+  // update left_pos => left_child to left_pos => right_child
+  impl->replace_child_addr(left_pos, right_child_addr, left_child_addr);
+  replace_track(right_child, left_child);
 
-  search_position_t insert_pos = pos;
+  search_position_t insert_pos = left_pos;
   auto [insert_stage, insert_size] = impl->evaluate_insert(
       left_key, left_child_addr, insert_pos);
   auto free_size = impl->free_size();
@@ -556,7 +559,7 @@ node_future<> InternalNode::apply_child_split(
     [[maybe_unused]] auto p_value = impl->insert(
         left_key, left_child_addr, insert_pos, insert_stage, insert_size);
     assert(impl->free_size() == free_size - insert_size);
-    assert(insert_pos <= pos);
+    assert(insert_pos <= left_pos);
     assert(p_value->value == left_child_addr);
     track_insert(insert_pos, insert_stage, left_child, right_child);
     validate_tracked_children();
@@ -585,11 +588,7 @@ node_future<> InternalNode::apply_child_split(
     validate_tracked_children();
     right_node->validate_tracked_children();
 
-    // propagate index to parent
-    // TODO: get from trim()
-    key_view_t pivot_index;
-    impl->get_largest_slot(nullptr, &pivot_index, nullptr);
-    return insert_parent(c, pivot_index, right_node);
+    return apply_split_to_parent(c, right_node);
     // TODO (optimize)
     // try to acquire space from siblings before split... see btrfs
   });
@@ -751,7 +750,7 @@ node_future<Ref<Node>> InternalNode::get_or_track_child(
     assert(position == child->parent_info().position);
     std::ignore = position;
     std::ignore = child_addr;
-    validate_child(*child);
+    validate_child_tracked(*child);
     return child;
   });
 }
@@ -790,12 +789,16 @@ void InternalNode::track_insert(
 }
 
 void InternalNode::replace_track(
-    const search_position_t& position, Ref<Node> new_child, Ref<Node> old_child)
+    Ref<Node> new_child, Ref<Node> old_child)
 {
-  assert(tracked_child_nodes[position] == old_child);
-  tracked_child_nodes.erase(position);
-  new_child->as_child(position, this);
-  assert(tracked_child_nodes[position] == new_child);
+  assert(old_child->parent_info().ptr == this);
+  auto& pos = old_child->parent_info().position;
+  do_untrack_child(*old_child);
+  new_child->as_child(pos, this);
+  // ok, safe to release ref
+  old_child->_parent_info.reset();
+
+  validate_child_tracked(*new_child);
 }
 
 void InternalNode::track_split(
@@ -1060,11 +1063,8 @@ node_future<Ref<tree_cursor_t>> LeafNode::insert_value(
     validate_tracked_cursors();
     right_node->validate_tracked_cursors();
 
-    // propagate insert to parent
-    // TODO: get from trim()
-    key_view_t pivot_index;
-    impl->get_largest_slot(nullptr, &pivot_index, nullptr);
-    return insert_parent(c, pivot_index, right_node).safe_then([ret] {
+    return apply_split_to_parent(c, right_node
+    ).safe_then([ret] {
       return ret;
     });
     // TODO (optimize)
index 08fa369d7817e11ab0bbe77a0f6562b989ef847a..fd2271fd1460dddf42f06fcd22ace839d9cb39c2 100644 (file)
@@ -415,12 +415,14 @@ class Node
   // as child/non-root
   template <bool VALIDATE = true>
   void as_child(const search_position_t&, Ref<InternalNode>);
+
   struct parent_info_t {
     search_position_t position;
     Ref<InternalNode> ptr;
   };
   const parent_info_t& parent_info() const { return *_parent_info; }
-  node_future<> insert_parent(context_t, const key_view_t&, Ref<Node> right_node);
+
+  node_future<> apply_split_to_parent(context_t, Ref<Node>);
   node_future<Ref<tree_cursor_t>> get_next_cursor_from_parent(context_t);
 
  private:
@@ -467,9 +469,7 @@ class InternalNode final : public Node {
 
   node_future<Ref<tree_cursor_t>> get_next_cursor(context_t, const search_position_t&);
 
-  node_future<> apply_child_split(
-      context_t, const search_position_t&, const key_view_t& left_key,
-      Ref<Node> left, Ref<Node> right);
+  node_future<> apply_child_split(context_t, Ref<Node> left, Ref<Node> right);
 
   template <bool VALIDATE>
   void do_track_child(Node& child) {
@@ -498,6 +498,13 @@ class InternalNode final : public Node {
     }
   }
 
+  void validate_child_tracked(const Node& child) const {
+    validate_child(child);
+    assert(tracked_child_nodes.find(child.parent_info().position) !=
+           tracked_child_nodes.end());
+    assert(tracked_child_nodes.find(child.parent_info().position)->second == &child);
+  }
+
   static node_future<Ref<InternalNode>> allocate_root(
       context_t, level_t, laddr_t, Super::URef&&);
 
@@ -516,7 +523,7 @@ class InternalNode final : public Node {
   node_future<Ref<Node>> get_or_track_child(context_t, const search_position_t&, laddr_t);
   void track_insert(
       const search_position_t&, match_stage_t, Ref<Node>, Ref<Node> nxt_child = nullptr);
-  void replace_track(const search_position_t&, Ref<Node> new_child, Ref<Node> old_child);
+  void replace_track(Ref<Node> new_child, Ref<Node> old_child);
   void track_split(const search_position_t&, Ref<InternalNode>);
   void validate_tracked_children() const {
 #ifndef NDEBUG
index 12c02f33d0457342fa736e84088e3c9c46db91a4..09f3fc90bf5c392c2a82631f6f9b0a773906852a 100644 (file)
@@ -771,7 +771,7 @@ class DummyChildPool {
       if (right_child->can_split()) {
         splitable_nodes.insert(right_child);
       }
-      return insert_parent(c, *impl->get_pivot_index(), right_child);
+      return apply_split_to_parent(c, right_child);
     }
 
     node_future<> insert_and_split(