]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson: add node implementation for onode tree
authorKefu Chai <kchai@redhat.com>
Fri, 15 May 2020 16:38:41 +0000 (00:38 +0800)
committerKefu Chai <kchai@redhat.com>
Tue, 9 Jun 2020 01:39:47 +0000 (09:39 +0800)
* node_t and friends are representations of in-mem and on-disk onode
  block/page/node. it might have different names in different contexts.
  but normally, they are the same thing and are loaded as a logical
  cached extent.
* delta_t holds the mutations against a given node onode. they are
  applied to memory once they are committed to disk as part of the
  log record.

regarding to the name of "simple-fltree", it is a short for simple
Flexible Layout Fixed Tree, which was designed by Samuel Just. My
implementation is a naive implementation of this design. But it
tries to be extensive and should be able to adapt to different policies
for rebalancing nodes when overflow/underflow. But there could be
other variant of flexible layout tree implementations in future,
so i added "simple" prefix.

Signed-off-by: Kefu Chai <kchai@redhat.com>
src/crimson/os/seastore/CMakeLists.txt
src/crimson/os/seastore/onode_manager/simple-fltree/onode_delta.cc [new file with mode: 0644]
src/crimson/os/seastore/onode_manager/simple-fltree/onode_delta.h [new file with mode: 0644]
src/crimson/os/seastore/onode_manager/simple-fltree/onode_node.cc [new file with mode: 0644]
src/crimson/os/seastore/onode_manager/simple-fltree/onode_node.h [new file with mode: 0644]
src/test/crimson/seastore/CMakeLists.txt
src/test/crimson/seastore/onode_tree/CMakeLists.txt [new file with mode: 0644]
src/test/crimson/seastore/onode_tree/test_node.cc [new file with mode: 0644]

index 75e3aaddbf10f24192aacc4b77edcfec171114ee..74a7729bee0fd581441b99ec9e6eb709958361b8 100644 (file)
@@ -10,6 +10,8 @@ add_library(crimson-seastore
   lba_manager/btree/btree_lba_manager.cc
   lba_manager/btree/lba_btree_node_impl.cc
   onode.cc
+  onode_manager/simple-fltree/onode_delta.cc
+  onode_manager/simple-fltree/onode_node.cc
   seastore.cc
   root_block.cc
        )
