]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/onode-staged-tree: implement Btree::Cursor::get_ghobj()
authorYingxin Cheng <yingxin.cheng@intel.com>
Fri, 18 Sep 2020 02:49:35 +0000 (10:49 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Tue, 1 Dec 2020 04:50:53 +0000 (12:50 +0800)
Internally, get key_view_t when lookup into the leaf node.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/os/seastore/onode_manager/staged-fltree/node.cc
src/crimson/os/seastore/onode_manager/staged-fltree/node.h
src/crimson/os/seastore/onode_manager/staged-fltree/node_impl.h
src/crimson/os/seastore/onode_manager/staged-fltree/node_layout.h
src/crimson/os/seastore/onode_manager/staged-fltree/stages/key_layout.h
src/crimson/os/seastore/onode_manager/staged-fltree/stages/stage.h
src/crimson/os/seastore/onode_manager/staged-fltree/tree.cc
src/crimson/os/seastore/onode_manager/staged-fltree/tree.h
src/test/crimson/seastore/onode_tree/test_staged_fltree.cc

index f68bc5174b3d2ee58ef21f1b43e29831f80246c2..15a14ab79553e15a9846beb1c09c6c4747980539 100644 (file)
@@ -22,12 +22,18 @@ using node_future = Node::node_future<ValuesT...>;
  * tree_cursor_t
  */
 
+tree_cursor_t::tree_cursor_t(Ref<LeafNode> node, const search_position_t& pos)
+      : leaf_node{node}, position{pos} {
+  assert(!is_end());
+  leaf_node->do_track_cursor(*this);
+}
+
 tree_cursor_t::tree_cursor_t(
     Ref<LeafNode> node, const search_position_t& pos,
-    const onode_t* _p_value, layout_version_t v)
+    const key_view_t& key, const onode_t* _p_value, layout_version_t v)
       : leaf_node{node}, position{pos} {
   assert(!is_end());
-  update_p_value(_p_value, v);
+  update_kv(key, _p_value, v);
   leaf_node->do_track_cursor(*this);
 }
 
@@ -43,13 +49,13 @@ tree_cursor_t::~tree_cursor_t() {
   }
 }
 
+const key_view_t& tree_cursor_t::get_key_view() const {
+  ensure_kv();
+  return *key_view;
+}
+
 const onode_t* tree_cursor_t::get_p_value() const {
-  assert(!is_end());
-  if (!p_value || node_version != leaf_node->get_layout_version()) {
-    // NOTE: the leaf node is always present when we hold its reference.
-    std::tie(p_value, node_version) = leaf_node->get_p_value(position);
-  }
-  assert(p_value);
+  ensure_kv();
   return p_value;
 }
 
@@ -61,18 +67,30 @@ void tree_cursor_t::update_track(
   assert(!is_end());
   leaf_node = node;
   position = pos;
+  key_view.reset();
   p_value = nullptr;
   leaf_node->do_track_cursor(*this);
 }
 
-void tree_cursor_t::update_p_value(const onode_t* _p_value, layout_version_t v) {
+void tree_cursor_t::update_kv(
+    const key_view_t& key, const onode_t* _p_value, layout_version_t v) const {
   assert(!is_end());
   assert(_p_value);
-  assert(std::make_pair(_p_value, v) == leaf_node->get_p_value(position));
+  assert(std::make_tuple(key, _p_value, v) == leaf_node->get_kv(position));
+  key_view = key;
   p_value = _p_value;
   node_version = v;
 }
 
+void tree_cursor_t::ensure_kv() const {
+  assert(!is_end());
+  if (!p_value || node_version != leaf_node->get_layout_version()) {
+    // NOTE: the leaf node is always present when we hold its reference.
+    std::tie(key_view, p_value, node_version) = leaf_node->get_kv(position);
+  }
+  assert(p_value);
+}
+
 /*
  * Node
  */
@@ -489,9 +507,11 @@ bool LeafNode::is_level_tail() const {
   return impl->is_level_tail();
 }
 
-std::pair<const onode_t*, layout_version_t> LeafNode::get_p_value(
+std::tuple<key_view_t, const onode_t*, layout_version_t> LeafNode::get_kv(
     const search_position_t& pos) const {
-  return {impl->get_p_value(pos), layout_version};
+  key_view_t key_view;
+  auto p_value = impl->get_p_value(pos, &key_view);
+  return {key_view, p_value, layout_version};
 }
 
 node_future<Ref<tree_cursor_t>>
@@ -502,9 +522,10 @@ LeafNode::lookup_smallest(context_t) {
         new tree_cursor_t(this));
   }
   auto pos = search_position_t::begin();
-  auto p_value = impl->get_p_value(pos);
+  key_view_t index_key;
+  auto p_value = impl->get_p_value(pos, &index_key);
   return node_ertr::make_ready_future<Ref<tree_cursor_t>>(
-      get_or_track_cursor(pos, p_value));
+      get_or_track_cursor(pos, index_key, p_value));
 }
 
 node_future<Ref<tree_cursor_t>>
