]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/onode-staged-tree: be explicit about node num-key/value invariants
authorYingxin Cheng <yingxin.cheng@intel.com>
Fri, 2 Apr 2021 07:27:45 +0000 (15:27 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Thu, 29 Apr 2021 08:03:37 +0000 (16:03 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/os/seastore/onode_manager/staged-fltree/node.cc
src/crimson/os/seastore/onode_manager/staged-fltree/node_impl.h
src/crimson/os/seastore/onode_manager/staged-fltree/node_layout.h
src/test/crimson/seastore/onode_tree/test_staged_fltree.cc

index c1358f39040156ac6e41429fbfcaf59dc726014f..e55ff710cd392740054487a7b2831ec6ec87c631 100644 (file)
@@ -494,7 +494,7 @@ InternalNode::InternalNode(InternalNodeImpl* impl, NodeImplURef&& impl_ref)
 node_future<Ref<tree_cursor_t>>
 InternalNode::get_next_cursor(context_t c, const search_position_t& pos)
 {
-  assert(!impl->is_empty());
+  impl->validate_non_empty();
   if (pos.is_end()) {
     assert(impl->is_level_tail());
     return get_next_cursor_from_parent(c);
@@ -602,7 +602,7 @@ node_future<Ref<InternalNode>> InternalNode::allocate_root(
   ).safe_then([c, old_root_addr,
                super = std::move(super)](auto fresh_node) mutable {
     auto root = fresh_node.node;
-    assert(root->impl->is_empty());
+    assert(root->impl->is_keys_empty());
     auto p_value = root->impl->get_tail_value();
     fresh_node.mut.copy_in_absolute(
         const_cast<laddr_packed_t*>(p_value), old_root_addr);
@@ -614,7 +614,7 @@ node_future<Ref<InternalNode>> InternalNode::allocate_root(
 node_future<Ref<tree_cursor_t>>
 InternalNode::lookup_smallest(context_t c)
 {
-  assert(!impl->is_empty());
+  impl->validate_non_empty();
   auto position = search_position_t::begin();
   const laddr_packed_t* p_child_addr;
   impl->get_slot(position, nullptr, &p_child_addr);
@@ -629,7 +629,7 @@ InternalNode::lookup_largest(context_t c)
 {
   // NOTE: unlike LeafNode::lookup_largest(), this only works for the tail
   // internal node to return the tail child address.
-  assert(!impl->is_empty());
+  impl->validate_non_empty();
   assert(impl->is_level_tail());
   auto p_child_addr = impl->get_tail_value();
   return get_or_track_child(c, search_position_t::end(), p_child_addr->value
@@ -653,7 +653,7 @@ InternalNode::lower_bound_tracked(
 node_future<> InternalNode::do_get_tree_stats(
     context_t c, tree_stats_t& stats)
 {
-  assert(!impl->is_empty());
+  impl->validate_non_empty();
   auto nstats = impl->get_stats();
   stats.size_persistent_internal += nstats.size_persistent;
   stats.size_filled_internal += nstats.size_filled;
@@ -882,7 +882,7 @@ LeafNode::get_kv(const search_position_t& pos) const
 node_future<Ref<tree_cursor_t>>
 LeafNode::get_next_cursor(context_t c, const search_position_t& pos)
 {
-  assert(!impl->is_empty());
+  impl->validate_non_empty();
   search_position_t next_pos = pos;
   key_view_t index_key;
   const value_header_t* p_value_header = nullptr;
@@ -931,7 +931,7 @@ LeafNode::prepare_mutate_value_payload(context_t c)
 node_future<Ref<tree_cursor_t>>
 LeafNode::lookup_smallest(context_t)
 {
-  if (unlikely(impl->is_empty())) {
+  if (unlikely(impl->is_keys_empty())) {
     assert(is_root());
     return node_ertr::make_ready_future<Ref<tree_cursor_t>>(
         tree_cursor_t::create_end(this));
@@ -947,7 +947,7 @@ LeafNode::lookup_smallest(context_t)
 node_future<Ref<tree_cursor_t>>
 LeafNode::lookup_largest(context_t)
 {
-  if (unlikely(impl->is_empty())) {
+  if (unlikely(impl->is_keys_empty())) {
     assert(is_root());
     return node_ertr::make_ready_future<Ref<tree_cursor_t>>(
         tree_cursor_t::create_end(this));
index f359daf3692d54b413da03b7dc035f8d70ca56fe..f0805ac7d1d2281803e238e3b080c88259811d99 100644 (file)
@@ -74,7 +74,16 @@ class NodeImpl {
   virtual nextent_state_t get_extent_state() const = 0;
   virtual void prepare_mutate(context_t) = 0;
   virtual bool is_level_tail() const = 0;
-  virtual bool is_empty() const = 0;
+
+  /* Invariants for num_keys and num_values:
+   * - for leaf node and non-tail internal node, num_keys == num_values;
+   * - for tail internal node, num_keys + 1 == num_values;
+   * - all node must have at least 1 value, except the root leaf node;
+   * - the root internal node must have more than 1 values;
+   */
+  virtual void validate_non_empty() const = 0;
+  virtual bool is_keys_empty() const = 0;
+
   virtual level_t level() const = 0;
   virtual node_offset_t free_size() const = 0;
   virtual std::optional<key_view_t> get_pivot_index() const = 0;
index 712ea55e903d487a811f55a664f2584caa9ec8a2..5533c70812e562325630883d2f811939729614c9 100644 (file)
@@ -93,17 +93,27 @@ class NodeLayoutT final : public InternalNodeImpl, public LeafNodeImpl {
   nextent_state_t get_extent_state() const override { return extent.get_state(); }
   void prepare_mutate(context_t c) override { return extent.prepare_mutate(c); }
   bool is_level_tail() const override { return extent.read().is_level_tail(); }
-  bool is_empty() const override { return extent.read().keys() == 0; }
+
+  void validate_non_empty() const override {
+    if constexpr (NODE_TYPE == node_type_t::INTERNAL) {
+      if (is_level_tail()) {
+        return;
+      }
+    }
+    assert(!is_keys_empty());
+  }
+  bool is_keys_empty() const override { return extent.read().keys() == 0; }
+
   level_t level() const override { return extent.read().level(); }
   node_offset_t free_size() const override { return extent.read().free_size(); }
 
   std::optional<key_view_t> get_pivot_index() const override {
-    assert(!is_empty());
     if constexpr (NODE_TYPE == node_type_t::INTERNAL) {
       if (is_level_tail()) {
         return std::nullopt;
       }
     }
+    assert(!is_keys_empty());
     key_view_t pivot_index;
     get_largest_slot(nullptr, &pivot_index, nullptr);
     return {pivot_index};
@@ -113,7 +123,7 @@ class NodeLayoutT final : public InternalNodeImpl, public LeafNodeImpl {
     node_stats_t stats;
     auto& node_stage = extent.read();
     key_view_t index_key;
-    if (node_stage.keys()) {
+    if (!is_keys_empty()) {
       STAGE_T::get_stats(node_stage, stats, index_key);
     }
     stats.size_persistent = node_stage_t::EXTENT_SIZE;
@@ -139,7 +149,7 @@ class NodeLayoutT final : public InternalNodeImpl, public LeafNodeImpl {
        << "B, value=" << stats.size_value << "B";
     os << ":\n  header: " << node_stage_t::header_size() << "B";
     size_t size = 0u;
-    if (node_stage.keys()) {
+    if (!is_keys_empty()) {
       STAGE_T::dump(node_stage, os, "  ", size, p_start);
     } else {
       size += node_stage_t::header_size();
@@ -195,7 +205,7 @@ class NodeLayoutT final : public InternalNodeImpl, public LeafNodeImpl {
   void get_slot(const search_position_t& pos,
                 key_view_t* p_index_key = nullptr,
                 const value_t** pp_value = nullptr) const override {
-    assert(!is_empty());
+    assert(!is_keys_empty());
     assert(!pos.is_end());
     if (p_index_key && pp_value) {
       STAGE_T::template get_slot<true, true>(
@@ -214,7 +224,7 @@ class NodeLayoutT final : public InternalNodeImpl, public LeafNodeImpl {
   void get_next_slot(search_position_t& pos,
                      key_view_t* p_index_key = nullptr,
                      const value_t** pp_value = nullptr) const override {
-    assert(!is_empty());
+    assert(!is_keys_empty());
     assert(!pos.is_end());
     bool find_next;
     if (p_index_key && pp_value) {
@@ -234,10 +244,7 @@ class NodeLayoutT final : public InternalNodeImpl, public LeafNodeImpl {
   void get_largest_slot(search_position_t* p_pos = nullptr,
                         key_view_t* p_index_key = nullptr,
                         const value_t** pp_value = nullptr) const override {
-    assert(!is_empty());
-    if constexpr (NODE_TYPE == node_type_t::INTERNAL) {
-      assert(!is_level_tail());
-    }
+    assert(!is_keys_empty());
     if (p_pos && p_index_key && pp_value) {
       STAGE_T::template get_largest_slot<true, true, true>(
           extent.read(), &cast_down_fill_0<STAGE>(*p_pos), p_index_key, pp_value);
@@ -255,11 +262,12 @@ class NodeLayoutT final : public InternalNodeImpl, public LeafNodeImpl {
       key_view_t* index_key=nullptr, marker_t={}) const override {
     auto& node_stage = extent.read();
     if constexpr (NODE_TYPE == node_type_t::LEAF) {
-      if (unlikely(node_stage.keys() == 0)) {
+      if (unlikely(is_keys_empty())) {
         history.set<STAGE_LEFT>(MatchKindCMP::LT);
         return lookup_result_t<NODE_TYPE>::end();
       }
     }
+    assert(!is_keys_empty());
 
     typename STAGE_T::result_t result_raw;
     if (index_key) {
@@ -604,8 +612,9 @@ class NodeLayoutT final : public InternalNodeImpl, public LeafNodeImpl {
       auto& node_stage = extent.read();
       match_stage_t insert_stage;
       node_offset_t insert_size;
-      if (unlikely(!node_stage.keys())) {
+      if (unlikely(is_keys_empty())) {
         assert(insert_pos.is_end());
+        assert(is_level_tail());
         insert_stage = STAGE;
         insert_size = STAGE_T::template insert_size<KeyT::VIEW>(key, value);
       } else {
@@ -626,8 +635,9 @@ class NodeLayoutT final : public InternalNodeImpl, public LeafNodeImpl {
       const MatchHistory& history, match_stat_t mstat,
       search_position_t& insert_pos) const override {
     if constexpr (NODE_TYPE == node_type_t::LEAF) {
-      if (unlikely(is_empty())) {
+      if (unlikely(is_keys_empty())) {
         assert(insert_pos.is_end());
+        assert(is_level_tail());
         return {STAGE, STAGE_T::template insert_size<KeyT::HOBJ>(key, value)};
       } else {
         return STAGE_T::evaluate_insert(
index 09f3fc90bf5c392c2a82631f6f9b0a773906852a..d2bb0ebd730a5b7448d32b921731cf1886dbc166 100644 (file)
@@ -715,7 +715,9 @@ class DummyChildPool {
     level_t level() const override { return 0u; }
     void prepare_mutate(context_t) override {
       ceph_abort("impossible path"); }
-    bool is_empty() const override {
+    void validate_non_empty() const override {
+      ceph_abort("impossible path"); }
+    bool is_keys_empty() const override {
       ceph_abort("impossible path"); }
     node_offset_t free_size() const override {
       ceph_abort("impossible path"); }