diff --git a/src/crimson/os/seastore/onode_manager/simple-fltree/onode_delta.cc b/src/crimson/os/seastore/onode_manager/simple-fltree/onode_delta.cc
new file mode 100644 (file)
index 0000000..869685d
--- /dev/null
@@ -0,0 +1,188 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "onode_delta.h"
+
+delta_t::delta_t(delta_t&& delta)
+{
+  assert(op == op_t::nop);
+  op = delta.op;
+  n = delta.n;
+  oid = std::move(delta.oid);
+  onode = std::move(delta.onode);
+  keys = std::move(delta.keys);
+  cells = std::move(delta.cells);
+  delta.op = op_t::nop;
+}
+
+delta_t& delta_t::operator=(delta_t&& delta)
+{
+  assert(op == op_t::nop);
+  op = delta.op;
+  n = delta.n;
+  oid = std::move(delta.oid);
+  onode = std::move(delta.onode);
+  keys = std::move(delta.keys);
+  cells = std::move(delta.cells);
+  delta.op = op_t::nop;
+  return *this;
+}
+
+delta_t delta_t::nop()
+{
+  return delta_t{op_t::nop};
+}
+
+delta_t delta_t::insert_onode(unsigned slot, const ghobject_t& oid, OnodeRef onode)
+{
+  delta_t delta{op_t::insert_onode};
+  delta.n = slot;
+  delta.oid = oid;
+  delta.onode = onode;
+  return delta;
+}
+
+delta_t delta_t::update_onode(unsigned slot, const ghobject_t& oid, OnodeRef onode)
+{
+  delta_t delta{op_t::update_onode};
+  delta.n = slot;
+  delta.oid = oid;
+  delta.onode = onode;
+  return delta;
+}
+
+delta_t delta_t::insert_child(unsigned slot,
+                              const ghobject_t& oid,
+                              crimson::os::seastore::laddr_t addr)
+{
+  delta_t delta{op_t::insert_child};
+  delta.n = slot;
+  delta.oid = oid;
+  delta.addr = addr;
+  return delta;
+}
+
+delta_t delta_t::update_key(unsigned slot, const ghobject_t& oid)
+{
+  delta_t delta{op_t::update_key};
+  delta.n = slot;
+  delta.oid = oid;
+  return delta;
+}
+
+delta_t delta_t::shift_left(unsigned n)
+{
+  delta_t delta{op_t::shift_left};
+  delta.n = n;
+  return delta;
+}
+
+delta_t delta_t::trim_right(unsigned n)
+{
+  delta_t delta{op_t::trim_right};
+  delta.n = n;
+  return delta;
+}
+
+delta_t delta_t::insert_front(ceph::buffer::ptr keys,
+                              ceph::buffer::ptr cells)
+{
+  delta_t delta{op_t::insert_front};
+  delta.keys = std::move(keys);
+  delta.cells = std::move(cells);
+  return delta;
+}
+
+delta_t delta_t::insert_back(ceph::buffer::ptr keys,
+                             ceph::buffer::ptr cells)
+{
+  delta_t delta{op_t::insert_back};
+  delta.keys = std::move(keys);
+  delta.cells = std::move(cells);
+  return delta;
+}
+
+delta_t delta_t::remove_from(unsigned slot)
+{
+  delta_t delta{op_t::remove_from};
+  delta.n = slot;
+  return delta;
+}
+
+void delta_t::encode(ceph::bufferlist& bl)
+{
+  using ceph::encode;
+  switch (op) {
+  case op_t::insert_onode:
+    [[fallthrough]];
+  case op_t::update_onode:
+    // the slot # is not encoded, because we can alway figure it out
+    // when we have to replay the delta by looking the oid up in the
+    // node block
+    encode(oid, bl);
+    encode(*onode, bl);
+    break;
+  case op_t::insert_child:
+    encode(oid, bl);
+    encode(addr, bl);
+  case op_t::update_key:
+    encode(n, bl);
+    encode(oid, bl);
+    break;
+  case op_t::shift_left:
+    encode(n, bl);
+    break;
+  case op_t::trim_right:
+    encode(n, bl);
+    break;
+  case op_t::insert_front:
+    [[fallthrough]];
+  case op_t::insert_back:
+    encode(n, bl);
+    encode(keys, bl);
+    encode(cells, bl);
+    break;
+  case op_t::remove_from:
+    encode(n, bl);
+    break;
+  default:
+    assert(0 == "unknown onode op");
+  }
+}
+
+void delta_t::decode(ceph::bufferlist::const_iterator& p) {
+  using ceph::decode;
+  decode(op, p);
+  switch (op) {
+  case op_t::insert_onode:
+    [[fallthrough]];
+  case op_t::update_onode:
+    decode(oid, p);
+    decode(*onode, p);
+    break;
+  case op_t::insert_child:
+    [[fallthrough]];
+  case op_t::update_key:
+    decode(n, p);
+    decode(oid, p);
+    break;
+  case op_t::shift_left:
+    decode(n, p);
+    break;
+  case op_t::trim_right:
+    decode(n, p);
+    break;
+  case op_t::insert_front:
+    [[fallthrough]];
+  case op_t::insert_back:
+    decode(n, p);
+    decode(keys, p);
+    decode(cells, p);
+    break;
+  case op_t::remove_from:
+    decode(n, p);
+    break;
+  default:
+    assert(0 == "unknown onode op");
+  }
+}
diff --git a/src/crimson/os/seastore/onode_manager/simple-fltree/onode_delta.h b/src/crimson/os/seastore/onode_manager/simple-fltree/onode_delta.h
new file mode 100644 (file)
index 0000000..3e7e731
--- /dev/null
@@ -0,0 +1,70 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <cstdint>
+
+#include "common/hobject.h"
+#include "include/buffer_fwd.h"
+
+#include "crimson/os/seastore/onode.h"
+#include "crimson/os/seastore/seastore_types.h"
+
+using crimson::os::seastore::OnodeRef;
+
+struct delta_t {
+  enum class op_t : uint8_t {
+    nop,
+    insert_onode,
+    update_onode,
+    insert_child,
+    update_key,
+    shift_left,
+    trim_right,
+    insert_front,
+    insert_back,
+    remove_from,
+    // finer grained op?
+    //  - changing the embedded extent map of given oid
+    //  - mutating the embedded xattrs of given oid
+  } op = op_t::nop;
+
+  unsigned n = 0;
+  ghobject_t oid;
+  crimson::os::seastore::laddr_t addr = 0;
+  OnodeRef onode;
+  ceph::bufferptr keys;
+  ceph::bufferptr cells;
+
+  delta_t() = default;
+  delta_t(op_t op)
+    : op{op}
+  {}
+  delta_t(delta_t&& delta);
+  delta_t& operator=(delta_t&& delta);
+
+  static delta_t nop();
+  static delta_t insert_onode(unsigned slot, const ghobject_t& oid, OnodeRef onode);
+  static delta_t update_onode(unsigned slot, const ghobject_t& oid, OnodeRef onode);
+  static delta_t insert_child(unsigned slot, const ghobject_t& oid, crimson::os::seastore::laddr_t addr);
+  static delta_t update_key(unsigned slot, const ghobject_t& oid);
+  static delta_t shift_left(unsigned n);
+  static delta_t trim_right(unsigned n);
+  static delta_t insert_front(ceph::buffer::ptr keys,
+                              ceph::buffer::ptr cells);
+  static delta_t insert_back(ceph::buffer::ptr keys,
+                             ceph::buffer::ptr cells);
+  static delta_t remove_from(unsigned slot);
+
+  // shortcuts
+  static delta_t insert_item(unsigned slot, const ghobject_t& oid, OnodeRef onode) {
+    return insert_onode(slot, oid, onode);
+  }
+  static delta_t insert_item(unsigned slot, const ghobject_t& oid, crimson::os::seastore::laddr_t addr) {
+    return insert_child(slot, oid, addr);
+  }
+
+  void encode(ceph::bufferlist& bl);
+  void decode(ceph::bufferlist::const_iterator& p);
+};
diff --git a/src/crimson/os/seastore/onode_manager/simple-fltree/onode_node.cc b/src/crimson/os/seastore/onode_manager/simple-fltree/onode_node.cc
new file mode 100644 (file)
index 0000000..fdcaa2f
--- /dev/null
@@ -0,0 +1,567 @@
+#include "onode_node.h"
+
+template<size_t BlockSize, int N, ntype_t NodeType>
+auto node_t<BlockSize, N, NodeType>::key_at(unsigned slot) const
+  -> std::pair<const key_prefix_t&, const key_suffix_t&>
+{
+  auto& key = keys[slot];
+  if constexpr (item_in_key) {
+    return {key, key_suffix_t{}};
+  } else {
+    auto p = from_end(key.offset);
+    return {key, *reinterpret_cast<const key_suffix_t*>(p)};
+  }
+}
+
+// update an existing oid with the specified item
+template<size_t BlockSize, int N, ntype_t NodeType>
+ghobject_t
+node_t<BlockSize, N, NodeType>::get_oid_at(unsigned slot,
+                                           const ghobject_t& oid) const
+{
+  auto [prefix, suffix] = key_at(slot);
+  ghobject_t updated = oid;
+  prefix.update_oid(updated);
+  suffix.update_oid(updated);
+  return updated;
+}
+
+template<size_t BlockSize, int N, ntype_t NodeType>
+auto node_t<BlockSize, N, NodeType>::item_at(const key_prefix_t& key) const
+  -> const_item_t
+{
+  if constexpr (item_in_key) {
+    return key.child_addr;
+  } else {
+    assert(key.offset < BlockSize);
+    auto p = from_end(key.offset);
+    auto partial_key = reinterpret_cast<const key_suffix_t*>(p);
+    p += size_of(*partial_key);
+    return *reinterpret_cast<const item_t*>(p);
+  }
+}
+
+template<size_t BlockSize, int N, ntype_t NodeType>
+void node_t<BlockSize, N, NodeType>::dump(std::ostream& os) const
+{
+  for (uint16_t i = 0; i < count; i++) {
+    const auto& [prefix, suffix] = key_at(i);
+    os << " [" << i << '/' << count - 1 << "]\n"
+       << "  key1 = (" << prefix << ")\n"
+       << "  key2 = (" << suffix << ")\n";
+    const auto& item = item_at(prefix);
+    if (_is_leaf()) {
+      os << " item = " << item << "\n";
+    } else {
+      os << " child = " << std::hex << item << std::dec << "\n";
+    }
+  }
+}
+
+template<size_t BlockSize, int N, ntype_t NodeType>
+char* node_t<BlockSize, N, NodeType>::from_end(uint16_t offset)
+{
+  auto end = reinterpret_cast<char*>(this) + BlockSize;
+  return end - static_cast<int>(offset);
+}
+
+template<size_t BlockSize, int N, ntype_t NodeType>
+const char* node_t<BlockSize, N, NodeType>::from_end(uint16_t offset) const
+{
+  auto end = reinterpret_cast<const char*>(this) + BlockSize;
+  return end - static_cast<int>(offset);
+}
+
+template<size_t BlockSize, int N, ntype_t NodeType>
+uint16_t node_t<BlockSize, N, NodeType>::used_space() const
+{
+  if constexpr (item_in_key) {
+    return count * sizeof(key_prefix_t);
+  } else {
+    if (count) {
+      return keys[count - 1].offset + count * sizeof(key_prefix_t);
+    } else {
+      return 0;
+    }
+  }
+}
+
+template<size_t BlockSize, int N, ntype_t NodeType>
+uint16_t node_t<BlockSize, N, NodeType>::capacity()
+{
+  auto p = reinterpret_cast<node_t*>(0);
+  return BlockSize - (reinterpret_cast<char*>(p->keys) -
+                      reinterpret_cast<char*>(p));
+}
+
+// TODO: if it's allowed to update 2 siblings at the same time, we can have
+//       B* tree
+template<size_t BlockSize, int N, ntype_t NodeType>
+constexpr uint16_t node_t<BlockSize, N, NodeType>::min_size()
+{
+  return capacity() / 2;
+}
+
+template<size_t BlockSize, int N, ntype_t NodeType>
+constexpr std::pair<int16_t, int16_t>
+node_t<BlockSize, N, NodeType>::bytes_to_add(uint16_t size)
+{
+  assert(size < min_size());
+  return {min_size() - size, capacity() - size};
+}
+
+template<size_t BlockSize, int N, ntype_t NodeType>
+constexpr std::pair<int16_t, int16_t>
+node_t<BlockSize, N, NodeType>::bytes_to_remove(uint16_t size)
+{
+  assert(size > capacity());
+  return {size - capacity(), size - min_size()};
+}
+
+template<size_t BlockSize, int N, ntype_t NodeType>
+size_state_t node_t<BlockSize, N, NodeType>::size_state(uint16_t size) const
+{
+  if (size > capacity()) {
+    return size_state_t::overflow;
+  } else if (size < capacity() / 2) {
+    return size_state_t::underflow;
+  } else {
+    return size_state_t::okay;
+  }
+}
+
+template<size_t BlockSize, int N, ntype_t NodeType>
+bool node_t<BlockSize, N, NodeType>::is_underflow(uint16_t size) const
+{
+  switch (size_state(size)) {
+  case size_state_t::underflow:
+    return true;
+  case size_state_t::okay:
+    return false;
+  default:
+    assert(0);
+    return false;
+  }
+}
+
+template<size_t BlockSize, int N, ntype_t NodeType>
+int16_t node_t<BlockSize, N, NodeType>::size_with_key(unsigned slot,
+                                                      const ghobject_t& oid) const
+{
+  if constexpr (item_in_key) {
+    return capacity();
+  } else {
+    // the size of fixed key does not change
+    [[maybe_unused]] const auto& [prefix, suffix] = key_at(slot);
+    return capacity() + key_suffix_t::size_from(oid) - suffix.size();
+  }
+}
+
+template<size_t BlockSize, int N, ntype_t NodeType>
+ordering_t node_t<BlockSize, N, NodeType>::compare_with_slot(unsigned slot,
+                                                             const ghobject_t& oid) const
+{
+  const auto& [prefix, suffix] = key_at(slot);
+  if (auto result = prefix.compare(oid); result != ordering_t::equivalent) {
+    return result;
+  } else {
+    return suffix.compare(oid);
+  }
+}
+
+/// return the slot number of the first slot that is greater or equal to
+/// key
+template<size_t BlockSize, int N, ntype_t NodeType>
+std::pair<unsigned, bool> node_t<BlockSize, N, NodeType>::lower_bound(const ghobject_t& oid) const
+{
+  unsigned s = 0, e = count;
+  while (s != e) {
+    unsigned mid = (s + e) / 2;
+    switch (compare_with_slot(mid, oid)) {
+    case ordering_t::less:
+      s = ++mid;
+      break;
+    case ordering_t::greater:
+      e = mid;
+      break;
+    case ordering_t::equivalent:
+      assert(mid == 0 || mid < count);
+      return {mid, true};
+    }
+  }
+  return {s, false};
+}
+
+template<size_t BlockSize, int N, ntype_t NodeType>
+uint16_t node_t<BlockSize, N, NodeType>::size_of_item(const ghobject_t& oid,
+                                                      const item_t& item)
+{
+  if constexpr (item_in_key) {
+    return sizeof(key_prefix_t);
+  } else {
+    return (sizeof(key_prefix_t) +
+            key_suffix_t::size_from(oid) + size_of(item));
+  }
+}
+
+template<size_t BlockSize, int N, ntype_t NodeType>
+bool node_t<BlockSize, N, NodeType>::is_overflow(const ghobject_t& oid,
+                                                 const item_t& item) const
+{
+  return free_space() < size_of_item(oid, item);
+}
+
+template<size_t BlockSize, int N, ntype_t NodeType>
+bool node_t<BlockSize, N, NodeType>::is_overflow(const ghobject_t& oid,
+                                                 const OnodeRef& item) const
+{
+  return free_space() < (sizeof(key_prefix_t) + key_suffix_t::size_from(oid) + item->size());
+}
+
+// inserts an item into the given slot, pushing all subsequent keys forward
+// @note if the item is not embedded in key, shift the right half as well
+template<size_t BlockSize, int N, ntype_t NodeType>
+void node_t<BlockSize, N, NodeType>::insert_at(unsigned slot,
+                                               const ghobject_t& oid,
+                                               const item_t& item)
+{
+  assert(!is_overflow(oid, item));
+  assert(slot <= count);
+  if constexpr (item_in_key) {
+    // shift the keys right
+    key_prefix_t* key = keys + slot;
+    key_prefix_t* last_key = keys + count;
+    std::copy_backward(key, last_key, last_key + 1);
+    key->set(oid, item);
+  } else {
+    const uint16_t size = key_suffix_t::size_from(oid) + size_of(item);
+    uint16_t offset = size;
+    if (slot > 0) {
+      offset += keys[slot - 1].offset;
+    }
+    if (slot < count) {
+      //                                 V
+      // |         |... //    ...|//////||    |
+      // |         |... // ...|//////|   |    |
+      // shift the partial keys and items left
+      auto first = keys[slot - 1].offset;
+      auto last = keys[count - 1].offset;
+      std::memmove(from_end(last + size), from_end(last), last - first);
+      // shift the keys right and update the pointers
+      for (key_prefix_t* dst = keys + count; dst > keys + slot; dst--) {
+        key_prefix_t* src = dst - 1;
+        *dst = *src;
+        dst->offset += size;
+      }
+    }
+    keys[slot].set(oid, offset);
+    auto p = from_end(offset);
+    auto partial_key = reinterpret_cast<key_suffix_t*>(p);
+    partial_key->set(oid);
+    p += size_of(*partial_key);
+    auto item_ptr = reinterpret_cast<item_t*>(p);
+    *item_ptr = item;
+  }
+  count++;
+  assert(used_space() <= capacity());
+}
+
+// used by InnerNode for updating the keys indexing its children when their lower boundaries
+// is updated
+template<size_t BlockSize, int N, ntype_t NodeType>
+void node_t<BlockSize, N, NodeType>::update_key_at(unsigned slot, const ghobject_t& oid)
+{
+  if constexpr (is_leaf()) {
+    assert(0);
+  } else if constexpr (item_in_key) {
+    keys[slot].update(oid);
+  } else {
+    const auto& [prefix, suffix] = key_at(slot);
+    int16_t delta = key_suffix_t::size_from(oid) - suffix.size();
+    if (delta > 0) {
+      // shift the cells sitting at its left side
+      auto first = keys[slot].offset;
+      auto last = keys[count - 1].offset;
+      std::memmove(from_end(last + delta), from_end(last), last - first);
+      // update the pointers
+      for (key_prefix_t* key = keys + slot; key < keys + count; key++) {
+        key->offset += delta;
+      }
+    }
+    keys[slot].update(oid);
+    auto p = from_end(keys[slot].offset);
+    auto partial_key = reinterpret_cast<key_suffix_t*>(p);
+    partial_key->set(oid);
+    // we don't update item here
+  }
+}
+
+template<size_t BlockSize, int N, ntype_t NodeType>
+std::pair<unsigned, uint16_t>
+node_t<BlockSize, N, NodeType>::calc_grab_front(uint16_t min_grab,
+                                                uint16_t max_grab) const
+{
+  // TODO: split by likeness
+  uint16_t grabbed = 0;
+  uint16_t used = used_space();
+  int n = 0;
+  for (; n < count; n++) {
+    const auto& [prefix, suffix] = key_at(n);
+    uint16_t to_grab = sizeof(prefix) + size_of(suffix);
+    if constexpr (!item_in_key) {
+      const auto& item = item_at(prefix);
+      to_grab += size_of(item);
+    }
+    if (grabbed + to_grab > max_grab) {
+      break;
+    }
+    grabbed += to_grab;
+  }
+  if (grabbed >= min_grab) {
+    if (n == count) {
+      return {n, grabbed};
+    } else if (!is_underflow(used - grabbed)) {
+      return {n, grabbed};
+    }
+  }
+  return {0, 0};
+}
+
+template<size_t BlockSize, int N, ntype_t NodeType>
+std::pair<unsigned, uint16_t>
+node_t<BlockSize, N, NodeType>::calc_grab_back(uint16_t min_grab,
+                                               uint16_t max_grab) const
+{
+  // TODO: split by likeness
+  uint16_t grabbed = 0;
+  uint16_t used = used_space();
+  for (int i = count - 1; i >= 0; i--) {
+    const auto& [prefix, suffix] = key_at(i);
+    uint16_t to_grab = sizeof(prefix) + size_of(suffix);
+    if constexpr (!item_in_key) {
+      const auto& item = item_at(prefix);
+      to_grab += size_of(item);
+    }
+    grabbed += to_grab;
+    if (is_underflow(used - grabbed)) {
+      return {0, 0};
+    } else if (grabbed > max_grab) {
+      return {0, 0};
+    } else if (grabbed >= min_grab) {
+      return {i + 1, grabbed};
+    }
+  }
+  return {0, 0};
+}
+
+template<size_t BlockSize, int N, ntype_t NodeType>
+template<int LeftN, class Mover>
+void node_t<BlockSize, N, NodeType>::grab_from_left(node_t<BlockSize, LeftN, NodeType>& left,
+                                                    unsigned n, uint16_t bytes,
+                                                    Mover& mover)
+{
+  // TODO: rebuild keys if moving across different layouts
+  //       group by likeness
+  shift_right(n, bytes);
+  mover.move_from(left.count - n, 0, n);
+}
+
+template<size_t BlockSize, int N, ntype_t NodeType>
+template<int RightN, class Mover>
+delta_t node_t<BlockSize, N, NodeType>::acquire_right(node_t<BlockSize, RightN, NodeType>& right,
+                                                      unsigned whoami, Mover& mover)
+{
+  mover.move_from(0, count, right.count);
+  return mover.to_delta();
+}
+
+template<size_t BlockSize, int N, ntype_t NodeType>
+template<int RightN, class Mover>
+void node_t<BlockSize, N, NodeType>::grab_from_right(node_t<BlockSize, RightN, NodeType>& right,
+                                                     unsigned n, uint16_t bytes,
+                                                     Mover& mover)
+{
+  mover.move_from(0, count, n);
+  right.shift_left(n, 0);
+}
+
+template<size_t BlockSize, int N, ntype_t NodeType>
+template<int LeftN, class Mover>
+void node_t<BlockSize, N, NodeType>::push_to_left(node_t<BlockSize, LeftN, NodeType>& left,
+                                                  unsigned n, uint16_t bytes,
+                                                  Mover& mover)
+{
+  left.grab_from_right(*this, n, bytes, mover);
+}
+
+template<size_t BlockSize, int N, ntype_t NodeType>
+template<int RightN, class Mover>
+void node_t<BlockSize, N, NodeType>::push_to_right(node_t<BlockSize, RightN, NodeType>& right,
+                                                   unsigned n, uint16_t bytes,
+                                                   Mover& mover)
+{
+  right.grab_from_left(*this, n, bytes, mover);
+}
+
+// [to, from) are removed, so we need to shift left
+// actually there are only two use cases:
+// - to = 0: for giving elements in bulk
+// - to = from - 1: for removing a single element
+// old: |////|.....|   |.....|/|........|
+// new: |.....|        |.....||........|
+template<size_t BlockSize, int N, ntype_t NodeType>
+void node_t<BlockSize, N, NodeType>::shift_left(unsigned from, unsigned to)
+{
+  assert(from < count);
+  assert(to < from);
+  if constexpr (item_in_key) {
+    std::copy(keys + from, keys + count, keys + to);
+  } else {
+    const uint16_t cell_hi = keys[count - 1].offset;
+    const uint16_t cell_lo = keys[from - 1].offset;
+    const uint16_t offset_delta = keys[from].offset - keys[to].offset;
+    for (auto src_key = keys + from, dst_key = keys + to;
+         src_key != keys + count;
+         ++src_key, ++dst_key) {
+      // shift the keys left
+      *dst_key = *src_key;
+      // update the pointers
+      dst_key->offset -= offset_delta;
+    }
+    // and cells
+    auto dst = from_end(cell_hi);
+    std::memmove(dst + offset_delta, dst, cell_hi - cell_lo);
+  }
+  count -= (from - to);
+}
+
+template<size_t BlockSize, int N, ntype_t NodeType>
+void node_t<BlockSize, N, NodeType>::insert_front(const ceph::bufferptr& keys_buf,
+                                                  const ceph::bufferptr& cells_buf)
+{
+  unsigned n = keys_buf.length() / sizeof(key_prefix_t);
+  shift_right(n, cells_buf.length());
+  keys_buf.copy_out(0, keys_buf.length(), reinterpret_cast<char*>(keys));
+  if constexpr (item_in_key) {
+    assert(cells_buf.length() == 0);
+  } else {
+    cells_buf.copy_out(0, cells_buf.length(), from_end(keys[n - 1].offset));
+  }
+}
+
+template<size_t BlockSize, int N, ntype_t NodeType>
+void node_t<BlockSize, N, NodeType>::insert_back(const ceph::bufferptr& keys_buf,
+                                                 const ceph::bufferptr& cells_buf)
+{
+  keys_buf.copy_out(0, keys_buf.length(), reinterpret_cast<char*>(keys + count));
+  count += keys_buf.length() / sizeof(key_prefix_t);
+  if constexpr (item_in_key) {
+    assert(cells_buf.length() == 0);
+  } else {
+    cells_buf.copy_out(0, cells_buf.length(), from_end(keys[count - 1].offset));
+  }
+}
+
+// one or more elements are inserted, so we need to shift the elements right
+// actually there are only two use cases:
+// - bytes != 0: for inserting bytes before from
+// - bytes = 0: for inserting a single element before from
+// old: ||.....|
+// new: |/////|.....|
+template<size_t BlockSize, int N, ntype_t NodeType>
+void node_t<BlockSize, N, NodeType>::shift_right(unsigned n, unsigned bytes)
+{
+  assert(bytes + used_space() < capacity());
+  // shift the keys left
+  std::copy_backward(keys, keys + count, keys + count + n);
+  count += n;
+  if constexpr (!item_in_key) {
+    uint16_t cells = keys[count - 1].offset;
+    // copy the partial keys and items
+    std::memmove(from_end(cells + bytes), from_end(cells), cells);
+    // update the pointers
+    for (auto key = keys + n; key < keys + count; ++key) {
+      key->offset += bytes;
+    }
+  }
+}
+
+// shift all keys after slot is removed.
+// @note if the item is not embdedded in key, all items sitting at the left
+//       side of it will be shifted right
+template<size_t BlockSize, int N, ntype_t NodeType>
+void node_t<BlockSize, N, NodeType>::remove_from(unsigned slot)
+{
+  assert(slot < count);
+  if (unsigned next = slot + 1; next < count) {
+    shift_left(next, slot);
+  } else {
+    // slot is the last one
+    count--;
+  }
+}
+
+template<size_t BlockSize, int N, ntype_t NodeType>
+void node_t<BlockSize, N, NodeType>::trim_right(unsigned n)
+{
+  count = n;
+}
+
+template<size_t BlockSize, int N, ntype_t NodeType>
+void node_t<BlockSize, N, NodeType>::play_delta(const delta_t& delta)
+{
+  switch (delta.op) {
+  case delta_t::op_t::insert_onode:
+    if constexpr (is_leaf()) {
+      auto [slot, found] = lower_bound(delta.oid);
+      assert(!found);
+      assert(delta.onode->size() <= std::numeric_limits<unsigned>::max());
+      ceph::bufferptr buf{static_cast<unsigned>(delta.onode->size())};
+      delta.onode->encode(buf.c_str(), buf.length());
+      auto onode = reinterpret_cast<const onode_t*>(buf.c_str());
+      return insert_at(slot, delta.oid, *onode);
+    } else {
+      throw std::invalid_argument("wrong node type");
+    }
+  case delta_t::op_t::update_onode:
+    // TODO
+    assert(0 == "not implemented");
+    break;
+  case delta_t::op_t::insert_child:
+    if constexpr (is_leaf()) {
+      throw std::invalid_argument("wrong node type");
+    } else {
+      auto [slot, found] = lower_bound(delta.oid);
+      assert(!found);
+      insert_at(slot, delta.oid, delta.addr);
+    }
+  case delta_t::op_t::update_key:
+    if constexpr (is_leaf()) {
+      throw std::invalid_argument("wrong node type");
+    } else {
+      return update_key_at(delta.n, delta.oid);
+    }
+  case delta_t::op_t::shift_left:
+    return shift_left(delta.n, 0);
+  case delta_t::op_t::trim_right:
+    return trim_right(delta.n);
+  case delta_t::op_t::insert_front:
+    return insert_front(delta.keys, delta.cells);
+  case delta_t::op_t::insert_back:
+    return insert_back(delta.keys, delta.cells);
+  case delta_t::op_t::remove_from:
+    return remove_from(delta.n);
+  default:
+    assert(0 == "unknown onode delta");
+  }
+}
+
+// explicit instantiate the node_t classes used by test_node.cc
+template class node_t<512, 0, ntype_t::inner>;
+template class node_t<512, 0, ntype_t::leaf>;
+template class node_t<512, 1, ntype_t::inner>;
+template class node_t<512, 1, ntype_t::leaf>;
+template class node_t<512, 2, ntype_t::inner>;
+template class node_t<512, 2, ntype_t::leaf>;
+template class node_t<512, 3, ntype_t::inner>;
+template class node_t<512, 3, ntype_t::leaf>;
diff --git a/src/crimson/os/seastore/onode_manager/simple-fltree/onode_node.h b/src/crimson/os/seastore/onode_manager/simple-fltree/onode_node.h
new file mode 100644 (file)
index 0000000..d833a66
--- /dev/null
@@ -0,0 +1,942 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <algorithm>
+#include <cstdint>
+#include <type_traits>
+#include <variant>
+
+#include "common/hobject.h"
+#include "crimson/common/layout.h"
+#include "crimson/os/seastore/onode.h"
+#include "crimson/os/seastore/seastore_types.h"
+#include "onode_delta.h"
+
+namespace asci = absl::container_internal;
+
+namespace boost::beast {
+  template<class T>
+  bool operator==(const span<T>& lhs, const span<T>& rhs) {
+    return std::equal(
+        lhs.begin(), lhs.end(),
+        rhs.begin(), rhs.end());
+  }
+}
+
+// on-disk onode
+// it only keeps the bits necessary to rebuild an in-memory onode
+struct [[gnu::packed]] onode_t {
+  onode_t& operator=(const onode_t& onode) {
+    len = onode.len;
+    std::memcpy(data, onode.data, len);
+    return *this;
+  }
+  size_t size() const {
+    return sizeof(*this) + len;
+  }
+  OnodeRef decode() const {
+    return new crimson::os::seastore::Onode(std::string_view{data, len});
+  }
+  uint8_t struct_v = 1;
+  uint8_t struct_compat = 1;
+  // TODO:
+  // - use uint16_t for length, as the size of an onode should be less
+  //   than a block (16K for now)
+  // - drop struct_len
+  uint32_t struct_len = 0;
+  uint32_t len;
+  char data[];
+};
+
+static inline std::ostream& operator<<(std::ostream& os, const onode_t& onode) {
+  return os << *onode.decode();
+}
+
+using crimson::os::seastore::laddr_t;
+
+struct [[gnu::packed]] child_addr_t {
+  laddr_t data;
+  child_addr_t(laddr_t data)
+    : data{data}
+  {}
+  child_addr_t& operator=(laddr_t addr) {
+    data = addr;
+    return *this;
+  }
+  laddr_t get() const {
+    return data;
+  }
+  operator laddr_t() const {
+    return data;
+  }
+  size_t size() const {
+    return sizeof(laddr_t);
+  }
+};
+
+// poor man's operator<=>
+enum class ordering_t {
+  less,
+  equivalent,
+  greater,
+};
+
+template<class L, class R>
+ordering_t compare_element(const L& x, const R& y)
+{
+  if constexpr (std::is_arithmetic_v<L>) {
+    static_assert(std::is_arithmetic_v<R>);
+    if (x < y) {
+      return ordering_t::less;
+    } else if (x > y) {
+      return ordering_t::greater;
+    } else {
+      return ordering_t::equivalent;
+    }
+  } else {
+    // string_view::compare(), string::compare(), ...
+    auto result = x.compare(y);
+    if (result < 0) {
+      return ordering_t::less;
+    } else if (result > 0) {
+      return ordering_t::greater;
+    } else {
+      return ordering_t::equivalent;
+    }
+  }
+}
+
+template<typename L, typename R>
+constexpr ordering_t tuple_cmp(const L&, const R&, std::index_sequence<>)
+{
+  return ordering_t::equivalent;
+}
+
+template<typename L, typename R,
+         size_t Head, size_t... Tail>
+constexpr ordering_t tuple_cmp(const L& x, const R& y,
+                               std::index_sequence<Head, Tail...>)
+{
+  auto ordering = compare_element(std::get<Head>(x), std::get<Head>(y));
+  if (ordering != ordering_t::equivalent) {
+    return ordering;
+  } else {
+    return tuple_cmp(x, y, std::index_sequence<Tail...>());
+  }
+}
+
+template<typename... Ls, typename... Rs>
+constexpr ordering_t cmp(const std::tuple<Ls...>& x,
+                         const std::tuple<Rs...>& y)
+{
+  static_assert(sizeof...(Ls) == sizeof...(Rs));
+  return tuple_cmp(x, y, std::index_sequence_for<Ls...>());
+}
+
+enum class likes_t {
+  yes,
+  no,
+  maybe,
+};
+
+struct [[gnu::packed]] variable_key_suffix {
+  uint64_t snap;
+  uint64_t gen;
+  uint8_t nspace_len;
+  uint8_t name_len;
+  char data[];
+  struct index_t {
+    enum {
+      nspace_data = 0,
+      name_data = 1,
+    };
+  };
+  using layout_type = asci::Layout<char, char>;
+  layout_type cell_layout() const {
+    return layout_type{nspace_len, name_len};
+  }
+  void set(const ghobject_t& oid) {
+    snap = oid.hobj.snap;
+    gen = oid.generation;
+    nspace_len = oid.hobj.nspace.size();
+    name_len = oid.hobj.oid.name.size();
+    auto layout = cell_layout();
+    std::memcpy(layout.Pointer<index_t::nspace_data>(data),
+                oid.hobj.nspace.data(), oid.hobj.nspace.size());
+    std::memcpy(layout.Pointer<index_t::name_data>(data),
+                oid.hobj.oid.name.data(), oid.hobj.oid.name.size());
+  }
+
+  void update_oid(ghobject_t& oid) const {
+    oid.hobj.snap = snap;
+    oid.generation = gen;
+    oid.hobj.nspace = nspace();
+    oid.hobj.oid.name = name();
+  }
+
+  variable_key_suffix& operator=(const variable_key_suffix& key) {
+    snap = key.snap;
+    gen = key.gen;
+    auto layout = cell_layout();
+    auto nspace = key.nspace();
+    std::copy_n(nspace.data(), nspace.size(),
+                layout.Pointer<index_t::nspace_data>(data));
+    auto name = key.name();
+    std::copy_n(name.data(), name.size(),
+                layout.Pointer<index_t::name_data>(data));
+    return *this;
+  }
+  const std::string_view nspace() const {
+    auto layout = cell_layout();
+    auto nspace = layout.Slice<index_t::nspace_data>(data);
+    return {nspace.data(), nspace.size()};
+  }
+  const std::string_view name() const {
+    auto layout = cell_layout();
+    auto name = layout.Slice<index_t::name_data>(data);
+    return {name.data(), name.size()};
+  }
+  size_t size() const {
+    return sizeof(*this) + nspace_len + name_len;
+  }
+  static size_t size_from(const ghobject_t& oid) {
+    return (sizeof(variable_key_suffix) +
+            oid.hobj.nspace.size() +
+            oid.hobj.oid.name.size());
+  }
+  ordering_t compare(const ghobject_t& oid) const {
+    return cmp(std::tie(nspace(), name(), snap, gen),
+               std::tie(oid.hobj.nspace, oid.hobj.oid.name, oid.hobj.snap.val,
+                        oid.generation));
+  }
+  bool likes(const variable_key_suffix& key) const {
+    return nspace() == key.nspace() && name() == key.name();
+  }
+};
+
+static inline std::ostream& operator<<(std::ostream& os, const variable_key_suffix& k) {
+  if (k.snap != CEPH_NOSNAP) {
+    os << "s" << k.snap << ",";
+  }
+  if (k.gen != ghobject_t::NO_GEN) {
+    os << "g" << k.gen << ",";
+  }
+  return os << k.nspace() << "/" << k.name();
+}
+
+// should use [[no_unique_address]] in C++20
+struct empty_key_suffix {
+  static constexpr ordering_t compare(const ghobject_t&) {
+    return ordering_t::equivalent;
+  }
+  static void set(const ghobject_t&) {}
+  static constexpr size_t size() {
+    return 0;
+  }
+  static size_t size_from(const ghobject_t&) {
+    return 0;
+  }
+  static void update_oid(ghobject_t&) {}
+};
+
+static inline std::ostream& operator<<(std::ostream& os, const empty_key_suffix&)
+{
+  return os;
+}
+
+enum class ntype_t : uint8_t {
+  leaf = 0u,
+  inner,
+};
+
+constexpr ntype_t flip_ntype(ntype_t ntype) noexcept
+{
+  if (ntype == ntype_t::leaf) {
+    return ntype_t::inner;
+  } else {
+    return ntype_t::leaf;
+  }
+}
+
+template<int N, ntype_t NodeType>
+struct FixedKeyPrefix {};
+
+template<ntype_t NodeType>
+struct FixedKeyPrefix<0, NodeType>
+{
+  static constexpr bool item_in_key = false;
+  int8_t shard = -1;
+  int64_t pool = -1;
+  uint32_t hash = 0;
+  uint16_t offset = 0;
+
+  FixedKeyPrefix() = default;
+  FixedKeyPrefix(const ghobject_t& oid, uint16_t offset)
+    : shard{oid.shard_id},
+      pool{oid.hobj.pool},
+      hash{oid.hobj.get_hash()},
+      offset{offset}
+  {}
+
+  void set(const ghobject_t& oid, uint16_t new_offset) {
+    shard = oid.shard_id;
+    pool = oid.hobj.pool;
+    hash = oid.hobj.get_hash();
+    offset = new_offset;
+  }
+
+  void set(const FixedKeyPrefix& k, uint16_t new_offset) {
+    shard = k.shard;
+    pool = k.pool;
+    hash = k.hash;
+    offset = new_offset;
+  }
+
+  void update(const ghobject_t& oid) {
+    shard = oid.shard_id;
+    pool = oid.hobj.pool;
+    hash = oid.hobj.get_hash();
+  }
+
+  void update_oid(ghobject_t& oid) const {
+    oid.set_shard(shard_id_t{shard});
+    oid.hobj.pool = pool;
+    oid.hobj.set_hash(hash);
+  }
+
+  ordering_t compare(const ghobject_t& oid) const {
+    // so std::tie() can bind them  by reference
+    int8_t rhs_shard = oid.shard_id;
+    uint32_t rhs_hash = oid.hobj.get_hash();
+    return cmp(std::tie(shard, pool, hash),
+               std::tie(rhs_shard, oid.hobj.pool, rhs_hash));
+  }
+  // @return true if i likes @c k, we will can be pushed down to next level
+  //              in the same node
+  likes_t likes(const FixedKeyPrefix& k) const {
+    if (shard == k.shard && pool == k.pool) {
+      return likes_t::yes;
+    } else {
+      return likes_t::no;
+    }
+  }
+};
+
+template<ntype_t NodeType>
+std::ostream& operator<<(std::ostream& os, const FixedKeyPrefix<0, NodeType>& k) {
+  if (k.shard != shard_id_t::NO_SHARD) {
+    os << "s" << k.shard;
+  }
+  return os << "p=" << k.pool << ","
+            << "h=" << std::hex << k.hash << std::dec << ","
+            << ">" << k.offset;
+}
+
+// all elements in this node share the same <shard, pool>
+template<ntype_t NodeType>
+struct FixedKeyPrefix<1, NodeType> {
+  static constexpr bool item_in_key = false;
+  uint32_t hash = 0;
+  uint16_t offset = 0;
+
+  FixedKeyPrefix() = default;
+  FixedKeyPrefix(uint32_t hash, uint16_t offset)
+    : hash{hash},
+      offset{offset}
+  {}
+  FixedKeyPrefix(const ghobject_t& oid, uint16_t offset)
+    : FixedKeyPrefix(oid.hobj.get_hash(), offset)
+  {}
+  void set(const ghobject_t& oid, uint16_t new_offset) {
+    hash = oid.hobj.get_hash();
+    offset = new_offset;
+  }
+  template<int N>
+  void set(const FixedKeyPrefix<N, NodeType>& k, uint16_t new_offset) {
+    static_assert(N < 2, "only N0, N1 have hash");
+    hash = k.hash;
+    offset = new_offset;
+  }
+  void update_oid(ghobject_t& oid) const {
+    oid.hobj.set_hash(hash);
+  }
+  void update(const ghobject_t& oid) {
+    hash = oid.hobj.get_hash();
+  }
+  ordering_t compare(const ghobject_t& oid) const {
+    return compare_element(hash, oid.hobj.get_hash());
+  }
+  likes_t likes(const FixedKeyPrefix& k) const {
+    return hash == k.hash ? likes_t::yes : likes_t::no;
+  }
+};
+
+template<ntype_t NodeType>
+std::ostream& operator<<(std::ostream& os, const FixedKeyPrefix<1, NodeType>& k) {
+  return os << "0x" << std::hex << k.hash << std::dec << ","
+            << ">" << k.offset;
+}
+
+// all elements in this node must share the same <shard, pool, hash>
+template<ntype_t NodeType>
+struct FixedKeyPrefix<2, NodeType> {
+  static constexpr bool item_in_key = false;
+  uint16_t offset = 0;
+
+  FixedKeyPrefix() = default;
+
+  static constexpr ordering_t compare(const ghobject_t& oid) {
+    // need to compare the cell
+    return ordering_t::equivalent;
+  }
+  // always defer to my cell for likeness
+  constexpr likes_t likes(const FixedKeyPrefix&) const {
+    return likes_t::maybe;
+  }
+  void set(const ghobject_t&, uint16_t new_offset) {
+    offset = new_offset;
+  }
+  template<int N>
+  void set(const FixedKeyPrefix<N, NodeType>&, uint16_t new_offset) {
+    offset = new_offset;
+  }
+  void update(const ghobject_t&) {}
+  void update_oid(ghobject_t&) const {}
+};
+
+template<ntype_t NodeType>
+std::ostream& operator<<(std::ostream& os, const FixedKeyPrefix<2, NodeType>& k) {
+  return os << ">" << k.offset;
+}
+
+struct fixed_key_3 {
+  uint64_t snap = 0;
+  uint64_t gen = 0;
+
+  fixed_key_3() = default;
+  fixed_key_3(const ghobject_t& oid)
+  : snap{oid.hobj.snap}, gen{oid.generation}
+  {}
+  ordering_t compare(const ghobject_t& oid) const {
+    return cmp(std::tie(snap, gen),
+               std::tie(oid.hobj.snap.val, oid.generation));
+  }
+  // no object likes each other at this level
+  constexpr likes_t likes(const fixed_key_3&) const {
+    return likes_t::no;
+  }
+  void update_with_oid(const ghobject_t& oid) {
+    snap = oid.hobj.snap;
+    gen = oid.generation;
+  }
+  void update_oid(ghobject_t& oid) const {
+    oid.hobj.snap = snap;
+    oid.generation = gen;
+  }
+};
+
+static inline std::ostream& operator<<(std::ostream& os, const fixed_key_3& k) {
+  if (k.snap != CEPH_NOSNAP) {
+    os << "s" << k.snap << ",";
+  }
+  if (k.gen != ghobject_t::NO_GEN) {
+    os << "g" << k.gen << ",";
+  }
+  return os;
+}
+
+// all elements in this node must share the same <shard, pool, hash, namespace, oid>
+// but the unlike other FixedKeyPrefix<>, a node with FixedKeyPrefix<3> does not have
+// variable_sized_key, so if it is an inner node, we can just embed the child
+// addr right in the key.
+template<>
+struct FixedKeyPrefix<3, ntype_t::inner> : public fixed_key_3 {
+  // the item is embedded in the key
+  static constexpr bool item_in_key = true;
+  laddr_t child_addr = 0;
+
+  FixedKeyPrefix() = default;
+  void set(const ghobject_t& oid, laddr_t new_child_addr) {
+    update_with_oid(oid);
+    child_addr = new_child_addr;
+  }
+  // unlikely get called, though..
+  void update(const ghobject_t& oid) {}
+  template<int N>
+  std::enable_if_t<N < 3> set(const FixedKeyPrefix<N, ntype_t::inner>&,
+                              laddr_t new_child_addr) {
+    child_addr = new_child_addr;
+  }
+  void set(const FixedKeyPrefix& k, laddr_t new_child_addr) {
+    snap = k.snap;
+    gen = k.gen;
+    child_addr = new_child_addr;
+  }
+  void set(const variable_key_suffix& k, laddr_t new_child_addr) {
+    snap = k.snap;
+    gen = k.gen;
+    child_addr = new_child_addr;
+  }
+};
+
+template<>
+struct FixedKeyPrefix<3, ntype_t::leaf> : public fixed_key_3 {
+  static constexpr bool item_in_key = false;
+  uint16_t offset = 0;
+
+  FixedKeyPrefix() = default;
+  void set(const ghobject_t& oid, uint16_t new_offset) {
+    update_with_oid(oid);
+    offset = new_offset;
+  }
+  void set(const FixedKeyPrefix& k, uint16_t new_offset) {
+    snap = k.snap;
+    gen = k.gen;
+    offset = new_offset;
+  }
+  template<int N>
+  std::enable_if_t<N < 3> set(const FixedKeyPrefix<N, ntype_t::leaf>&,
+                              uint16_t new_offset) {
+    offset = new_offset;
+  }
+};
+
+struct tag_t {
+  template<int N, ntype_t node_type>
+  static constexpr tag_t create() {
+    static_assert(std::clamp(N, 0, 3) == N);
+    return tag_t{N, static_cast<uint8_t>(node_type)};
+  }
+  bool is_leaf() const {
+    return type() == ntype_t::leaf;
+  }
+  int layout() const {
+    return layout_type;
+  }
+  ntype_t type() const {
+    return ntype_t{node_type};
+  }
+  int layout_type : 4;
+  uint8_t node_type : 4;
+};
+
+static inline std::ostream& operator<<(std::ostream& os, const tag_t& tag) {
+  return os << "n=" << tag.layout() << ", leaf=" << tag.is_leaf();
+}
+
+// for calculating size of variable-sized item/key
+template<class T>
+size_t size_of(const T& t) {
+  using decayed_t = std::decay_t<T>;
+  if constexpr (std::is_scalar_v<decayed_t>) {
+    return sizeof(decayed_t);
+  } else {
+    return t.size();
+  }
+}
+
+enum class size_state_t {
+  okay,
+  underflow,
+  overflow,
+};
+
+// layout of a node of B+ tree
+//
+// it is different from a typical B+ tree in following ways
+// - the size of keys is not necessarily fixed, neither is the size of value.
+// - the max number of elements in a node is determined by the total size of
+//   the keys and values in the node
+// - in internal nodes, each key maps to the logical address of the child
+//   node whose minimum key is greater or equal to that key.
+template<size_t BlockSize,
+         int N,
+         ntype_t NodeType>
+struct node_t {
+  static_assert(std::clamp(N, 0, 3) == N);
+  constexpr static ntype_t node_type = NodeType;
+  constexpr static int node_n = N;
+
+  using key_prefix_t = FixedKeyPrefix<N, NodeType>;
+  using item_t = std::conditional_t<NodeType == ntype_t::leaf,
+                                    onode_t,
+                                    child_addr_t>;
+  using const_item_t = std::conditional_t<NodeType == ntype_t::leaf,
+                                          const onode_t&,
+                                          child_addr_t>;
+  static constexpr bool item_in_key = key_prefix_t::item_in_key;
+  using key_suffix_t = std::conditional_t<N < 3,
+                                           variable_key_suffix,
+                                           empty_key_suffix>;
+
+  std::pair<const key_prefix_t&, const key_suffix_t&>
+  key_at(unsigned slot) const;
+
+  // update an existing oid with the specified item
+  ghobject_t get_oid_at(unsigned slot, const ghobject_t& oid) const;
+  const_item_t item_at(const key_prefix_t& key) const;
+  void dump(std::ostream& os) const;
+
+  // for debugging only.
+  static constexpr bool is_leaf() {
+    return node_type == ntype_t::leaf;
+  }
+
+  bool _is_leaf() const {
+    return tag.is_leaf();
+  }
+
+  char* from_end(uint16_t offset);
+  const char* from_end(uint16_t offset) const;
+  uint16_t used_space() const;
+  uint16_t free_space() const {
+    return capacity() - used_space();
+  }
+  static uint16_t capacity();
+  // TODO: if it's allowed to update 2 siblings at the same time, we can have
+  //       B* tree
+  static constexpr uint16_t min_size();
+
+
+  // calculate the allowable bounds on bytes to remove from an overflow node
+  // with specified size
+  // @param size the overflowed size
+  // @return <minimum bytes to grab, maximum bytes to grab>
+  static constexpr std::pair<int16_t, int16_t> bytes_to_remove(uint16_t size);
+
+  // calculate the allowable bounds on bytes to add to an underflow node
+  // with specified size
+  // @param size the underflowed size
+  // @return <minimum bytes to push, maximum bytes to push>
+  static constexpr std::pair<int16_t, int16_t> bytes_to_add(uint16_t size);
+
+  size_state_t size_state(uint16_t size) const;
+  bool is_underflow(uint16_t size) const;
+  int16_t size_with_key(unsigned slot, const ghobject_t& oid) const;
+  ordering_t compare_with_slot(unsigned slot, const ghobject_t& oid) const;
+  /// return the slot number of the first slot that is greater or equal to
+  /// key
+  std::pair<unsigned, bool> lower_bound(const ghobject_t& oid) const;
+  static uint16_t size_of_item(const ghobject_t& oid, const item_t& item);
+  bool is_overflow(const ghobject_t& oid, const item_t& item) const;
+  bool is_overflow(const ghobject_t& oid, const OnodeRef& item) const;
+
+  // inserts an item into the given slot, pushing all subsequent keys forward
+  // @note if the item is not embedded in key, shift the right half as well
+  void insert_at(unsigned slot, const ghobject_t& oid, const item_t& item);
+  // used by InnerNode for updating the keys indexing its children when their lower boundaries
+  // is updated
+  void update_key_at(unsigned slot, const ghobject_t& oid);
+  // try to figure out the number of elements and total size when trying to
+  // rebalance by moving the elements from the front of this node when its
+  // left sibling node is underflow
+  //
+  // @param min_grab lower bound of the number of bytes to move
+  // @param max_grab upper bound of the number of bytes to move
+  // @return the number of element to grab
+  // @note return {0, 0} if current node would be underflow if
+  //       @c min_grab bytes of elements are taken from it
+  std::pair<unsigned, uint16_t> calc_grab_front(uint16_t min_grab, uint16_t max_grab) const;
+  // try to figure out the number of elements and their total size when trying to
+  // rebalance by moving the elements from the end of this node when its right
+  // sibling node is underflow
+  //
+  // @param min_grab lower bound of the number of bytes to move
+  // @param max_grab upper bound of the number of bytes to move
+  // @return the number of element to grab
+  // @note return {0, 0} if current node would be underflow if
+  //       @c min_grab bytes of elements are taken from it
+  std::pair<unsigned, uint16_t> calc_grab_back(uint16_t min_grab, uint16_t max_grab) const;
+  template<int LeftN, class Mover> void grab_from_left(
+    node_t<BlockSize, LeftN, NodeType>& left,
+    unsigned n, uint16_t bytes,
+    Mover& mover);
+  template<int RightN, class Mover>
+  delta_t acquire_right(node_t<BlockSize, RightN, NodeType>& right,
+                        unsigned whoami, Mover& mover);
+  // transfer n elements at the front of given node to me
+  template<int RightN, class Mover>
+  void grab_from_right(node_t<BlockSize, RightN, NodeType>& right,
+                       unsigned n, uint16_t bytes,
+                       Mover& mover);
+  template<int LeftN, class Mover>
+  void push_to_left(node_t<BlockSize, LeftN, NodeType>& left,
+                    unsigned n, uint16_t bytes,
+                    Mover& mover);
+  template<int RightN, class Mover>
+  void push_to_right(node_t<BlockSize, RightN, NodeType>& right,
+                     unsigned n, uint16_t bytes,
+                     Mover& mover);
+  // [to, from) are removed, so we need to shift left
+  // actually there are only two use cases:
+  // - to = 0: for giving elements in bulk
+  // - to = from - 1: for removing a single element
+  // old: |////|.....|   |.....|/|........|
+  // new: |.....|        |.....||........|
+  void shift_left(unsigned from, unsigned to);
+  void insert_front(const ceph::bufferptr& keys_buf, const ceph::bufferptr& cells_buf);
+  void insert_back(const ceph::bufferptr& keys_buf, const ceph::bufferptr& cells_buf);
+  // one or more elements are inserted, so we need to shift the elements right
+  // actually there are only two use cases:
+  // - bytes != 0: for inserting bytes before from
+  // - bytes = 0: for inserting a single element before from
+  // old: ||.....|
+  // new: |/////|.....|
+  void shift_right(unsigned n, unsigned bytes);
+  // shift all keys after slot is removed.
+  // @note if the item is not embdedded in key, all items sitting at the left
+  //       side of it will be shifted right
+  void remove_from(unsigned slot);
+  void trim_right(unsigned n);
+  void play_delta(const delta_t& delta);
+  //         /-------------------------------|
+  //         |                               V
+  // |header|k0|k1|k2|...  | / / |k2'v2|k1'v1|k0'.v0| v_m |
+  //        |<-- count  -->|
+  tag_t tag = tag_t::create<N, NodeType>();
+  // the count of values in the node
+  uint16_t count = 0;
+  key_prefix_t keys[];
+};
+
+template<class parent_t,
+         class from_t,
+         class to_t,
+         typename=void>
+class EntryMover {
+public:
+  // a "trap" mover
+  EntryMover(const parent_t&, from_t&, to_t& dst, unsigned) {
+    assert(0);
+  }
+  void move_from(unsigned, unsigned, unsigned) {
+    assert(0);
+  }
+  delta_t get_delta() {
+    return delta_t::nop();
+  }
+};
+
+// lower the layout, for instance, from L0 to L1, no reference oid is used
+template<class parent_t,
+         class from_t,
+         class to_t>
+class EntryMover<parent_t,
+                 from_t,
+                 to_t,
+                 std::enable_if_t<from_t::node_n < to_t::node_n>>
+{
+public:
+  EntryMover(const parent_t&, from_t& src, to_t& dst, unsigned)
+    : src{src}, dst{dst}
+  {}
+  void move_from(unsigned src_first, unsigned dst_first, unsigned n)
+  {
+    ceph::bufferptr keys_buf{n * sizeof(to_t::key_prefix_t)};
+    ceph::bufferptr cells_buf;
+    auto dst_keys = reinterpret_cast<typename to_t::key_prefix_t*>(keys_buf.c_str());
+    if constexpr (to_t::item_in_key) {
+      for (unsigned i = 0; i < n; i++) {
+        const auto& [prefix, suffix] = src.key_at(src_first + i);
+        dst_keys[i].set(suffix, src.item_at(prefix));
+      }
+    } else {
+      // copy keys
+      uint16_t src_offset = src_first > 0 ? src.keys[src_first - 1].offset : 0;
+      uint16_t dst_offset = dst_first > 0 ? dst.keys[dst_first - 1].offset : 0;
+      for (unsigned i = 0; i < n; i++) {
+        auto& src_key = src.keys[src_first + i];
+        uint16_t offset = src_key.offset - src_offset + dst_offset;
+        dst_keys[i].set(src_key, offset);
+      }
+      // copy cells in bulk, yay!
+      auto src_end = src.keys[src_first + n - 1].offset;
+      uint16_t total_cell_size = src_end - src_offset;
+      cells_buf = ceph::bufferptr{total_cell_size};
+      cells_buf.copy_in(0, total_cell_size, src.from_end(src_end));
+    }
+    if (dst_first == dst.count) {
+      dst_delta = delta_t::insert_back(keys_buf, cells_buf);
+    } else {
+      dst_delta = delta_t::insert_front(keys_buf, cells_buf);
+    }
+    if (src_first > 0 && src_first + n == src.count) {
+      src_delta = delta_t::trim_right(src_first);
+    } else if (src_first == 0 && n < src.count) {
+      src_delta = delta_t::shift_left(n);
+    } else if (src_first == 0 && n == src.count) {
+      // the caller will retire the src extent
+    } else {
+      // grab in the middle?
+      assert(0);
+    }
+  }
+
+  delta_t from_delta() {
+    return std::move(src_delta);
+  }
+  delta_t to_delta() {
+    return std::move(dst_delta);
+  }
+private:
+  const from_t& src;
+  const to_t& dst;
+  delta_t dst_delta;
+  delta_t src_delta;
+};
+
+// lift the layout, for instance, from L2 to L0, need a reference oid
+template<class parent_t,
+         class from_t,
+         class to_t>
+class EntryMover<parent_t, from_t, to_t,
+                 std::enable_if_t<(from_t::node_n > to_t::node_n)>>
+{
+public:
+  EntryMover(const parent_t& parent, from_t& src, to_t& dst, unsigned from_slot)
+    : src{src}, dst{dst}, ref_oid{parent->get_oid_at(from_slot, {})}
+  {}
+  void move_from(unsigned src_first, unsigned dst_first, unsigned n)
+  {
+    ceph::bufferptr keys_buf{n * sizeof(to_t::key_prefix_t)};
+    ceph::bufferptr cells_buf;
+    auto dst_keys = reinterpret_cast<typename to_t::key_prefix_t*>(keys_buf.c_str());
+    uint16_t in_node_offset = dst_first > 0 ? dst.keys[dst_first - 1].offset : 0;
+    static_assert(!std::is_same_v<typename to_t::key_suffix_t, empty_key_suffix>);
+    // copy keys
+    uint16_t buf_offset = 0;
+    for (unsigned i = 0; i < n; i++) {
+      auto& src_key = src.keys[src_first + i];
+      if constexpr (std::is_same_v<typename from_t::key_suffix_t, empty_key_suffix>) {
+        // heterogeneous partial key, have to rebuild dst partial key from oid
+        src_key.update_oid(ref_oid);
+        const auto& src_item = src.item_at(src_key);
+        size_t key2_size = to_t::key_suffix_t::size_from(ref_oid);
+        buf_offset += key2_size + size_of(src_item);
+        dst_keys[i].set(ref_oid, in_node_offset + buf_offset);
+        auto p = from_end(cells_buf, buf_offset);
+        auto partial_key = reinterpret_cast<typename to_t::key_suffix_t*>(p);
+        partial_key->set(ref_oid);
+        p += key2_size;
+        auto dst_item = reinterpret_cast<typename to_t::item_t*>(p);
+        *dst_item = src_item;
+      } else {
+        // homogeneous partial key, just update the pointers
+        uint16_t src_offset = src_first > 0 ? src.keys[src_first - 1].offset : 0;
+        uint16_t dst_offset = dst_first > 0 ? dst.keys[dst_first - 1].offset : 0;
+        uint16_t offset = src_key.offset - src_offset + dst_offset;
+        dst_keys[i].set(ref_oid, in_node_offset + offset);
+      }
+    }
+    if constexpr (std::is_same_v<typename to_t::key_suffix_t,
+                                 typename from_t::key_suffix_t>) {
+      // copy cells in bulk, yay!
+      uint16_t src_offset = src_first > 0 ? src.keys[src_first - 1].offset : 0;
+      uint16_t src_end = src.keys[src_first + n - 1].offset;
+      uint16_t total_cell_size = src_end - src_offset;
+      cells_buf.copy_in(0,  total_cell_size, src.from_end(src_end));
+    }
+    if (dst_first == dst.count) {
+      dst_delta = delta_t::insert_back(keys_buf, cells_buf);
+    } else {
+      dst_delta = delta_t::insert_front(keys_buf, cells_buf);
+    }
+    if (src_first + n == src.count && src_first > 0) {
+      src_delta = delta_t::trim_right(src_first);
+    } else {
+      // the caller will retire the src extent
+      assert(src_first == 0);
+    }
+  }
+
+  delta_t from_delta() {
+    return std::move(src_delta);
+  }
+  delta_t to_delta() {
+    return std::move(dst_delta);
+  }
+private:
+  char* from_end(ceph::bufferptr& ptr, uint16_t offset) {
+    return ptr.end_c_str() - static_cast<int>(offset);
+  }
+private:
+  const from_t& src;
+  const to_t& dst;
+  delta_t dst_delta;
+  delta_t src_delta;
+  ghobject_t ref_oid;
+};
+
+// identical layout, yay!
+template<class parent_t,
+         class child_t>
+class EntryMover<parent_t, child_t, child_t>
+{
+public:
+  EntryMover(const parent_t&, child_t& src, child_t& dst, unsigned)
+    : src{src}, dst{dst}
+  {}
+
+  void move_from(unsigned src_first, unsigned dst_first, unsigned n)
+  {
+    ceph::bufferptr keys_buf{static_cast<unsigned>(n * sizeof(typename child_t::key_prefix_t))};
+    ceph::bufferptr cells_buf;
+    auto dst_keys = reinterpret_cast<typename child_t::key_prefix_t*>(keys_buf.c_str());
+
+    // copy keys
+    std::copy(src.keys + src_first, src.keys + src_first + n,
+              dst_keys);
+    if constexpr (!child_t::item_in_key) {
+      uint16_t src_offset = src_first > 0 ? src.keys[src_first - 1].offset : 0;
+      uint16_t dst_offset = dst_first > 0 ? dst.keys[dst_first - 1].offset : 0;
+      const int offset_delta = dst_offset - src_offset;
+      // update the pointers
+      for (unsigned i = 0; i < n; i++) {
+        dst_keys[i].offset += offset_delta;
+      }
+      // copy cells in bulk, yay!
+      auto src_end = src.keys[src_first + n - 1].offset;
+      uint16_t total_cell_size = src_end - src_offset;
+      cells_buf = ceph::bufferptr{total_cell_size};
+      cells_buf.copy_in(0,  total_cell_size, src.from_end(src_end));
+    }
+    if (dst_first == dst.count) {
+      dst_delta = delta_t::insert_back(std::move(keys_buf), std::move(cells_buf));
+    } else {
+      dst_delta = delta_t::insert_front(std::move(keys_buf), std::move(cells_buf));
+    }
+    if (src_first + n == src.count && src_first > 0) {
+      src_delta = delta_t::trim_right(n);
+    } else if (src_first == 0 && n < src.count) {
+      src_delta = delta_t::shift_left(n);
+    } else if (src_first == 0 && n == src.count) {
+      // the caller will retire the src extent
+    } else {
+      // grab in the middle?
+      assert(0);
+    }
+  }
+
+  delta_t from_delta() {
+    return std::move(src_delta);
+  }
+
+  delta_t to_delta() {
+    return std::move(dst_delta);
+  }
+private:
+  char* from_end(ceph::bufferptr& ptr, uint16_t offset) {
+    return ptr.end_c_str() - static_cast<int>(offset);
+  }
+private:
+  const child_t& src;
+  const child_t& dst;
+  delta_t src_delta;
+  delta_t dst_delta;
+};
+
+template<class parent_t, class from_t, class to_t>
+EntryMover<parent_t, from_t, to_t>
+make_mover(const parent_t& parent, from_t& src, to_t& dst, unsigned from_slot) {
+  return EntryMover<parent_t, from_t, to_t>(parent, src, dst, from_slot);
+}
index 9e801cd37119a6cf9c2e39832060b90a48e94509..ec7a01c1b77f52d2d79843b08e2d321d5a83d018 100644 (file)
@@ -33,3 +33,5 @@ target_link_libraries(
   unittest_seastore_cache
   ${CMAKE_DL_LIBS}
   crimson-seastore)
