]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/onode-staged-tree: cleanup -- switch to get_*_slot() interfaces 39332/head
authorYingxin Cheng <yingxin.cheng@intel.com>
Fri, 5 Feb 2021 16:04:12 +0000 (00:04 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Sun, 7 Feb 2021 02:19:41 +0000 (10:19 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/os/seastore/onode_manager/staged-fltree/node.cc
src/crimson/os/seastore/onode_manager/staged-fltree/node.h
src/crimson/os/seastore/onode_manager/staged-fltree/node_impl.h
src/crimson/os/seastore/onode_manager/staged-fltree/node_layout.h
src/crimson/os/seastore/onode_manager/staged-fltree/stages/stage.h
src/test/crimson/seastore/onode_tree/test_staged_fltree.cc

index 2cea7a4210ef190dc5b3559363076a96fa4cac9a..6ba04af81283f6e4c985dd453ecb9def3e6071e1 100644 (file)
@@ -357,12 +357,13 @@ void Node::as_child(const search_position_t& pos, Ref<InternalNode> parent_node)
 template void Node::as_child<true>(const search_position_t&, Ref<InternalNode>);
 template void Node::as_child<false>(const search_position_t&, Ref<InternalNode>);
 
-node_future<> Node::insert_parent(context_t c, Ref<Node> right_node)
+node_future<> Node::insert_parent(
+    context_t c, const key_view_t& pivot_index, Ref<Node> right_node)
 {
   assert(!is_root());
   // TODO(cross-node string dedup)
   return parent_info().ptr->apply_child_split(
-      c, parent_info().position, this, right_node);
+      c, parent_info().position, pivot_index, this, right_node);
 }
 
 node_future<Ref<tree_cursor_t>>
@@ -429,23 +430,31 @@ InternalNode::get_next_cursor(context_t c, const search_position_t& pos)
 }
 
 node_future<> InternalNode::apply_child_split(
-    context_t c, const search_position_t& pos,
+    context_t c, const search_position_t& pos, const key_view_t& left_key,
     Ref<Node> left_child, Ref<Node> right_child)
 {
 #ifndef NDEBUG
   if (pos.is_end()) {
     assert(impl->is_level_tail());
+    assert(right_child->impl->is_level_tail());
   }
+  auto _left_key = *left_child->impl->get_pivot_index();
+  assert(left_key.compare_to(_left_key) == MatchKindCMP::EQ);
 #endif
   impl->prepare_mutate(c);
 
-  auto left_key = left_child->impl->get_largest_key_view();
   auto left_child_addr = left_child->impl->laddr();
-  auto right_key = right_child->impl->get_largest_key_view();
   auto right_child_addr = right_child->impl->laddr();
-  logger().debug("OTree::Internal::Insert: "
-                 "pos({}), left_child({}, {:#x}), right_child({}, {:#x}) ...",
-                 pos, left_key, left_child_addr, right_key, right_child_addr);
+  auto right_key = right_child->impl->get_pivot_index();
+  if (right_key.has_value()) {
+    logger().debug("OTree::Internal::Insert: "
+                   "pos({}), left_child({}, {:#x}), right_child({}, {:#x}) ...",
+                   pos, left_key, left_child_addr, *right_key, right_child_addr);
+  } else {
+    logger().debug("OTree::Internal::Insert: "
+                   "pos({}), left_child({}, {:#x}), right_child(N/A, {:#x}) ...",
+                   pos, left_key, left_child_addr, right_child_addr);
+  }
   // update pos => left_child to pos => right_child
   impl->replace_child_addr(pos, right_child_addr, left_child_addr);
   replace_track(pos, right_child, left_child);
@@ -489,7 +498,10 @@ node_future<> InternalNode::apply_child_split(
     right_node->validate_tracked_children();
 
     // propagate index to parent
-    return insert_parent(c, right_node);
+    // TODO: get from trim()
+    key_view_t pivot_index;
+    impl->get_largest_slot(nullptr, &pivot_index, nullptr);
+    return insert_parent(c, pivot_index, right_node);
     // TODO (optimize)
     // try to acquire space from siblings before split... see btrfs
   });
