]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/onode-staged-tree: implement an extensive Value framework
authorYingxin Cheng <yingxin.cheng@intel.com>
Fri, 15 Jan 2021 06:35:20 +0000 (14:35 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Fri, 22 Jan 2021 05:43:07 +0000 (13:43 +0800)
The introduced Value/ValueDeltaRecorder classes are designed for a
concrete Onode implementation that supports:
* User defined value layout;
* Locate and read the value payload stored in the leaf node;
* Modify the value payload with transaction;
* Value specific delta encode, decode and replay;
* Pin the according leaf node when the Value is alive;
* (interface only) Extend and trim the value payload in tree;

The goal is to decouple the dependencis between the follow-up
onode-attrs/omap/extentmap integrations and the onode-staged-tree
on-going implementations.

There is one limitation currently, that we cannot guarantee any
alignment of the value payload due to the unaligned node layouts and
unaligned split operations.

See src/test/crimson/seastore/onode_tree/test_value.h for an example
implementation which is implemented for test and benchmark purposes.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
23 files changed:
src/crimson/os/seastore/CMakeLists.txt
src/crimson/os/seastore/onode_manager/staged-fltree/fwd.h
src/crimson/os/seastore/onode_manager/staged-fltree/node.cc
src/crimson/os/seastore/onode_manager/staged-fltree/node.h
src/crimson/os/seastore/onode_manager/staged-fltree/node_delta_recorder.h
src/crimson/os/seastore/onode_manager/staged-fltree/node_extent_accessor.h
src/crimson/os/seastore/onode_manager/staged-fltree/node_extent_manager/seastore.cc
src/crimson/os/seastore/onode_manager/staged-fltree/node_extent_mutable.h
src/crimson/os/seastore/onode_manager/staged-fltree/node_impl.h
src/crimson/os/seastore/onode_manager/staged-fltree/node_layout.h
src/crimson/os/seastore/onode_manager/staged-fltree/node_types.h
src/crimson/os/seastore/onode_manager/staged-fltree/stages/node_stage.h
src/crimson/os/seastore/onode_manager/staged-fltree/stages/stage.h
src/crimson/os/seastore/onode_manager/staged-fltree/stages/stage_types.h
src/crimson/os/seastore/onode_manager/staged-fltree/stages/sub_items_stage.cc
src/crimson/os/seastore/onode_manager/staged-fltree/stages/sub_items_stage.h
src/crimson/os/seastore/onode_manager/staged-fltree/tree.h
src/crimson/os/seastore/onode_manager/staged-fltree/tree_types.h [deleted file]
src/crimson/os/seastore/onode_manager/staged-fltree/tree_utils.h
src/crimson/os/seastore/onode_manager/staged-fltree/value.cc [new file with mode: 0644]
src/crimson/os/seastore/onode_manager/staged-fltree/value.h [new file with mode: 0644]
src/test/crimson/seastore/onode_tree/test_staged_fltree.cc
src/test/crimson/seastore/onode_tree/test_value.h [new file with mode: 0644]

index 988aed2e19adebfc1244d6edd96667d142867b47..bdc51076a4ebc1a7a11ac2c4d4b9e7865689dff0 100644 (file)
@@ -25,6 +25,7 @@ add_library(crimson-seastore STATIC
   onode_manager/staged-fltree/stages/node_stage.cc
   onode_manager/staged-fltree/stages/sub_items_stage.cc
   onode_manager/staged-fltree/super.cc
+  onode_manager/staged-fltree/value.cc
   extentmap_manager.cc
   extentmap_manager/btree/extentmap_btree_node_impl.cc
   extentmap_manager/btree/btree_extentmap_manager.cc
index 4908c691f9122533f6a1737c02b991cd7bc1a869..ac31bcfd8cd000602dbd5da53ae0d66b7159f970 100644 (file)
@@ -7,6 +7,7 @@
 #include <cstring>
 #include <limits>
 #include <memory>
+#include <ostream>
 #include <string>
 
 #include "crimson/common/errorator.h"
@@ -28,12 +29,14 @@ class DeltaRecorder;
 class NodeExtent;
 class NodeExtentManager;
 class RootNodeTracker;
+struct ValueBuilder;
 using DeltaRecorderURef = std::unique_ptr<DeltaRecorder>;
 using NodeExtentRef = crimson::os::seastore::TCachedExtentRef<NodeExtent>;
 using NodeExtentManagerURef = std::unique_ptr<NodeExtentManager>;
 using RootNodeTrackerURef = std::unique_ptr<RootNodeTracker>;
 struct context_t {
   NodeExtentManager& nm;
+  const ValueBuilder& vb;
   Transaction& t;
 };
 
@@ -90,4 +93,71 @@ inline MatchKindCMP reverse(MatchKindCMP cmp) {
   }
 }
 
+struct tree_stats_t {
+  size_t size_persistent_leaf = 0;
+  size_t size_persistent_internal = 0;
+  size_t size_filled_leaf = 0;
+  size_t size_filled_internal = 0;
+  size_t size_logical_leaf = 0;
+  size_t size_logical_internal = 0;
+  size_t size_overhead_leaf = 0;
+  size_t size_overhead_internal = 0;
+  size_t size_value_leaf = 0;
+  size_t size_value_internal = 0;
+  unsigned num_kvs_leaf = 0;
+  unsigned num_kvs_internal = 0;
+  unsigned num_nodes_leaf = 0;
+  unsigned num_nodes_internal = 0;
+  unsigned height = 0;
+
+  size_t size_persistent() const {
+    return size_persistent_leaf + size_persistent_internal; }
+  size_t size_filled() const {
+    return size_filled_leaf + size_filled_internal; }
+  size_t size_logical() const {
+    return size_logical_leaf + size_logical_internal; }
+  size_t size_overhead() const {
+    return size_overhead_leaf + size_overhead_internal; }
+  size_t size_value() const {
+    return size_value_leaf + size_value_internal; }
+  unsigned num_kvs() const {
+    return num_kvs_leaf + num_kvs_internal; }
+  unsigned num_nodes() const {
+    return num_nodes_leaf + num_nodes_internal; }
+
+  double ratio_fullness() const {
+    return (double)size_filled() / size_persistent(); }
+  double ratio_key_compression() const {
+    return (double)(size_filled() - size_value()) / (size_logical() - size_value()); }
+  double ratio_overhead() const {
+    return (double)size_overhead() / size_filled(); }
+  double ratio_keys_leaf() const {
+    return (double)num_kvs_leaf / num_kvs(); }
+  double ratio_nodes_leaf() const {
+    return (double)num_nodes_leaf / num_nodes(); }
+  double ratio_filled_leaf() const {
+    return (double)size_filled_leaf / size_filled(); }
+};
+inline std::ostream& operator<<(std::ostream& os, const tree_stats_t& stats) {
+  os << "Tree stats:"
+     << "\n  height = " << stats.height
+     << "\n  num values = " << stats.num_kvs_leaf
+     << "\n  num nodes  = " << stats.num_nodes()
+     << " (leaf=" << stats.num_nodes_leaf
+     << ", internal=" << stats.num_nodes_internal << ")"
+     << "\n  size persistent = " << stats.size_persistent() << "B"
+     << "\n  size filled     = " << stats.size_filled() << "B"
+     << " (value=" << stats.size_value_leaf << "B"
+     << ", rest=" << stats.size_filled() - stats.size_value_leaf << "B)"
+     << "\n  size logical    = " << stats.size_logical() << "B"
+     << "\n  size overhead   = " << stats.size_overhead() << "B"
+     << "\n  ratio fullness  = " << stats.ratio_fullness()
+     << "\n  ratio keys leaf = " << stats.ratio_keys_leaf()
+     << "\n  ratio nodes leaf  = " << stats.ratio_nodes_leaf()
+     << "\n  ratio filled leaf = " << stats.ratio_filled_leaf()
+     << "\n  ratio key compression = " << stats.ratio_key_compression();
+  assert(stats.num_kvs_internal + 1 == stats.num_nodes());
+  return os;
+}
+
 }
index 3926b0f30b11644e623504fd0a83ab416d096ed3..5e13fe72cf5e1a7db92e9bcba2ab20d977e05476 100644 (file)
@@ -37,10 +37,10 @@ tree_cursor_t::tree_cursor_t(Ref<LeafNode> node, const search_position_t& pos)
 
 tree_cursor_t::tree_cursor_t(
     Ref<LeafNode> node, const search_position_t& pos,
-    const key_view_t& key_view, const onode_t* p_value)
+    const key_view_t& key_view, const value_header_t* p_value_header)
       : ref_leaf_node{node}, position{pos} {
   assert(!is_end());
-  update_cache(*node, key_view, p_value);
+  update_cache(*node, key_view, p_value_header);
   ref_leaf_node->do_track_cursor<true>(*this);
 }
 
@@ -56,6 +56,14 @@ tree_cursor_t::~tree_cursor_t() {
   }
 }
 