+
+add_subdirectory(onode_tree)
diff --git a/src/test/crimson/seastore/onode_tree/CMakeLists.txt b/src/test/crimson/seastore/onode_tree/CMakeLists.txt
new file mode 100644 (file)
index 0000000..2c79968
--- /dev/null
@@ -0,0 +1,6 @@
+add_executable(test-seastore-onode-tree-node
+  test_node.cc)
+add_ceph_unittest(test-seastore-onode-tree-node)
+target_link_libraries(test-seastore-onode-tree-node
+  crimson-seastore
+  GTest::Main)
diff --git a/src/test/crimson/seastore/onode_tree/test_node.cc b/src/test/crimson/seastore/onode_tree/test_node.cc
new file mode 100644 (file)
index 0000000..178f783
--- /dev/null
@@ -0,0 +1,207 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <gtest/gtest.h>
+
+#include "crimson/os/seastore/onode_manager/simple-fltree/onode_node.h"
+
+using crimson::os::seastore::Onode;
+using crimson::os::seastore::OnodeRef;
+
+TEST(OnodeNode, denc)
+{
+  Onode onode{"hello"};
+  bufferlist bl;
+  ceph::encode(onode, bl);
+  bl.rebuild();
+  auto flattened = reinterpret_cast<const onode_t*>(bl.c_str());
+  auto actual_onode = flattened->decode();
+  ASSERT_EQ(*actual_onode, onode);
+}
+
+TEST(OnodeNode, lookup)
+{
+  static constexpr size_t BLOCK_SIZE = 512;
+  char buf[BLOCK_SIZE];
+  using leaf_node_0 = node_t<BLOCK_SIZE, 0, ntype_t::leaf>;
+  auto leaf = new (buf) leaf_node_0;
+  ghobject_t oid{hobject_t{object_t{"saturn"}, "", 0, 0, 0, "solar"}};
+  {
+    auto [slot, found] = leaf->lower_bound(oid);
+    ASSERT_FALSE(found);
+    ASSERT_EQ(0, slot);
+  }
+  Onode onode{"hello"};
+  bufferlist bl;
+  ceph::encode(onode, bl);
+  bl.rebuild();
+  auto flattened = reinterpret_cast<const onode_t*>(bl.c_str());
+  leaf->insert_at(0, oid, *flattened);
+  {
+    auto [slot, found] = leaf->lower_bound(oid);
+    ASSERT_TRUE(found);
+    ASSERT_EQ(0, slot);
+    const auto& [key1, key2] = leaf->key_at(slot);
+    auto& item = leaf->item_at(key1);
+    auto actual_onode = item.decode();
+    ASSERT_EQ(*actual_onode, onode);
+  }
+}
+
+TEST(OnodeNode, grab_from_right)
+{
+  static constexpr size_t BLOCK_SIZE = 512;
+  char buf1[BLOCK_SIZE];
+  char buf2[BLOCK_SIZE];
+  using leaf_node_0 = node_t<BLOCK_SIZE, 0, ntype_t::leaf>;
+  auto leaf1 = new (buf1) leaf_node_0;
+  auto leaf2 = new (buf2) leaf_node_0;
+  auto& dummy_parent = *leaf1;
+
+  ghobject_t oid1{hobject_t{object_t{"earth"}, "", 0, 0, 0, "solar"}};
+  ghobject_t oid2{hobject_t{object_t{"jupiter"}, "", 0, 0, 0, "solar"}};
+  ghobject_t oid3{hobject_t{object_t{"saturn"}, "", 0, 0, 0, "solar"}};
+  Onode onode{"hello"};
+  bufferlist bl;
+  ceph::encode(onode, bl);
+  bl.rebuild();
+  auto flattened = reinterpret_cast<const onode_t*>(bl.c_str());
+  // so they are ordered as they should
+  leaf1->insert_at(0, oid1, *flattened);
+  ASSERT_EQ(1, leaf1->count);
+  {
+    auto [slot, found] = leaf1->lower_bound(oid1);
+    ASSERT_TRUE(found);
+    ASSERT_EQ(0, slot);
+  }
+  {
+    leaf2->insert_at(0, oid2, *flattened);
+    auto [slot, found] = leaf2->lower_bound(oid2);
+    ASSERT_TRUE(found);
+    ASSERT_EQ(0, slot);
+  }
+  {
+    leaf2->insert_at(1, oid3, *flattened);
+    auto [slot, found] = leaf2->lower_bound(oid3);
+    ASSERT_TRUE(found);
+    ASSERT_EQ(1, slot);
+  }
+  ASSERT_EQ(2, leaf2->count);
+
+  // normally we let left merge right, so we just need to remove an
+  // entry in parent, let's keep this convention here
+  auto mover = make_mover(dummy_parent, *leaf2, *leaf1, 0);
+  // just grab a single item from right
+  mover.move_from(0, 1, 1);
+  auto to_delta = mover.to_delta();
+  ASSERT_EQ(to_delta.op_t::insert_back, to_delta.op);
+  leaf1->insert_back(std::move(to_delta.keys), std::move(to_delta.cells));
+
+  ASSERT_EQ(2, leaf1->count);
+  {
+    auto [slot, found] = leaf1->lower_bound(oid2);
+    ASSERT_TRUE(found);
+    ASSERT_EQ(1, slot);
+  }
+
+  auto from_delta = mover.from_delta();
+  ASSERT_EQ(from_delta.op_t::shift_left, from_delta.op);
+  leaf2->shift_left(from_delta.n, 0);
+  ASSERT_EQ(1, leaf2->count);
+}
+
+TEST(OnodeNode, merge_right)
+{
+  static constexpr size_t BLOCK_SIZE = 512;
+  char buf1[BLOCK_SIZE];
+  char buf2[BLOCK_SIZE];
+  using leaf_node_0 = node_t<BLOCK_SIZE, 0, ntype_t::leaf>;
+  auto leaf1 = new (buf1) leaf_node_0;
+  auto leaf2 = new (buf2) leaf_node_0;
+  auto& dummy_parent = leaf1;
+
+  ghobject_t oid1{hobject_t{object_t{"earth"}, "", 0, 0, 0, "solar"}};
+  ghobject_t oid2{hobject_t{object_t{"jupiter"}, "", 0, 0, 0, "solar"}};
+  ghobject_t oid3{hobject_t{object_t{"saturn"}, "", 0, 0, 0, "solar"}};
+  Onode onode{"hello"};
+  bufferlist bl;
+  ceph::encode(onode, bl);
+  bl.rebuild();
+  auto flattened = reinterpret_cast<const onode_t*>(bl.c_str());
+  // so they are ordered as they should
+  leaf1->insert_at(0, oid1, *flattened);
+  ASSERT_EQ(1, leaf1->count);
+  {
+    auto [slot, found] = leaf1->lower_bound(oid1);
+    ASSERT_TRUE(found);
+    ASSERT_EQ(0, slot);
+  }
+  {
+    leaf2->insert_at(0, oid2, *flattened);
+    auto [slot, found] = leaf2->lower_bound(oid2);
+    ASSERT_TRUE(found);
+    ASSERT_EQ(0, slot);
+  }
+  {
+    leaf2->insert_at(1, oid3, *flattened);
+    auto [slot, found] = leaf2->lower_bound(oid3);
+    ASSERT_TRUE(found);
+    ASSERT_EQ(1, slot);
+  }
+  ASSERT_EQ(2, leaf2->count);
+
+  // normally we let left merge right, so we just need to remove an
+  // entry in parent, let's keep this convention here
+  auto mover = make_mover(dummy_parent, *leaf2, *leaf1, 0);
+  // just grab a single item from right
+  mover.move_from(0, 1, 2);
+  auto to_delta = mover.to_delta();
+  ASSERT_EQ(to_delta.op_t::insert_back, to_delta.op);
+  leaf1->insert_back(std::move(to_delta.keys), std::move(to_delta.cells));
+
+  ASSERT_EQ(3, leaf1->count);
+  {
+    auto [slot, found] = leaf1->lower_bound(oid2);
+    ASSERT_TRUE(found);
+    ASSERT_EQ(1, slot);
+  }
+  {
+    auto [slot, found] = leaf1->lower_bound(oid3);
+    ASSERT_TRUE(found);
+    ASSERT_EQ(2, slot);
+  }
+
+  // its onode tree's responsibility to retire the node
+  auto from_delta = mover.from_delta();
+  ASSERT_EQ(from_delta.op_t::nop, from_delta.op);
+}
+
+TEST(OnodeNode, remove_basic)
+{
+  static constexpr size_t BLOCK_SIZE = 512;
+  char buf[BLOCK_SIZE];
+  using leaf_node_0 = node_t<BLOCK_SIZE, 0, ntype_t::leaf>;
+  auto leaf = new (buf) leaf_node_0;
+  ghobject_t oid{hobject_t{object_t{"saturn"}, "", 0, 0, 0, "solar"}};
+  {
+    auto [slot, found] = leaf->lower_bound(oid);
+    ASSERT_FALSE(found);
+    ASSERT_EQ(0, slot);
+  }
+  Onode onode{"hello"};
+  bufferlist bl;
+  ceph::encode(onode, bl);
+  bl.rebuild();
+  auto flattened = reinterpret_cast<const onode_t*>(bl.c_str());
+  leaf->insert_at(0, oid, *flattened);
+  {
+    auto [slot, found] = leaf->lower_bound(oid);
+    ASSERT_TRUE(found);
+    ASSERT_EQ(0, slot);
+    leaf->remove_from(slot);
+  }
+  {
+    auto [slot, found] = leaf->lower_bound(oid);
+    ASSERT_FALSE(found);
+  }
+}