@@ -503,7 +515,8 @@ node_future<Ref<InternalNode>> InternalNode::allocate_root(
   ).safe_then([c, old_root_addr,
                super = std::move(super)](auto fresh_node) mutable {
     auto root = fresh_node.node;
-    auto p_value = root->impl->get_p_value(search_position_t::end());
+    assert(root->impl->is_empty());
+    auto p_value = root->impl->get_tail_value();
     fresh_node.mut.copy_in_absolute(
         const_cast<laddr_packed_t*>(p_value), old_root_addr);
     root->make_root_from(c, std::move(super), old_root_addr);
@@ -514,9 +527,11 @@ node_future<Ref<InternalNode>> InternalNode::allocate_root(
 node_future<Ref<tree_cursor_t>>
 InternalNode::lookup_smallest(context_t c)
 {
+  assert(!impl->is_empty());
   auto position = search_position_t::begin();
-  laddr_t child_addr = impl->get_p_value(position)->value;
-  return get_or_track_child(c, position, child_addr
+  const laddr_packed_t* p_child_addr;
+  impl->get_slot(position, nullptr, &p_child_addr);
+  return get_or_track_child(c, position, p_child_addr->value
   ).safe_then([c](auto child) {
     return child->lookup_smallest(c);
   });
@@ -527,9 +542,11 @@ InternalNode::lookup_largest(context_t c)
 {
   // NOTE: unlike LeafNode::lookup_largest(), this only works for the tail
   // internal node to return the tail child address.
-  auto position = search_position_t::end();
-  laddr_t child_addr = impl->get_p_value(position)->value;
-  return get_or_track_child(c, position, child_addr).safe_then([c](auto child) {
+  assert(!impl->is_empty());
+  assert(impl->is_level_tail());
+  auto p_child_addr = impl->get_tail_value();
+  return get_or_track_child(c, search_position_t::end(), p_child_addr->value
+  ).safe_then([c](auto child) {
     return child->lookup_largest(c);
   });
 }
@@ -713,14 +730,17 @@ void InternalNode::validate_child(const Node& child) const
   assert(impl->level() - 1 == child.impl->level());
   assert(this == child.parent_info().ptr);
   auto& child_pos = child.parent_info().position;
-  assert(impl->get_p_value(child_pos)->value == child.impl->laddr());
   if (child_pos.is_end()) {
     assert(impl->is_level_tail());
     assert(child.impl->is_level_tail());
+    assert(impl->get_tail_value()->value == child.impl->laddr());
   } else {
     assert(!child.impl->is_level_tail());
-    assert(impl->get_key_view(child_pos).compare_to(
-           child.impl->get_largest_key_view()) == MatchKindCMP::EQ);
+    key_view_t index_key;
+    const laddr_packed_t* p_child_addr;
+    impl->get_slot(child_pos, &index_key, &p_child_addr);
+    assert(index_key.compare_to(*child.impl->get_pivot_index()) == MatchKindCMP::EQ);
+    assert(p_child_addr->value == child.impl->laddr());
   }
   // XXX(multi-type)
   assert(impl->field_type() <= child.impl->field_type());
@@ -754,7 +774,8 @@ std::tuple<key_view_t, const value_header_t*>
 LeafNode::get_kv(const search_position_t& pos) const
 {
   key_view_t key_view;
-  auto p_value_header = impl->get_p_value(pos, &key_view);
+  const value_header_t* p_value_header;
+  impl->get_slot(pos, &key_view, &p_value_header);
   return {key_view, p_value_header};
 }
 
@@ -809,7 +830,8 @@ LeafNode::lookup_smallest(context_t)
   }
   auto pos = search_position_t::begin();
   key_view_t index_key;
-  auto p_value_header = impl->get_p_value(pos, &index_key);
+  const value_header_t* p_value_header;
+  impl->get_slot(pos, &index_key, &p_value_header);
   return node_ertr::make_ready_future<Ref<tree_cursor_t>>(
       get_or_track_cursor(pos, index_key, p_value_header));
 }
@@ -823,9 +845,9 @@ LeafNode::lookup_largest(context_t)
         tree_cursor_t::create_end(this));
   }
   search_position_t pos;
-  const value_header_t* p_value_header = nullptr;
   key_view_t index_key;
-  impl->get_largest_slot(pos, index_key, &p_value_header);
+  const value_header_t* p_value_header = nullptr;
+  impl->get_largest_slot(&pos, &index_key, &p_value_header);
   return node_ertr::make_ready_future<Ref<tree_cursor_t>>(
       get_or_track_cursor(pos, index_key, p_value_header));
 }
@@ -933,7 +955,10 @@ node_future<Ref<tree_cursor_t>> LeafNode::insert_value(
     right_node->validate_tracked_cursors();
 
     // propagate insert to parent
-    return insert_parent(c, right_node).safe_then([ret] {
+    // TODO: get from trim()
+    key_view_t pivot_index;
+    impl->get_largest_slot(nullptr, &pivot_index, nullptr);
+    return insert_parent(c, pivot_index, right_node).safe_then([ret] {
       return ret;
     });
     // TODO (optimize)
index cf4a4508c262d89d8e5b24b3479dc4b77afd4166..edbde7541888f2006577962b58bdbd53394c4280 100644 (file)
@@ -350,7 +350,7 @@ class Node
     Ref<InternalNode> ptr;
   };
   const parent_info_t& parent_info() const { return *_parent_info; }
-  node_future<> insert_parent(context_t, Ref<Node> right_node);
+  node_future<> insert_parent(context_t, const key_view_t&, Ref<Node> right_node);
   node_future<Ref<tree_cursor_t>> get_next_cursor_from_parent(context_t);
 
  private:
@@ -398,7 +398,8 @@ class InternalNode final : public Node {
   node_future<Ref<tree_cursor_t>> get_next_cursor(context_t, const search_position_t&);
 
   node_future<> apply_child_split(
-      context_t, const search_position_t&, Ref<Node> left, Ref<Node> right);
+      context_t, const search_position_t&, const key_view_t& left_key,
+      Ref<Node> left, Ref<Node> right);
 
   template <bool VALIDATE>
   void do_track_child(Node& child) {
index 14b1fad2e9f028f3d695b49b3954cc061f72d920..63946f798edee46fea9ab676a831ed23fac3eb46 100644 (file)
@@ -67,6 +67,7 @@ class NodeImpl {
    >;
   virtual ~NodeImpl() = default;
 
+  virtual node_type_t node_type() const = 0;
   virtual field_type_t field_type() const = 0;
   virtual laddr_t laddr() const = 0;
   virtual void prepare_mutate(context_t) = 0;
@@ -74,8 +75,7 @@ class NodeImpl {
   virtual bool is_empty() const = 0;
   virtual level_t level() const = 0;
   virtual node_offset_t free_size() const = 0;
-  virtual key_view_t get_key_view(const search_position_t&) const = 0;
-  virtual key_view_t get_largest_key_view() const = 0;
+  virtual std::optional<key_view_t> get_pivot_index() const = 0;
 
   virtual node_stats_t get_stats() const = 0;
   virtual std::ostream& dump(std::ostream&) const = 0;
@@ -114,22 +114,25 @@ class InternalNodeImpl : public NodeImpl {
   }
 
   #pragma GCC diagnostic ignored "-Woverloaded-virtual"
-  virtual const laddr_packed_t* get_p_value(
-      const search_position_t&,
-      key_view_t* = nullptr, internal_marker_t = {}) const {
+  virtual void get_largest_slot(search_position_t* = nullptr,             // OUT
+                                key_view_t* = nullptr,                    // OUT
+                                const laddr_packed_t** = nullptr) const { // OUT
     ceph_abort("impossible path");
   }
+
   #pragma GCC diagnostic ignored "-Woverloaded-virtual"
   virtual lookup_result_t<node_type_t::INTERNAL> lower_bound(
       const key_hobj_t&, MatchHistory&,
       key_view_t* = nullptr, internal_marker_t = {}) const {
     ceph_abort("impossible path");
   }
+
   #pragma GCC diagnostic ignored "-Woverloaded-virtual"
   virtual const laddr_packed_t* insert(
       const key_view_t&, const laddr_t&, search_position_t&, match_stage_t&, node_offset_t&) {
     ceph_abort("impossible path");
   }
+
   #pragma GCC diagnostic ignored "-Woverloaded-virtual"
   virtual std::tuple<search_position_t, bool, const laddr_packed_t*> split_insert(
       NodeExtentMutable&, NodeImpl&, const key_view_t&, const laddr_t&,
@@ -140,6 +143,7 @@ class InternalNodeImpl : public NodeImpl {
   virtual const laddr_packed_t* get_tail_value() const = 0;
 
   virtual void replace_child_addr(const search_position_t&, laddr_t dst, laddr_t src) = 0;
+
   virtual std::tuple<match_stage_t, node_offset_t> evaluate_insert(
       const key_view_t&, const laddr_t&, search_position_t&) const = 0;
 
@@ -151,6 +155,7 @@ class InternalNodeImpl : public NodeImpl {
     }
   };
   static alloc_ertr::future<fresh_impl_t> allocate(context_t, field_type_t, bool, level_t);
+
   static InternalNodeImplURef load(NodeExtentRef, field_type_t, bool);
 
  protected:
@@ -182,22 +187,25 @@ class LeafNodeImpl : public NodeImpl {
   }
 
   #pragma GCC diagnostic ignored "-Woverloaded-virtual"
-  virtual const value_header_t* get_p_value(
-      const search_position_t&,
-      key_view_t* = nullptr, leaf_marker_t={}) const {
+  virtual void get_largest_slot(search_position_t* = nullptr,             // OUT
+                                key_view_t* = nullptr,                    // OUT
+                                const value_header_t** = nullptr) const { // OUT
     ceph_abort("impossible path");
   }
+
   #pragma GCC diagnostic ignored "-Woverloaded-virtual"
   virtual lookup_result_t<node_type_t::LEAF> lower_bound(
       const key_hobj_t&, MatchHistory&,
       key_view_t* = nullptr, leaf_marker_t = {}) const {
     ceph_abort("impossible path");
   }
+
   #pragma GCC diagnostic ignored "-Woverloaded-virtual"
   virtual const value_header_t* insert(
       const key_hobj_t&, const value_config_t&, search_position_t&, match_stage_t&, node_offset_t&) {
     ceph_abort("impossible path");
   }
+
   #pragma GCC diagnostic ignored "-Woverloaded-virtual"
   virtual std::tuple<search_position_t, bool, const value_header_t*> split_insert(
       NodeExtentMutable&, NodeImpl&, const key_hobj_t&, const value_config_t&,
@@ -205,9 +213,6 @@ class LeafNodeImpl : public NodeImpl {
     ceph_abort("impossible path");
   }
 
-  virtual void get_largest_slot(
-      search_position_t&, key_view_t&, const value_header_t**) const = 0;
-
   virtual std::tuple<match_stage_t, node_offset_t> evaluate_insert(
       const key_hobj_t&, const value_config_t&,
       const MatchHistory&, match_stat_t, search_position_t&) const = 0;
@@ -223,6 +228,7 @@ class LeafNodeImpl : public NodeImpl {
     }
   };
   static alloc_ertr::future<fresh_impl_t> allocate(context_t, field_type_t, bool);
+
   static LeafNodeImplURef load(NodeExtentRef, field_type_t, bool);
 
  protected:
index db8a561bb6b216a4d72a37a19b1c2278f3be18a7..17a6b6b882b34e2530eaabe052f16c415b020cfc 100644 (file)
@@ -86,6 +86,7 @@ class NodeLayoutT final : public InternalNodeImpl, public LeafNodeImpl {
   /*
    * NodeImpl
    */
+  node_type_t node_type() const override { return NODE_TYPE; }
   field_type_t field_type() const override { return FIELD_TYPE; }
   laddr_t laddr() const override { return extent.get_laddr(); }
   void prepare_mutate(context_t c) override { return extent.prepare_mutate(c); }
@@ -94,17 +95,16 @@ class NodeLayoutT final : public InternalNodeImpl, public LeafNodeImpl {
   level_t level() const override { return extent.read().level(); }
   node_offset_t free_size() const override { return extent.read().free_size(); }
 
-  key_view_t get_key_view(const search_position_t& position) const override {
-    key_view_t ret;
-    STAGE_T::get_key_view(extent.read(), cast_down<STAGE>(position), ret);
-    return ret;
-  }
-
-  key_view_t get_largest_key_view() const override {
-    key_view_t index_key;
-    STAGE_T::template lookup_largest_slot<false, true, false>(
-        extent.read(), nullptr, &index_key, nullptr);
-    return index_key;
+  std::optional<key_view_t> get_pivot_index() const override {
+    assert(!is_empty());
+    if constexpr (NODE_TYPE == node_type_t::INTERNAL) {
+      if (is_level_tail()) {
+        return std::nullopt;
+      }
+    }
+    key_view_t pivot_index;
+    get_largest_slot(nullptr, &pivot_index, nullptr);
+    return {pivot_index};
   }
 
   node_stats_t get_stats() const override {
@@ -195,11 +195,17 @@ class NodeLayoutT final : public InternalNodeImpl, public LeafNodeImpl {
                 const value_t** pp_value = nullptr) const override {
     assert(!is_empty());
     assert(!pos.is_end());
-    if (!p_index_key && pp_value) {
+    if (p_index_key && pp_value) {
+      STAGE_T::template get_slot<true, true>(
+          extent.read(), cast_down<STAGE>(pos), p_index_key, pp_value);
+    } else if (!p_index_key && pp_value) {
       STAGE_T::template get_slot<false, true>(
           extent.read(), cast_down<STAGE>(pos), nullptr, pp_value);
+    } else if (p_index_key && !pp_value) {
+      STAGE_T::template get_slot<true, false>(
+          extent.read(), cast_down<STAGE>(pos), p_index_key, nullptr);
     } else {
-      ceph_abort("not implemented");
+      ceph_abort("impossible path");
     }
   }
 
@@ -223,26 +229,25 @@ class NodeLayoutT final : public InternalNodeImpl, public LeafNodeImpl {
     }
   }
 
-  const value_t* get_p_value(const search_position_t& position,
-                             key_view_t* index_key=nullptr, marker_t={}) const override {
-    auto& node_stage = extent.read();
+  void get_largest_slot(search_position_t* p_pos = nullptr,
+                        key_view_t* p_index_key = nullptr,
+                        const value_t** pp_value = nullptr) const override {
+    assert(!is_empty());
     if constexpr (NODE_TYPE == node_type_t::INTERNAL) {
-      assert(!index_key);
-      if (position.is_end()) {
-        assert(is_level_tail());
-        return node_stage.get_end_p_laddr();
-      }
-    } else {
-      assert(!position.is_end());
+      assert(!is_level_tail());
     }
-    if (index_key) {
-      return STAGE_T::template get_p_value<true>(
-          node_stage, cast_down<STAGE>(position), index_key);
+    if (p_pos && p_index_key && pp_value) {
+      STAGE_T::template get_largest_slot<true, true, true>(
+          extent.read(), &cast_down_fill_0<STAGE>(*p_pos), p_index_key, pp_value);
+    } else if (!p_pos && p_index_key && !pp_value) {
+      STAGE_T::template get_largest_slot<false, true, false>(
+          extent.read(), nullptr, p_index_key, nullptr);
     } else {
-      return STAGE_T::get_p_value(node_stage, cast_down<STAGE>(position));
+      ceph_abort("not implemented");
     }
   }
 
+
   lookup_result_t<NODE_TYPE> lower_bound(
       const key_hobj_t& key, MatchHistory& history,
       key_view_t* index_key=nullptr, marker_t={}) const override {
@@ -261,7 +266,8 @@ class NodeLayoutT final : public InternalNodeImpl, public LeafNodeImpl {
 #ifndef NDEBUG
       if (!result_raw.is_end()) {
         full_key_t<KeyT::VIEW> index;
-        STAGE_T::get_key_view(node_stage, result_raw.position, index);
+        STAGE_T::template get_slot<true, false>(
+            node_stage, result_raw.position, &index, nullptr);
         assert(index.compare_to(*index_key) == MatchKindCMP::EQ);
       }
 #endif
@@ -273,7 +279,8 @@ class NodeLayoutT final : public InternalNodeImpl, public LeafNodeImpl {
       assert(result_raw.mstat == MSTAT_END);
     } else {
       full_key_t<KeyT::VIEW> index;
-      STAGE_T::get_key_view(node_stage, result_raw.position, index);
+      STAGE_T::template get_slot<true, false>(
+          node_stage, result_raw.position, &index, nullptr);
       assert_mstat(key, index, result_raw.mstat);
     }
 #endif
@@ -329,15 +336,22 @@ class NodeLayoutT final : public InternalNodeImpl, public LeafNodeImpl {
       logger().trace("OTree::Layout::Insert: -- dump\n{}", sos.str());
     }
     validate_layout();
-    assert(get_key_view(insert_pos).compare_to(key) == MatchKindCMP::EQ);
+#ifndef NDEBUG
+    full_key_t<KeyT::VIEW> index;
+    get_slot(insert_pos, &index, nullptr);
+    assert(index.compare_to(key) == MatchKindCMP::EQ);
+#endif
     return ret;
   }
 
   std::tuple<search_position_t, bool, const value_t*> split_insert(
-      NodeExtentMutable& right_mut, NodeImpl& right_impl,
+      NodeExtentMutable& right_mut, NodeImpl& _right_impl,
       const full_key_t<KEY_TYPE>& key, const value_input_t& value,
       search_position_t& _insert_pos, match_stage_t& insert_stage,
       node_offset_t& insert_size) override {
+    assert(_right_impl.node_type() == NODE_TYPE);
+    assert(_right_impl.field_type() == FIELD_TYPE);
+    auto& right_impl = dynamic_cast<NodeLayoutT&>(_right_impl);
     logger().info("OTree::Layout::Split: begin at "
                   "insert_pos({}), insert_stage={}, insert_size={}B, "
                   "{:#x}=>{:#x} ...",
@@ -499,10 +513,18 @@ class NodeLayoutT final : public InternalNodeImpl, public LeafNodeImpl {
                      insert_pos, insert_stage);
       p_value = extent.template split_insert_replayable<KEY_TYPE>(
           split_at, key, value, insert_pos, insert_stage, insert_size);
-      assert(get_key_view(_insert_pos).compare_to(key) == MatchKindCMP::EQ);
+#ifndef NDEBUG
+      full_key_t<KeyT::VIEW> index;
+      get_slot(_insert_pos, &index, nullptr);
+      assert(index.compare_to(key) == MatchKindCMP::EQ);
+#endif
     } else {
       logger().debug("OTree::Layout::Split: -- left trim ...");
-      assert(right_impl.get_key_view(_insert_pos).compare_to(key) == MatchKindCMP::EQ);
+#ifndef NDEBUG
+      full_key_t<KeyT::VIEW> index;
+      right_impl.get_slot(_insert_pos, &index, nullptr);
+      assert(index.compare_to(key) == MatchKindCMP::EQ);
+#endif
       extent.split_replayable(split_at);
     }
     if (unlikely(logger().is_enabled(seastar::log_level::debug))) {
@@ -525,11 +547,11 @@ class NodeLayoutT final : public InternalNodeImpl, public LeafNodeImpl {
     InsertType insert_type;
     search_position_t last_pos;
     if (is_insert_left) {
-      STAGE_T::template lookup_largest_slot<true, false, false>(
+      STAGE_T::template get_largest_slot<true, false, false>(
           extent.read(), &cast_down_fill_0<STAGE>(last_pos), nullptr, nullptr);
     } else {
       node_stage_t right_stage{reinterpret_cast<FieldType*>(right_mut.get_write())};
-      STAGE_T::template lookup_largest_slot<true, false, false>(
+      STAGE_T::template get_largest_slot<true, false, false>(
           right_stage, &cast_down_fill_0<STAGE>(last_pos), nullptr, nullptr);
     }
     if (_insert_pos == search_position_t::begin()) {
@@ -559,7 +581,13 @@ class NodeLayoutT final : public InternalNodeImpl, public LeafNodeImpl {
   void replace_child_addr(
       const search_position_t& pos, laddr_t dst, laddr_t src) override {
     if constexpr (NODE_TYPE == node_type_t::INTERNAL) {
-      const laddr_packed_t* p_value = get_p_value(pos);
+      const laddr_packed_t* p_value;
+      if (pos.is_end()) {
+        assert(is_level_tail());
+        p_value = get_tail_value();
+      } else {
+        get_slot(pos, nullptr, &p_value);
+      }
       assert(p_value->value == src);
       extent.update_child_addr_replayable(dst, const_cast<laddr_packed_t*>(p_value));
     } else {
@@ -591,17 +619,6 @@ class NodeLayoutT final : public InternalNodeImpl, public LeafNodeImpl {
   /*
    * LeafNodeImpl
    */
-  void get_largest_slot(search_position_t& pos,
-                        key_view_t& index_key,
-                        const value_header_t** pp_value) const override {
-    if constexpr (NODE_TYPE == node_type_t::LEAF) {
-      STAGE_T::template lookup_largest_slot<true, true, true>(
-          extent.read(), &cast_down_fill_0<STAGE>(pos), &index_key, pp_value);
-    } else {
-      ceph_abort("impossible path");
-    }
-  }
-
   std::tuple<match_stage_t, node_offset_t> evaluate_insert(
       const key_hobj_t& key, const value_config_t& value,
       const MatchHistory& history, match_stat_t mstat,
index 871add3068447a9b9560440652682d3a5c6b715d..b9a4dcb9961fb35170d7d28369d0d30d8998ac16 100644 (file)
@@ -858,17 +858,21 @@ struct staged {
 
   template <bool GET_KEY>
   static result_t smallest_result(
-      const iterator_t& iter, full_key_t<KeyT::VIEW>* index_key) {
+      const iterator_t& iter, full_key_t<KeyT::VIEW>* p_index_key) {
     static_assert(!IS_BOTTOM);
     assert(!iter.is_end());
-    auto pos_smallest = NXT_STAGE_T::position_t::begin();
     auto nxt_container = iter.get_nxt_container();
-    auto value_ptr = NXT_STAGE_T::template get_p_value<GET_KEY>(
-        nxt_container, pos_smallest, index_key);
+    auto pos_smallest = NXT_STAGE_T::position_t::begin();
+    const value_t* p_value;
+    NXT_STAGE_T::template get_slot<GET_KEY, true>(
+        nxt_container, pos_smallest, p_index_key, &p_value);
     if constexpr (GET_KEY) {
-      index_key->set(iter.get_key());
+      assert(p_index_key);
+      p_index_key->set(iter.get_key());
+    } else {
+      assert(!p_index_key);
     }
-    return result_t{{iter.index(), pos_smallest}, value_ptr, STAGE};
+    return result_t{{iter.index(), pos_smallest}, p_value, STAGE};
   }
 
   template <bool GET_KEY>
@@ -895,54 +899,44 @@ struct staged {
   }
 
   template <bool GET_POS, bool GET_KEY, bool GET_VAL>
-  static void lookup_largest_slot(
-      const container_t& container, position_t* p_position,
-      full_key_t<KeyT::VIEW>* p_index_key, const value_t** pp_value) {
+  static void get_largest_slot(
+      const container_t& container,        // IN
+      position_t* p_position,              // OUT
+      full_key_t<KeyT::VIEW>* p_index_key, // OUT
+      const value_t** pp_value) {          // OUT
     auto iter = iterator_t(container);
     iter.seek_last();
     if constexpr (GET_KEY) {
       assert(p_index_key);
       p_index_key->set(iter.get_key());
+    } else {
+      assert(!p_index_key);
     }
     if constexpr (GET_POS) {
       assert(p_position);
       p_position->index = iter.index();
+    } else {
+      assert(!p_position);
     }
     if constexpr (IS_BOTTOM) {
       if constexpr (GET_VAL) {
         assert(pp_value);
         *pp_value = iter.get_p_value();
+      } else {
+        assert(!pp_value);
       }
     } else {
       auto nxt_container = iter.get_nxt_container();
       if constexpr (GET_POS) {
-        NXT_STAGE_T::template lookup_largest_slot<true, GET_KEY, GET_VAL>(
+        NXT_STAGE_T::template get_largest_slot<true, GET_KEY, GET_VAL>(
             nxt_container, &p_position->nxt, p_index_key, pp_value);
       } else {
-        NXT_STAGE_T::template lookup_largest_slot<false, GET_KEY, GET_VAL>(
+        NXT_STAGE_T::template get_largest_slot<false, GET_KEY, GET_VAL>(
             nxt_container, nullptr, p_index_key, pp_value);
       }
     }
   }
 
-  template <bool GET_KEY = false>
-  static const value_t* get_p_value(
-      const container_t& container, const position_t& position,
-      full_key_t<KeyT::VIEW>* index_key = nullptr) {
-    auto iter = iterator_t(container);
-    iter.seek_at(position.index);
-    if constexpr (GET_KEY) {
-      index_key->set(iter.get_key());
-    }
-    if constexpr (!IS_BOTTOM) {
-      auto nxt_container = iter.get_nxt_container();
-      return NXT_STAGE_T::template get_p_value<GET_KEY>(
-          nxt_container, position.nxt, index_key);
-    } else {
-      return iter.get_p_value();
-    }
-  }
-
   template <bool GET_KEY, bool GET_VAL>
   static void get_slot(
       const container_t& container,        // IN
@@ -973,19 +967,6 @@ struct staged {
     }
   }
 
-  static void get_key_view(
-      const container_t& container,
-      const position_t& position,
-      full_key_t<KeyT::VIEW>& index_key) {
-    auto iter = iterator_t(container);
-    iter.seek_at(position.index);
-    index_key.set(iter.get_key());
-    if constexpr (!IS_BOTTOM) {
-      auto nxt_container = iter.get_nxt_container();
-      return NXT_STAGE_T::get_key_view(nxt_container, position.nxt, index_key);
-    }
-  }
-
   template <bool GET_KEY = false>
   static result_t lower_bound(
       const container_t& container,
index 384e824b22ec8c35525a8d5c28f7f0ef90aef586..2366a6e4dc0a5c4a0540bc531845305e657349c2 100644 (file)
@@ -703,19 +703,18 @@ class DummyChildPool {
    public:
     laddr_t laddr() const override { return _laddr; }
     bool is_level_tail() const override { return _is_level_tail; }
+    std::optional<key_view_t> get_pivot_index() const override { return {key_view}; }
 
    protected:
+    node_type_t node_type() const override { return node_type_t::LEAF; }
     field_type_t field_type() const override { return field_type_t::N0; }
     level_t level() const override { return 0u; }
-    key_view_t get_largest_key_view() const override { return key_view; }
     void prepare_mutate(context_t) override {
       ceph_abort("impossible path"); }
     bool is_empty() const override {
       ceph_abort("impossible path"); }
     node_offset_t free_size() const override {
       ceph_abort("impossible path"); }
-    key_view_t get_key_view(const search_position_t&) const override {
-      ceph_abort("impossible path"); }
     node_stats_t get_stats() const override {
       ceph_abort("impossible path"); }
     std::ostream& dump(std::ostream&) const override {
@@ -768,7 +767,7 @@ class DummyChildPool {
       if (right_child->can_split()) {
         splitable_nodes.insert(right_child);
       }
-      return insert_parent(c, right_child);
+      return insert_parent(c, *impl->get_pivot_index(), right_child);
     }
 
     node_future<> insert_and_split(