@@ -516,21 +537,23 @@ LeafNode::lookup_largest(context_t) {
   }
   search_position_t pos;
   const onode_t* p_value = nullptr;
-  impl->get_largest_value(pos, p_value);
+  key_view_t index_key;
+  impl->get_largest_slot(pos, index_key, &p_value);
   return node_ertr::make_ready_future<Ref<tree_cursor_t>>(
-      get_or_track_cursor(pos, p_value));
+      get_or_track_cursor(pos, index_key, p_value));
 }
 
 node_future<Node::search_result_t>
 LeafNode::lower_bound_tracked(
     context_t c, const key_hobj_t& key, MatchHistory& history) {
-  auto result = impl->lower_bound(key, history);
+  key_view_t index_key;
+  auto result = impl->lower_bound(key, history, &index_key);
   Ref<tree_cursor_t> cursor;
   if (result.position.is_end()) {
     assert(!result.p_value);
     cursor = new tree_cursor_t(this);
   } else {
-    cursor = get_or_track_cursor(result.position, result.p_value);
+    cursor = get_or_track_cursor(result.position, index_key, result.p_value);
   }
   return node_ertr::make_ready_future<search_result_t>(
       search_result_t{cursor, result.match()});
@@ -630,26 +653,31 @@ node_future<Ref<LeafNode>> LeafNode::allocate_root(
 }
 
 Ref<tree_cursor_t> LeafNode::get_or_track_cursor(
-    const search_position_t& position, const onode_t* p_value) {
+    const search_position_t& position,
+    const key_view_t& key, const onode_t* p_value) {
   assert(!position.is_end());
   assert(p_value);
   Ref<tree_cursor_t> p_cursor;
   auto found = tracked_cursors.find(position);
   if (found == tracked_cursors.end()) {
-    p_cursor = new tree_cursor_t(this, position, p_value, layout_version);
+    p_cursor = new tree_cursor_t(this, position, key, p_value, layout_version);
   } else {
     p_cursor = found->second;
     assert(p_cursor->get_leaf_node() == this);
     assert(p_cursor->get_position() == position);
-    p_cursor->update_p_value(p_value, layout_version);
+    p_cursor->update_kv(key, p_value, layout_version);
   }
   return p_cursor;
 }
 
 void LeafNode::validate_cursor(tree_cursor_t& cursor) const {
+#ifndef NDEBUG
   assert(this == cursor.get_leaf_node().get());
   assert(!cursor.is_end());
-  assert(impl->get_p_value(cursor.get_position()) == cursor.get_p_value());
+  auto [key, val, ver] = get_kv(cursor.get_position());
+  assert(key == cursor.get_key_view());
+  assert(val == cursor.get_p_value());
+#endif
 }
 
 Ref<tree_cursor_t> LeafNode::track_insert(
@@ -672,7 +700,9 @@ Ref<tree_cursor_t> LeafNode::track_insert(
   }
 
   // track insert
-  return new tree_cursor_t(this, insert_pos, p_onode, layout_version);
+  // TODO: getting key_view_t from stage::proceed_insert() and
+  // stage::append_insert() has not supported yet
+  return new tree_cursor_t(this, insert_pos);
 }
 
 void LeafNode::track_split(
index 203b93b3c8bef4a0ce6cf1c9d3b37b5a4ecdd064..97f7cfad02d25a29ec1bb0047a3e3db9685094c2 100644 (file)
@@ -11,6 +11,7 @@
 #include "crimson/common/type_helpers.h"
 
 #include "node_extent_mutable.h"
+#include "stages/key_layout.h"
 #include "stages/stage_types.h"
 #include "super.h"
 #include "tree_types.h"
@@ -34,7 +35,6 @@ namespace crimson::os::seastore::onode {
  * LeafNode      --> tree_cursor_t*
  */
 
-struct key_hobj_t;
 class LeafNode;
 class InternalNode;
 
@@ -51,22 +51,26 @@ class tree_cursor_t final
   tree_cursor_t& operator=(tree_cursor_t&&) = delete;
 
   bool is_end() const { return position.is_end(); }
+  const key_view_t& get_key_view() const;
   const onode_t* get_p_value() const;
 
  private:
+  tree_cursor_t(Ref<LeafNode>, const search_position_t&);
   tree_cursor_t(Ref<LeafNode>, const search_position_t&,
-                const onode_t*, layout_version_t);
+                const key_view_t& key, const onode_t*, layout_version_t);
   // lookup reaches the end, contain leaf node for further insert
   tree_cursor_t(Ref<LeafNode>);
   const search_position_t& get_position() const { return position; }
   Ref<LeafNode> get_leaf_node() { return leaf_node; }
   void update_track(Ref<LeafNode>, const search_position_t&);
-  void update_p_value(const onode_t*, layout_version_t);
+  void update_kv(const key_view_t&, const onode_t*, layout_version_t) const;
+  void ensure_kv() const;
 
  private:
   Ref<LeafNode> leaf_node;
   search_position_t position;
   // cached information
+  mutable std::optional<key_view_t> key_view;
   mutable const onode_t* p_value;
   mutable layout_version_t node_version;
 
@@ -251,7 +255,7 @@ class LeafNode final : public Node {
 
   bool is_level_tail() const;
   layout_version_t get_layout_version() const { return layout_version; }
-  std::pair<const onode_t*, layout_version_t> get_p_value(
+  std::tuple<key_view_t, const onode_t*, layout_version_t> get_kv(
       const search_position_t&) const;
   void do_track_cursor(tree_cursor_t& cursor) {
     validate_cursor(cursor);
@@ -286,7 +290,8 @@ class LeafNode final : public Node {
  private:
   // XXX: extract a common tracker for InternalNode to track Node,
   // and LeafNode to track tree_cursor_t.
-  Ref<tree_cursor_t> get_or_track_cursor(const search_position_t&, const onode_t*);
+  Ref<tree_cursor_t> get_or_track_cursor(
+      const search_position_t&, const key_view_t&, const onode_t*);
   Ref<tree_cursor_t> track_insert(
       const search_position_t&, match_stage_t, const onode_t*);
   void track_split(const search_position_t&, Ref<LeafNode>);
index d39e6784270321776bb15bae56312598a74d922d..acd00c964e5a35845895e4749b50a23708a519e7 100644 (file)
@@ -51,12 +51,14 @@ class InternalNodeImpl : public NodeImpl {
 
   #pragma GCC diagnostic ignored "-Woverloaded-virtual"
   virtual const laddr_t* get_p_value(
-      const search_position_t&, internal_marker_t={}) const {
+      const search_position_t&,
+      key_view_t* = nullptr, internal_marker_t = {}) const {
     assert(false && "impossible path");
   }
   #pragma GCC diagnostic ignored "-Woverloaded-virtual"
   virtual lookup_result_t<node_type_t::INTERNAL> lower_bound(
-      const key_hobj_t&, MatchHistory&, internal_marker_t={}) const {
+      const key_hobj_t&, MatchHistory&,
+      key_view_t* = nullptr, internal_marker_t = {}) const {
     assert(false && "impossible path");
   }
   #pragma GCC diagnostic ignored "-Woverloaded-virtual"
@@ -96,12 +98,14 @@ class LeafNodeImpl : public NodeImpl {
 
   #pragma GCC diagnostic ignored "-Woverloaded-virtual"
   virtual const onode_t* get_p_value(
-      const search_position_t&, leaf_marker_t={}) const {
+      const search_position_t&,
+      key_view_t* = nullptr, leaf_marker_t={}) const {
     assert(false && "impossible path");
   }
   #pragma GCC diagnostic ignored "-Woverloaded-virtual"
   virtual lookup_result_t<node_type_t::LEAF> lower_bound(
-      const key_hobj_t&, MatchHistory&, leaf_marker_t={}) const {
+      const key_hobj_t&, MatchHistory&,
+      key_view_t* = nullptr, leaf_marker_t = {}) const {
     assert(false && "impossible path");
   }
   #pragma GCC diagnostic ignored "-Woverloaded-virtual"
@@ -116,7 +120,8 @@ class LeafNodeImpl : public NodeImpl {
     assert(false && "impossible path");
   }
 
-  virtual void get_largest_value(search_position_t&, const onode_t*&) const = 0;
+  virtual void get_largest_slot(
+      search_position_t&, key_view_t&, const onode_t**) const = 0;
   virtual std::tuple<match_stage_t, node_offset_t> evaluate_insert(
       const key_hobj_t&, const onode_t&,
       const MatchHistory&, search_position_t&) const = 0;
index 7ad1366bac03d9307e8df154d93c624aed775eca..c3244aa3154e5ec51c9c76405874a9391f604a85 100644 (file)
@@ -93,9 +93,10 @@ class NodeLayoutT final : public InternalNodeImpl, public LeafNodeImpl {
   }
 
   key_view_t get_largest_key_view() const override {
-    key_view_t ret;
-    STAGE_T::lookup_largest_index(extent.read(), ret);
-    return ret;
+    key_view_t index_key;
+    STAGE_T::template lookup_largest_slot<false, true, false>(
+        extent.read(), nullptr, &index_key, nullptr);
+    return index_key;
   }
 
   std::ostream& dump(std::ostream& os) const override {
@@ -155,10 +156,11 @@ class NodeLayoutT final : public InternalNodeImpl, public LeafNodeImpl {
   /*
    * Common
    */
-  const value_t* get_p_value(
-      const search_position_t& position, marker_t={}) const override {
+  const value_t* get_p_value(const search_position_t& position,
+                             key_view_t* index_key=nullptr, marker_t={}) const override {
     auto& node_stage = extent.read();
     if constexpr (NODE_TYPE == node_type_t::INTERNAL) {
+      assert(!index_key);
       if (position.is_end()) {
         assert(is_level_tail());
         return node_stage.get_end_p_laddr();
@@ -166,11 +168,17 @@ class NodeLayoutT final : public InternalNodeImpl, public LeafNodeImpl {
     } else {
       assert(!position.is_end());
     }
-    return STAGE_T::get_p_value(node_stage, cast_down<STAGE>(position));
+    if (index_key) {
+      return STAGE_T::template get_p_value<true>(
+          node_stage, cast_down<STAGE>(position), index_key);
+    } else {
+      return STAGE_T::get_p_value(node_stage, cast_down<STAGE>(position));
+    }
   }
 
   lookup_result_t<NODE_TYPE> lower_bound(
-      const key_hobj_t& key, MatchHistory& history, marker_t={}) const override {
+      const key_hobj_t& key, MatchHistory& history,
+      key_view_t* index_key=nullptr, marker_t={}) const override {
     auto& node_stage = extent.read();
     if constexpr (NODE_TYPE == node_type_t::LEAF) {
       if (unlikely(node_stage.keys() == 0)) {
@@ -179,7 +187,20 @@ class NodeLayoutT final : public InternalNodeImpl, public LeafNodeImpl {
       }
     }
 
-    auto result_raw = STAGE_T::lower_bound(node_stage, key, history);
+    typename STAGE_T::result_t result_raw;
+    if (index_key) {
+      result_raw = STAGE_T::template lower_bound<true>(
+          node_stage, key, history, index_key);
+#ifndef NDEBUG
+      if (!result_raw.is_end()) {
+        full_key_t<KeyT::VIEW> index;
+        STAGE_T::get_key_view(node_stage, result_raw.position, index);
+        assert(index == *index_key);
+      }
+#endif
+    } else {
+      result_raw = STAGE_T::lower_bound(node_stage, key, history);
+    }
 #ifndef NDEBUG
     if (result_raw.is_end()) {
       assert(result_raw.mstat == MSTAT_END);
@@ -333,9 +354,11 @@ class NodeLayoutT final : public InternalNodeImpl, public LeafNodeImpl {
   /*
    * LeafNodeImpl
    */
-  void get_largest_value(search_position_t& pos, const onode_t*& p_value) const override {
+  void get_largest_slot(search_position_t& pos,
+                        key_view_t& index_key, const onode_t** pp_value) const override {
     if constexpr (NODE_TYPE == node_type_t::LEAF) {
-      STAGE_T::lookup_largest(extent.read(), cast_down_fill_0<STAGE>(pos), p_value);
+      STAGE_T::template lookup_largest_slot<true, true, true>(
+          extent.read(), &cast_down_fill_0<STAGE>(pos), &index_key, pp_value);
     } else {
       assert(false && "impossible path");
     }
index 203545842aa3dd3e7ab421a8633f9267cc0c6231..ae61f60c0d98ab0c89b60c88ab036d83be69ee04 100644 (file)
@@ -134,6 +134,10 @@ struct string_key_view_t {
     }
   }
   size_t size() const { return length + sizeof(string_size_t); }
+  std::string_view to_string_view() const {
+    assert(type() == Type::STR);
+    return {p_key, length};
+  }
   bool operator==(const string_key_view_t& x) const {
     if (type() == x.type() && type() != Type::STR)
       return true;
@@ -433,6 +437,18 @@ class key_view_t {
     return *p_snap_gen;
   }
 
+  ghobject_t to_ghobj() const {
+    ghobject_t ghobj;
+    ghobj.shard_id.id = shard();
+    ghobj.hobj.pool = pool();
+    ghobj.hobj.set_hash(crush());
+    ghobj.hobj.nspace = nspace().to_string_view();
+    ghobj.hobj.oid.name = oid().to_string_view();
+    ghobj.hobj.snap = snap();
+    ghobj.generation = gen();
+    return ghobj;
+  }
+
   void set(const crush_t& key) {
     assert(!has_crush());
     p_crush = &key;
index c3378c1c655967e5f5ef19317622d8414168927e..ac82e03a1a098bd1ae4221f96da1f3909bf6f376 100644 (file)
@@ -773,63 +773,88 @@ struct staged {
    * Lookup internals (hide?)
    */
 
-  static result_t smallest_result(const iterator_t& iter) {
+  template <bool GET_KEY>
+  static result_t smallest_result(
+      const iterator_t& iter, full_key_t<KeyT::VIEW>* index_key) {
     static_assert(!IS_BOTTOM);
     assert(!iter.is_end());
     auto pos_smallest = NXT_STAGE_T::position_t::begin();
     auto nxt_container = iter.get_nxt_container();
-    auto value_ptr = NXT_STAGE_T::get_p_value(nxt_container, pos_smallest);
+    auto value_ptr = NXT_STAGE_T::template get_p_value<GET_KEY>(
+        nxt_container, pos_smallest, index_key);
+    if constexpr (GET_KEY) {
+      index_key->set(iter.get_key());
+    }
     return result_t{{iter.index(), pos_smallest}, value_ptr, STAGE};
   }
 
+  template <bool GET_KEY>
   static result_t nxt_lower_bound(
-      const full_key_t<KeyT::HOBJ>& key, iterator_t& iter, MatchHistory& history) {
+      const full_key_t<KeyT::HOBJ>& key, iterator_t& iter,
+      MatchHistory& history, full_key_t<KeyT::VIEW>* index_key) {
     static_assert(!IS_BOTTOM);
     assert(!iter.is_end());
     auto nxt_container = iter.get_nxt_container();
-    auto nxt_result = NXT_STAGE_T::lower_bound(nxt_container, key, history);
+    auto nxt_result = NXT_STAGE_T::template lower_bound<GET_KEY>(
+        nxt_container, key, history, index_key);
     if (nxt_result.is_end()) {
       if (iter.is_last()) {
         return result_t::end();
       } else {
-        return smallest_result(++iter);
+        return smallest_result<GET_KEY>(++iter, index_key);
       }
     } else {
+      if constexpr (GET_KEY) {
+        index_key->set(iter.get_key());
+      }
       return result_t::from_nxt(iter.index(), nxt_result);
     }
   }
 
-  static void lookup_largest(
-      const container_t& container, position_t& position, const value_t*& p_value) {
+  template <bool GET_POS, bool GET_KEY, bool GET_VAL>
+  static void lookup_largest_slot(
+      const container_t& container, position_t* p_position,
+      full_key_t<KeyT::VIEW>* p_index_key, const value_t** pp_value) {
     auto iter = iterator_t(container);
     iter.seek_last();
-    position.index = iter.index();
+    if constexpr (GET_KEY) {
+      assert(p_index_key);
+      p_index_key->set(iter.get_key());
+    }
+    if constexpr (GET_POS) {
+      assert(p_position);
+      p_position->index = iter.index();
+    }
     if constexpr (IS_BOTTOM) {
-      p_value = iter.get_p_value();
+      if constexpr (GET_VAL) {
+        assert(pp_value);
+        *pp_value = iter.get_p_value();
+      }
     } else {
       auto nxt_container = iter.get_nxt_container();
-      NXT_STAGE_T::lookup_largest(nxt_container, position.nxt, p_value);
-    }
-  }
-
-  static void lookup_largest_index(
-      const container_t& container, full_key_t<KeyT::VIEW>& output) {
-    auto iter = iterator_t(container);
-    iter.seek_last();
-    output.set(iter.get_key());
-    if constexpr (!IS_BOTTOM) {
-      auto nxt_container = iter.get_nxt_container();
-      NXT_STAGE_T::lookup_largest_index(nxt_container, output);
+      if constexpr (GET_POS) {
+        NXT_STAGE_T::template lookup_largest_slot<true, GET_KEY, GET_VAL>(
+            nxt_container, &p_position->nxt, p_index_key, pp_value);
+      } else {
+        NXT_STAGE_T::template lookup_largest_slot<false, GET_KEY, GET_VAL>(
+            nxt_container, nullptr, p_index_key, pp_value);
+      }
     }
   }
 
+  template <bool GET_KEY = false>
   static const value_t* get_p_value(
-      const container_t& container, const position_t& position) {
+      const container_t& container, const position_t& position,
+      full_key_t<KeyT::VIEW>* index_key = nullptr) {
     auto iter = iterator_t(container);
     iter.seek_at(position.index);
+    if constexpr (GET_KEY) {
+      index_key->set(iter.get_key());
+    }
     if constexpr (!IS_BOTTOM) {
       auto nxt_container = iter.get_nxt_container();
-      return NXT_STAGE_T::get_p_value(nxt_container, position.nxt);
+      return NXT_STAGE_T::template get_p_value<GET_KEY>(
+          nxt_container, position.nxt, index_key);
     } else {
       return iter.get_p_value();
     }
@@ -838,20 +863,22 @@ struct staged {
   static void get_key_view(
       const container_t& container,
       const position_t& position,
-      full_key_t<KeyT::VIEW>& output) {
+      full_key_t<KeyT::VIEW>& index_key) {
     auto iter = iterator_t(container);
     iter.seek_at(position.index);
-    output.set(iter.get_key());
+    index_key.set(iter.get_key());
     if constexpr (!IS_BOTTOM) {
       auto nxt_container = iter.get_nxt_container();
-      return NXT_STAGE_T::get_key_view(nxt_container, position.nxt, output);
+      return NXT_STAGE_T::get_key_view(nxt_container, position.nxt, index_key);
     }
   }
 
+  template <bool GET_KEY = false>
   static result_t lower_bound(
       const container_t& container,
       const full_key_t<KeyT::HOBJ>& key,
-      MatchHistory& history) {
+      MatchHistory& history,
+      full_key_t<KeyT::VIEW>* index_key = nullptr) {
     bool exclude_last = false;
     if (history.get<STAGE>().has_value()) {
       if (*history.get<STAGE>() == MatchKindCMP::EQ) {
@@ -876,10 +903,10 @@ struct staged {
               test_key_equal = (cmp == MatchKindCMP::EQ);
             }
             if (test_key_equal) {
-              return nxt_lower_bound(key, iter, history);
+              return nxt_lower_bound<GET_KEY>(key, iter, history, index_key);
             } else {
               // key[stage] < index[stage][left-most]
-              return smallest_result(iter);
+              return smallest_result<GET_KEY>(iter, index_key);
             }
           }
         }
@@ -893,12 +920,16 @@ struct staged {
         } else {
           assert(compare_to<KeyT::HOBJ>(key, iter.get_key()) == MatchKindCMP::EQ);
         }
+        if constexpr (GET_KEY) {
+          index_key->set(iter.get_key());
+        }
         if constexpr (IS_BOTTOM) {
           auto value_ptr = iter.get_p_value();
           return result_t{{iter.index()}, value_ptr, MSTAT_EQ};
         } else {
           auto nxt_container = iter.get_nxt_container();
-          auto nxt_result = NXT_STAGE_T::lower_bound(nxt_container, key, history);
+          auto nxt_result = NXT_STAGE_T::template lower_bound<GET_KEY>(
+              nxt_container, key, history, index_key);
           // !history.is_PO<STAGE - 1>() means
           // key[stage+1 ...] <= index[stage+1 ...][*]
           assert(!nxt_result.is_end());
@@ -919,14 +950,17 @@ struct staged {
     history.set<STAGE>(bs_match == MatchKindBS::EQ ?
                        MatchKindCMP::EQ : MatchKindCMP::NE);
     if constexpr (IS_BOTTOM) {
+      if constexpr (GET_KEY) {
+        index_key->set(iter.get_key());
+      }
       auto value_ptr = iter.get_p_value();
       return result_t{{iter.index()}, value_ptr,
                       (bs_match == MatchKindBS::EQ ? MSTAT_EQ : MSTAT_NE0)};
     } else {
       if (bs_match == MatchKindBS::EQ) {
-        return nxt_lower_bound(key, iter, history);
+        return nxt_lower_bound<GET_KEY>(key, iter, history, index_key);
       } else {
-        return smallest_result(iter);
+        return smallest_result<GET_KEY>(iter, index_key);
       }
     }
   }
index 67b68384621f7f45ea2ef0c58ebe53c4593a4907..b5ccc60c23b631b6ceaba81fc701f6020dde8a7b 100644 (file)
@@ -5,7 +5,7 @@
 
 #include "node.h"
 #include "node_extent_manager.h"
-#include "stages/key_layout.h" // full_key_t<KeyT::HOBJ>
+#include "stages/key_layout.h"
 #include "super.h"
 
 namespace crimson::os::seastore::onode {
@@ -39,9 +39,8 @@ bool Cursor::is_end() const {
   }
 }
 
-const ghobject_t& Cursor::get_ghobj() {
-  // TODO
-  assert(false && "not implemented");
+ghobject_t Cursor::get_ghobj() const {
+  return p_cursor->get_key_view().to_ghobj();
 }
 
 const onode_t* Cursor::value() const {
index 697301b03324141de61b02ba33469504f52c918a..9f9895ce2927217e1352969cf9a1dda36d9692fc 100644 (file)
@@ -86,7 +86,8 @@ class Btree::Cursor {
   ~Cursor();
 
   bool is_end() const;
-  const ghobject_t& get_ghobj();
+  // XXX: return key_view_t to avoid unecessary ghobject_t constructions
+  ghobject_t get_ghobj() const;
   const onode_t* value() const;
   bool operator==(const Cursor& x) const;
   bool operator!=(const Cursor& x) const { return !(*this == x); }
index 98356b65e27d89f3ec653dde16f2130e2df082fe..f7727daf906e7188eac9710e4b10e821ffbf04a0 100644 (file)
@@ -212,8 +212,9 @@ class Onodes {
   }
 
   static void validate_cursor(
-      const Btree::Cursor& cursor, const onode_t& onode) {
+      const Btree::Cursor& cursor, const ghobject_t& key, const onode_t& onode) {
     assert(!cursor.is_end());
+    assert(cursor.get_ghobj() == key);
     assert(cursor.value());
     assert(cursor.value() != &onode);
     assert(*cursor.value() == onode);
@@ -277,7 +278,7 @@ TEST_F(b_dummy_tree_test_t, 3_random_insert_leaf_node)
       auto [cursor, success] = tree.insert(t, key, value).unsafe_get0();
       assert(success == true);
       insert_history.emplace_back(key, &value, cursor);
-      Onodes::validate_cursor(cursor, value);
+      Onodes::validate_cursor(cursor, key, value);
       auto cursor_ = tree.lower_bound(t, key).unsafe_get0();
       assert(cursor_.value() == cursor.value());
       return cursor.value();
@@ -303,7 +304,7 @@ TEST_F(b_dummy_tree_test_t, 3_random_insert_leaf_node)
       auto [cursor1_dup, ret1_dup] = tree.insert(
           t, key1, onode1_dup).unsafe_get0();
       assert(ret1_dup == false);
-      Onodes::validate_cursor(cursor1_dup, onode1);
+      Onodes::validate_cursor(cursor1_dup, key1, onode1);
     }
 
     // insert key2, onode2 to key1's left at STAGE_LEFT
@@ -376,8 +377,8 @@ TEST_F(b_dummy_tree_test_t, 3_random_insert_leaf_node)
       {make_ghobj(4, 4, 4, "ns3", "oid3", 2, 2), &onodes.pick()},
       {make_ghobj(4, 4, 4, "ns4", "oid4", 2, 2), &onodes.pick()},
       {make_ghobj(4, 4, 4, "ns4", "oid4", 4, 4), &onodes.pick()}};
-    auto& smallest_value = *kvs[0].second;
-    auto& largest_value = *kvs[kvs.size() - 1].second;
+    auto [smallest_key, smallest_value] = kvs[0];
+    auto [largest_key, largest_value] = kvs[kvs.size() - 1];
     std::random_shuffle(kvs.begin(), kvs.end());
     std::for_each(kvs.begin(), kvs.end(), [&f_validate_insert_new] (auto& kv) {
       f_validate_insert_new(kv.first, *kv.second);
@@ -388,16 +389,16 @@ TEST_F(b_dummy_tree_test_t, 3_random_insert_leaf_node)
     for (auto& [k, v, c] : insert_history) {
       // validate values in tree keep intact
       auto cursor = tree.lower_bound(t, k).unsafe_get0();
-      Onodes::validate_cursor(cursor, *v);
+      Onodes::validate_cursor(cursor, k, *v);
       // validate values in cursors keep intact
-      Onodes::validate_cursor(c, *v);
+      Onodes::validate_cursor(c, k, *v);
     }
     Onodes::validate_cursor(
-        tree.lower_bound(t, key_s).unsafe_get0(), smallest_value);
+        tree.lower_bound(t, key_s).unsafe_get0(), smallest_key, *smallest_value);
     Onodes::validate_cursor(
-        tree.begin(t).unsafe_get0(), smallest_value);
+        tree.begin(t).unsafe_get0(), smallest_key, *smallest_value);
     Onodes::validate_cursor(
-        tree.last(t).unsafe_get0(), largest_value);
+        tree.last(t).unsafe_get0(), largest_key, *largest_value);
 
     std::ostringstream oss;
     tree.dump(t, oss);
@@ -447,7 +448,7 @@ TEST_F(b_dummy_tree_test_t, 4_split_leaf_node)
       auto& value = onodes.create(120);
       auto [cursor, success] = tree.insert(t, key, value).unsafe_get0();
       assert(success == true);
-      Onodes::validate_cursor(cursor, value);
+      Onodes::validate_cursor(cursor, key, value);
       insert_history.emplace_back(key, &value, cursor);
     }
     assert(tree.height(t).unsafe_get0() == 1);
@@ -466,7 +467,7 @@ TEST_F(b_dummy_tree_test_t, 4_split_leaf_node)
       logger().info("insert {}:", key_hobj_t(key));
       auto [cursor, success] = tree_clone.insert(t_clone, key, value).unsafe_get0();
       assert(success == true);
-      Onodes::validate_cursor(cursor, value);
+      Onodes::validate_cursor(cursor, key, value);
 
       std::ostringstream oss;
       tree_clone.dump(t_clone, oss);
@@ -475,10 +476,10 @@ TEST_F(b_dummy_tree_test_t, 4_split_leaf_node)
 
       for (auto& [k, v, c] : insert_history) {
         auto result = tree_clone.lower_bound(t_clone, k).unsafe_get0();
-        Onodes::validate_cursor(result, *v);
+        Onodes::validate_cursor(result, k, *v);
       }
       auto result = tree_clone.lower_bound(t_clone, key).unsafe_get0();
-      Onodes::validate_cursor(result, value);
+      Onodes::validate_cursor(result, key, value);
     };
 
     auto& onode = onodes.create(1280);