+node_future<> tree_cursor_t::extend_value(context_t c, value_size_t extend_size) {
+  return ref_leaf_node->extend_value(c, position, extend_size);
+}
+
+node_future<> tree_cursor_t::trim_value(context_t c, value_size_t trim_size) {
+  return ref_leaf_node->trim_value(c, position, trim_size);
+}
+
 template <bool VALIDATE>
 void tree_cursor_t::update_track(
     Ref<LeafNode> node, const search_position_t& pos) {
@@ -73,18 +81,23 @@ template void tree_cursor_t::update_track<false>(Ref<LeafNode>, const search_pos
 
 void tree_cursor_t::update_cache(LeafNode& node,
                                  const key_view_t& key_view,
-                                 const onode_t* p_value) const {
+                                 const value_header_t* p_value_header) const {
   assert(!is_end());
   assert(ref_leaf_node.get() == &node);
-  cache.update(node, key_view, p_value);
+  cache.update(node, key_view, p_value_header);
   cache.validate_is_latest(node, position);
 }
 
-void tree_cursor_t::maybe_update_cache() const {
+void tree_cursor_t::maybe_update_cache(value_magic_t magic) const {
   assert(!is_end());
   if (!cache.is_latest()) {
-    auto [key_view, p_value] = ref_leaf_node->get_kv(position);
-    cache.update(*ref_leaf_node, key_view, p_value);
+    auto [key_view, p_value_header] = ref_leaf_node->get_kv(position);
+    if (p_value_header->magic != magic) {
+      logger().error("OTree::Value::Load: magic mismatch, expect {} but got {}",
+                     magic, p_value_header->magic);
+      ceph_abort();
+    }
+    cache.update(*ref_leaf_node, key_view, p_value_header);
   }
   cache.validate_is_latest(*ref_leaf_node, position);
 }
@@ -97,12 +110,14 @@ bool tree_cursor_t::Cache::is_latest() const {
 
 void tree_cursor_t::Cache::update(LeafNode& node,
                                   const key_view_t& _key_view,
-                                  const onode_t* _p_value) {
-  assert(_p_value);
+                                  const value_header_t* _p_value_header) {
+  assert(_p_value_header);
   p_leaf_node = &node;
   version = node.get_layout_version();
   key_view = _key_view;
-  p_value = _p_value;
+  p_value_header = _p_value_header;
+  value_payload_mut.reset();
+  p_value_recorder = nullptr;
   valid = true;
   assert(is_latest());
 }
@@ -112,12 +127,25 @@ void tree_cursor_t::Cache::validate_is_latest(const LeafNode& node,
   assert(p_leaf_node == &node);
   assert(is_latest());
 #ifndef NDEBUG
-  auto [_key_view, _p_value] = node.get_kv(pos);
+  auto [_key_view, _p_value_header] = node.get_kv(pos);
   assert(*key_view == _key_view);
-  assert(p_value == _p_value);
+  assert(p_value_header == _p_value_header);
 #endif
 }
 
+std::pair<NodeExtentMutable&, ValueDeltaRecorder*>
+tree_cursor_t::Cache::prepare_mutate_value_payload(context_t c) {
+  assert(is_latest());
+  assert(p_leaf_node && p_value_header);
+  assert(p_value_header->magic == c.vb.get_header_magic());
+  if (!value_payload_mut.has_value()) {
+    auto value_mutable = p_leaf_node->prepare_mutate_value_payload(c);
+    value_payload_mut = p_value_header->get_payload_mutable(value_mutable.first);
+    p_value_recorder = value_mutable.second;
+  }
+  return {*value_payload_mut, p_value_recorder};
+}
+
 /*
  * Node
  */
@@ -147,18 +175,18 @@ node_future<Node::search_result_t> Node::lower_bound(
 }
 
 node_future<std::pair<Ref<tree_cursor_t>, bool>> Node::insert(
-    context_t c, const key_hobj_t& key, const onode_t& value) {
+    context_t c, const key_hobj_t& key, value_config_t vconf) {
   return seastar::do_with(
-    MatchHistory(), [this, c, &key, &value](auto& history) {
+    MatchHistory(), [this, c, &key, vconf](auto& history) {
       return lower_bound_tracked(c, key, history
-      ).safe_then([c, &key, &value, &history](auto result) {
+      ).safe_then([c, &key, vconf, &history](auto result) {
         if (result.match() == MatchKindBS::EQ) {
           return node_ertr::make_ready_future<std::pair<Ref<tree_cursor_t>, bool>>(
               std::make_pair(result.p_cursor, false));
         } else {
           auto leaf_node = result.p_cursor->get_leaf_node();
           return leaf_node->insert_value(
-              c, key, value, result.p_cursor->get_position(), history, result.mstat
+              c, key, vconf, result.p_cursor->get_position(), history, result.mstat
           ).safe_then([](auto p_cursor) {
             return node_ertr::make_ready_future<std::pair<Ref<tree_cursor_t>, bool>>(
                 std::make_pair(p_cursor, true));
@@ -588,11 +616,28 @@ bool LeafNode::is_level_tail() const {
   return impl->is_level_tail();
 }
 
-std::tuple<key_view_t, const onode_t*>
+std::tuple<key_view_t, const value_header_t*>
 LeafNode::get_kv(const search_position_t& pos) const {
   key_view_t key_view;
-  auto p_value = impl->get_p_value(pos, &key_view);
-  return {key_view, p_value};
+  auto p_value_header = impl->get_p_value(pos, &key_view);
+  return {key_view, p_value_header};
+}
+
+node_future<> LeafNode::extend_value(
+    context_t c, const search_position_t& pos, value_size_t extend_size) {
+  ceph_abort("not implemented");
+  return node_ertr::now();
+}
+
+node_future<> LeafNode::trim_value(
+    context_t c, const search_position_t& pos, value_size_t trim_size) {
+  ceph_abort("not implemented");
+  return node_ertr::now();
+}
+
+std::pair<NodeExtentMutable&, ValueDeltaRecorder*>
+LeafNode::prepare_mutate_value_payload(context_t c) {
+  return impl->prepare_mutate_value_payload(c);
 }
 
 node_future<Ref<tree_cursor_t>>
@@ -604,9 +649,9 @@ LeafNode::lookup_smallest(context_t) {
   }
   auto pos = search_position_t::begin();
   key_view_t index_key;
-  auto p_value = impl->get_p_value(pos, &index_key);
+  auto p_value_header = impl->get_p_value(pos, &index_key);
   return node_ertr::make_ready_future<Ref<tree_cursor_t>>(
-      get_or_track_cursor(pos, index_key, p_value));
+      get_or_track_cursor(pos, index_key, p_value_header));
 }
 
 node_future<Ref<tree_cursor_t>>
@@ -617,11 +662,11 @@ LeafNode::lookup_largest(context_t) {
         new tree_cursor_t(this));
   }
   search_position_t pos;
-  const onode_t* p_value = nullptr;
+  const value_header_t* p_value_header = nullptr;
   key_view_t index_key;
-  impl->get_largest_slot(pos, index_key, &p_value);
+  impl->get_largest_slot(pos, index_key, &p_value_header);
   return node_ertr::make_ready_future<Ref<tree_cursor_t>>(
-      get_or_track_cursor(pos, index_key, p_value));
+      get_or_track_cursor(pos, index_key, p_value_header));
 }
 
 node_future<Node::search_result_t>
@@ -670,7 +715,7 @@ node_future<> LeafNode::test_clone_root(
 }
 
 node_future<Ref<tree_cursor_t>> LeafNode::insert_value(
-    context_t c, const key_hobj_t& key, const onode_t& value,
+    context_t c, const key_hobj_t& key, value_config_t vconf,
     const search_position_t& pos, const MatchHistory& history,
     match_stat_t mstat) {
 #ifndef NDEBUG
@@ -680,20 +725,20 @@ node_future<Ref<tree_cursor_t>> LeafNode::insert_value(
 #endif
   logger().debug("OTree::Leaf::Insert: "
                  "pos({}), {}, {}, {}, mstat({}) ...",
-                 pos, key, value, history, mstat);
+                 pos, key, vconf, history, mstat);
   search_position_t insert_pos = pos;
   auto [insert_stage, insert_size] = impl->evaluate_insert(
-      key, value, history, mstat, insert_pos);
+      key, vconf, history, mstat, insert_pos);
   auto free_size = impl->free_size();
   if (free_size >= insert_size) {
     // insert
     on_layout_change();
     impl->prepare_mutate(c);
-    auto p_value = impl->insert(key, value, insert_pos, insert_stage, insert_size);
+    auto p_value_header = impl->insert(key, vconf, insert_pos, insert_stage, insert_size);
     assert(impl->free_size() == free_size - insert_size);
     assert(insert_pos <= pos);
-    assert(p_value->size == value.size);
-    auto ret = track_insert(insert_pos, insert_stage, p_value);
+    assert(p_value_header->payload_size == vconf.payload_size);
+    auto ret = track_insert(insert_pos, insert_stage, p_value_header);
     validate_tracked_cursors();
     return node_ertr::make_ready_future<Ref<tree_cursor_t>>(ret);
   }
@@ -702,22 +747,22 @@ node_future<Ref<tree_cursor_t>> LeafNode::insert_value(
   return (is_root() ? upgrade_root(c) : node_ertr::now()
   ).safe_then([this, c] {
     return LeafNode::allocate(c, impl->field_type(), impl->is_level_tail());
-  }).safe_then([this_ref, this, c, &key, &value,
+  }).safe_then([this_ref, this, c, &key, vconf,
                 insert_pos, insert_stage=insert_stage, insert_size=insert_size](auto fresh_right) mutable {
     auto right_node = fresh_right.node;
     // no need to bump version for right node, as it is fresh
     on_layout_change();
     impl->prepare_mutate(c);
-    auto [split_pos, is_insert_left, p_value] = impl->split_insert(
-        fresh_right.mut, *right_node->impl, key, value,
+    auto [split_pos, is_insert_left, p_value_header] = impl->split_insert(
+        fresh_right.mut, *right_node->impl, key, vconf,
         insert_pos, insert_stage, insert_size);
-    assert(p_value->size == value.size);
+    assert(p_value_header->payload_size == vconf.payload_size);
     track_split(split_pos, right_node);
     Ref<tree_cursor_t> ret;
     if (is_insert_left) {
-      ret = track_insert(insert_pos, insert_stage, p_value);
+      ret = track_insert(insert_pos, insert_stage, p_value_header);
     } else {
-      ret = right_node->track_insert(insert_pos, insert_stage, p_value);
+      ret = right_node->track_insert(insert_pos, insert_stage, p_value_header);
     }
     validate_tracked_cursors();
     right_node->validate_tracked_cursors();
@@ -746,18 +791,18 @@ node_future<Ref<LeafNode>> LeafNode::allocate_root(
 
 Ref<tree_cursor_t> LeafNode::get_or_track_cursor(
     const search_position_t& position,
-    const key_view_t& key, const onode_t* p_value) {
+    const key_view_t& key, const value_header_t* p_value_header) {
   assert(!position.is_end());
-  assert(p_value);
+  assert(p_value_header);
   Ref<tree_cursor_t> p_cursor;
   auto found = tracked_cursors.find(position);
   if (found == tracked_cursors.end()) {
-    p_cursor = new tree_cursor_t(this, position, key, p_value);
+    p_cursor = new tree_cursor_t(this, position, key, p_value_header);
   } else {
     p_cursor = found->second;
     assert(p_cursor->get_leaf_node() == this);
     assert(p_cursor->get_position() == position);
-    p_cursor->update_cache(*this, key, p_value);
+    p_cursor->update_cache(*this, key, p_value_header);
   }
   return p_cursor;
 }
@@ -766,15 +811,16 @@ void LeafNode::validate_cursor(tree_cursor_t& cursor) const {
 #ifndef NDEBUG
   assert(this == cursor.get_leaf_node().get());
   assert(!cursor.is_end());
-  auto [key, p_value] = get_kv(cursor.get_position());
-  assert(key == cursor.get_key_view());
-  assert(p_value == cursor.get_p_value());
+  auto [key, p_value_header] = get_kv(cursor.get_position());
+  auto magic = p_value_header->magic;
+  assert(key == cursor.get_key_view(magic));
+  assert(p_value_header == cursor.read_value_header(magic));
 #endif
 }
 
 Ref<tree_cursor_t> LeafNode::track_insert(
     const search_position_t& insert_pos, match_stage_t insert_stage,
-    const onode_t* p_onode) {
+    const value_header_t* p_value_header) {
   // update cursor position
   auto pos_upper_bound = insert_pos;
   pos_upper_bound.index_by_stage(insert_stage) = INDEX_UPPER_BOUND;
index f508ff63851724bcbb03fe1c3050ddf5e7b5e519..4384e15d8b8c5eda43252672990b6d9bf9e01ff1 100644 (file)
@@ -14,7 +14,7 @@
 #include "stages/key_layout.h"
 #include "stages/stage_types.h"
 #include "super.h"
-#include "tree_types.h"
+#include "value.h"
 
 /**
  * Tree example (2 levels):
@@ -62,6 +62,14 @@ class tree_cursor_t final
   : public boost::intrusive_ref_counter<
            tree_cursor_t, boost::thread_unsafe_counter> {
  public:
+  using ertr = crimson::errorator<
+    crimson::ct_error::input_output_error,
+    crimson::ct_error::invarg,
+    crimson::ct_error::enoent,
+    crimson::ct_error::erange>;
+  template <class ValueT=void>
+  using future = ertr::future<ValueT>;
+
   ~tree_cursor_t();
   tree_cursor_t(const tree_cursor_t&) = delete;
   tree_cursor_t(tree_cursor_t&&) = delete;
@@ -80,21 +88,36 @@ class tree_cursor_t final
   bool is_end() const { return position.is_end(); }
 
   /// Returns the key view in tree if it is not an end cursor.
-  const key_view_t& get_key_view() const {
-    maybe_update_cache();
+  const key_view_t& get_key_view(value_magic_t magic) const {
+    maybe_update_cache(magic);
     return cache.get_key_view();
   }
 
-  /// Returns the value pointer in tree if it is not an end cursor.
-  const onode_t* get_p_value() const {
-    maybe_update_cache();
-    return cache.get_p_value();
+  // public to Value
+
+  /// Get the latest value_header_t pointer for read.
+  const value_header_t* read_value_header(value_magic_t magic) const {
+    maybe_update_cache(magic);
+    return cache.get_p_value_header();
   }
 
+  /// Prepare the node extent to be mutable and recorded.
+  std::pair<NodeExtentMutable&, ValueDeltaRecorder*>
+  prepare_mutate_value_payload(context_t c) {
+    maybe_update_cache(c.vb.get_header_magic());
+    return cache.prepare_mutate_value_payload(c);
+  }
+
+  /// Extends the size of value payload.
+  future<> extend_value(context_t, value_size_t);
+
+  /// Trim and shrink the value payload.
+  future<> trim_value(context_t, value_size_t);
+
  private:
   tree_cursor_t(Ref<LeafNode>, const search_position_t&);
   tree_cursor_t(Ref<LeafNode>, const search_position_t&,
-                const key_view_t&, const onode_t*);
+                const key_view_t&, const value_header_t*);
   // lookup reaches the end, contain leaf node for further insert
   tree_cursor_t(Ref<LeafNode>);
 
@@ -102,8 +125,8 @@ class tree_cursor_t final
   Ref<LeafNode> get_leaf_node() { return ref_leaf_node; }
   template <bool VALIDATE>
   void update_track(Ref<LeafNode>, const search_position_t&);
-  void update_cache(LeafNode&, const key_view_t&, const onode_t*) const;
-  void maybe_update_cache() const;
+  void update_cache(LeafNode&, const key_view_t&, const value_header_t*) const;
+  void maybe_update_cache(value_magic_t magic) const;
 
   /**
    * Reversed resource management (tree_cursor_t)
@@ -124,7 +147,7 @@ class tree_cursor_t final
     Cache();
     bool is_latest() const;
     void invalidate() { valid = false; }
-    void update(LeafNode&, const key_view_t&, const onode_t*);
+    void update(LeafNode&, const key_view_t&, const value_header_t*);
     void validate_is_latest(const LeafNode&, const search_position_t&) const;
 
     const key_view_t& get_key_view() const {
@@ -132,16 +155,23 @@ class tree_cursor_t final
       assert(key_view.has_value());
       return *key_view;
     }
-    const onode_t* get_p_value() const {
+    const value_header_t* get_p_value_header() const {
       assert(is_latest());
-      assert(p_value);
-      return p_value;
+      assert(p_value_header);
+      return p_value_header;
     }
+    std::pair<NodeExtentMutable&, ValueDeltaRecorder*>
+    prepare_mutate_value_payload(context_t);
 
    private:
     LeafNode* p_leaf_node = nullptr;
     std::optional<key_view_t> key_view;
-    const onode_t* p_value = nullptr;
+    const value_header_t* p_value_header = nullptr;
+
+    // to update value payload
+    std::optional<NodeExtentMutable> value_payload_mut;
+    ValueDeltaRecorder* p_value_recorder = nullptr;
+
     layout_version_t version;
     bool valid = false;
   };
@@ -239,7 +269,7 @@ class Node
    * - If false, the returned cursor points to the conflicting element in tree;
    */
   node_future<std::pair<Ref<tree_cursor_t>, bool>> insert(
-      context_t, const key_hobj_t&, const onode_t&);
+      context_t, const key_hobj_t&, value_config_t);
 
   /// Recursively collects the statistics of the sub-tree formed by this node
   node_future<tree_stats_t> get_tree_stats(context_t);
@@ -428,7 +458,7 @@ class LeafNode final : public Node {
 
   bool is_level_tail() const;
   layout_version_t get_layout_version() const { return layout_version; }
-  std::tuple<key_view_t, const onode_t*> get_kv(const search_position_t&) const;
+  std::tuple<key_view_t, const value_header_t*> get_kv(const search_position_t&) const;
 
   template <bool VALIDATE>
   void do_track_cursor(tree_cursor_t& cursor) {
@@ -447,6 +477,12 @@ class LeafNode final : public Node {
     assert(removed);
   }
 
+  node_future<> extend_value(context_t, const search_position_t&, value_size_t);
+  node_future<> trim_value(context_t, const search_position_t&, value_size_t);
+
+  std::pair<NodeExtentMutable&, ValueDeltaRecorder*>
+  prepare_mutate_value_payload(context_t);
+
  protected:
   node_future<Ref<tree_cursor_t>> lookup_smallest(context_t) override;
   node_future<Ref<tree_cursor_t>> lookup_largest(context_t) override;
@@ -459,7 +495,7 @@ class LeafNode final : public Node {
  private:
   LeafNode(LeafNodeImpl*, NodeImplURef&&);
   node_future<Ref<tree_cursor_t>> insert_value(
-      context_t, const key_hobj_t&, const onode_t&,
+      context_t, const key_hobj_t&, value_config_t,
       const search_position_t&, const MatchHistory&,
       match_stat_t mstat);
   static node_future<Ref<LeafNode>> allocate_root(context_t, RootNodeTracker&);
@@ -469,9 +505,9 @@ class LeafNode final : public Node {
   // XXX: extract a common tracker for InternalNode to track Node,
   // and LeafNode to track tree_cursor_t.
   Ref<tree_cursor_t> get_or_track_cursor(
-      const search_position_t&, const key_view_t&, const onode_t*);
+      const search_position_t&, const key_view_t&, const value_header_t*);
   Ref<tree_cursor_t> track_insert(
-      const search_position_t&, match_stage_t, const onode_t*);
+      const search_position_t&, match_stage_t, const value_header_t*);
   void track_split(const search_position_t&, Ref<LeafNode>);
   void validate_tracked_cursors() const {
 #ifndef NDEBUG
index 4d323c05ce9a5cfe569f0385d649c24eb1d250c1..63aa16c63520f8259a9d66719f06bfe4e8506db7 100644 (file)
@@ -5,6 +5,7 @@
 
 #include "include/buffer.h"
 #include "node_types.h"
+#include "value.h"
 
 namespace crimson::os::seastore::onode {
 
@@ -29,6 +30,11 @@ class DeltaRecorder {
     return std::move(encoded);
   }
 
+  ValueDeltaRecorder* get_value_recorder() const {
+    assert(value_recorder);
+    return value_recorder.get();
+  }
+
   virtual node_type_t node_type() const = 0;
   virtual field_type_t field_type() const = 0;
   virtual void apply_delta(ceph::bufferlist::const_iterator&,
@@ -37,7 +43,11 @@ class DeltaRecorder {
 
  protected:
   DeltaRecorder() = default;
+  DeltaRecorder(const ValueBuilder& vb)
+    : value_recorder{vb.build_value_recorder(encoded)} {}
+
   ceph::bufferlist encoded;
+  std::unique_ptr<ValueDeltaRecorder> value_recorder;
 };
 
 }
index f22de27b2295d9ee9e9b99e983e60141db57ea44..29b13b9ea715b687d6483356f98de1d32be8de90 100644 (file)
@@ -7,6 +7,7 @@
 #include "node_extent_manager.h"
 #include "node_delta_recorder.h"
 #include "node_layout_replayable.h"
+#include "value.h"
 
 #ifndef NDEBUG
 #include "node_extent_manager/test_replay.h"
@@ -83,12 +84,17 @@ class DeltaRecorderT final: public DeltaRecorder {
     ceph::encode(static_cast<node_offset_t>(node_offset), encoded);
   }
 
-  static DeltaRecorderURef create() {
+  static DeltaRecorderURef create_for_encode(const ValueBuilder& v_builder) {
+    return std::unique_ptr<DeltaRecorder>(new DeltaRecorderT(v_builder));
+  }
+
+  static DeltaRecorderURef create_for_replay() {
     return std::unique_ptr<DeltaRecorder>(new DeltaRecorderT());
   }
 
  protected:
-  DeltaRecorderT() = default;
+  DeltaRecorderT() : DeltaRecorder() {}
+  DeltaRecorderT(const ValueBuilder& vb) : DeltaRecorder(vb) {}
   node_type_t node_type() const override { return NODE_TYPE; }
   field_type_t field_type() const override { return FIELD_TYPE; }
   void apply_delta(ceph::bufferlist::const_iterator& delta,
@@ -103,11 +109,7 @@ class DeltaRecorderT final: public DeltaRecorder {
       case node_delta_op_t::INSERT: {
         logger().debug("OTree::Extent::Replay: decoding INSERT ...");
         auto key = key_hobj_t::decode(delta);
-
-        std::unique_ptr<char[]> value_storage_heap;
-        value_input_t value_storage_stack;
-        auto p_value = decode_value(delta, value_storage_heap, value_storage_stack);
-
+        auto value = decode_value(delta);
         auto insert_pos = position_t::decode(delta);
         match_stage_t insert_stage;
         ceph::decode(insert_stage, delta);
@@ -115,9 +117,9 @@ class DeltaRecorderT final: public DeltaRecorder {
         ceph::decode(insert_size, delta);
         logger().debug("OTree::Extent::Replay: apply {}, {}, "
                        "insert_pos({}), insert_stage={}, insert_size={}B ...",
-                       key, *p_value, insert_pos, insert_stage, insert_size);
+                       key, value, insert_pos, insert_stage, insert_size);
         layout_t::template insert<KeyT::HOBJ>(
-          node, stage, key, *p_value, insert_pos, insert_stage, insert_size);
+          node, stage, key, value, insert_pos, insert_stage, insert_size);
         break;
       }
       case node_delta_op_t::SPLIT: {
@@ -131,11 +133,7 @@ class DeltaRecorderT final: public DeltaRecorder {
         logger().debug("OTree::Extent::Replay: decoding SPLIT_INSERT ...");
         auto split_at = StagedIterator::decode(stage.p_start(), delta);
         auto key = key_hobj_t::decode(delta);
-
-        std::unique_ptr<char[]> value_storage_heap;
-        value_input_t value_storage_stack;
-        auto p_value = decode_value(delta, value_storage_heap, value_storage_stack);
-
+        auto value = decode_value(delta);
         auto insert_pos = position_t::decode(delta);
         match_stage_t insert_stage;
         ceph::decode(insert_stage, delta);
@@ -143,9 +141,9 @@ class DeltaRecorderT final: public DeltaRecorder {
         ceph::decode(insert_size, delta);
         logger().debug("OTree::Extent::Replay: apply split_at={}, {}, {}, "
                        "insert_pos({}), insert_stage={}, insert_size={}B ...",
-                       split_at, key, *p_value, insert_pos, insert_stage, insert_size);
+                       split_at, key, value, insert_pos, insert_stage, insert_size);
         layout_t::template split_insert<KeyT::HOBJ>(
-          node, stage, split_at, key, *p_value, insert_pos, insert_stage, insert_size);
+          node, stage, split_at, key, value, insert_pos, insert_stage, insert_size);
         break;
       }
       case node_delta_op_t::UPDATE_CHILD_ADDR: {
@@ -161,6 +159,20 @@ class DeltaRecorderT final: public DeltaRecorder {
         layout_t::update_child_addr(node, new_addr, p_addr);
         break;
       }
+      case node_delta_op_t::SUBOP_UPDATE_VALUE: {
+        logger().debug("OTree::Extent::Replay: decoding SUBOP_UPDATE_VALUE ...");
+        node_offset_t value_header_offset;
+        ceph::decode(value_header_offset, delta);
+        auto p_header = node.get_read() + value_header_offset;
+        auto p_header_ = reinterpret_cast<const value_header_t*>(p_header);
+        logger().debug("OTree::Extent::Replay: update {} at {:#x} ...",
+                       *p_header_, value_header_offset);
+        auto payload_mut = p_header_->get_payload_mutable(node);
+        auto value_addr = node_laddr + payload_mut.get_node_offset();
+        get_value_replayer(p_header_->magic)->apply_value_delta(
+            delta, payload_mut, value_addr);
+        break;
+      }
       default:
         logger().error("OTree::Extent::Replay: got unknown op {} when replay {:#x}",
                        op, node_laddr);
@@ -174,11 +186,30 @@ class DeltaRecorderT final: public DeltaRecorder {
   }
 
  private:
-  static void encode_value(const value_input_t& value, ceph::bufferlist& encoded) {
+  ValueDeltaRecorder* get_value_replayer(value_magic_t magic) {
+    // Replay procedure is independent of Btree and happens at lower level in
+    // seastore. There is no ValueBuilder so the recoder needs to build the
+    // ValueDeltaRecorder by itself.
+    if (value_replayer) {
+      if (value_replayer->get_header_magic() != magic) {
+        ceph_abort_msgf("OTree::Extent::Replay: value magic mismatch %x != %x",
+                        value_replayer->get_header_magic(), magic);
+      }
+    } else {
+      value_replayer = build_value_recorder_by_type(encoded, magic);
+      if (!value_replayer) {
+        ceph_abort_msgf("OTree::Extent::Replay: got unexpected value magic = %x",
+                        magic);
+      }
+    }
+    return value_replayer.get();
+  }
+
+  void encode_value(const value_input_t& value, ceph::bufferlist& encoded) const {
     if constexpr (std::is_same_v<value_input_t, laddr_t>) {
       // NODE_TYPE == node_type_t::INTERNAL
       ceph::encode(value, encoded);
-    } else if constexpr (std::is_same_v<value_input_t, onode_t>) {
+    } else if constexpr (std::is_same_v<value_input_t, value_config_t>) {
       // NODE_TYPE == node_type_t::LEAF
       value.encode(encoded);
     } else {
@@ -186,20 +217,15 @@ class DeltaRecorderT final: public DeltaRecorder {
     }
   }
 
-  static value_input_t* decode_value(ceph::bufferlist::const_iterator& delta,
-                                     std::unique_ptr<char[]>& value_storage_heap,
-                                     value_input_t& value_storage_stack) {
+  value_input_t decode_value(ceph::bufferlist::const_iterator& delta) const {
     if constexpr (std::is_same_v<value_input_t, laddr_t>) {
       // NODE_TYPE == node_type_t::INTERNAL
       laddr_t value;
       ceph::decode(value, delta);
-      value_storage_stack = value;
-      return &value_storage_stack;
-    } else if constexpr (std::is_same_v<value_input_t, onode_t>) {
+      return value;
+    } else if constexpr (std::is_same_v<value_input_t, value_config_t>) {
       // NODE_TYPE == node_type_t::LEAF
-      auto value_config = onode_t::decode(delta);
-      value_storage_heap = onode_t::allocate(value_config);
-      return reinterpret_cast<onode_t*>(value_storage_heap.get());
+      return value_config_t::decode(delta);
     } else {
       ceph_abort("impossible path");
     }
@@ -208,6 +234,8 @@ class DeltaRecorderT final: public DeltaRecorder {
   static seastar::logger& logger() {
     return crimson::get_logger(ceph_subsys_filestore);
   }
+
+  std::unique_ptr<ValueDeltaRecorder> value_replayer;
 };
 
 /**
@@ -252,7 +280,7 @@ class NodeExtentAccessorT {
       ceph_abort("impossible path");
     }
 #ifndef NDEBUG
-    auto ref_recorder = recorder_t::create();
+    auto ref_recorder = recorder_t::create_for_replay();
     test_recorder = static_cast<recorder_t*>(ref_recorder.get());
     test_extent = TestReplayExtent::create(
         extent->get_length(), std::move(ref_recorder));
@@ -271,7 +299,7 @@ class NodeExtentAccessorT {
   // for the safety of mixed read and mutate, call before read.
   void prepare_mutate(context_t c) {
     if (needs_mutate()) {
-      auto ref_recorder = recorder_t::create();
+      auto ref_recorder = recorder_t::create_for_encode(c.vb);
       recorder = static_cast<recorder_t*>(ref_recorder.get());
       extent = extent->mutate(c, std::move(ref_recorder));
       assert(needs_recording());
@@ -368,6 +396,16 @@ class NodeExtentAccessorT {
 #endif
   }
 
+  std::pair<NodeExtentMutable&, ValueDeltaRecorder*>
+  prepare_mutate_value_payload(context_t c) {
+    prepare_mutate(c);
+    ValueDeltaRecorder* p_value_recorder = nullptr;
+    if (needs_recording()) {
+      p_value_recorder = recorder->get_value_recorder();
+    }
+    return {*mut, p_value_recorder};
+  }
+
   void test_copy_to(NodeExtentMutable& to) const {
     assert(extent->get_length() == to.get_length());
     std::memcpy(to.get_write(), extent->get_read(), extent->get_length());
index c3d9ba0c45b7f1d5c48fbf8bd3ca7adf120534e9..dc9333295b7799bdf0b2a1bb1839fb89c1bb275c 100644 (file)
@@ -16,29 +16,29 @@ seastar::logger& logger() {
 
 namespace crimson::os::seastore::onode {
 
-static DeltaRecorderURef create_recorder(
+static DeltaRecorderURef create_replay_recorder(
     node_type_t node_type, field_type_t field_type) {
   if (node_type == node_type_t::LEAF) {
     if (field_type == field_type_t::N0) {
-      return DeltaRecorderT<node_fields_0_t, node_type_t::LEAF>::create();
+      return DeltaRecorderT<node_fields_0_t, node_type_t::LEAF>::create_for_replay();
     } else if (field_type == field_type_t::N1) {
-      return DeltaRecorderT<node_fields_1_t, node_type_t::LEAF>::create();
+      return DeltaRecorderT<node_fields_1_t, node_type_t::LEAF>::create_for_replay();
     } else if (field_type == field_type_t::N2) {
-      return DeltaRecorderT<node_fields_2_t, node_type_t::LEAF>::create();
+      return DeltaRecorderT<node_fields_2_t, node_type_t::LEAF>::create_for_replay();
     } else if (field_type == field_type_t::N3) {
-      return DeltaRecorderT<leaf_fields_3_t, node_type_t::LEAF>::create();
+      return DeltaRecorderT<leaf_fields_3_t, node_type_t::LEAF>::create_for_replay();
     } else {
       ceph_abort("impossible path");
     }
   } else if (node_type == node_type_t::INTERNAL) {
     if (field_type == field_type_t::N0) {
-      return DeltaRecorderT<node_fields_0_t, node_type_t::INTERNAL>::create();
+      return DeltaRecorderT<node_fields_0_t, node_type_t::INTERNAL>::create_for_replay();
     } else if (field_type == field_type_t::N1) {
-      return DeltaRecorderT<node_fields_1_t, node_type_t::INTERNAL>::create();
+      return DeltaRecorderT<node_fields_1_t, node_type_t::INTERNAL>::create_for_replay();
     } else if (field_type == field_type_t::N2) {
-      return DeltaRecorderT<node_fields_2_t, node_type_t::INTERNAL>::create();
+      return DeltaRecorderT<node_fields_2_t, node_type_t::INTERNAL>::create_for_replay();
     } else if (field_type == field_type_t::N3) {
-      return DeltaRecorderT<internal_fields_3_t, node_type_t::INTERNAL>::create();
+      return DeltaRecorderT<internal_fields_3_t, node_type_t::INTERNAL>::create_for_replay();
     } else {
       ceph_abort("impossible path");
     }
@@ -60,6 +60,8 @@ NodeExtentRef SeastoreNodeExtent::mutate(
   auto nm = static_cast<SeastoreNodeExtentManager*>(&c.nm);
   auto extent = nm->get_tm().get_mutable_extent(c.t, this);
   auto ret = extent->cast<SeastoreNodeExtent>();
+  // A replayed extent may already have an empty recorder, we discard it for
+  // simplicity.
   assert(!ret->recorder || ret->recorder->is_empty());
   ret->recorder = std::move(_recorder);
   return ret;
@@ -69,7 +71,7 @@ void SeastoreNodeExtent::apply_delta(const ceph::bufferlist& bl) {
   logger().debug("OTree::Seastore: replay {:#x} ...", get_laddr());
   if (!recorder) {
     auto [node_type, field_type] = get_types();
-    recorder = create_recorder(node_type, field_type);
+    recorder = create_replay_recorder(node_type, field_type);
   } else {
 #ifndef NDEBUG
     auto [node_type, field_type] = get_types();
index 0a55020832e38bf611dd2ca7c8ba24488164cbee..5082361740fda27a300b62248a715362db59094c 100644 (file)
@@ -52,6 +52,15 @@ class NodeExtentMutable {
     shift_absolute(get_write() + src_offset, len, offset);
   }
 
+  void set_absolute(void* dst, int value, extent_len_t len) {
+    assert(is_safe(dst, len));
+    std::memset(dst, value, len);
+  }
+  void set_relative(extent_len_t dst_offset, int value, extent_len_t len) {
+    auto dst = get_write() + dst_offset;
+    set_absolute(dst, value, len);
+  }
+
   template <typename T>
   void validate_inplace_update(const T& updated) {
     assert(is_safe(&updated, sizeof(T)));
@@ -60,6 +69,23 @@ class NodeExtentMutable {
   const char* get_read() const { return p_start; }
   char* get_write() { return p_start; }
   extent_len_t get_length() const { return length; }
+  node_offset_t get_node_offset() const { return node_offset; }
+
+  NodeExtentMutable get_mutable_absolute(const void* dst, node_offset_t len) const {
+    assert(node_offset == 0);
+    assert(is_safe(dst, len));
+    assert((const char*)dst != get_read());
+    auto ret = *this;
+    node_offset_t offset = (const char*)dst - get_read();
+    ret.p_start += offset;
+    ret.length = len;
+    ret.node_offset = offset;
+    return ret;
+  }
+  NodeExtentMutable get_mutable_relative(
+      node_offset_t offset, node_offset_t len) const {
+    return get_mutable_absolute(get_read() + offset, len);
+  }
 
  private:
   NodeExtentMutable(char* p_start, extent_len_t length)
@@ -71,6 +97,7 @@ class NodeExtentMutable {
 
   char* p_start;
   extent_len_t length;
+  node_offset_t node_offset = 0;
 
   friend class NodeExtent;
 };
index 319562d5d19a23152d84f923882e840e4e54ea2d..88c8a77b02781467800d605fdcf70f749e429a23 100644 (file)
@@ -151,7 +151,7 @@ class LeafNodeImpl : public NodeImpl {
   virtual ~LeafNodeImpl() = default;
 
   #pragma GCC diagnostic ignored "-Woverloaded-virtual"
-  virtual const onode_t* get_p_value(
+  virtual const value_header_t* get_p_value(
       const search_position_t&,
       key_view_t* = nullptr, leaf_marker_t={}) const {
     ceph_abort("impossible path");
@@ -163,23 +163,27 @@ class LeafNodeImpl : public NodeImpl {
     ceph_abort("impossible path");
   }
   #pragma GCC diagnostic ignored "-Woverloaded-virtual"
-  virtual const onode_t* insert(
-      const key_hobj_t&, const onode_t&, search_position_t&, match_stage_t&, node_offset_t&) {
+  virtual const value_header_t* insert(
+      const key_hobj_t&, const value_config_t&, search_position_t&, match_stage_t&, node_offset_t&) {
     ceph_abort("impossible path");
   }
   #pragma GCC diagnostic ignored "-Woverloaded-virtual"
-  virtual std::tuple<search_position_t, bool, const onode_t*> split_insert(
-      NodeExtentMutable&, NodeImpl&, const key_hobj_t&, const onode_t&,
+  virtual std::tuple<search_position_t, bool, const value_header_t*> split_insert(
+      NodeExtentMutable&, NodeImpl&, const key_hobj_t&, const value_config_t&,
       search_position_t&, match_stage_t&, node_offset_t&) {
     ceph_abort("impossible path");
   }
 
   virtual void get_largest_slot(
-      search_position_t&, key_view_t&, const onode_t**) const = 0;
+      search_position_t&, key_view_t&, const value_header_t**) const = 0;
+
   virtual std::tuple<match_stage_t, node_offset_t> evaluate_insert(
-      const key_hobj_t&, const onode_t&,
+      const key_hobj_t&, const value_config_t&,
       const MatchHistory&, match_stat_t, search_position_t&) const = 0;
 
+  virtual std::pair<NodeExtentMutable&, ValueDeltaRecorder*>
+  prepare_mutate_value_payload(context_t) = 0;
+
   struct fresh_impl_t {
     LeafNodeImplURef impl;
     NodeExtentMutable mut;
index cc5a83b6db047fa98837661643173bade03862e3..1088d1223e1ba406f1a9a546c257346ac813ff31 100644 (file)
@@ -391,7 +391,7 @@ class NodeLayoutT final : public InternalNodeImpl, public LeafNodeImpl {
        * - I > 2 + 2/S (S > 1)
        *
        * Now back to NODE_BLOCK_SIZE calculation, if we have limits of at most
-       * X KiB ns-oid string and Y KiB of onode_t to store in this BTree, then:
+       * X KiB ns-oid string and Y KiB of value to store in this BTree, then:
        * - largest_insert_size ~= X+Y KiB
        * - 1/S == X/(X+Y)
        * - I > (4X+2Y)/(X+Y)
@@ -558,7 +558,8 @@ class NodeLayoutT final : public InternalNodeImpl, public LeafNodeImpl {
    * LeafNodeImpl
    */
   void get_largest_slot(search_position_t& pos,
-                        key_view_t& index_key, const onode_t** pp_value) const override {
+                        key_view_t& index_key,
+                        const value_header_t** pp_value) const override {
     if constexpr (NODE_TYPE == node_type_t::LEAF) {
       STAGE_T::template lookup_largest_slot<true, true, true>(
           extent.read(), &cast_down_fill_0<STAGE>(pos), &index_key, pp_value);
@@ -568,7 +569,7 @@ class NodeLayoutT final : public InternalNodeImpl, public LeafNodeImpl {
   }
 
   std::tuple<match_stage_t, node_offset_t> evaluate_insert(
-      const key_hobj_t& key, const onode_t& value,
+      const key_hobj_t& key, const value_config_t& value,
       const MatchHistory& history, match_stat_t mstat,
       search_position_t& insert_pos) const override {
     if constexpr (NODE_TYPE == node_type_t::LEAF) {
@@ -584,6 +585,11 @@ class NodeLayoutT final : public InternalNodeImpl, public LeafNodeImpl {
     }
   }
 
+  std::pair<NodeExtentMutable&, ValueDeltaRecorder*>
+  prepare_mutate_value_payload(context_t c) {
+    return extent.prepare_mutate_value_payload(c);
+  }
+
  private:
   NodeLayoutT(NodeExtentRef extent) : extent{extent} {}
 
index 11cf64b121e2a349133cb24ae9a62d8423829c52..821fc6961df399743d470fcb556a97946b034a65 100644 (file)
@@ -66,6 +66,7 @@ enum class node_delta_op_t : uint8_t {
   SPLIT,
   SPLIT_INSERT,
   UPDATE_CHILD_ADDR,
+  SUBOP_UPDATE_VALUE = 0xff,
 };
 
 }
index be98eb17ec490c5973e8aa5eed339050c81b8fc5..1d88d0dc93782e4c73ea0b2e762882b15c3828d1 100644 (file)
@@ -100,8 +100,8 @@ class node_extent_t {
       return &p_fields->child_addrs[index];
     } else {
       auto range = get_nxt_container(index);
-      auto ret = reinterpret_cast<const onode_t*>(range.p_start);
-      assert(range.p_start + ret->size == range.p_end);
+      auto ret = reinterpret_cast<const value_header_t*>(range.p_start);
+      assert(range.p_start + ret->allocation_size() == range.p_end);
       return ret;
     }
   }
@@ -144,7 +144,7 @@ class node_extent_t {
       size += ns_oid_view_t::estimate_size<KT>(key);
     } else if constexpr (FIELD_TYPE == field_type_t::N3 &&
                          NODE_TYPE == node_type_t::LEAF) {
-      size += value.size;
+      size += value.allocation_size();
     }
     return size;
   }
index 794d654624291aad0276b5be0f51adcf513396a0..7e5d70a34238c2969c2dd8d9ac31485721e125e0 100644 (file)
@@ -1174,7 +1174,7 @@ struct staged {
 
   template <typename T = std::tuple<match_stage_t, node_offset_t>>
   static std::enable_if_t<NODE_TYPE == node_type_t::LEAF, T> evaluate_insert(
-      const full_key_t<KeyT::HOBJ>& key, const onode_t& value,
+      const full_key_t<KeyT::HOBJ>& key, const value_config_t& value,
       const MatchHistory& history, match_stat_t mstat, position_t& position) {
     match_stage_t insert_stage = STAGE_TOP;
     while (*history.get_by_stage(insert_stage) == MatchKindCMP::EQ) {
@@ -1408,7 +1408,7 @@ struct staged {
         size_t kv_logical_size = index_key.size_logical();
         size_t value_size;
         if constexpr (NODE_TYPE == node_type_t::LEAF) {
-          value_size = iter.get_p_value()->size;
+          value_size = iter.get_p_value()->allocation_size();
         } else {
           value_size = sizeof(value_t);
         }
index 02fe11ab97f9cc0fa223c523eeb52bbef43da376..24652761518c754d36c14aa292a4908be3f0732e 100644 (file)
@@ -9,7 +9,7 @@
 
 #include "crimson/os/seastore/onode_manager/staged-fltree/fwd.h"
 #include "crimson/os/seastore/onode_manager/staged-fltree/node_types.h"
-#include "crimson/os/seastore/onode_manager/staged-fltree/tree_types.h"
+#include "crimson/os/seastore/onode_manager/staged-fltree/value.h"
 
 namespace crimson::os::seastore::onode {
 
@@ -359,13 +359,13 @@ enum class ContainerType { ITERATIVE, INDEXABLE };
 // the input type to construct the value during insert.
 template <node_type_t> struct value_input_type;
 template<> struct value_input_type<node_type_t::INTERNAL> { using type = laddr_t; };
-template<> struct value_input_type<node_type_t::LEAF> { using type = onode_t; };
+template<> struct value_input_type<node_type_t::LEAF> { using type = value_config_t; };
 template <node_type_t NODE_TYPE>
 using value_input_type_t = typename value_input_type<NODE_TYPE>::type;
 
 template <node_type_t> struct value_type;
 template<> struct value_type<node_type_t::INTERNAL> { using type = laddr_packed_t; };
-template<> struct value_type<node_type_t::LEAF> { using type = onode_t; };
+template<> struct value_type<node_type_t::LEAF> { using type = value_header_t; };
 template <node_type_t NODE_TYPE>
 using value_type_t = typename value_type<NODE_TYPE>::type;
 
index 10f157809625614e397d83920c0e189172a37b73..dd98c230e100678f49fcbb4701ce843e3ade000c 100644 (file)
@@ -68,9 +68,9 @@ void internal_sub_items_t::Appender<KT>::append(
 }
 
 template <KeyT KT>
-const onode_t* leaf_sub_items_t::insert_at(
+const value_header_t* leaf_sub_items_t::insert_at(
     NodeExtentMutable& mut, const leaf_sub_items_t& sub_items,
-    const full_key_t<KT>& key, const onode_t& value,
+    const full_key_t<KT>& key, const value_config_t& value,
     index_t index, node_offset_t size, const char* p_left_bound) {
   assert(index <= sub_items.keys());
   assert(size == estimate_insert<KT>(key, value));
@@ -81,14 +81,14 @@ const onode_t* leaf_sub_items_t::insert_at(
 
   // b. insert item
   auto p_insert = const_cast<char*>(p_shift_end - size);
-  auto p_value = reinterpret_cast<const onode_t*>(p_insert);
-  mut.copy_in_absolute(p_insert, &value, value.size);
-  p_insert += value.size;
+  auto p_value = reinterpret_cast<value_header_t*>(p_insert);
+  p_value->initiate(mut, value);
+  p_insert += value.allocation_size();
   mut.copy_in_absolute(p_insert, snap_gen_t::template from_key<KT>(key));
   assert(p_insert + sizeof(snap_gen_t) + sizeof(node_offset_t) == p_shift_end);
 
   // c. compensate affected offsets
-  auto item_size = value.size + sizeof(snap_gen_t);
+  auto item_size = value.allocation_size() + sizeof(snap_gen_t);
   for (auto i = index; i < sub_items.keys(); ++i) {
     const node_offset_packed_t& offset_i = sub_items.get_offset(i);
     mut.copy_in_absolute((void*)&offset_i, node_offset_t(offset_i.value + item_size));
@@ -112,9 +112,9 @@ const onode_t* leaf_sub_items_t::insert_at(
 
   return p_value;
 }
-template const onode_t* leaf_sub_items_t::insert_at<KeyT::HOBJ>(
+template const value_header_t* leaf_sub_items_t::insert_at<KeyT::HOBJ>(
     NodeExtentMutable&, const leaf_sub_items_t&, const full_key_t<KeyT::HOBJ>&,
-    const onode_t&, index_t, node_offset_t, const char*);
+    const value_config_t&, index_t, node_offset_t, const char*);
 
 node_offset_t leaf_sub_items_t::trim_until(
     NodeExtentMutable& mut, leaf_sub_items_t& items, index_t index) {
@@ -175,7 +175,7 @@ char* leaf_sub_items_t::Appender<KT>::wrap() {
         last_offset = offset;
       },
       [&] (const kv_item_t& arg) {
-        last_offset += sizeof(snap_gen_t) + arg.p_value->size;
+        last_offset += sizeof(snap_gen_t) + arg.value_config.allocation_size();
         p_cur -= sizeof(node_offset_t);
         p_mut->copy_in_absolute(p_cur, last_offset);
       }
@@ -195,9 +195,10 @@ char* leaf_sub_items_t::Appender<KT>::wrap() {
         assert(pp_value);
         p_cur -= sizeof(snap_gen_t);
         p_mut->copy_in_absolute(p_cur, snap_gen_t::template from_key<KT>(*arg.p_key));
-        p_cur -= arg.p_value->size;
-        p_mut->copy_in_absolute(p_cur, arg.p_value, arg.p_value->size);
-        *pp_value = reinterpret_cast<const onode_t*>(p_cur);
+        p_cur -= arg.value_config.allocation_size();
+        auto p_value = reinterpret_cast<value_header_t*>(p_cur);
+        p_value->initiate(*p_mut, arg.value_config);
+        *pp_value = p_value;
       }
     }, a);
   }
index e19664427edd76167e69e4c81d6a45c2b0c439cc..b4666ede2d0b128cd4e3558b58094bd3af88280b 100644 (file)
@@ -135,7 +135,7 @@ class internal_sub_items_t::Appender {
  * leaf_sub_items_t
  *
  * The STAGE_RIGHT implementation for leaf node N0/N1/N2, implements staged
- * contract as an indexable container to index snap-gen to onode_t.
+ * contract as an indexable container to index snap-gen to value_header_t.
  *
  * The layout of the contaner storing n sub-items:
  *
@@ -143,7 +143,7 @@ class internal_sub_items_t::Appender {
  * # <---------- sub-items ----------------> # <--- offsets ---------#          #
  * #<~># sub-items [2, n)                    #<~>| offsets [2, n)    #          #
  * #   # <- sub-item 1 -> # <- sub-item 0 -> #   |                   #          #
- * #...# snap-gen | onode # snap-gen | onode #...| offset1 | offset0 # num_keys #
+ * #...# snap-gen | value # snap-gen | value #...| offset1 | offset0 # num_keys #
  *                                           ^             ^         ^
  *                                           |             |         |
  *                               p_items_end +   p_offsets +         |
@@ -152,7 +152,7 @@ class internal_sub_items_t::Appender {
 class leaf_sub_items_t {
  public:
   // TODO: decide by NODE_BLOCK_SIZE, sizeof(snap_gen_t),
-  //       and the minimal size of onode_t
+  //       and the minimal size of value
   using num_keys_t = uint8_t;
 
   leaf_sub_items_t(const memory_range_t& range) {
@@ -222,11 +222,12 @@ class leaf_sub_items_t {
     return ret;
   }
   node_offset_t size_overhead_at(index_t index) const { return sizeof(node_offset_t); }
-  const onode_t* get_p_value(index_t index) const {
+  const value_header_t* get_p_value(index_t index) const {
     assert(index < keys());
     auto pointer = get_item_start(index);
-    auto value = reinterpret_cast<const onode_t*>(pointer);
-    assert(pointer + value->size + sizeof(snap_gen_t) == get_item_end(index));
+    auto value = reinterpret_cast<const value_header_t*>(pointer);
+    assert(pointer + value->allocation_size() + sizeof(snap_gen_t) ==
+           get_item_end(index));
     return value;
   }
   void encode(const char* p_node_start, ceph::bufferlist& encoded) const {
@@ -256,14 +257,15 @@ class leaf_sub_items_t {
   static node_offset_t header_size() { return sizeof(num_keys_t); }
 
   template <KeyT KT>
-  static node_offset_t estimate_insert(const full_key_t<KT>&, const onode_t& value) {
-    return value.size + sizeof(snap_gen_t) + sizeof(node_offset_t);
+  static node_offset_t estimate_insert(
+      const full_key_t<KT>&, const value_config_t& value) {
+    return value.allocation_size() + sizeof(snap_gen_t) + sizeof(node_offset_t);
   }
 
   template <KeyT KT>
-  static const onode_t* insert_at(
+  static const value_header_t* insert_at(
       NodeExtentMutable&, const leaf_sub_items_t&,
-      const full_key_t<KT>&, const onode_t&,
+      const full_key_t<KT>&, const value_config_t&,
       index_t index, node_offset_t size, const char* p_left_bound);
 
   static node_offset_t trim_until(NodeExtentMutable&, leaf_sub_items_t&, index_t index);
@@ -288,7 +290,7 @@ class leaf_sub_items_t::Appender {
   };
   struct kv_item_t {
     const full_key_t<KT>* p_key;
-    const onode_t* p_value;
+    value_config_t value_config;
   };
   using var_t = std::variant<range_items_t, kv_item_t>;
 
@@ -314,10 +316,10 @@ class leaf_sub_items_t::Appender {
     ++cnt;
   }
   void append(const full_key_t<KT>& key,
-              const onode_t& value, const onode_t*& p_value) {
+              const value_config_t& value, const value_header_t*& p_value) {
     assert(pp_value == nullptr);
     assert(cnt <= APPENDER_LIMIT);
-    appends[cnt] = kv_item_t{&key, &value};
+    appends[cnt] = kv_item_t{&key, value};
     ++cnt;
     pp_value = &p_value;
   }
@@ -325,7 +327,7 @@ class leaf_sub_items_t::Appender {
 
  private:
   std::optional<leaf_sub_items_t> op_src;
-  const onode_t** pp_value = nullptr;
+  const value_header_t** pp_value = nullptr;
   NodeExtentMutable* p_mut;
   char* p_append;
   var_t appends[APPENDER_LIMIT];
index 6cab27f47eaa4b37c7a3bc0434870884d4256f33..48b4a39c61dcda2c069eab4192eecca80151de49 100644 (file)
@@ -13,7 +13,7 @@
 #include "node_extent_manager.h"
 #include "stages/key_layout.h"
 #include "super.h"
-#include "tree_types.h"
+#include "value.h"
 
 /**
  * tree.h
@@ -33,6 +33,7 @@ namespace crimson::os::seastore::onode {
 class Node;
 class tree_cursor_t;
 
+template <typename ValueImpl>
 class Btree {
  public:
   using btree_ertr = crimson::errorator<
@@ -77,12 +78,14 @@ class Btree {
     // XXX: return key_view_t to avoid unecessary ghobject_t constructions
     ghobject_t get_ghobj() const {
       assert(!is_end());
-      return p_cursor->get_key_view().to_ghobj();
+      return p_cursor->get_key_view(
+          p_tree->value_builder.get_header_magic()).to_ghobj();
     }
 
-    const onode_t* value() const {
+    ValueImpl value() {
       assert(!is_end());
-      return p_cursor->get_p_value();
+      return p_tree->value_builder.build_value(
+          *p_tree->nm, p_tree->value_builder, p_cursor);
     }
 
     bool operator==(const Cursor& x) const {
@@ -195,14 +198,17 @@ class Btree {
    * modifiers
    */
 
-  // TODO: replace onode_t
+  struct tree_value_config_t {
+    value_size_t payload_size = 256;
+  };
   btree_future<std::pair<Cursor, bool>>
-  insert(Transaction& t, const ghobject_t& obj, const onode_t& value) {
+  insert(Transaction& t, const ghobject_t& obj, tree_value_config_t _vconf) {
+    value_config_t vconf{value_builder.get_header_magic(), _vconf.payload_size};
     return seastar::do_with(
       full_key_t<KeyT::HOBJ>(obj),
-      [this, &t, &value](auto& key) -> btree_future<std::pair<Cursor, bool>> {
-        return get_root(t).safe_then([this, &t, &key, &value](auto root) {
-          return root->insert(get_context(t), key, value);
+      [this, &t, vconf](auto& key) -> btree_future<std::pair<Cursor, bool>> {
+        return get_root(t).safe_then([this, &t, &key, vconf](auto root) {
+          return root->insert(get_context(t), key, vconf);
         }).safe_then([this](auto ret) {
           auto& [cursor, success] = ret;
           return std::make_pair(Cursor(this, cursor), success);
@@ -284,7 +290,7 @@ class Btree {
 
  private:
   context_t get_context(Transaction& t) {
-    return {*nm, t};
+    return {*nm, value_builder, t};
   }
 
   btree_future<Ref<Node>> get_root(Transaction& t) {
@@ -297,11 +303,14 @@ class Btree {
   }
 
   NodeExtentManagerURef nm;
+  const ValueBuilderImpl<ValueImpl> value_builder;
   RootNodeTrackerURef root_tracker;
 
   friend class DummyChildPool;
 };
-inline std::ostream& operator<<(std::ostream& os, const Btree& tree) {
+
+template <typename ValueImpl>
+inline std::ostream& operator<<(std::ostream& os, const Btree<ValueImpl>& tree) {
   return tree.print(os);
 }
 
diff --git a/src/crimson/os/seastore/onode_manager/staged-fltree/tree_types.h b/src/crimson/os/seastore/onode_manager/staged-fltree/tree_types.h
deleted file mode 100644 (file)
index 0bb345e..0000000
+++ /dev/null
@@ -1,125 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
-// vim: ts=8 sw=2 smarttab
-
-#pragma once
-
-#include <ostream>
-
-namespace crimson::os::seastore::onode {
-
-// TODO: Redesign according to real requirement from onode manager
-struct onode_t {
-  // onode should be smaller than a node
-  uint16_t size; // address up to 64 KiB sized node
-  uint16_t id;
-  // omap, extent_map, inline data
-
-  bool operator==(const onode_t& o) const { return size == o.size && id == o.id; }
-  bool operator!=(const onode_t& o) const { return !(*this == o); }
-
-  void encode(ceph::bufferlist& encoded) const {
-    ceph::encode(size, encoded);
-    ceph::encode(id, encoded);
-  }
-  static onode_t decode(ceph::bufferlist::const_iterator& delta) {
-    uint16_t size;
-    ceph::decode(size, delta);
-    uint16_t id;
-    ceph::decode(id, delta);
-    onode_t ret{size, id};
-    return ret;
-  }
-  static void validate_tail_magic(const onode_t& onode) {
-    auto p_target = (const char*)&onode + onode.size - sizeof(uint32_t);
-    uint32_t target;
-    std::memcpy(&target, p_target, sizeof(uint32_t));
-    ceph_assert(target == onode.size * 137);
-  }
-  static std::unique_ptr<char[]> allocate(const onode_t& config) {
-    ceph_assert(config.size >= sizeof(onode_t) + sizeof(uint32_t));
-
-    auto ret = std::make_unique<char[]>(config.size);
-    char* p_mem = ret.get();
-    auto p_onode = reinterpret_cast<onode_t*>(p_mem);
-    *p_onode = config;
-
-    uint32_t tail_magic = config.size * 137;
-    p_mem += (config.size - sizeof(uint32_t));
-    std::memcpy(p_mem, &tail_magic, sizeof(uint32_t));
-    validate_tail_magic(*p_onode);
-
-    return ret;
-  }
-} __attribute__((packed));
-inline std::ostream& operator<<(std::ostream& os, const onode_t& node) {
-  return os << "onode(" << node.id << ", " << node.size << "B)";
-}
-
-struct tree_stats_t {
-  size_t size_persistent_leaf = 0;
-  size_t size_persistent_internal = 0;
-  size_t size_filled_leaf = 0;
-  size_t size_filled_internal = 0;
-  size_t size_logical_leaf = 0;
-  size_t size_logical_internal = 0;
-  size_t size_overhead_leaf = 0;
-  size_t size_overhead_internal = 0;
-  size_t size_value_leaf = 0;
-  size_t size_value_internal = 0;
-  unsigned num_kvs_leaf = 0;
-  unsigned num_kvs_internal = 0;
-  unsigned num_nodes_leaf = 0;
-  unsigned num_nodes_internal = 0;
-  unsigned height = 0;
-
-  size_t size_persistent() const {
-    return size_persistent_leaf + size_persistent_internal; }
-  size_t size_filled() const {
-    return size_filled_leaf + size_filled_internal; }
-  size_t size_logical() const {
-    return size_logical_leaf + size_logical_internal; }
-  size_t size_overhead() const {
-    return size_overhead_leaf + size_overhead_internal; }
-  size_t size_value() const {
-    return size_value_leaf + size_value_internal; }
-  unsigned num_kvs() const {
-    return num_kvs_leaf + num_kvs_internal; }
-  unsigned num_nodes() const {
-    return num_nodes_leaf + num_nodes_internal; }
-
-  double ratio_fullness() const {
-    return (double)size_filled() / size_persistent(); }
-  double ratio_key_compression() const {
-    return (double)(size_filled() - size_value()) / (size_logical() - size_value()); }
-  double ratio_overhead() const {
-    return (double)size_overhead() / size_filled(); }
-  double ratio_keys_leaf() const {
-    return (double)num_kvs_leaf / num_kvs(); }
-  double ratio_nodes_leaf() const {
-    return (double)num_nodes_leaf / num_nodes(); }
-  double ratio_filled_leaf() const {
-    return (double)size_filled_leaf / size_filled(); }
-};
-inline std::ostream& operator<<(std::ostream& os, const tree_stats_t& stats) {
-  os << "Tree stats:"
-     << "\n  height = " << stats.height
-     << "\n  num values = " << stats.num_kvs_leaf
-     << "\n  num nodes  = " << stats.num_nodes()
-     << " (leaf=" << stats.num_nodes_leaf
-     << ", internal=" << stats.num_nodes_internal << ")"
-     << "\n  size persistent = " << stats.size_persistent() << "B"
-     << "\n  size filled     = " << stats.size_filled() << "B"
-     << " (value=" << stats.size_value_leaf << "B"
-     << ", rest=" << stats.size_filled() - stats.size_value_leaf << "B)"
-     << "\n  size logical    = " << stats.size_logical() << "B"
-     << "\n  size overhead   = " << stats.size_overhead() << "B"
-     << "\n  ratio fullness  = " << stats.ratio_fullness()
-     << "\n  ratio keys leaf = " << stats.ratio_keys_leaf()
-     << "\n  ratio nodes leaf  = " << stats.ratio_nodes_leaf()
-     << "\n  ratio filled leaf = " << stats.ratio_filled_leaf()
-     << "\n  ratio key compression = " << stats.ratio_key_compression();
-  assert(stats.num_kvs_internal + 1 == stats.num_nodes());
-  return os;
-}
-
-}
index 536052003df4d16b265e20ddb41478b3e873c8ab..a17df0183b1f72064aa665048dfe81508e49a72d 100644 (file)
@@ -14,6 +14,7 @@
 #include "crimson/common/log.h"
 #include "stages/key_layout.h"
 #include "tree.h"
+#include "test/crimson/seastore/onode_tree/test_value.h"
 
 /**
  * tree_utils.h
 
 namespace crimson::os::seastore::onode {
 
-class Onodes {
+using TestBtree = Btree<TestValue>;
+
+struct value_item_t {
+  value_size_t size;
+  TestValue::id_t id;
+  TestValue::magic_t magic;
+
+  TestBtree::tree_value_config_t get_config() const {
+    assert(size > sizeof(value_header_t));
+    return {static_cast<value_size_t>(size - sizeof(value_header_t))};
+  }
+};
+inline std::ostream& operator<<(std::ostream& os, const value_item_t& item) {
+  return os << "ValueItem(#" << item.id << ", " << item.size << "B)";
+}
+
+class Values {
  public:
-  Onodes(size_t n) {
+  Values(size_t n) {
     for (size_t i = 1; i <= n; ++i) {
-      auto p_onode = &create(i * 8);
-      onodes.push_back(p_onode);
+      auto item = create(i * 8);
+      values.push_back(item);
     }
   }
 
-  Onodes(std::vector<size_t> sizes) {
+  Values(std::vector<size_t> sizes) {
     for (auto& size : sizes) {
-      auto p_onode = &create(size);
-      onodes.push_back(p_onode);
+      auto item = create(size);
+      values.push_back(item);
     }
   }
 
-  ~Onodes() = default;
+  ~Values() = default;
 
-  const onode_t& create(size_t size) {
-    ceph_assert(size <= std::numeric_limits<uint16_t>::max());
-    onode_t config{static_cast<uint16_t>(size), id++};
-    auto onode = onode_t::allocate(config);
-    auto p_onode = onode.get();
-    tracked_onodes.push_back(std::move(onode));
-    return *reinterpret_cast<onode_t*>(p_onode);
+  value_item_t create(size_t _size) {
+    ceph_assert(_size <= std::numeric_limits<value_size_t>::max());
+    ceph_assert(_size > sizeof(value_header_t));
+    value_size_t size = _size;
+    auto current_id = id++;
+    return value_item_t{size, current_id, (TestValue::magic_t)current_id * 137};
   }
 
-  const onode_t& pick() const {
-    auto index = rd() % onodes.size();
-    return *onodes[index];
+  value_item_t pick() const {
+    auto index = rd() % values.size();
+    return values[index];
   }
 
-  const onode_t& pick_largest() const {
-    return *onodes[onodes.size() - 1];
+  static void initialize_cursor(
+      Transaction& t,
+      TestBtree::Cursor& cursor,
+      const value_item_t& item) {
+    ceph_assert(!cursor.is_end());
+    auto value = cursor.value();
+    ceph_assert(value.get_payload_size() + sizeof(value_header_t) == item.size);
+    value.set_id_replayable(t, item.id);
+    value.set_tail_magic_replayable(t, item.magic);
   }
 
   static void validate_cursor(
-      const Btree::Cursor& cursor, const ghobject_t& key, const onode_t& onode) {
+      TestBtree::Cursor& cursor,
+      const ghobject_t& key,
+      const value_item_t& item) {
     ceph_assert(!cursor.is_end());
     ceph_assert(cursor.get_ghobj() == key);
-    ceph_assert(cursor.value());
-    ceph_assert(cursor.value() != &onode);
-    ceph_assert(*cursor.value() == onode);
-    onode_t::validate_tail_magic(*cursor.value());
+    auto value = cursor.value();
+    ceph_assert(value.get_payload_size() + sizeof(value_header_t) == item.size);
+    ceph_assert(value.get_id() == item.id);
+    ceph_assert(value.get_tail_magic() == item.magic);
   }
 
  private:
-  uint16_t id = 0;
+  TestValue::id_t id = 0;
   mutable std::random_device rd;
-  std::vector<const onode_t*> onodes;
-  std::vector<std::unique_ptr<char[]>> tracked_onodes;
+  std::vector<value_item_t> values;
 };
 
 class KVPool {
@@ -83,7 +107,7 @@ class KVPool {
     unsigned index0;
     size_t ns_size;
     size_t oid_size;
-    const onode_t* p_value;
+    value_item_t value;
 
     ghobject_t get_ghobj() const {
       assert(index1 < 10);
@@ -106,14 +130,14 @@ class KVPool {
   using kv_vector_t = std::vector<kv_conf_t>;
 
  public:
-  using kv_t = std::pair<ghobject_t, const onode_t*>;
+  using kv_t = std::pair<ghobject_t, value_item_t>;
 
   KVPool(const std::vector<size_t>& str_sizes,
-         const std::vector<size_t>& onode_sizes,
+         const std::vector<size_t>& value_sizes,
          const std::pair<unsigned, unsigned>& range2,
          const std::pair<unsigned, unsigned>& range1,
          const std::pair<unsigned, unsigned>& range0)
-      : str_sizes{str_sizes}, onodes{onode_sizes} {
+      : str_sizes{str_sizes}, values{value_sizes} {
     ceph_assert(range2.first < range2.second);
     ceph_assert(range2.second - 1 <= (unsigned)std::numeric_limits<shard_t>::max());
     ceph_assert(range2.second - 1 <= std::numeric_limits<crush_hash_t>::max());
@@ -126,7 +150,7 @@ class KVPool {
         auto ns_size = (unsigned)str_sizes[rd() % str_sizes.size()];
         auto oid_size = (unsigned)str_sizes[rd() % str_sizes.size()];
         for (unsigned k = range0.first; k < range0.second; ++k) {
-          kvs.emplace_back(kv_conf_t{i, j, k, ns_size, oid_size, &onodes.pick()});
+          kvs.emplace_back(kv_conf_t{i, j, k, ns_size, oid_size, values.pick()});
         }
       }
     }
@@ -145,7 +169,7 @@ class KVPool {
     kv_t get_kv() const {
       assert(!is_end());
       auto& conf = (*p_kvs)[i];
-      return std::make_pair(conf.get_ghobj(), conf.p_value);
+      return std::make_pair(conf.get_ghobj(), conf.value);
     }
     bool is_end() const { return !p_kvs || i >= p_kvs->size(); }
     size_t index() const { return i; }
@@ -184,7 +208,7 @@ class KVPool {
 
  private:
   std::vector<size_t> str_sizes;
-  Onodes onodes;
+  Values values;
   kv_vector_t kvs;
   kv_vector_t random_kvs;
 };
@@ -192,7 +216,7 @@ class KVPool {
 template <bool TRACK>
 class TreeBuilder {
  public:
-  using ertr = Btree::btree_ertr;
+  using ertr = TestBtree::btree_ertr;
   template <class ValueT=void>
   using future = ertr::future<ValueT>;
 
@@ -225,29 +249,32 @@ class TreeBuilder {
 
   future<> insert(Transaction& t) {
     kv_iter = kvs.random_begin();
-    auto cursors = seastar::make_lw_shared<std::vector<Btree::Cursor>>();
+    auto cursors = seastar::make_lw_shared<std::vector<TestBtree::Cursor>>();
     logger().warn("start inserting {} kvs ...", kvs.size());
     auto start_time = mono_clock::now();
     return crimson::do_until([&t, this, cursors]() -> future<bool> {
       if (kv_iter.is_end()) {
         return ertr::make_ready_future<bool>(true);
       }
-      auto [key, p_value] = kv_iter.get_kv();
-      logger().debug("[{}] {} -> {}", kv_iter.index(), key_hobj_t{key}, *p_value);
-      return tree->insert(t, key, *p_value
-      ).safe_then([&t, this, cursors](auto ret) {
+      auto [key, value] = kv_iter.get_kv();
+      logger().debug("[{}] {} -> {}", kv_iter.index(), key_hobj_t{key}, value);
+      return tree->insert(t, key, value.get_config()
+      ).safe_then([&t, this, cursors, value](auto ret) {
         auto& [cursor, success] = ret;
         assert(success == true);
+        Values::initialize_cursor(t, cursor, value);
         if constexpr (TRACK) {
           cursors->emplace_back(cursor);
         }
 #ifndef NDEBUG
-        auto [key, p_value] = kv_iter.get_kv();
-        Onodes::validate_cursor(cursor, key, *p_value);
-        return tree->lower_bound(t, key).safe_then([this, cursor](auto cursor_) {
-          auto [key, p_value] = kv_iter.get_kv();
+        auto [key, value] = kv_iter.get_kv();
+        Values::validate_cursor(cursor, key, value);
+        return tree->lower_bound(t, key
+        ).safe_then([this, cursor](auto cursor_) mutable {
+          auto [key, value] = kv_iter.get_kv();
           ceph_assert(cursor_.get_ghobj() == key);
           ceph_assert(cursor_.value() == cursor.value());
+          Values::validate_cursor(cursor_, key, value);
           ++kv_iter;
           return ertr::make_ready_future<bool>(false);
         });
@@ -274,9 +301,9 @@ class TreeBuilder {
             // validate values in tree keep intact
             return tree->lower_bound(t, k).safe_then([this, &c_iter](auto cursor) {
               auto [k, v] = kv_iter.get_kv();
-              Onodes::validate_cursor(cursor, k, *v);
+              Values::validate_cursor(cursor, k, v);
               // validate values in cursors keep intact
-              Onodes::validate_cursor(*c_iter, k, *v);
+              Values::validate_cursor(*c_iter, k, v);
               ++kv_iter;
               ++c_iter;
               return ertr::make_ready_future<bool>(false);
@@ -312,7 +339,7 @@ class TreeBuilder {
         auto [k, v] = kvs_iter.get_kv();
         return tree->lower_bound(t, k
         ).safe_then([&kvs_iter, k=k, v=v] (auto cursor) {
-          Onodes::validate_cursor(cursor, k, *v);
+          Values::validate_cursor(cursor, k, v);
           ++kvs_iter;
           return ertr::make_ready_future<bool>(false);
         });
@@ -326,7 +353,7 @@ class TreeBuilder {
   }
 
   KVPool& kvs;
-  std::optional<Btree> tree;
+  std::optional<TestBtree> tree;
   KVPool::iterator_t kv_iter;
 };
 
diff --git a/src/crimson/os/seastore/onode_manager/staged-fltree/value.cc b/src/crimson/os/seastore/onode_manager/staged-fltree/value.cc
new file mode 100644 (file)
index 0000000..96ebb20
--- /dev/null
@@ -0,0 +1,82 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "value.h"
+
+#include "node.h"
+#include "node_delta_recorder.h"
+
+// value implementations
+#include "test/crimson/seastore/onode_tree/test_value.h"
+
+namespace crimson::os::seastore::onode {
+
+using ertr = Value::ertr;
+template <class ValueT=void>
+using future = Value::future<ValueT>;
+
+ceph::bufferlist&
+ValueDeltaRecorder::get_encoded(NodeExtentMutable& payload_mut) {
+  ceph::encode(node_delta_op_t::SUBOP_UPDATE_VALUE, encoded);
+  node_offset_t offset = payload_mut.get_node_offset();
+  assert(offset > sizeof(value_header_t));
+  offset -= sizeof(value_header_t);
+  ceph::encode(offset, encoded);
+  return encoded;
+}
+
+Value::Value(NodeExtentManager& nm,
+             const ValueBuilder& vb,
+             Ref<tree_cursor_t>& p_cursor)
+  : nm{nm}, vb{vb}, p_cursor{p_cursor} {}
+
+Value::~Value() {}
+
+future<> Value::extend(Transaction& t, value_size_t extend_size) {
+  auto target_size = get_payload_size() + extend_size;
+  return p_cursor->extend_value(get_context(t), extend_size
+  ).safe_then([this, target_size] {
+    assert(target_size == get_payload_size());
+  });
+}
+
+future<> Value::trim(Transaction& t, value_size_t trim_size) {
+  assert(get_payload_size() > trim_size);
+  auto target_size = get_payload_size() - trim_size;
+  return p_cursor->trim_value(get_context(t), trim_size
+  ).safe_then([this, target_size] {
+    assert(target_size == get_payload_size());
+  });
+}
+
+const value_header_t* Value::read_value_header() const {
+  return p_cursor->read_value_header(vb.get_header_magic());
+}
+
+std::pair<NodeExtentMutable&, ValueDeltaRecorder*>
+Value::do_prepare_mutate_payload(Transaction& t) {
+   return p_cursor->prepare_mutate_value_payload(get_context(t));
+}
+
+std::unique_ptr<ValueDeltaRecorder>
+build_value_recorder_by_type(ceph::bufferlist& encoded,
+                             const value_magic_t& magic) {
+  std::unique_ptr<ValueDeltaRecorder> ret;
+  switch (magic) {
+  case value_magic_t::TEST:
+    ret = std::make_unique<TestValue::Recorder>(encoded);
+    break;
+  case value_magic_t::ONODE:
+    // TODO: onode implementation
+    ceph_abort("not implemented");
+    ret = nullptr;
+    break;
+  default:
+    ret = nullptr;
+    break;
+  }
+  assert(!ret || ret->get_header_magic() == magic);
+  return ret;
+}
+
+}
diff --git a/src/crimson/os/seastore/onode_manager/staged-fltree/value.h b/src/crimson/os/seastore/onode_manager/staged-fltree/value.h
new file mode 100644 (file)
index 0000000..14ff762
--- /dev/null
@@ -0,0 +1,270 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <ostream>
+
+#include "include/buffer.h"
+#include "crimson/common/type_helpers.h"
+
+#include "fwd.h"
+#include "node_extent_mutable.h"
+
+namespace crimson::os::seastore::onode {
+
+// value size up to 64 KiB
+using value_size_t = uint16_t;
+enum class value_magic_t : uint8_t {
+  TEST = 0x52,
+  ONODE,
+};
+inline std::ostream& operator<<(std::ostream& os, const value_magic_t& magic) {
+  switch (magic) {
+  case value_magic_t::TEST:
+    return os << "TEST";
+  case value_magic_t::ONODE:
+    return os << "ONODE";
+  default:
+    return os << "UNKNOWN(" << magic << ")";
+  }
+}
+
+/**
+ * value_config_t
+ *
+ * Parameters to create a value.
+ */
+struct value_config_t {
+  value_magic_t magic;
+  value_size_t payload_size;
+
+  value_size_t allocation_size() const;
+
+  void encode(ceph::bufferlist& encoded) const {
+    ceph::encode(magic, encoded);
+    ceph::encode(payload_size, encoded);
+  }
+
+  static value_config_t decode(ceph::bufferlist::const_iterator& delta) {
+    value_magic_t magic;
+    ceph::decode(magic, delta);
+    value_size_t payload_size;
+    ceph::decode(payload_size, delta);
+    return {magic, payload_size};
+  }
+};
+inline std::ostream& operator<<(std::ostream& os, const value_config_t& conf) {
+  return os << "ValueConf(" << conf.magic
+            << ", " << conf.payload_size << "B)";
+}
+
+/**
+ * value_header_t
+ *
+ * The header structure in value layout.
+ *
+ * Value layout:
+ *
+ * # <- alloc size -> #
+ * # header | payload #
+ */
+struct value_header_t {
+  value_magic_t magic;
+  value_size_t payload_size;
+
+  value_size_t allocation_size() const {
+    return payload_size + sizeof(value_header_t);
+  }
+
+  const char* get_payload() const {
+    return reinterpret_cast<const char*>(this) + sizeof(value_header_t);
+  }
+
+  NodeExtentMutable get_payload_mutable(NodeExtentMutable& node) const {
+    return node.get_mutable_absolute(get_payload(), payload_size);
+  }
+
+  char* get_payload() {
+    return reinterpret_cast<char*>(this) + sizeof(value_header_t);
+  }
+
+  void initiate(NodeExtentMutable& mut, const value_config_t& config) {
+    value_header_t header{config.magic, config.payload_size};
+    mut.copy_in_absolute(this, header);
+    mut.set_absolute(get_payload(), 0, config.payload_size);
+  }
+
+  static value_size_t estimate_allocation_size(value_size_t payload_size) {
+    return payload_size + sizeof(value_header_t);
+  }
+} __attribute__((packed));
+inline std::ostream& operator<<(std::ostream& os, const value_header_t& header) {
+  return os << "Value(" << header.magic
+            << ", " << header.payload_size << "B)";
+}
+
+inline value_size_t value_config_t::allocation_size() const {
+  return value_header_t::estimate_allocation_size(payload_size);
+}
+
+/**
+ * ValueDeltaRecorder
+ *
+ * An abstracted class to handle user-defined value delta encode, decode and
+ * replay.
+ */
+class ValueDeltaRecorder {
+ public:
+  virtual ~ValueDeltaRecorder() = default;
+  ValueDeltaRecorder(const ValueDeltaRecorder&) = delete;
+  ValueDeltaRecorder(ValueDeltaRecorder&&) = delete;
+  ValueDeltaRecorder& operator=(const ValueDeltaRecorder&) = delete;
+  ValueDeltaRecorder& operator=(ValueDeltaRecorder&&) = delete;
+
+  /// Returns the value header magic for validation purpose.
+  virtual value_magic_t get_header_magic() const = 0;
+
+  /// Called by DeltaRecorderT to apply user-defined value delta.
+  virtual void apply_value_delta(ceph::bufferlist::const_iterator&,
+                                 NodeExtentMutable&,
+                                 laddr_t) = 0;
+
+ protected:
+  ValueDeltaRecorder(ceph::bufferlist& encoded) : encoded{encoded} {}
+
+  /// Get the delta buffer to encode user-defined value delta.
+  ceph::bufferlist& get_encoded(NodeExtentMutable&);
+
+ private:
+  ceph::bufferlist& encoded;
+};
+
+class tree_cursor_t;
+/**
+ * Value
+ *
+ * Value is a stateless view of the underlying value header and payload content
+ * stored in a tree leaf node, with the support to implement user-defined value
+ * deltas and to extend and trim the underlying payload data (not implemented
+ * yet).
+ *
+ * In the current implementation, we don't guarantee any alignment for value
+ * payload due to unaligned node layout and the according merge and split
+ * operations.
+ */
+class Value {
+ public:
+  using ertr = crimson::errorator<
+    crimson::ct_error::input_output_error,
+    crimson::ct_error::invarg,
+    crimson::ct_error::enoent,
+    crimson::ct_error::erange>;
+  template <class ValueT=void>
+  using future = ertr::future<ValueT>;
+
+  virtual ~Value();
+  Value(const Value&) = default;
+  Value(Value&&) = default;
+  Value& operator=(const Value&) = default;
+  Value& operator=(Value&&) = default;
+
+  /// Returns the value payload size.
+  value_size_t get_payload_size() const {
+    return read_value_header()->payload_size;
+  }
+
+  bool operator==(const Value& v) const { return p_cursor == v.p_cursor; }
+  bool operator!=(const Value& v) const { return !(*this == v); }
+
+ protected:
+  Value(NodeExtentManager&, const ValueBuilder&, Ref<tree_cursor_t>&);
+
+  /// Extends the payload size.
+  future<> extend(Transaction&, value_size_t extend_size);
+
+  /// Trim and shrink the payload.
+  future<> trim(Transaction&, value_size_t trim_size);
+
+  /// Get the permission to mutate the payload with the optional value recorder.
+  template <typename PayloadT, typename ValueDeltaRecorderT>
+  std::pair<NodeExtentMutable&, ValueDeltaRecorderT*>
+  prepare_mutate_payload(Transaction& t) {
+    assert(sizeof(PayloadT) <= get_payload_size());
+
+    auto value_mutable = do_prepare_mutate_payload(t);
+    assert(value_mutable.first.get_write() ==
+           const_cast<const Value*>(this)->template read_payload<char>());
+    assert(value_mutable.first.get_length() == get_payload_size());
+    return {value_mutable.first,
+            static_cast<ValueDeltaRecorderT*>(value_mutable.second)};
+  }
+
+  /// Get the latest payload pointer for read.
+  template <typename PayloadT>
+  const PayloadT* read_payload() const {
+    // see Value documentation
+    static_assert(alignof(PayloadT) == 1);
+    assert(sizeof(PayloadT) <= get_payload_size());
+    return reinterpret_cast<const PayloadT*>(read_value_header()->get_payload());
+  }
+
+ private:
+  const value_header_t* read_value_header() const;
+  context_t get_context(Transaction& t) { return {nm, vb, t}; }
+
+  std::pair<NodeExtentMutable&, ValueDeltaRecorder*>
+  do_prepare_mutate_payload(Transaction&);
+
+  NodeExtentManager& nm;
+  const ValueBuilder& vb;
+  Ref<tree_cursor_t> p_cursor;
+};
+
+/**
+ * ValueBuilder
+ *
+ * For tree nodes to build values without the need to depend on the actual
+ * implementation.
+ */
+struct ValueBuilder {
+  virtual value_magic_t get_header_magic() const = 0;
+  virtual std::unique_ptr<ValueDeltaRecorder>
+  build_value_recorder(ceph::bufferlist&) const = 0;
+};
+
+/**
+ * ValueBuilderImpl
+ *
+ * The concrete ValueBuilder implementation in Btree.
+ */
+template <typename ValueImpl>
+struct ValueBuilderImpl final : public ValueBuilder {
+  value_magic_t get_header_magic() const {
+    return ValueImpl::HEADER_MAGIC;
+  }
+
+  std::unique_ptr<ValueDeltaRecorder>
+  build_value_recorder(ceph::bufferlist& encoded) const override {
+    std::unique_ptr<ValueDeltaRecorder> ret =
+      std::make_unique<typename ValueImpl::Recorder>(encoded);
+    assert(ret->get_header_magic() == get_header_magic());
+    return ret;
+  }
+
+  ValueImpl build_value(NodeExtentManager& nm,
+                        const ValueBuilder& vb,
+                        Ref<tree_cursor_t>& p_cursor) const {
+    assert(vb.get_header_magic() == get_header_magic());
+    return ValueImpl(nm, vb, p_cursor);
+  }
+};
+
+/**
+ * Get the value recorder by type (the magic value) when the ValueBuilder is
+ * unavailable.
+ */
+std::unique_ptr<ValueDeltaRecorder>
+build_value_recorder_by_type(ceph::bufferlist& encoded, const value_magic_t& magic);
+
+}
index ad14c34502dc90e09eaff74217cac2e87d4d306b..398bc567d694dd4c5a8ce7c48eeb1b817e7e18dc 100644 (file)
@@ -17,6 +17,7 @@
 
 #include "test/crimson/gtest_seastar.h"
 #include "test/crimson/seastore/transaction_manager_test_state.h"
+#include "test_value.h"
 
 using namespace crimson::os::seastore::onode;
 
@@ -94,13 +95,14 @@ TEST_F(a_basic_test_t, 1_basic_sizes)
   auto hobj = make_ghobj(0, 0, 0, "n", "o", 0, 0);
   key_hobj_t key(hobj);
   auto [key_view, p_mem] = build_key_view(hobj);
-  onode_t value = {2};
+  value_config_t value;
+  value.payload_size = 8;
 #define _STAGE_T(NodeType) node_to_stage_t<typename NodeType::node_stage_t>
 #define NXT_T(StageType)  staged<typename StageType::next_param_t>
   laddr_t i_value{0};
   logger().info("\n"
     "Bytes of a key-value insertion (full-string):\n"
-    "  s-p-c, 'n'-'o', s-g => onode_t(2): typically internal 41B, leaf 35B\n"
+    "  s-p-c, 'n'-'o', s-g => value_payload(8): typically internal 43B, leaf 59B\n"
     "  InternalNode0: {} {} {}\n"
     "  InternalNode1: {} {} {}\n"
     "  InternalNode2: {} {}\n"
@@ -136,7 +138,8 @@ TEST_F(a_basic_test_t, 2_node_sizes)
   run_async([this] {
     auto nm = NodeExtentManager::create_dummy(IS_DUMMY_SYNC);
     auto t = make_transaction();
-    context_t c{*nm, *t};
+    ValueBuilderImpl<TestValue> vb;
+    context_t c{*nm, vb, *t};
     std::array<std::pair<NodeImplURef, NodeExtentMutable>, 16> nodes = {
       InternalNode0::allocate(c, false, 1u).unsafe_get0().make_pair(),
       InternalNode1::allocate(c, false, 1u).unsafe_get0().make_pair(),
@@ -170,14 +173,15 @@ struct b_dummy_tree_test_t : public seastar_test_suite_t {
   NodeExtentManagerURef moved_nm;
   TransactionRef ref_t;
   Transaction& t;
+  ValueBuilderImpl<TestValue> vb;
   context_t c;
-  Btree tree;
+  TestBtree tree;
 
   b_dummy_tree_test_t()
     : moved_nm{NodeExtentManager::create_dummy(IS_DUMMY_SYNC)},
       ref_t{make_transaction()},
       t{*ref_t},
-      c{*moved_nm, t},
+      c{*moved_nm, vb, t},
       tree{std::move(moved_nm)} {}
 
   seastar::future<> set_up_fut() override final {
@@ -202,119 +206,121 @@ TEST_F(b_dummy_tree_test_t, 3_random_insert_leaf_node)
     ASSERT_TRUE(tree.last(t).unsafe_get0().is_end());
 
     std::vector<std::tuple<ghobject_t,
-                           const onode_t*,
-                           Btree::Cursor>> insert_history;
+                           value_item_t,
+                           TestBtree::Cursor>> insert_history;
     auto f_validate_insert_new = [this, &insert_history] (
-        const ghobject_t& key, const onode_t& value) {
-      auto [cursor, success] = tree.insert(t, key, value).unsafe_get0();
+        const ghobject_t& key, const value_item_t& value) {
+      auto [cursor, success] = tree.insert(
+          t, key, value.get_config()).unsafe_get0();
       ceph_assert(success);
-      insert_history.emplace_back(key, &value, cursor);
-      Onodes::validate_cursor(cursor, key, value);
+      ceph_assert(cursor.get_ghobj() == key);
+      Values::initialize_cursor(t, cursor, value);
+      insert_history.emplace_back(key, value, cursor);
       auto cursor_ = tree.lower_bound(t, key).unsafe_get0();
-      ceph_assert(cursor_.get_ghobj() == key);
       ceph_assert(cursor_.value() == cursor.value());
+      Values::validate_cursor(cursor_, key, value);
       return cursor.value();
     };
-    auto onodes = Onodes(15);
+    auto values = Values(15);
 
-    // insert key1, onode1 at STAGE_LEFT
+    // insert key1, value1 at STAGE_LEFT
     auto key1 = make_ghobj(3, 3, 3, "ns3", "oid3", 3, 3);
-    auto& onode1 = onodes.pick();
-    auto p_value1 = f_validate_insert_new(key1, onode1);
+    auto value1 = values.pick();
+    auto test_value1 = f_validate_insert_new(key1, value1);
 
     // validate lookup
     {
       auto cursor1_s = tree.lower_bound(t, key_s).unsafe_get0();
       ASSERT_EQ(cursor1_s.get_ghobj(), key1);
-      ASSERT_EQ(cursor1_s.value(), p_value1);
+      ASSERT_EQ(cursor1_s.value(), test_value1);
       auto cursor1_e = tree.lower_bound(t, key_e).unsafe_get0();
       ASSERT_TRUE(cursor1_e.is_end());
     }
 
-    // insert the same key1 with a different onode
+    // insert the same key1 with a different value
     {
-      auto& onode1_dup = onodes.pick();
+      auto value1_dup = values.pick();
       auto [cursor1_dup, ret1_dup] = tree.insert(
-          t, key1, onode1_dup).unsafe_get0();
+          t, key1, value1_dup.get_config()).unsafe_get0();
       ASSERT_FALSE(ret1_dup);
-      Onodes::validate_cursor(cursor1_dup, key1, onode1);
+      Values::validate_cursor(cursor1_dup, key1, value1);
     }
 
-    // insert key2, onode2 to key1's left at STAGE_LEFT
+    // insert key2, value2 to key1's left at STAGE_LEFT
     // insert node front at STAGE_LEFT
     auto key2 = make_ghobj(2, 2, 2, "ns3", "oid3", 3, 3);
-    auto& onode2 = onodes.pick();
-    f_validate_insert_new(key2, onode2);
+    auto value2 = values.pick();
+    f_validate_insert_new(key2, value2);
 
-    // insert key3, onode3 to key1's right at STAGE_LEFT
+    // insert key3, value3 to key1's right at STAGE_LEFT
     // insert node last at STAGE_LEFT
     auto key3 = make_ghobj(4, 4, 4, "ns3", "oid3", 3, 3);
-    auto& onode3 = onodes.pick();
-    f_validate_insert_new(key3, onode3);
+    auto value3 = values.pick();
+    f_validate_insert_new(key3, value3);
 
-    // insert key4, onode4 to key1's left at STAGE_STRING (collision)
+    // insert key4, value4 to key1's left at STAGE_STRING (collision)
     auto key4 = make_ghobj(3, 3, 3, "ns2", "oid2", 3, 3);
-    auto& onode4 = onodes.pick();
-    f_validate_insert_new(key4, onode4);
+    auto value4 = values.pick();
+    f_validate_insert_new(key4, value4);
 
-    // insert key5, onode5 to key1's right at STAGE_STRING (collision)
+    // insert key5, value5 to key1's right at STAGE_STRING (collision)
     auto key5 = make_ghobj(3, 3, 3, "ns4", "oid4", 3, 3);
-    auto& onode5 = onodes.pick();
-    f_validate_insert_new(key5, onode5);
+    auto value5 = values.pick();
+    f_validate_insert_new(key5, value5);
 
-    // insert key6, onode6 to key1's left at STAGE_RIGHT
+    // insert key6, value6 to key1's left at STAGE_RIGHT
     auto key6 = make_ghobj(3, 3, 3, "ns3", "oid3", 2, 2);
-    auto& onode6 = onodes.pick();
-    f_validate_insert_new(key6, onode6);
+    auto value6 = values.pick();
+    f_validate_insert_new(key6, value6);
 
-    // insert key7, onode7 to key1's right at STAGE_RIGHT
+    // insert key7, value7 to key1's right at STAGE_RIGHT
     auto key7 = make_ghobj(3, 3, 3, "ns3", "oid3", 4, 4);
-    auto& onode7 = onodes.pick();
-    f_validate_insert_new(key7, onode7);
+    auto value7 = values.pick();
+    f_validate_insert_new(key7, value7);
 
     // insert node front at STAGE_RIGHT
     auto key8 = make_ghobj(2, 2, 2, "ns3", "oid3", 2, 2);
-    auto& onode8 = onodes.pick();
-    f_validate_insert_new(key8, onode8);
+    auto value8 = values.pick();
+    f_validate_insert_new(key8, value8);
 
     // insert node front at STAGE_STRING (collision)
     auto key9 = make_ghobj(2, 2, 2, "ns2", "oid2", 3, 3);
-    auto& onode9 = onodes.pick();
-    f_validate_insert_new(key9, onode9);
+    auto value9 = values.pick();
+    f_validate_insert_new(key9, value9);
 
     // insert node last at STAGE_RIGHT
     auto key10 = make_ghobj(4, 4, 4, "ns3", "oid3", 4, 4);
-    auto& onode10 = onodes.pick();
-    f_validate_insert_new(key10, onode10);
+    auto value10 = values.pick();
+    f_validate_insert_new(key10, value10);
 
     // insert node last at STAGE_STRING (collision)
     auto key11 = make_ghobj(4, 4, 4, "ns4", "oid4", 3, 3);
-    auto& onode11 = onodes.pick();
-    f_validate_insert_new(key11, onode11);
+    auto value11 = values.pick();
+    f_validate_insert_new(key11, value11);
 
     // insert key, value randomly until a perfect 3-ary tree is formed
-    std::vector<std::pair<ghobject_t, const onode_t*>> kvs{
-      {make_ghobj(2, 2, 2, "ns2", "oid2", 2, 2), &onodes.pick()},
-      {make_ghobj(2, 2, 2, "ns2", "oid2", 4, 4), &onodes.pick()},
-      {make_ghobj(2, 2, 2, "ns3", "oid3", 4, 4), &onodes.pick()},
-      {make_ghobj(2, 2, 2, "ns4", "oid4", 2, 2), &onodes.pick()},
-      {make_ghobj(2, 2, 2, "ns4", "oid4", 3, 3), &onodes.pick()},
-      {make_ghobj(2, 2, 2, "ns4", "oid4", 4, 4), &onodes.pick()},
-      {make_ghobj(3, 3, 3, "ns2", "oid2", 2, 2), &onodes.pick()},
-      {make_ghobj(3, 3, 3, "ns2", "oid2", 4, 4), &onodes.pick()},
-      {make_ghobj(3, 3, 3, "ns4", "oid4", 2, 2), &onodes.pick()},
-      {make_ghobj(3, 3, 3, "ns4", "oid4", 4, 4), &onodes.pick()},
-      {make_ghobj(4, 4, 4, "ns2", "oid2", 2, 2), &onodes.pick()},
-      {make_ghobj(4, 4, 4, "ns2", "oid2", 3, 3), &onodes.pick()},
-      {make_ghobj(4, 4, 4, "ns2", "oid2", 4, 4), &onodes.pick()},
-      {make_ghobj(4, 4, 4, "ns3", "oid3", 2, 2), &onodes.pick()},
-      {make_ghobj(4, 4, 4, "ns4", "oid4", 2, 2), &onodes.pick()},
-      {make_ghobj(4, 4, 4, "ns4", "oid4", 4, 4), &onodes.pick()}};
+    std::vector<std::pair<ghobject_t, value_item_t>> kvs{
+      {make_ghobj(2, 2, 2, "ns2", "oid2", 2, 2), values.pick()},
+      {make_ghobj(2, 2, 2, "ns2", "oid2", 4, 4), values.pick()},
+      {make_ghobj(2, 2, 2, "ns3", "oid3", 4, 4), values.pick()},
+      {make_ghobj(2, 2, 2, "ns4", "oid4", 2, 2), values.pick()},
+      {make_ghobj(2, 2, 2, "ns4", "oid4", 3, 3), values.pick()},
+      {make_ghobj(2, 2, 2, "ns4", "oid4", 4, 4), values.pick()},
+      {make_ghobj(3, 3, 3, "ns2", "oid2", 2, 2), values.pick()},
+      {make_ghobj(3, 3, 3, "ns2", "oid2", 4, 4), values.pick()},
+      {make_ghobj(3, 3, 3, "ns4", "oid4", 2, 2), values.pick()},
+      {make_ghobj(3, 3, 3, "ns4", "oid4", 4, 4), values.pick()},
+      {make_ghobj(4, 4, 4, "ns2", "oid2", 2, 2), values.pick()},
+      {make_ghobj(4, 4, 4, "ns2", "oid2", 3, 3), values.pick()},
+      {make_ghobj(4, 4, 4, "ns2", "oid2", 4, 4), values.pick()},
+      {make_ghobj(4, 4, 4, "ns3", "oid3", 2, 2), values.pick()},
+      {make_ghobj(4, 4, 4, "ns4", "oid4", 2, 2), values.pick()},
+      {make_ghobj(4, 4, 4, "ns4", "oid4", 4, 4), values.pick()}};
     auto [smallest_key, smallest_value] = kvs[0];
     auto [largest_key, largest_value] = kvs[kvs.size() - 1];
     std::random_shuffle(kvs.begin(), kvs.end());
     std::for_each(kvs.begin(), kvs.end(), [&f_validate_insert_new] (auto& kv) {
-      f_validate_insert_new(kv.first, *kv.second);
+      f_validate_insert_new(kv.first, kv.second);
     });
     ASSERT_EQ(tree.height(t).unsafe_get0(), 1);
     ASSERT_FALSE(tree.test_is_clean());
@@ -322,16 +328,22 @@ TEST_F(b_dummy_tree_test_t, 3_random_insert_leaf_node)
     for (auto& [k, v, c] : insert_history) {
       // validate values in tree keep intact
       auto cursor = tree.lower_bound(t, k).unsafe_get0();
-      Onodes::validate_cursor(cursor, k, *v);
+      Values::validate_cursor(cursor, k, v);
       // validate values in cursors keep intact
-      Onodes::validate_cursor(c, k, *v);
+      Values::validate_cursor(c, k, v);
+    }
+    {
+      auto cursor = tree.lower_bound(t, key_s).unsafe_get0();
+      Values::validate_cursor(cursor, smallest_key, smallest_value);
+    }
+    {
+      auto cursor = tree.begin(t).unsafe_get0();
+      Values::validate_cursor(cursor, smallest_key, smallest_value);
+    }
+    {
+      auto cursor = tree.last(t).unsafe_get0();
+      Values::validate_cursor(cursor, largest_key, largest_value);
     }
-    Onodes::validate_cursor(
-        tree.lower_bound(t, key_s).unsafe_get0(), smallest_key, *smallest_value);
-    Onodes::validate_cursor(
-        tree.begin(t).unsafe_get0(), smallest_key, *smallest_value);
-    Onodes::validate_cursor(
-        tree.last(t).unsafe_get0(), largest_key, *largest_value);
 
     std::ostringstream oss;
     tree.dump(t, oss);
@@ -374,22 +386,22 @@ class TestTree {
     : moved_nm{NodeExtentManager::create_dummy(IS_DUMMY_SYNC)},
       ref_t{make_transaction()},
       t{*ref_t},
-      c{*moved_nm, t},
+      c{*moved_nm, vb, t},
       tree{std::move(moved_nm)},
-      onodes{0} {}
+      values{0} {}
 
   seastar::future<> build_tree(
       std::pair<unsigned, unsigned> range_2,
       std::pair<unsigned, unsigned> range_1,
       std::pair<unsigned, unsigned> range_0,
-      size_t onode_size) {
-    return seastar::async([this, range_2, range_1, range_0, onode_size] {
+      size_t value_size) {
+    return seastar::async([this, range_2, range_1, range_0, value_size] {
       tree.mkfs(t).unsafe_get0();
       //logger().info("\n---------------------------------------------"
       //              "\nbefore leaf node split:\n");
       auto keys = build_key_set(range_2, range_1, range_0);
       for (auto& key : keys) {
-        auto& value = onodes.create(onode_size);
+        auto value = values.create(value_size);
         insert_tree(key, value).get0();
       }
       ASSERT_EQ(tree.height(t).unsafe_get0(), 1);
@@ -401,7 +413,7 @@ class TestTree {
   }
 
   seastar::future<> build_tree(
-      const std::vector<ghobject_t>& keys, const std::vector<const onode_t*>& values) {
+      const std::vector<ghobject_t>& keys, const std::vector<value_item_t>& values) {
     return seastar::async([this, keys, values] {
       tree.mkfs(t).unsafe_get0();
       //logger().info("\n---------------------------------------------"
@@ -410,7 +422,7 @@ class TestTree {
       auto key_iter = keys.begin();
       auto value_iter = values.begin();
       while (key_iter != keys.end()) {
-        insert_tree(*key_iter, **value_iter).get0();
+        insert_tree(*key_iter, *value_iter).get0();
         ++key_iter;
         ++value_iter;
       }
@@ -422,18 +434,20 @@ class TestTree {
     });
   }
 
-  seastar::future<> split(const ghobject_t& key, const onode_t& value,
+  seastar::future<> split(const ghobject_t& key, const value_item_t& value,
                           const split_expectation_t& expected) {
-    return seastar::async([this, key, &value, expected] {
-      Btree tree_clone(NodeExtentManager::create_dummy(IS_DUMMY_SYNC));
+    return seastar::async([this, key, value, expected] {
+      TestBtree tree_clone(NodeExtentManager::create_dummy(IS_DUMMY_SYNC));
       auto ref_t_clone = make_transaction();
       Transaction& t_clone = *ref_t_clone;
       tree_clone.test_clone_from(t_clone, t, tree).unsafe_get0();
 
       logger().info("insert {}:", key_hobj_t(key));
-      auto [cursor, success] = tree_clone.insert(t_clone, key, value).unsafe_get0();
+      auto [cursor, success] = tree_clone.insert(
+          t_clone, key, value.get_config()).unsafe_get0();
       ASSERT_TRUE(success);
-      Onodes::validate_cursor(cursor, key, value);
+      ASSERT_EQ(cursor.get_ghobj(), key);
+      Values::initialize_cursor(t_clone, cursor, value);
 
       std::ostringstream oss;
       tree_clone.dump(t_clone, oss);
@@ -442,36 +456,39 @@ class TestTree {
 
       for (auto& [k, v, c] : insert_history) {
         auto result = tree_clone.lower_bound(t_clone, k).unsafe_get0();
-        Onodes::validate_cursor(result, k, *v);
+        Values::validate_cursor(result, k, v);
       }
       auto result = tree_clone.lower_bound(t_clone, key).unsafe_get0();
-      Onodes::validate_cursor(result, key, value);
+      Values::validate_cursor(result, key, value);
       EXPECT_TRUE(last_split.match(expected));
     });
   }
 
-  const onode_t& create_onode(size_t size) {
-    return onodes.create(size);
+  value_item_t create_value(size_t size) {
+    return values.create(size);
   }
 
  private:
-  seastar::future<> insert_tree(const ghobject_t& key, const onode_t& value) {
+  seastar::future<> insert_tree(const ghobject_t& key, const value_item_t& value) {
     return seastar::async([this, &key, &value] {
-      auto [cursor, success] = tree.insert(t, key, value).unsafe_get0();
+      auto [cursor, success] = tree.insert(
+          t, key, value.get_config()).unsafe_get0();
       ASSERT_TRUE(success);
-      Onodes::validate_cursor(cursor, key, value);
-      insert_history.emplace_back(key, &value, cursor);
+      ASSERT_EQ(cursor.get_ghobj(), key);
+      Values::initialize_cursor(t, cursor, value);
+      insert_history.emplace_back(key, value, cursor);
     });
   }
 
   NodeExtentManagerURef moved_nm;
   TransactionRef ref_t;
   Transaction& t;
+  ValueBuilderImpl<TestValue> vb;
   context_t c;
-  Btree tree;
-  Onodes onodes;
+  TestBtree tree;
+  Values values;
   std::vector<std::tuple<
-    ghobject_t, const onode_t*, Btree::Cursor>> insert_history;
+    ghobject_t, value_item_t, TestBtree::Cursor>> insert_history;
 };
 
 struct c_dummy_test_t : public seastar_test_suite_t {};
@@ -483,137 +500,137 @@ TEST_F(c_dummy_test_t, 4_split_leaf_node)
       TestTree test;
       test.build_tree({2, 5}, {2, 5}, {2, 5}, 120).get0();
 
-      auto& onode = test.create_onode(1144);
+      auto value = test.create_value(1144);
       logger().info("\n---------------------------------------------"
                     "\nsplit at stage 2; insert to left front at stage 2, 1, 0\n");
-      test.split(make_ghobj(1, 1, 1, "ns3", "oid3", 3, 3), onode,
+      test.split(make_ghobj(1, 1, 1, "ns3", "oid3", 3, 3), value,
                  {2u, 2u, true, InsertType::BEGIN}).get0();
-      test.split(make_ghobj(2, 2, 2, "ns1", "oid1", 3, 3), onode,
+      test.split(make_ghobj(2, 2, 2, "ns1", "oid1", 3, 3), value,
                  {2u, 1u, true, InsertType::BEGIN}).get0();
-      test.split(make_ghobj(2, 2, 2, "ns2", "oid2", 1, 1), onode,
+      test.split(make_ghobj(2, 2, 2, "ns2", "oid2", 1, 1), value,
                  {2u, 0u, true, InsertType::BEGIN}).get0();
 
       logger().info("\n---------------------------------------------"
                     "\nsplit at stage 2; insert to left back at stage 0, 1, 2, 1, 0\n");
-      test.split(make_ghobj(2, 2, 2, "ns4", "oid4", 5, 5), onode,
+      test.split(make_ghobj(2, 2, 2, "ns4", "oid4", 5, 5), value,
                  {2u, 0u, true, InsertType::LAST}).get0();
-      test.split(make_ghobj(2, 2, 2, "ns5", "oid5", 3, 3), onode,
+      test.split(make_ghobj(2, 2, 2, "ns5", "oid5", 3, 3), value,
                  {2u, 1u, true, InsertType::LAST}).get0();
-      test.split(make_ghobj(2, 3, 3, "ns3", "oid3", 3, 3), onode,
+      test.split(make_ghobj(2, 3, 3, "ns3", "oid3", 3, 3), value,
                  {2u, 2u, true, InsertType::LAST}).get0();
-      test.split(make_ghobj(3, 3, 3, "ns1", "oid1", 3, 3), onode,
+      test.split(make_ghobj(3, 3, 3, "ns1", "oid1", 3, 3), value,
                  {2u, 1u, true, InsertType::LAST}).get0();
-      test.split(make_ghobj(3, 3, 3, "ns2", "oid2", 1, 1), onode,
+      test.split(make_ghobj(3, 3, 3, "ns2", "oid2", 1, 1), value,
                  {2u, 0u, true, InsertType::LAST}).get0();
 
-      auto& onode0 = test.create_onode(1416);
+      auto value0 = test.create_value(1416);
       logger().info("\n---------------------------------------------"
                     "\nsplit at stage 2; insert to right front at stage 0, 1, 2, 1, 0\n");
-      test.split(make_ghobj(3, 3, 3, "ns4", "oid4", 5, 5), onode0,
+      test.split(make_ghobj(3, 3, 3, "ns4", "oid4", 5, 5), value0,
                  {2u, 0u, false, InsertType::BEGIN}).get0();
-      test.split(make_ghobj(3, 3, 3, "ns5", "oid5", 3, 3), onode0,
+      test.split(make_ghobj(3, 3, 3, "ns5", "oid5", 3, 3), value0,
                  {2u, 1u, false, InsertType::BEGIN}).get0();
-      test.split(make_ghobj(3, 4, 4, "ns3", "oid3", 3, 3), onode0,
+      test.split(make_ghobj(3, 4, 4, "ns3", "oid3", 3, 3), value0,
                  {2u, 2u, false, InsertType::BEGIN}).get0();
-      test.split(make_ghobj(4, 4, 4, "ns1", "oid1", 3, 3), onode0,
+      test.split(make_ghobj(4, 4, 4, "ns1", "oid1", 3, 3), value0,
                  {2u, 1u, false, InsertType::BEGIN}).get0();
-      test.split(make_ghobj(4, 4, 4, "ns2", "oid2", 1, 1), onode0,
+      test.split(make_ghobj(4, 4, 4, "ns2", "oid2", 1, 1), value0,
                  {2u, 0u, false, InsertType::BEGIN}).get0();
 
       logger().info("\n---------------------------------------------"
                     "\nsplit at stage 2; insert to right back at stage 0, 1, 2\n");
-      test.split(make_ghobj(4, 4, 4, "ns4", "oid4", 5, 5), onode0,
+      test.split(make_ghobj(4, 4, 4, "ns4", "oid4", 5, 5), value0,
                  {2u, 0u, false, InsertType::LAST}).get0();
-      test.split(make_ghobj(4, 4, 4, "ns5", "oid5", 3, 3), onode0,
+      test.split(make_ghobj(4, 4, 4, "ns5", "oid5", 3, 3), value0,
                  {2u, 1u, false, InsertType::LAST}).get0();
-      test.split(make_ghobj(5, 5, 5, "ns3", "oid3", 3, 3), onode0,
+      test.split(make_ghobj(5, 5, 5, "ns3", "oid3", 3, 3), value0,
                  {2u, 2u, false, InsertType::LAST}).get0();
 
-      auto& onode1 = test.create_onode(316);
+      auto value1 = test.create_value(316);
       logger().info("\n---------------------------------------------"
                     "\nsplit at stage 1; insert to left middle at stage 0, 1, 2, 1, 0\n");
-      test.split(make_ghobj(2, 2, 2, "ns4", "oid4", 5, 5), onode1,
+      test.split(make_ghobj(2, 2, 2, "ns4", "oid4", 5, 5), value1,
                  {1u, 0u, true, InsertType::MID}).get0();
-      test.split(make_ghobj(2, 2, 2, "ns5", "oid5", 3, 3), onode1,
+      test.split(make_ghobj(2, 2, 2, "ns5", "oid5", 3, 3), value1,
                  {1u, 1u, true, InsertType::MID}).get0();
-      test.split(make_ghobj(2, 2, 3, "ns3", "oid3", 3, 3), onode1,
+      test.split(make_ghobj(2, 2, 3, "ns3", "oid3", 3, 3), value1,
                  {1u, 2u, true, InsertType::MID}).get0();
-      test.split(make_ghobj(3, 3, 3, "ns1", "oid1", 3, 3), onode1,
+      test.split(make_ghobj(3, 3, 3, "ns1", "oid1", 3, 3), value1,
                  {1u, 1u, true, InsertType::MID}).get0();
-      test.split(make_ghobj(3, 3, 3, "ns2", "oid2", 1, 1), onode1,
+      test.split(make_ghobj(3, 3, 3, "ns2", "oid2", 1, 1), value1,
                  {1u, 0u, true, InsertType::MID}).get0();
 
       logger().info("\n---------------------------------------------"
                     "\nsplit at stage 1; insert to left back at stage 0, 1, 0\n");
-      test.split(make_ghobj(3, 3, 3, "ns2", "oid2", 5, 5), onode1,
+      test.split(make_ghobj(3, 3, 3, "ns2", "oid2", 5, 5), value1,
                  {1u, 0u, true, InsertType::LAST}).get0();
-      test.split(make_ghobj(3, 3, 3, "ns2", "oid3", 3, 3), onode1,
+      test.split(make_ghobj(3, 3, 3, "ns2", "oid3", 3, 3), value1,
                  {1u, 1u, true, InsertType::LAST}).get0();
-      test.split(make_ghobj(3, 3, 3, "ns3", "oid3", 1, 1), onode1,
+      test.split(make_ghobj(3, 3, 3, "ns3", "oid3", 1, 1), value1,
                  {1u, 0u, true, InsertType::LAST}).get0();
 
-      auto& onode2 = test.create_onode(452);
+      auto value2 = test.create_value(452);
       logger().info("\n---------------------------------------------"
                     "\nsplit at stage 1; insert to right front at stage 0, 1, 0\n");
-      test.split(make_ghobj(3, 3, 3, "ns3", "oid3", 5, 5), onode2,
+      test.split(make_ghobj(3, 3, 3, "ns3", "oid3", 5, 5), value2,
                  {1u, 0u, false, InsertType::BEGIN}).get0();
-      test.split(make_ghobj(3, 3, 3, "ns3", "oid4", 3, 3), onode2,
+      test.split(make_ghobj(3, 3, 3, "ns3", "oid4", 3, 3), value2,
                  {1u, 1u, false, InsertType::BEGIN}).get0();
-      test.split(make_ghobj(3, 3, 3, "ns4", "oid4", 1, 1), onode2,
+      test.split(make_ghobj(3, 3, 3, "ns4", "oid4", 1, 1), value2,
                  {1u, 0u, false, InsertType::BEGIN}).get0();
 
       logger().info("\n---------------------------------------------"
                     "\nsplit at stage 1; insert to right middle at stage 0, 1, 2, 1, 0\n");
-      test.split(make_ghobj(3, 3, 3, "ns4", "oid4", 5, 5), onode2,
+      test.split(make_ghobj(3, 3, 3, "ns4", "oid4", 5, 5), value2,
                  {1u, 0u, false, InsertType::MID}).get0();
-      test.split(make_ghobj(3, 3, 3, "ns5", "oid5", 3, 3), onode2,
+      test.split(make_ghobj(3, 3, 3, "ns5", "oid5", 3, 3), value2,
                  {1u, 1u, false, InsertType::MID}).get0();
-      test.split(make_ghobj(3, 3, 4, "ns3", "oid3", 3, 3), onode2,
+      test.split(make_ghobj(3, 3, 4, "ns3", "oid3", 3, 3), value2,
                  {1u, 2u, false, InsertType::MID}).get0();
-      test.split(make_ghobj(4, 4, 4, "ns1", "oid1", 3, 3), onode2,
+      test.split(make_ghobj(4, 4, 4, "ns1", "oid1", 3, 3), value2,
                  {1u, 1u, false, InsertType::MID}).get0();
-      test.split(make_ghobj(4, 4, 4, "ns2", "oid2", 1, 1), onode2,
+      test.split(make_ghobj(4, 4, 4, "ns2", "oid2", 1, 1), value2,
                  {1u, 0u, false, InsertType::MID}).get0();
 
-      auto& onode3 = test.create_onode(834);
+      auto value3 = test.create_value(834);
       logger().info("\n---------------------------------------------"
                     "\nsplit at stage 0; insert to right middle at stage 0, 1, 2, 1, 0\n");
-      test.split(make_ghobj(3, 3, 3, "ns4", "oid4", 5, 5), onode3,
+      test.split(make_ghobj(3, 3, 3, "ns4", "oid4", 5, 5), value3,
                  {0u, 0u, false, InsertType::MID}).get0();
-      test.split(make_ghobj(3, 3, 3, "ns5", "oid5", 3, 3), onode3,
+      test.split(make_ghobj(3, 3, 3, "ns5", "oid5", 3, 3), value3,
                  {0u, 1u, false, InsertType::MID}).get0();
-      test.split(make_ghobj(3, 3, 4, "ns3", "oid3", 3, 3), onode3,
+      test.split(make_ghobj(3, 3, 4, "ns3", "oid3", 3, 3), value3,
                  {0u, 2u, false, InsertType::MID}).get0();
-      test.split(make_ghobj(4, 4, 4, "ns1", "oid1", 3, 3), onode3,
+      test.split(make_ghobj(4, 4, 4, "ns1", "oid1", 3, 3), value3,
                  {0u, 1u, false, InsertType::MID}).get0();
-      test.split(make_ghobj(4, 4, 4, "ns2", "oid2", 1, 1), onode3,
+      test.split(make_ghobj(4, 4, 4, "ns2", "oid2", 1, 1), value3,
                  {0u, 0u, false, InsertType::MID}).get0();
 
       logger().info("\n---------------------------------------------"
                     "\nsplit at stage 0; insert to right front at stage 0\n");
-      test.split(make_ghobj(3, 3, 3, "ns4", "oid4", 2, 3), onode3,
+      test.split(make_ghobj(3, 3, 3, "ns4", "oid4", 2, 3), value3,
                  {0u, 0u, false, InsertType::BEGIN}).get0();
 
-      auto& onode4 = test.create_onode(572);
+      auto value4 = test.create_value(572);
       logger().info("\n---------------------------------------------"
                     "\nsplit at stage 0; insert to left back at stage 0\n");
-      test.split(make_ghobj(3, 3, 3, "ns2", "oid2", 3, 4), onode4,
+      test.split(make_ghobj(3, 3, 3, "ns2", "oid2", 3, 4), value4,
                  {0u, 0u, true, InsertType::LAST}).get0();
     }
 
     {
       TestTree test;
       test.build_tree({2, 4}, {2, 4}, {2, 4}, 232).get0();
-      auto& onode = test.create_onode(1996);
+      auto value = test.create_value(1996);
       logger().info("\n---------------------------------------------"
                     "\nsplit at [0, 0, 0]; insert to left front at stage 2, 1, 0\n");
-      test.split(make_ghobj(1, 1, 1, "ns3", "oid3", 3, 3), onode,
+      test.split(make_ghobj(1, 1, 1, "ns3", "oid3", 3, 3), value,
                  {2u, 2u, true, InsertType::BEGIN}).get0();
       EXPECT_TRUE(last_split.match_split_pos({0, {0, {0}}}));
-      test.split(make_ghobj(2, 2, 2, "ns1", "oid1", 3, 3), onode,
+      test.split(make_ghobj(2, 2, 2, "ns1", "oid1", 3, 3), value,
                  {2u, 1u, true, InsertType::BEGIN}).get0();
       EXPECT_TRUE(last_split.match_split_pos({0, {0, {0}}}));
-      test.split(make_ghobj(2, 2, 2, "ns2", "oid2", 1, 1), onode,
+      test.split(make_ghobj(2, 2, 2, "ns2", "oid2", 1, 1), value,
                  {2u, 0u, true, InsertType::BEGIN}).get0();
       EXPECT_TRUE(last_split.match_split_pos({0, {0, {0}}}));
     }
@@ -623,20 +640,20 @@ TEST_F(c_dummy_test_t, 4_split_leaf_node)
       std::vector<ghobject_t> keys = {
         make_ghobj(2, 2, 2, "ns3", "oid3", 3, 3),
         make_ghobj(3, 3, 3, "ns3", "oid3", 3, 3)};
-      std::vector<const onode_t*> values = {
-        &test.create_onode(1360),
-        &test.create_onode(1632)};
+      std::vector<value_item_t> values = {
+        test.create_value(1360),
+        test.create_value(1632)};
       test.build_tree(keys, values).get0();
-      auto& onode = test.create_onode(1640);
+      auto value = test.create_value(1640);
       logger().info("\n---------------------------------------------"
                     "\nsplit at [END, END, END]; insert to right at stage 0, 1, 2\n");
-      test.split(make_ghobj(3, 3, 3, "ns3", "oid3", 4, 4), onode,
+      test.split(make_ghobj(3, 3, 3, "ns3", "oid3", 4, 4), value,
                  {0u, 0u, false, InsertType::BEGIN}).get0();
       EXPECT_TRUE(last_split.match_split_pos({1, {0, {1}}}));
-      test.split(make_ghobj(3, 3, 3, "ns4", "oid4", 3, 3), onode,
+      test.split(make_ghobj(3, 3, 3, "ns4", "oid4", 3, 3), value,
                  {1u, 1u, false, InsertType::BEGIN}).get0();
       EXPECT_TRUE(last_split.match_split_pos({1, {1, {0}}}));
-      test.split(make_ghobj(4, 4, 4, "ns3", "oid3", 3, 3), onode,
+      test.split(make_ghobj(4, 4, 4, "ns3", "oid3", 3, 3), value,
                  {2u, 2u, false, InsertType::BEGIN}).get0();
       EXPECT_TRUE(last_split.match_split_pos({2, {0, {0}}}));
     }
@@ -922,14 +939,15 @@ class DummyChildPool {
 
   context_t get_context() {
     ceph_assert(p_nm != nullptr);
-    return {*p_nm, t()};
+    return {*p_nm, vb, t()};
   }
 
   Transaction& t() const { return *ref_t; }
 
   std::set<Ref<DummyChild>> tracked_children;
-  std::optional<Btree> p_btree;
+  std::optional<TestBtree> p_btree;
   NodeExtentManager* p_nm = nullptr;
+  ValueBuilderImpl<TestValue> vb;
   TransactionRef ref_t = make_transaction();
 
   std::random_device rd;
diff --git a/src/test/crimson/seastore/onode_tree/test_value.h b/src/test/crimson/seastore/onode_tree/test_value.h
new file mode 100644 (file)
index 0000000..35ec25f
--- /dev/null
@@ -0,0 +1,143 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "crimson/common/log.h"
+#include "crimson/os/seastore/onode_manager/staged-fltree/value.h"
+
+namespace crimson::os::seastore::onode {
+
+class TestValue final : public Value {
+ public:
+  static constexpr auto HEADER_MAGIC = value_magic_t::TEST;
+  using id_t = uint16_t;
+  using magic_t = uint32_t;
+  struct magic_packed_t {
+    magic_t value;
+  } __attribute__((packed));
+
+ private:
+  struct payload_t {
+    id_t id;
+  } __attribute__((packed));
+
+  struct Replayable {
+    static void set_id(NodeExtentMutable& payload_mut, id_t id) {
+      auto p_payload = get_write(payload_mut);
+      p_payload->id = id;
+    }
+
+    static void set_tail_magic(NodeExtentMutable& payload_mut, magic_t magic) {
+      auto length = payload_mut.get_length();
+      auto offset_magic = length - sizeof(magic_t);
+      payload_mut.copy_in_relative(offset_magic, magic);
+    }
+
+   private:
+    static payload_t* get_write(NodeExtentMutable& payload_mut) {
+      return reinterpret_cast<payload_t*>(payload_mut.get_write());
+    }
+  };
+
+ public:
+  class Recorder final : public ValueDeltaRecorder {
+    enum class delta_op_t : uint8_t {
+      UPDATE_ID,
+      UPDATE_TAIL_MAGIC,
+    };
+
+   public:
+    Recorder(ceph::bufferlist& encoded)
+      : ValueDeltaRecorder(encoded) {}
+    ~Recorder() override = default;
+
+    void encode_set_id(NodeExtentMutable& payload_mut, id_t id) {
+      auto& encoded = get_encoded(payload_mut);
+      ceph::encode(delta_op_t::UPDATE_ID, encoded);
+      ceph::encode(id, encoded);
+    }
+
+    void encode_set_tail_magic(NodeExtentMutable& payload_mut, magic_t magic) {
+      auto& encoded = get_encoded(payload_mut);
+      ceph::encode(delta_op_t::UPDATE_TAIL_MAGIC, encoded);
+      ceph::encode(magic, encoded);
+    }
+
+   protected:
+    value_magic_t get_header_magic() const override {
+      return HEADER_MAGIC;
+    }
+
+    void apply_value_delta(ceph::bufferlist::const_iterator& delta,
+                           NodeExtentMutable& payload_mut,
+                           laddr_t value_addr) override {
+      delta_op_t op;
+      try {
+        ceph::decode(op, delta);
+        switch (op) {
+        case delta_op_t::UPDATE_ID: {
+          logger().debug("OTree::TestValue::Replay: decoding UPDATE_ID ...");
+          id_t id;
+          ceph::decode(id, delta);
+          logger().debug("OTree::TestValue::Replay: apply id={} ...", id);
+          Replayable::set_id(payload_mut, id);
+          break;
+        }
+        case delta_op_t::UPDATE_TAIL_MAGIC: {
+          logger().debug("OTree::TestValue::Replay: decoding UPDATE_TAIL_MAGIC ...");
+          magic_t magic;
+          ceph::decode(magic, delta);
+          logger().debug("OTree::TestValue::Replay: apply magic={} ...", magic);
+          Replayable::set_tail_magic(payload_mut, magic);
+          break;
+        }
+        default:
+          logger().error("OTree::TestValue::Replay: got unknown op {} when replay {:#x}+{:#x}",
+                         op, value_addr, payload_mut.get_length());
+          ceph_abort();
+        }
+      } catch (buffer::error& e) {
+        logger().error("OTree::TestValue::Replay: got decode error {} when replay {:#x}+{:#x}",
+                       e, value_addr, payload_mut.get_length());
+        ceph_abort();
+      }
+    }
+
+   private:
+    seastar::logger& logger() {
+      return crimson::get_logger(ceph_subsys_test);
+    }
+  };
+
+  TestValue(NodeExtentManager& nm, const ValueBuilder& vb, Ref<tree_cursor_t>& p_cursor)
+    : Value(nm, vb, p_cursor) {}
+  ~TestValue() override = default;
+
+  id_t get_id() const {
+    return read_payload<payload_t>()->id;
+  }
+  void set_id_replayable(Transaction& t, id_t id) {
+    auto value_mutable = prepare_mutate_payload<payload_t, Recorder>(t);
+    if (value_mutable.second) {
+      value_mutable.second->encode_set_id(value_mutable.first, id);
+    }
+    Replayable::set_id(value_mutable.first, id);
+  }
+
+  magic_t get_tail_magic() const {
+    auto p_payload = read_payload<payload_t>();
+    auto offset_magic = get_payload_size() - sizeof(magic_t);
+    auto p_magic = reinterpret_cast<const char*>(p_payload) + offset_magic;
+    return reinterpret_cast<const magic_packed_t*>(p_magic)->value;
+  }
+  void set_tail_magic_replayable(Transaction& t, magic_t magic) {
+    auto value_mutable = prepare_mutate_payload<payload_t, Recorder>(t);
+    if (value_mutable.second) {
+      value_mutable.second->encode_set_tail_magic(value_mutable.first, magic);
+    }
+    Replayable::set_tail_magic(value_mutable.first, magic);
+  }
+};
+
+}