]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/os/seastore: add lba_manager interface with btree_lba_manager impl
authorSamuel Just <sjust@redhat.com>
Wed, 27 May 2020 01:14:47 +0000 (18:14 -0700)
committerSamuel Just <sjust@redhat.com>
Tue, 2 Jun 2020 23:56:41 +0000 (16:56 -0700)
Signed-off-by: Samuel Just <sjust@redhat.com>
12 files changed:
src/crimson/os/seastore/CMakeLists.txt
src/crimson/os/seastore/cached_extent.h
src/crimson/os/seastore/lba_manager.cc [new file with mode: 0644]
src/crimson/os/seastore/lba_manager.h [new file with mode: 0644]
src/crimson/os/seastore/lba_manager/btree/btree_lba_manager.cc [new file with mode: 0644]
src/crimson/os/seastore/lba_manager/btree/btree_lba_manager.h [new file with mode: 0644]
src/crimson/os/seastore/lba_manager/btree/lba_btree_node.h [new file with mode: 0644]
src/crimson/os/seastore/lba_manager/btree/lba_btree_node_impl.cc [new file with mode: 0644]
src/crimson/os/seastore/lba_manager/btree/lba_btree_node_impl.h [new file with mode: 0644]
src/crimson/os/seastore/seastore_types.h
src/test/crimson/seastore/CMakeLists.txt
src/test/crimson/seastore/test_btree_lba_manager.cc [new file with mode: 0644]

index 5df0d94123f5ccb7b41945976d678da60a43f8fd..eda36b4f2721b51fafc59c9204d10e19f60af037 100644 (file)
@@ -5,6 +5,9 @@ add_library(crimson-seastore
   segment_manager.cc
   journal.cc
   cache.cc
+  lba_manager.cc
+  lba_manager/btree/btree_lba_manager.cc
+  lba_manager/btree/lba_btree_node_impl.cc
   root_block.cc
        )
 target_link_libraries(crimson-seastore
index 6a464d3d907f2d1e7192a04b88c0272e642da3b6..d85da1011b9aacc0be0749c1ef129f394297e11d 100644 (file)
@@ -108,14 +108,16 @@ public:
   virtual extent_types_t get_type() const = 0;
 
   friend std::ostream &operator<<(std::ostream &, extent_state_t);
+  virtual std::ostream &print_detail(std::ostream &out) const { return out; }
   std::ostream &print(std::ostream &out) const {
-    return out << "CachedExtent(addr=" << this
-              << ", type=" << get_type()
-              << ", version=" << version
-              << ", paddr=" << get_paddr()
-              << ", state=" << state
-              << ", refcount=" << use_count()
-              << ")";
+    out << "CachedExtent(addr=" << this
+       << ", type=" << get_type()
+       << ", version=" << version
+       << ", paddr=" << get_paddr()
+       << ", state=" << state
+       << ", refcount=" << use_count();
+    print_detail(out);
+    return out << ")";
   }
 
   /**
diff --git a/src/crimson/os/seastore/lba_manager.cc b/src/crimson/os/seastore/lba_manager.cc
new file mode 100644 (file)
index 0000000..2fe1345
--- /dev/null
@@ -0,0 +1,38 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "crimson/os/seastore/segment_manager.h"
+#include "crimson/os/seastore/cache.h"
+#include "crimson/os/seastore/lba_manager.h"
+#include "crimson/os/seastore/lba_manager/btree/btree_lba_manager.h"
+
+namespace crimson::os::seastore::lba_manager {
+
+LBAManagerRef create_lba_manager(
+  SegmentManager &segment_manager,
+  Cache &cache) {
+  return LBAManagerRef(new btree::BtreeLBAManager(segment_manager, cache));
+}
+
+}
+
+namespace crimson::os::seastore {
+
+std::ostream &operator<<(std::ostream &out, const LBAPin &rhs)
+{
+  return out << "LBAPin(" << rhs.get_laddr() << "~" << rhs.get_length()
+            << "->" << rhs.get_paddr();
+}
+
+std::ostream &operator<<(std::ostream &out, const lba_pin_list_t &rhs)
+{
+  bool first = true;
+  out << '[';
+  for (auto &i: rhs) {
+    out << (first ? "" : ",") << *i;
+    first = false;
+  }
+  return out << ']';
+}
+
+};
diff --git a/src/crimson/os/seastore/lba_manager.h b/src/crimson/os/seastore/lba_manager.h
new file mode 100644 (file)
index 0000000..c29652c
--- /dev/null
@@ -0,0 +1,151 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <iostream>
+
+#include <boost/intrusive_ptr.hpp>
+#include <boost/smart_ptr/intrusive_ref_counter.hpp>
+
+#include <seastar/core/future.hh>
+
+#include "include/ceph_assert.h"
+#include "include/buffer_fwd.h"
+#include "include/interval_set.h"
+#include "common/interval_map.h"
+
+#include "crimson/osd/exceptions.h"
+
+#include "crimson/os/seastore/cache.h"
+#include "crimson/os/seastore/seastore_types.h"
+#include "crimson/os/seastore/segment_manager.h"
+
+namespace crimson::os::seastore {
+
+class LBAPin;
+using LBAPinRef = std::unique_ptr<LBAPin>;
+class LBAPin {
+public:
+  virtual extent_len_t get_length() const = 0;
+  virtual paddr_t get_paddr() const = 0;
+  virtual laddr_t get_laddr() const = 0;
+  virtual LBAPinRef duplicate() const = 0;
+
+  virtual ~LBAPin() {}
+};
+std::ostream &operator<<(std::ostream &out, const LBAPin &rhs);
+
+using lba_pin_list_t = std::list<LBAPinRef>;
+
+std::ostream &operator<<(std::ostream &out, const lba_pin_list_t &rhs);
+
+/**
+ * Abstract interface for managing the logical to physical mapping
+ */
+class LBAManager {
+public:
+  using mkfs_ertr = crimson::errorator<
+    crimson::ct_error::input_output_error>;
+  using mkfs_ret = mkfs_ertr::future<>;
+  virtual mkfs_ret mkfs(
+    Transaction &t
+  ) = 0;
+
+  /**
+   * Fetches mappings for laddr_t in range [offset, offset + len)
+   *
+   * Future will not resolve until all pins have resolved (set_paddr called)
+   */
+  using get_mapping_ertr = crimson::errorator<
+  crimson::ct_error::input_output_error>;
+  using get_mapping_ret = get_mapping_ertr::future<lba_pin_list_t>;
+  virtual get_mapping_ret get_mapping(
+    Transaction &t,
+    laddr_t offset, extent_len_t length) = 0;
+
+  /**
+   * Fetches mappings for laddr_t in range [offset, offset + len)
+   *
+   * Future will not result until all pins have resolved (set_paddr called)
+   */
+  using get_mappings_ertr = crimson::errorator<
+    crimson::ct_error::input_output_error>;
+  using get_mappings_ret = get_mapping_ertr::future<lba_pin_list_t>;
+  virtual get_mappings_ret get_mappings(
+    Transaction &t,
+    laddr_list_t &&extent_lisk) = 0;
+
+  /**
+   * Allocates a new mapping referenced by LBARef
+   *
+   * Offset will be relative to the block offset of the record
+   * This mapping will block from transaction submission until set_paddr
+   * is called on the LBAPin.
+   */
+  using alloc_extent_ertr = crimson::errorator<
+    crimson::ct_error::input_output_error>;
+  using alloc_extent_ret = alloc_extent_ertr::future<LBAPinRef>;
+  virtual alloc_extent_ret alloc_extent(
+    Transaction &t,
+    laddr_t hint,
+    extent_len_t len,
+    paddr_t addr) = 0;
+
+  /**
+   * Creates a new absolute mapping.
+   *
+   * off~len must be unreferenced
+   */
+  using set_extent_ertr = crimson::errorator<
+    crimson::ct_error::input_output_error,
+    crimson::ct_error::invarg>;
+  using set_extent_ret = set_extent_ertr::future<LBAPinRef>;
+  virtual set_extent_ret set_extent(
+    Transaction &t,
+    laddr_t off, extent_len_t len, paddr_t addr) = 0;
+
+  using ref_ertr = crimson::errorator<
+    crimson::ct_error::enoent,
+    crimson::ct_error::input_output_error>;
+  using ref_ret = ref_ertr::future<unsigned>;
+
+  /**
+   * Decrements ref count on extent
+   *
+   * @return returns resulting refcount
+   */
+  virtual ref_ret decref_extent(
+    Transaction &t,
+    laddr_t addr) = 0;
+
+  /**
+   * Increments ref count on extent
+   *
+   * @return returns resulting refcount
+   */
+  virtual ref_ret incref_extent(
+    Transaction &t,
+    laddr_t addr) = 0;
+
+  // TODO: probably unused, removed
+  using submit_lba_transaction_ertr = crimson::errorator<
+    crimson::ct_error::input_output_error>;
+  using submit_lba_transaction_ret = submit_lba_transaction_ertr::future<>;
+  virtual submit_lba_transaction_ret submit_lba_transaction(
+    Transaction &t) = 0;
+
+  virtual TransactionRef create_transaction() = 0;
+
+  virtual ~LBAManager() {}
+};
+using LBAManagerRef = std::unique_ptr<LBAManager>;
+
+class Cache;
+namespace lba_manager {
+LBAManagerRef create_lba_manager(
+  SegmentManager &segment_manager,
+  Cache &cache);
+}
+
+}
diff --git a/src/crimson/os/seastore/lba_manager/btree/btree_lba_manager.cc b/src/crimson/os/seastore/lba_manager/btree/btree_lba_manager.cc
new file mode 100644 (file)
index 0000000..beb69d1
--- /dev/null
@@ -0,0 +1,254 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <sys/mman.h>
+#include <string.h>
+
+#include "crimson/common/log.h"
+
+#include "include/buffer.h"
+#include "crimson/os/seastore/lba_manager/btree/btree_lba_manager.h"
+#include "crimson/os/seastore/lba_manager/btree/lba_btree_node_impl.h"
+
+
+namespace {
+  seastar::logger& logger() {
+    return crimson::get_logger(ceph_subsys_filestore);
+  }
+}
+
+namespace crimson::os::seastore::lba_manager::btree {
+
+BtreeLBAManager::mkfs_ret BtreeLBAManager::mkfs(
+  Transaction &t)
+{
+  logger().debug("BtreeLBAManager::mkfs");
+  return cache.get_root(t).safe_then([this, &t](auto root) {
+    auto root_leaf = cache.alloc_new_extent<LBALeafNode>(
+      t,
+      LBA_BLOCK_SIZE);
+    root_leaf->set_size(0);
+    root->get_lba_root() =
+      btree_lba_root_t{
+        0,
+        0,
+        root_leaf->get_paddr() - root->get_paddr(),
+        paddr_t{}};
+    return mkfs_ertr::now();
+  });
+}
+
+BtreeLBAManager::get_root_ret
+BtreeLBAManager::get_root(Transaction &t)
+{
+  return cache.get_root(t).safe_then([this, &t](auto root) {
+    paddr_t root_addr = root->get_lba_root().lba_root_addr;
+    root_addr = root_addr.maybe_relative_to(root->get_paddr());
+    logger().debug(
+      "BtreeLBAManager::get_root: reading root at {} depth {}",
+      root->get_lba_root().lba_root_addr,
+      root->get_lba_root().lba_depth);
+    return get_lba_btree_extent(
+      cache,
+      t,
+      root->get_lba_root().lba_depth,
+      root->get_lba_root().lba_root_addr,
+      root->get_paddr());
+  });
+}
+
+BtreeLBAManager::get_mapping_ret
+BtreeLBAManager::get_mapping(
+  Transaction &t,
+  laddr_t offset, extent_len_t length)
+{
+  logger().debug("BtreeLBAManager::get_mapping: {}, {}", offset, length);
+  return get_root(
+    t).safe_then([this, &t, offset, length](auto extent) {
+      return extent->lookup_range(
+       cache, t, offset, length);
+    }).safe_then([](auto &&e) {
+      logger().debug("BtreeLBAManager::get_mapping: got mapping {}", e);
+      return get_mapping_ret(
+       get_mapping_ertr::ready_future_marker{},
+       std::move(e));
+    });
+}
+
+
+BtreeLBAManager::get_mappings_ret
+BtreeLBAManager::get_mappings(
+  Transaction &t,
+  laddr_list_t &&list)
+{
+  logger().debug("BtreeLBAManager::get_mappings: {}", list);
+  auto l = std::make_unique<laddr_list_t>(std::move(list));
+  auto retptr = std::make_unique<lba_pin_list_t>();
+  auto &ret = *retptr;
+  return crimson::do_for_each(
+    l->begin(),
+    l->end(),
+    [this, &t, &ret](const auto &p) {
+      return get_mapping(t, p.first, p.second).safe_then(
+       [&ret](auto res) {
+         ret.splice(ret.end(), res, res.begin(), res.end());
+       });
+    }).safe_then([l=std::move(l), retptr=std::move(retptr)]() mutable {
+      return std::move(*retptr);
+    });
+}
+
+BtreeLBAManager::alloc_extent_ret
+BtreeLBAManager::alloc_extent(
+  Transaction &t,
+  laddr_t hint,
+  extent_len_t len,
+  paddr_t addr)
+{
+  // TODO: we can certainly combine the lookup and the insert.
+  return get_root(
+    t).safe_then([this, &t, hint, len](auto extent) {
+      logger().debug(
+       "BtreeLBAManager::alloc_extent: beginning search at {}",
+       *extent);
+      return extent->find_hole(
+       cache,
+       t,
+       hint,
+       L_ADDR_MAX,
+       len).safe_then([extent](auto ret) {
+         return std::make_pair(ret, extent);
+       });
+    }).safe_then([this, &t, len, addr](auto allocation_pair) {
+      auto &[laddr, extent] = allocation_pair;
+      ceph_assert(laddr != L_ADDR_MAX);
+      return insert_mapping(
+       t,
+       extent,
+       laddr,
+       { len, addr, 1, 0 }
+      ).safe_then([laddr=laddr, addr, len](auto pin) {
+       logger().debug(
+         "BtreeLBAManager::alloc_extent: alloc {}~{} for {}",
+         laddr,
+         len,
+         addr);
+       return alloc_extent_ret(
+         alloc_extent_ertr::ready_future_marker{},
+         LBAPinRef(pin.release()));
+      });
+    });
+}
+
+BtreeLBAManager::set_extent_ret
+BtreeLBAManager::set_extent(
+  Transaction &t,
+  laddr_t off, extent_len_t len, paddr_t addr)
+{
+  return get_root(
+    t).safe_then([this, &t, off, len, addr](auto root) {
+      return insert_mapping(
+       t,
+       root,
+       off,
+       { len, addr, 1, 0 });
+    }).safe_then([](auto ret) {
+      return set_extent_ret(
+       set_extent_ertr::ready_future_marker{},
+       LBAPinRef(ret.release()));
+    });
+}
+
+BtreeLBAManager::submit_lba_transaction_ret
+BtreeLBAManager::submit_lba_transaction(
+  Transaction &t)
+{
+  // This is a noop for now and may end up not being necessary
+  return submit_lba_transaction_ertr::now();
+}
+
+BtreeLBAManager::BtreeLBAManager(
+  SegmentManager &segment_manager,
+  Cache &cache)
+  : segment_manager(segment_manager),
+    cache(cache) {}
+
+BtreeLBAManager::insert_mapping_ret BtreeLBAManager::insert_mapping(
+  Transaction &t,
+  LBANodeRef root,
+  laddr_t laddr,
+  lba_map_val_t val)
+{
+  auto split = insert_mapping_ertr::future<LBANodeRef>(
+    insert_mapping_ertr::ready_future_marker{},
+    root);
+  if (root->at_max_capacity()) {
+    split = cache.get_root(t).safe_then(
+      [this, root, laddr, &t](RootBlockRef croot) {
+       logger().debug(
+         "BtreeLBAManager::insert_mapping: splitting root {}",
+         *croot);
+       {
+         auto mut_croot = cache.duplicate_for_write(t, croot);
+         croot = mut_croot->cast<RootBlock>();
+       }
+       auto nroot = cache.alloc_new_extent<LBAInternalNode>(t, LBA_BLOCK_SIZE);
+       nroot->set_depth(root->depth + 1);
+       nroot->begin()->set_key(L_ADDR_MIN);
+       nroot->begin()->set_val(root->get_paddr());
+       nroot->set_size(1);
+       croot->get_lba_root().lba_root_addr = nroot->get_paddr();
+       croot->get_lba_root().lba_depth = root->depth + 1;
+       return nroot->split_entry(cache, t, laddr, nroot->begin(), root);
+      });
+  }
+  return split.safe_then([this, &t, laddr, val](LBANodeRef node) {
+    node = cache.duplicate_for_write(t, node)->cast<LBANode>();
+    return node->insert(cache, t, laddr, val);
+  });
+}
+
+BtreeLBAManager::update_refcount_ret BtreeLBAManager::update_refcount(
+  Transaction &t,
+  laddr_t addr,
+  int delta)
+{
+  return update_mapping(
+    t,
+    addr,
+    [delta](const lba_map_val_t &in) {
+      lba_map_val_t out = in;
+      ceph_assert((int)out.refcount + delta >= 0);
+      out.refcount += delta;
+      if (out.refcount == 0) {
+       return std::optional<lba_map_val_t>();
+      } else {
+       return std::optional<lba_map_val_t>(out);
+      }
+    }).safe_then([](auto result) {
+      if (!result)
+       return 0u;
+      else
+       return result->refcount;
+    });
+}
+
+BtreeLBAManager::update_mapping_ret BtreeLBAManager::update_mapping(
+  Transaction &t,
+  laddr_t addr,
+  update_func_t &&f)
+{
+  return get_root(t
+  ).safe_then([this, f=std::move(f), &t, addr](LBANodeRef root) mutable {
+    if (root->depth == 0) {
+      root = cache.duplicate_for_write(t, root)->cast<LBANode>();
+    }
+    return root->mutate_mapping(
+      cache,
+      t,
+      addr,
+      std::move(f));
+  });
+}
+
+}
diff --git a/src/crimson/os/seastore/lba_manager/btree/btree_lba_manager.h b/src/crimson/os/seastore/lba_manager/btree/btree_lba_manager.h
new file mode 100644 (file)
index 0000000..6d9543e
--- /dev/null
@@ -0,0 +1,145 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <iostream>
+
+#include <boost/intrusive_ptr.hpp>
+#include <boost/smart_ptr/intrusive_ref_counter.hpp>
+#include <seastar/core/future.hh>
+
+#include "include/ceph_assert.h"
+#include "include/buffer_fwd.h"
+#include "include/interval_set.h"
+#include "common/interval_map.h"
+#include "crimson/osd/exceptions.h"
+
+#include "crimson/os/seastore/seastore_types.h"
+#include "crimson/os/seastore/lba_manager.h"
+#include "crimson/os/seastore/cache.h"
+#include "crimson/os/seastore/segment_manager.h"
+
+#include "crimson/os/seastore/lba_manager/btree/lba_btree_node.h"
+
+namespace crimson::os::seastore::lba_manager::btree {
+
+/**
+ * BtreeLBAManager
+ *
+ * Uses a wandering btree to track two things:
+ * 1) lba state including laddr_t -> paddr_t mapping
+ * 2) reverse paddr_t -> laddr_t mapping for gc (TODO)
+ *
+ * Generally, any transaction will involve
+ * 1) deltas against lba tree nodes
+ * 2) new lba tree nodes
+ *    - Note, there must necessarily be a delta linking
+ *      these new nodes into the tree -- might be a
+ *      bootstrap_state_t delta if new root
+ *
+ * get_mappings, alloc_extent_*, etc populate a Transaction
+ * which then gets submitted
+ */
+class BtreeLBAManager : public LBAManager {
+public:
+  BtreeLBAManager(
+    SegmentManager &segment_manager,
+    Cache &cache);
+
+  mkfs_ret mkfs(
+    Transaction &t) final;
+
+  get_mapping_ret get_mapping(
+    Transaction &t,
+    laddr_t offset, extent_len_t length) final;
+
+  get_mappings_ret get_mappings(
+    Transaction &t,
+    laddr_list_t &&list) final;
+
+  alloc_extent_ret alloc_extent(
+    Transaction &t,
+    laddr_t hint,
+    extent_len_t len,
+    paddr_t addr) final;
+
+  set_extent_ret set_extent(
+    Transaction &t,
+    laddr_t off, extent_len_t len, paddr_t addr) final;
+
+  ref_ret decref_extent(
+    Transaction &t,
+    laddr_t addr) final {
+    return update_refcount(t, addr, -1);
+  }
+
+  ref_ret incref_extent(
+    Transaction &t,
+    laddr_t addr) final {
+    return update_refcount(t, addr, 1);
+  }
+
+  submit_lba_transaction_ret submit_lba_transaction(
+    Transaction &t) final;
+
+  TransactionRef create_transaction() final {
+    auto t = new Transaction;
+    return TransactionRef(t);
+  }
+
+private:
+  SegmentManager &segment_manager;
+  Cache &cache;
+
+  /**
+   * get_root
+   *
+   * Get a reference to the root LBANode.
+   */
+  using get_root_ertr = Cache::get_extent_ertr;
+  using get_root_ret = get_root_ertr::future<LBANodeRef>;
+  get_root_ret get_root(Transaction &);
+
+  /**
+   * insert_mapping
+   *
+   * Insert a lba mapping into the tree
+   */
+  using insert_mapping_ertr = crimson::errorator<
+    crimson::ct_error::input_output_error>;
+  using insert_mapping_ret = insert_mapping_ertr::future<LBAPinRef>;
+  insert_mapping_ret insert_mapping(
+    Transaction &t,   ///< [in,out] transaction
+    LBANodeRef root,  ///< [in] root node
+    laddr_t laddr,    ///< [in] logical addr to insert
+    lba_map_val_t val ///< [in] mapping to insert
+  );
+
+  /**
+   * update_refcount
+   *
+   * Updates refcount, returns resulting refcount
+   */
+  using update_refcount_ret = ref_ret;
+  update_refcount_ret update_refcount(
+    Transaction &t,
+    laddr_t addr,
+    int delta);
+
+  /**
+   * update_mapping
+   *
+   * Updates mapping, removes if f returns nullopt
+   */
+  using update_mapping_ertr = ref_ertr;
+  using update_mapping_ret = ref_ertr::future<std::optional<lba_map_val_t>>;
+  using update_func_t = LBANode::mutate_func_t;
+  update_mapping_ret update_mapping(
+    Transaction &t,
+    laddr_t addr,
+    update_func_t &&f);
+};
+using BtreeLBAManagerRef = std::unique_ptr<BtreeLBAManager>;
+
+}
diff --git a/src/crimson/os/seastore/lba_manager/btree/lba_btree_node.h b/src/crimson/os/seastore/lba_manager/btree/lba_btree_node.h
new file mode 100644 (file)
index 0000000..e4e1809
--- /dev/null
@@ -0,0 +1,184 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <sys/mman.h>
+#include <string.h>
+
+#include <memory>
+#include <string.h>
+
+#include "crimson/common/log.h"
+
+namespace crimson::os::seastore::lba_manager::btree {
+
+/**
+ * lba_map_val_t
+ *
+ * struct representing a single lba mapping
+ */
+struct lba_map_val_t {
+  extent_len_t len = 0;  ///< length of mapping
+  paddr_t paddr;         ///< physical addr of mapping
+  uint32_t refcount = 0; ///< refcount
+  uint32_t checksum = 0; ///< checksum of original block written at paddr (TODO)
+
+  lba_map_val_t(
+    extent_len_t len,
+    paddr_t paddr,
+    uint32_t refcount,
+    uint32_t checksum)
+    : len(len), paddr(paddr), refcount(refcount), checksum(checksum) {}
+};
+
+class BtreeLBAPin;
+using BtreeLBAPinRef = std::unique_ptr<BtreeLBAPin>;
+
+/**
+ * LBANode
+ *
+ * Base class enabling recursive lookup between internal and leaf nodes.
+ */
+struct LBANode : CachedExtent {
+  using LBANodeRef = TCachedExtentRef<LBANode>;
+  using lookup_range_ertr = LBAManager::get_mapping_ertr;
+  using lookup_range_ret = LBAManager::get_mapping_ret;
+
+  depth_t depth = 0;
+
+  LBANode(ceph::bufferptr &&ptr) : CachedExtent(std::move(ptr)) {}
+  LBANode(const LBANode &rhs) = default;
+
+  void set_depth(depth_t _depth) { depth = _depth; }
+
+  /**
+   * lookup_range
+   *
+   * Returns mappings within range [addr, addr+len)
+   */
+  virtual lookup_range_ret lookup_range(
+    Cache &cache,
+    Transaction &transaction,
+    laddr_t addr,
+    extent_len_t len) = 0;
+
+  /**
+   * insert
+   *
+   * Recursively inserts into subtree rooted at *this.  Caller
+   * must already have handled splitting if at_max_capacity().
+   *
+   * Precondition: !at_max_capacity()
+   */
+  using insert_ertr = crimson::errorator<
+    crimson::ct_error::input_output_error
+    >;
+  using insert_ret = insert_ertr::future<LBAPinRef>;
+  virtual insert_ret insert(
+    Cache &cache,
+    Transaction &transaction,
+    laddr_t laddr,
+    lba_map_val_t val) = 0;
+
+  /**
+   * find_hole
+   *
+   * Finds minimum hole of size len in [min, max)
+   *
+   * @return addr of hole, L_ADDR_NULL if unfound
+   */
+  using find_hole_ertr = crimson::errorator<
+    crimson::ct_error::input_output_error>;
+  using find_hole_ret = find_hole_ertr::future<laddr_t>;
+  virtual find_hole_ret find_hole(
+    Cache &cache,
+    Transaction &t,
+    laddr_t min,
+    laddr_t max,
+    extent_len_t len) = 0;
+
+  /**
+   * mutate_mapping
+   *
+   * Lookups up laddr, calls f on value. If f returns a value, inserts it.
+   * If it returns nullopt, removes the value.
+   * Caller must already have merged if at_min_capacity().
+   *
+   * Precondition: !at_min_capacity()
+   */
+  using mutate_mapping_ertr = crimson::errorator<
+    crimson::ct_error::enoent,            ///< mapping does not exist
+    crimson::ct_error::input_output_error
+    >;
+  using mutate_mapping_ret = mutate_mapping_ertr::future<
+    std::optional<lba_map_val_t>>;
+  using mutate_func_t = std::function<
+    std::optional<lba_map_val_t>(const lba_map_val_t &v)
+    >;
+  virtual mutate_mapping_ret mutate_mapping(
+    Cache &cache,
+    Transaction &transaction,
+    laddr_t laddr,
+    mutate_func_t &&f) = 0;
+
+  /**
+   * make_split_children
+   *
+   * Generates appropriately typed left and right nodes formed from the
+   * contents of *this.
+   *
+   * Returns <left, right, pivot> where pivot is the first value of right.
+   */
+  virtual std::tuple<
+    LBANodeRef,
+    LBANodeRef,
+    laddr_t>
+  make_split_children(Cache &cache, Transaction &t) = 0;
+
+  /**
+   * make_full_merge
+   *
+   * Returns a single node formed from merging *this and right.
+   * Precondition: at_min_capacity() && right.at_min_capacity()
+   */
+  virtual LBANodeRef make_full_merge(
+    Cache &cache, Transaction &t, LBANodeRef &right) = 0;
+
+  /**
+   * make_balanced
+   *
+   * Returns nodes formed by balancing the contents of *this and right.
+   *
+   * Returns <left, right, pivot> where pivot is the first value of right.
+   */
+  virtual std::tuple<
+    LBANodeRef,
+    LBANodeRef,
+    laddr_t>
+  make_balanced(
+    Cache &cache, Transaction &t, LBANodeRef &right,
+    bool prefer_left) = 0;
+
+  virtual bool at_max_capacity() const = 0;
+  virtual bool at_min_capacity() const = 0;
+
+  virtual ~LBANode() = default;
+};
+using LBANodeRef = LBANode::LBANodeRef;
+
+/**
+ * get_lba_btree_extent
+ *
+ * Fetches node at depth of the appropriate type.
+ */
+Cache::get_extent_ertr::future<LBANodeRef> get_lba_btree_extent(
+  Cache &cache,
+  Transaction &t,
+  depth_t depth,  ///< [in] depth of node to fetch
+  paddr_t offset, ///< [in] physical addr of node
+  paddr_t base    ///< [in] depending on user, block addr or record addr
+                  ///       in case offset is relative
+);
+
+}
diff --git a/src/crimson/os/seastore/lba_manager/btree/lba_btree_node_impl.cc b/src/crimson/os/seastore/lba_manager/btree/lba_btree_node_impl.cc
new file mode 100644 (file)
index 0000000..f221f68
--- /dev/null
@@ -0,0 +1,536 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <sys/mman.h>
+#include <string.h>
+
+#include <memory>
+#include <string.h>
+
+#include "include/buffer.h"
+#include "include/byteorder.h"
+
+#include "crimson/os/seastore/lba_manager/btree/lba_btree_node_impl.h"
+
+namespace {
+  seastar::logger& logger() {
+    return crimson::get_logger(ceph_subsys_filestore);
+  }
+}
+
+namespace crimson::os::seastore::lba_manager::btree {
+
+std::ostream &LBAInternalNode::print_detail(std::ostream &out) const
+{
+  return out << ", size=" << get_size()
+            << ", depth=" << depth;
+}
+
+LBAInternalNode::lookup_range_ret LBAInternalNode::lookup_range(
+  Cache &cache,
+  Transaction &t,
+  laddr_t addr,
+  extent_len_t len)
+{
+  auto [begin, end] = bound(addr, addr + len);
+  auto result_up = std::make_unique<lba_pin_list_t>();
+  auto &result = *result_up;
+  return crimson::do_for_each(
+    std::move(begin),
+    std::move(end),
+    [this, &cache, &t, &result, addr, len](const auto &val) mutable {
+      return get_lba_btree_extent(
+       cache,
+       t,
+       depth-1,
+       val.get_val(),
+       get_paddr()).safe_then(
+         [&cache, &t, &result, addr, len](auto extent) mutable {
+           // TODO: add backrefs to ensure cache residence of parents
+           return extent->lookup_range(
+             cache,
+             t,
+             addr,
+             len).safe_then(
+               [&result](auto pin_list) mutable {
+                 result.splice(result.end(), pin_list,
+                               pin_list.begin(), pin_list.end());
+               });
+         });
+    }).safe_then([result=std::move(result_up)] {
+      return lookup_range_ertr::make_ready_future<lba_pin_list_t>(
+       std::move(*result));
+    });
+}
+
+LBAInternalNode::insert_ret LBAInternalNode::insert(
+  Cache &cache,
+  Transaction &t,
+  laddr_t laddr,
+  lba_map_val_t val)
+{
+  auto insertion_pt = get_containing_child(laddr);
+  return get_lba_btree_extent(
+    cache,
+    t,
+    depth-1,
+    insertion_pt->get_val(),
+    get_paddr()).safe_then(
+      [this, insertion_pt, &cache, &t, laddr, val=std::move(val)](
+       auto extent) mutable {
+       return extent->at_max_capacity() ?
+         split_entry(cache, t, laddr, insertion_pt, extent) :
+         insert_ertr::make_ready_future<LBANodeRef>(std::move(extent));
+      }).safe_then([&cache, &t, laddr, val=std::move(val)](
+                    LBANodeRef extent) mutable {
+       if (extent->depth == 0) {
+         auto mut_extent = cache.duplicate_for_write(t, extent);
+         extent = mut_extent->cast<LBANode>();
+       }
+       return extent->insert(cache, t, laddr, val);
+      });
+}
+
+LBAInternalNode::mutate_mapping_ret LBAInternalNode::mutate_mapping(
+  Cache &cache,
+  Transaction &t,
+  laddr_t laddr,
+  mutate_func_t &&f)
+{
+  return get_lba_btree_extent(
+    cache,
+    t,
+    depth-1,
+    get_containing_child(laddr)->get_val(),
+    get_paddr()
+  ).safe_then([this, &cache, &t, laddr](LBANodeRef extent) {
+    if (extent->at_min_capacity()) {
+      auto mut_this = cache.duplicate_for_write(
+       t, this)->cast<LBAInternalNode>();
+      return mut_this->merge_entry(
+       cache,
+       t,
+       laddr,
+       mut_this->get_containing_child(laddr),
+       extent);
+    } else {
+      return merge_ertr::make_ready_future<LBANodeRef>(
+       std::move(extent));
+    }
+  }).safe_then([&cache, &t, laddr, f=std::move(f)](LBANodeRef extent) mutable {
+    if (extent->depth == 0) {
+      auto mut_extent = cache.duplicate_for_write(
+       t, extent)->cast<LBANode>();
+      return mut_extent->mutate_mapping(cache, t, laddr, std::move(f));
+    } else {
+      return extent->mutate_mapping(cache, t, laddr, std::move(f));
+    }
+  });
+}
+
+LBAInternalNode::find_hole_ret LBAInternalNode::find_hole(
+  Cache &cache,
+  Transaction &t,
+  laddr_t min,
+  laddr_t max,
+  extent_len_t len)
+{
+  logger().debug(
+    "LBAInternalNode::find_hole min={}, max={}, len={}, *this={}",
+    min, max, len, *this);
+  auto bounds = bound(min, max);
+  return seastar::do_with(
+    bounds.first,
+    bounds.second,
+    L_ADDR_NULL,
+    [this, &cache, &t, len](auto &i, auto &e, auto &ret) {
+      return crimson::do_until(
+       [this, &cache, &t, &i, &e, &ret, len] {
+         if (i == e) {
+           return find_hole_ertr::make_ready_future<std::optional<laddr_t>>(
+             std::make_optional<laddr_t>(L_ADDR_NULL));
+         }
+         return get_lba_btree_extent(
+           cache,
+           t,
+           depth-1,
+           i->get_val(),
+           get_paddr()
+         ).safe_then([&cache, &t, &i, len](auto extent) mutable {
+           logger().debug(
+             "LBAInternalNode::find_hole extent {} lb {} ub {}",
+             *extent,
+             i->get_key(),
+             i->get_next_key_or_max());
+           return extent->find_hole(
+             cache,
+             t,
+             i->get_key(),
+             i->get_next_key_or_max(),
+             len);
+         }).safe_then([&i, &ret](auto addr) mutable {
+           i++;
+           if (addr != L_ADDR_NULL) {
+             ret = addr;
+           }
+           return find_hole_ertr::make_ready_future<std::optional<laddr_t>>(
+             addr == L_ADDR_NULL ? std::nullopt :
+             std::make_optional<laddr_t>(addr));
+         });
+       }).safe_then([&ret]() {
+         return ret;
+       });
+    });
+}
+
+
+void LBAInternalNode::resolve_relative_addrs(paddr_t base) {
+  for (auto i: *this) {
+    if (i->get_val().is_relative()) {
+      auto updated = base.add_relative(i->get_val());
+      logger().debug(
+       "LBAInternalNode::resolve_relative_addrs {} -> {}",
+       i->get_val(),
+       updated);
+      i->set_val(updated);
+    }
+  }
+}
+
+
+LBAInternalNode::split_ret
+LBAInternalNode::split_entry(
+  Cache &c, Transaction &t, laddr_t addr,
+  internal_iterator_t iter, LBANodeRef entry)
+{
+  ceph_assert(!at_max_capacity());
+  auto [left, right, pivot] = entry->make_split_children(c, t);
+
+  journal_remove(iter->get_key());
+  journal_insert(iter->get_key(), left->get_paddr());
+  journal_insert(pivot, right->get_paddr());
+
+  copy_from_local(iter + 1, iter, end());
+  iter->set_val(maybe_generate_relative(left->get_paddr()));
+  iter++;
+  iter->set_key(pivot);
+  iter->set_val(maybe_generate_relative(right->get_paddr()));
+  set_size(get_size() + 1);
+
+  c.retire_extent(t, entry);
+
+  logger().debug(
+    "LBAInternalNode::split_entry *this {} left {} right {}",
+    *this,
+    *left,
+    *right);
+
+  return split_ertr::make_ready_future<LBANodeRef>(
+    pivot > addr ? left : right
+  );
+}
+
+void LBAInternalNode::journal_remove(
+  laddr_t to_remove)
+{
+  // TODO
+}
+
+void LBAInternalNode::journal_insert(
+  laddr_t to_insert,
+  paddr_t val)
+{
+  // TODO
+}
+
+LBAInternalNode::merge_ret
+LBAInternalNode::merge_entry(
+  Cache &c, Transaction &t, laddr_t addr,
+  internal_iterator_t iter, LBANodeRef entry)
+{
+  logger().debug(
+    "LBAInternalNode: merge_entry: {}, {}",
+    *this,
+    *entry);
+  auto donor_is_left = (iter + 1) == end();
+  auto donor_iter = donor_is_left ? iter - 1 : iter + 1;
+  return get_lba_btree_extent(
+    c,
+    t,
+    depth - 1,
+    donor_iter->get_val(),
+    get_paddr()
+  ).safe_then([this, &c, &t, addr, iter, entry, donor_iter, donor_is_left](
+               auto donor) mutable {
+    auto [l, r] = donor_is_left ?
+      std::make_pair(donor, entry) : std::make_pair(entry, donor);
+    auto [liter, riter] = donor_is_left ?
+      std::make_pair(donor_iter, iter) : std::make_pair(iter, donor_iter);
+    if (donor->at_min_capacity()) {
+      auto replacement = l->make_full_merge(
+       c,
+       t,
+       r);
+
+      journal_remove(riter->get_key());
+      journal_remove(liter->get_key());
+      journal_insert(liter->get_key(), replacement->get_paddr());
+
+      liter->set_val(maybe_generate_relative(replacement->get_paddr()));
+      copy_from_local(riter, riter + 1, end());
+      set_size(get_size() - 1);
+
+      c.retire_extent(t, l);
+      c.retire_extent(t, r);
+      return split_ertr::make_ready_future<LBANodeRef>(replacement);
+    } else {
+      logger().debug(
+       "LBAInternalEntry::merge_entry balanced l {} r {}",
+       *l,
+       *r);
+      auto [replacement_l, replacement_r, pivot] =
+       l->make_balanced(
+         c,
+         t,
+         r,
+         !donor_is_left);
+
+      journal_remove(liter->get_key());
+      journal_remove(riter->get_key());
+      journal_insert(liter->get_key(), replacement_l->get_paddr());
+      journal_insert(pivot, replacement_r->get_paddr());
+
+      liter->set_val(
+       maybe_generate_relative(replacement_l->get_paddr()));
+      riter->set_key(pivot);
+      riter->set_val(
+       maybe_generate_relative(replacement_r->get_paddr()));
+
+      c.retire_extent(t, l);
+      c.retire_extent(t, r);
+      return split_ertr::make_ready_future<LBANodeRef>(
+       addr >= pivot ? replacement_r : replacement_l
+      );
+    }
+  });
+}
+
+
+LBAInternalNode::internal_iterator_t
+LBAInternalNode::get_containing_child(laddr_t laddr)
+{
+  // TODO: binary search
+  for (auto i = begin(); i != end(); ++i) {
+    if (i.contains(laddr))
+      return i;
+  }
+  ceph_assert(0 == "invalid");
+  return end();
+}
+
+std::ostream &LBALeafNode::print_detail(std::ostream &out) const
+{
+  return out << ", size=" << get_size()
+            << ", depth=" << depth;
+}
+
+LBALeafNode::lookup_range_ret LBALeafNode::lookup_range(
+  Cache &cache,
+  Transaction &t,
+  laddr_t addr,
+  extent_len_t len)
+{
+  logger().debug(
+    "LBALeafNode::lookup_range {}~{}",
+    addr,
+    len);
+  auto ret = lba_pin_list_t();
+  auto [i, end] = get_leaf_entries(addr, len);
+  for (; i != end; ++i) {
+    auto val = (*i).get_val();
+    ret.emplace_back(
+      std::make_unique<BtreeLBAPin>(
+       val.paddr,
+       (*i).get_key(),
+       val.len));
+  }
+  return lookup_range_ertr::make_ready_future<lba_pin_list_t>(
+    std::move(ret));
+}
+
+LBALeafNode::insert_ret LBALeafNode::insert(
+  Cache &cache,
+  Transaction &transaction,
+  laddr_t laddr,
+  lba_map_val_t val)
+{
+  ceph_assert(!at_max_capacity());
+  auto insert_pt = upper_bound(laddr);
+  if (insert_pt != end()) {
+    copy_from_local(insert_pt + 1, insert_pt, end());
+  }
+  set_size(get_size() + 1);
+  insert_pt.set_key(laddr);
+  val.paddr = maybe_generate_relative(val.paddr);
+  logger().debug(
+    "LBALeafNode::insert: inserting {}~{} -> {}",
+    laddr,
+    val.len,
+    val.paddr);
+  insert_pt.set_val(val);
+  logger().debug(
+    "LBALeafNode::insert: inserted {}~{} -> {}",
+    insert_pt.get_key(),
+    insert_pt.get_val().len,
+    insert_pt.get_val().paddr);
+  journal_insertion(laddr, val);
+  return insert_ret(
+    insert_ertr::ready_future_marker{},
+    std::make_unique<BtreeLBAPin>(
+      val.paddr,
+      laddr,
+      val.len));
+}
+
+LBALeafNode::mutate_mapping_ret LBALeafNode::mutate_mapping(
+  Cache &cache,
+  Transaction &transaction,
+  laddr_t laddr,
+  mutate_func_t &&f)
+{
+  ceph_assert(!at_min_capacity());
+  auto mutation_pt = find(laddr);
+  if (mutation_pt == end()) {
+    ceph_assert(0 == "should be impossible");
+    return mutate_mapping_ret(
+      mutate_mapping_ertr::ready_future_marker{},
+      std::nullopt);
+  }
+
+  auto mutated = f(mutation_pt.get_val());
+  if (mutated) {
+    mutation_pt.set_val(*mutated);
+    journal_mutated(laddr, *mutated);
+    return mutate_mapping_ret(
+      mutate_mapping_ertr::ready_future_marker{},
+      mutated);
+  } else {
+    journal_removal(laddr);
+    copy_from_local(mutation_pt, mutation_pt + 1, end());
+    set_size(get_size() - 1);
+    return mutate_mapping_ret(
+      mutate_mapping_ertr::ready_future_marker{},
+      mutated);
+  }
+}
+
+void LBALeafNode::journal_mutated(
+  laddr_t laddr,
+  lba_map_val_t val)
+{
+  // TODO
+}
+
+void LBALeafNode::journal_insertion(
+  laddr_t laddr,
+  lba_map_val_t val)
+{
+  // TODO
+}
+
+void LBALeafNode::journal_removal(
+  laddr_t laddr)
+{
+  // TODO
+}
+
+LBALeafNode::find_hole_ret LBALeafNode::find_hole(
+  Cache &cache,
+  Transaction &t,
+  laddr_t min,
+  laddr_t max,
+  extent_len_t len)
+{
+  logger().debug(
+    "LBALeafNode::find_hole min={} max={}, len={}, *this={}",
+    min, max, len, *this);
+  for (auto i = begin(); i != end(); ++i) {
+    auto ub = i->get_key();
+    if (min + len <= ub) {
+      return find_hole_ret(
+       find_hole_ertr::ready_future_marker{},
+       min);
+    } else {
+      min = i->get_key() + i->get_val().len;
+    }
+  }
+  if (min + len <= max) {
+    return find_hole_ret(
+      find_hole_ertr::ready_future_marker{},
+      min);
+  } else {
+    return find_hole_ret(
+      find_hole_ertr::ready_future_marker{},
+      L_ADDR_MAX);
+  }
+}
+
+void LBALeafNode::resolve_relative_addrs(paddr_t base) {
+  for (auto i: *this) {
+    if (i->get_val().paddr.is_relative()) {
+      auto val = i->get_val();
+      val.paddr = base.add_relative(val.paddr);
+      logger().debug(
+       "LBALeafNode::resolve_relative_addrs {} -> {}",
+       i->get_val().paddr,
+       val.paddr);
+      i->set_val(val);
+    }
+  }
+}
+
+std::pair<LBALeafNode::internal_iterator_t, LBALeafNode::internal_iterator_t>
+LBALeafNode::get_leaf_entries(laddr_t addr, extent_len_t len)
+{
+  return bound(addr, addr + len);
+}
+
+Cache::get_extent_ertr::future<LBANodeRef> get_lba_btree_extent(
+  Cache &cache,
+  Transaction &t,
+  depth_t depth,
+  paddr_t offset,
+  paddr_t base) {
+  offset = offset.maybe_relative_to(base);
+  if (depth > 0) {
+    logger().debug(
+      "get_lba_btree_extent: reading internal at offset {}, depth {}",
+      offset,
+      depth);
+    return cache.get_extent<LBAInternalNode>(
+      t,
+      offset,
+      LBA_BLOCK_SIZE).safe_then([depth](auto ret) {
+       ret->set_depth(depth);
+       return LBANodeRef(ret.detach(), /* add_ref = */ false);
+      });
+
+  } else {
+    logger().debug(
+      "get_lba_btree_extent: reading leaf at offset {}, depth {}",
+      offset,
+      depth);
+    return cache.get_extent<LBALeafNode>(
+      t,
+      offset,
+      LBA_BLOCK_SIZE).safe_then([offset, depth](auto ret) {
+       logger().debug(
+         "get_lba_btree_extent: read leaf at offset {}",
+         offset);
+       ret->set_depth(depth);
+       return LBANodeRef(ret.detach(), /* add_ref = */ false);
+      });
+  }
+}
+
+}
diff --git a/src/crimson/os/seastore/lba_manager/btree/lba_btree_node_impl.h b/src/crimson/os/seastore/lba_manager/btree/lba_btree_node_impl.h
new file mode 100644 (file)
index 0000000..49611d7
--- /dev/null
@@ -0,0 +1,465 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <sys/mman.h>
+#include <string.h>
+
+#include <memory>
+#include <string.h>
+
+#include "include/buffer.h"
+
+#include "crimson/common/fixed_kv_node_layout.h"
+#include "crimson/common/errorator.h"
+#include "crimson/os/seastore/lba_manager.h"
+#include "crimson/os/seastore/seastore_types.h"
+#include "crimson/os/seastore/cache.h"
+#include "crimson/os/seastore/cached_extent.h"
+#include "crimson/os/seastore/lba_manager/btree/lba_btree_node.h"
+
+namespace crimson::os::seastore::lba_manager::btree {
+
+constexpr size_t LBA_BLOCK_SIZE = 4096;
+
+/**
+ * LBAInternalNode
+ *
+ * Abstracts operations on and layout of internal nodes for the
+ * LBA Tree.
+ *
+ * Layout (4k):
+ *   size       : uint32_t[1]      (1*4)b
+ *   keys       : laddr_t[255]     (255*8)b
+ *   values     : paddr_t[255]     (255*8)b
+ *                                 = 4084
+
+ * TODO: make the above capacity calculation part of FixedKVNodeLayout
+ */
+constexpr size_t INTERNAL_NODE_CAPACITY = 255;
+struct LBAInternalNode
+  : LBANode,
+    common::FixedKVNodeLayout<
+      INTERNAL_NODE_CAPACITY,
+      laddr_t, laddr_le_t,
+      paddr_t, paddr_le_t> {
+  using internal_iterator_t = fixed_node_iter_t;
+  template <typename... T>
+  LBAInternalNode(T&&... t) :
+    LBANode(std::forward<T>(t)...),
+    FixedKVNodeLayout(get_bptr().c_str()) {}
+
+  static constexpr extent_types_t type = extent_types_t::LADDR_INTERNAL;
+
+  CachedExtentRef duplicate_for_write() final {
+    return CachedExtentRef(new LBAInternalNode(*this));
+  };
+
+  lookup_range_ret lookup_range(
+    Cache &cache,
+    Transaction &transaction,
+    laddr_t addr,
+    extent_len_t len) final;
+
+  insert_ret insert(
+    Cache &cache,
+    Transaction &transaction,
+    laddr_t laddr,
+    lba_map_val_t val) final;
+
+  mutate_mapping_ret mutate_mapping(
+    Cache &cache,
+    Transaction &transaction,
+    laddr_t laddr,
+    mutate_func_t &&f) final;
+
+  find_hole_ret find_hole(
+    Cache &cache,
+    Transaction &t,
+    laddr_t min,
+    laddr_t max,
+    extent_len_t len) final;
+
+  std::tuple<LBANodeRef, LBANodeRef, laddr_t>
+  make_split_children(Cache &cache, Transaction &t) final {
+    auto left = cache.alloc_new_extent<LBAInternalNode>(
+      t, LBA_BLOCK_SIZE);
+    auto right = cache.alloc_new_extent<LBAInternalNode>(
+      t, LBA_BLOCK_SIZE);
+    return std::make_tuple(
+      left,
+      right,
+      split_into(*left, *right));
+  }
+
+  LBANodeRef make_full_merge(
+    Cache &cache, Transaction &t, LBANodeRef &right) final {
+    auto replacement = cache.alloc_new_extent<LBAInternalNode>(
+      t, LBA_BLOCK_SIZE);
+    replacement->merge_from(*this, *right->cast<LBAInternalNode>());
+    return replacement;
+  }
+
+  std::tuple<LBANodeRef, LBANodeRef, laddr_t>
+  make_balanced(
+    Cache &cache, Transaction &t,
+    LBANodeRef &_right,
+    bool prefer_left) final {
+    ceph_assert(_right->get_type() == type);
+    auto &right = *_right->cast<LBAInternalNode>();
+    auto replacement_left = cache.alloc_new_extent<LBAInternalNode>(
+      t, LBA_BLOCK_SIZE);
+    auto replacement_right = cache.alloc_new_extent<LBAInternalNode>(
+      t, LBA_BLOCK_SIZE);
+
+    return std::make_tuple(
+      replacement_left,
+      replacement_right,
+      balance_into_new_nodes(
+       *this,
+       right,
+       prefer_left,
+       *replacement_left,
+       *replacement_right));
+  }
+
+  /**
+   * resolve_relative_addrs
+   *
+   * Internal relative addresses on read or in memory prior to commit
+   * are either record or block relative depending on whether this
+   * physical node is is_initial_pending() or just is_pending().
+   *
+   * User passes appropriate base depending on lifecycle and
+   * resolve_relative_addrs fixes up relative internal references
+   * based on base.
+   */
+  void resolve_relative_addrs(paddr_t base);
+
+  void on_delta_write(paddr_t record_block_offset) final {
+    // All in-memory relative addrs are necessarily record-relative
+    resolve_relative_addrs(record_block_offset);
+  }
+
+  void on_initial_write() final {
+    // All in-memory relative addrs are necessarily block-relative
+    resolve_relative_addrs(get_paddr());
+  }
+
+  void on_clean_read() final {
+    // From initial write of block, relative addrs are necessarily block-relative
+    resolve_relative_addrs(get_paddr());
+  }
+
+  extent_types_t get_type() const final {
+    return type;
+  }
+
+  std::ostream &print_detail(std::ostream &out) const final;
+
+  ceph::bufferlist get_delta() final {
+    // TODO
+    return ceph::bufferlist();
+  }
+
+  void apply_delta(paddr_t delta_base, ceph::bufferlist &bl) final {
+    ceph_assert(0 == "TODO");
+  }
+
+  bool at_max_capacity() const final {
+    return get_size() == get_capacity();
+  }
+
+  bool at_min_capacity() const {
+    return get_size() == get_capacity() / 2;
+  }
+
+  /// returns iterators containing [l, r)
+  std::pair<internal_iterator_t, internal_iterator_t> bound(
+    laddr_t l, laddr_t r) {
+    // TODO: inefficient
+    auto retl = begin();
+    for (; retl != end(); ++retl) {
+      if (retl->get_next_key_or_max() > l)
+       break;
+    }
+    auto retr = retl;
+    for (; retr != end(); ++retr) {
+      if (retr->get_key() >= r)
+       break;
+    }
+    return std::make_pair(retl, retr);
+  }
+
+  using split_ertr = crimson::errorator<
+    crimson::ct_error::input_output_error
+    >;
+  using split_ret = split_ertr::future<LBANodeRef>;
+  split_ret split_entry(
+    Cache &c, Transaction &t, laddr_t addr,
+    internal_iterator_t,
+    LBANodeRef entry);
+
+  using merge_ertr = crimson::errorator<
+    crimson::ct_error::input_output_error
+    >;
+  using merge_ret = merge_ertr::future<LBANodeRef>;
+  merge_ret merge_entry(
+    Cache &c, Transaction &t, laddr_t addr,
+    internal_iterator_t,
+    LBANodeRef entry);
+
+  /// returns iterator for subtree containing laddr
+  internal_iterator_t get_containing_child(laddr_t laddr);
+
+  // delta operations (TODO)
+  void journal_remove(
+    laddr_t to_remove);
+  void journal_insert(
+    laddr_t to_insert,
+    paddr_t val);
+};
+
+/**
+ * LBALeafNode
+ *
+ * Abstracts operations on and layout of leaf nodes for the
+ * LBA Tree.
+ *
+ * Layout (4k):
+ *   num_entries: uint32_t           4b
+ *   keys       : laddr_t[170]       (146*8)b
+ *   values     : lba_map_val_t[170] (146*20)b
+ *                                   = 4090
+ *
+ * TODO: update FixedKVNodeLayout to handle the above calculation
+ */
+constexpr size_t LEAF_NODE_CAPACITY = 146;
+
+/**
+ * lba_map_val_le_t
+ *
+ * On disk layout for lba_map_val_t.
+ */
+struct lba_map_val_le_t {
+  extent_len_le_t len = init_extent_len_le_t(0);
+  paddr_le_t paddr;
+  ceph_le32 refcount = init_le32(0);
+  ceph_le32 checksum = init_le32(0);
+
+  lba_map_val_le_t() = default;
+  lba_map_val_le_t(const lba_map_val_le_t &) = default;
+  explicit lba_map_val_le_t(const lba_map_val_t &val)
+    : len(init_extent_len_le_t(val.len)),
+      paddr(paddr_le_t(val.paddr)),
+      refcount(init_le32(val.refcount)),
+      checksum(init_le32(val.checksum)) {}
+
+  operator lba_map_val_t() const {
+    return lba_map_val_t{ len, paddr, refcount, checksum };
+  }
+};
+
+struct LBALeafNode
+  : LBANode,
+    common::FixedKVNodeLayout<
+      LEAF_NODE_CAPACITY,
+      laddr_t, laddr_le_t,
+      lba_map_val_t, lba_map_val_le_t> {
+  using internal_iterator_t = fixed_node_iter_t;
+  template <typename... T>
+  LBALeafNode(T&&... t) :
+    LBANode(std::forward<T>(t)...),
+    FixedKVNodeLayout(get_bptr().c_str()) {}
+
+  static constexpr extent_types_t type = extent_types_t::LADDR_LEAF;
+
+  CachedExtentRef duplicate_for_write() final {
+    return CachedExtentRef(new LBALeafNode(*this));
+  };
+
+  lookup_range_ret lookup_range(
+    Cache &cache,
+    Transaction &transaction,
+    laddr_t addr,
+    extent_len_t len) final;
+
+  insert_ret insert(
+    Cache &cache,
+    Transaction &transaction,
+    laddr_t laddr,
+    lba_map_val_t val) final;
+
+  mutate_mapping_ret mutate_mapping(
+    Cache &cache,
+    Transaction &transaction,
+    laddr_t laddr,
+    mutate_func_t &&f) final;
+
+  find_hole_ret find_hole(
+    Cache &cache,
+    Transaction &t,
+    laddr_t min,
+    laddr_t max,
+    extent_len_t len) final;
+
+  std::tuple<LBANodeRef, LBANodeRef, laddr_t>
+  make_split_children(Cache &cache, Transaction &t) final {
+    auto left = cache.alloc_new_extent<LBALeafNode>(
+      t, LBA_BLOCK_SIZE);
+    auto right = cache.alloc_new_extent<LBALeafNode>(
+      t, LBA_BLOCK_SIZE);
+    return std::make_tuple(
+      left,
+      right,
+      split_into(*left, *right));
+  }
+
+  LBANodeRef make_full_merge(
+    Cache &cache, Transaction &t, LBANodeRef &right) final {
+    auto replacement = cache.alloc_new_extent<LBALeafNode>(
+      t, LBA_BLOCK_SIZE);
+    replacement->merge_from(*this, *right->cast<LBALeafNode>());
+    return replacement;
+  }
+
+  std::tuple<LBANodeRef, LBANodeRef, laddr_t>
+  make_balanced(
+    Cache &cache, Transaction &t,
+    LBANodeRef &_right,
+    bool prefer_left) final {
+    ceph_assert(_right->get_type() == type);
+    auto &right = *_right->cast<LBALeafNode>();
+    auto replacement_left = cache.alloc_new_extent<LBALeafNode>(
+      t, LBA_BLOCK_SIZE);
+    auto replacement_right = cache.alloc_new_extent<LBALeafNode>(
+      t, LBA_BLOCK_SIZE);
+    return std::make_tuple(
+      replacement_left,
+      replacement_right,
+      balance_into_new_nodes(
+       *this,
+       right,
+       prefer_left,
+       *replacement_left,
+       *replacement_right));
+  }
+
+  // See LBAInternalNode, same concept
+  void resolve_relative_addrs(paddr_t base);
+
+  void on_delta_write(paddr_t record_block_offset) final {
+    resolve_relative_addrs(record_block_offset);
+  }
+
+  void on_initial_write() final {
+    resolve_relative_addrs(get_paddr());
+  }
+
+  void on_clean_read() final {
+    resolve_relative_addrs(get_paddr());
+  }
+
+  ceph::bufferlist get_delta() final {
+    // TODO
+    return ceph::bufferlist();
+  }
+
+  void apply_delta(paddr_t delta_base, ceph::bufferlist &bl) final {
+    ceph_assert(0 == "TODO");
+  }
+
+  extent_types_t get_type() const final {
+    return type;
+  }
+
+  std::ostream &print_detail(std::ostream &out) const final;
+
+  bool at_max_capacity() const final {
+    return get_size() == get_capacity();
+  }
+
+  bool at_min_capacity() const final {
+    return get_size() == get_capacity();
+  }
+
+  /// returns iterators <lb, ub> containing addresses [l, r)
+  std::pair<internal_iterator_t, internal_iterator_t> bound(
+    laddr_t l, laddr_t r) {
+    // TODO: inefficient
+    auto retl = begin();
+    for (; retl != end(); ++retl) {
+      if (retl->get_key() >= l || (retl->get_key() + retl->get_val().len) > l)
+       break;
+    }
+    auto retr = retl;
+    for (; retr != end(); ++retr) {
+      if (retr->get_key() >= r)
+       break;
+    }
+    return std::make_pair(retl, retr);
+  }
+  internal_iterator_t upper_bound(laddr_t l) {
+    auto ret = begin();
+    for (; ret != end(); ++ret) {
+      if (ret->get_key() > l)
+       break;
+    }
+    return ret;
+  }
+
+  std::pair<internal_iterator_t, internal_iterator_t>
+  get_leaf_entries(laddr_t addr, extent_len_t len);
+
+  // delta operations (TODO)
+  void journal_mutated(
+    laddr_t laddr,
+    lba_map_val_t val);
+  void journal_insertion(
+    laddr_t laddr,
+    lba_map_val_t val);
+  void journal_removal(
+    laddr_t laddr);
+};
+using LBALeafNodeRef = TCachedExtentRef<LBALeafNode>;
+
+/* BtreeLBAPin
+ *
+ * References leaf node
+ *
+ * TODO: does not at this time actually keep the relevant
+ * leaf resident in memory.  This is actually a bit tricky
+ * as we can mutate and therefore replace a leaf referenced
+ * by other, uninvolved but cached extents.  Will need to
+ * come up with some kind of pinning mechanism that handles
+ * that well.
+ */
+struct BtreeLBAPin : LBAPin {
+  paddr_t paddr;
+  laddr_t laddr = L_ADDR_NULL;
+  extent_len_t length = 0;
+  unsigned refcount = 0;
+
+public:
+  BtreeLBAPin(
+    paddr_t paddr,
+    laddr_t laddr,
+    extent_len_t length)
+    : paddr(paddr), laddr(laddr), length(length) {}
+
+  extent_len_t get_length() const final {
+    return length;
+  }
+  paddr_t get_paddr() const final {
+    return paddr;
+  }
+  laddr_t get_laddr() const final {
+    return laddr;
+  }
+  LBAPinRef duplicate() const final {
+    return LBAPinRef(new BtreeLBAPin(*this));
+  }
+};
+
+}
index b21820e36bfdce52d3f20972708c80d915d6e864..e98fa0c1d38449a8e1237036e125e6a8500ad1f4 100644 (file)
@@ -6,6 +6,7 @@
 #include <limits>
 #include <iostream>
 
+#include "include/byteorder.h"
 #include "include/denc.h"
 #include "include/buffer.h"
 #include "include/cmp.h"
@@ -148,6 +149,22 @@ constexpr paddr_t make_block_relative_paddr(segment_off_t off) {
   return paddr_t{BLOCK_REL_SEG_ID, off};
 }
 
+struct paddr_le_t {
+  ceph_le32 segment = init_le32(NULL_SEG_ID);
+  ceph_les32 offset = init_les32(NULL_SEG_OFF);
+
+  paddr_le_t() = default;
+  paddr_le_t(ceph_le32 segment, ceph_les32 offset)
+    : segment(segment), offset(offset) {}
+  paddr_le_t(segment_id_t segment, segment_off_t offset)
+    : segment(init_le32(segment)), offset(init_les32(offset)) {}
+  paddr_le_t(const paddr_t &addr) : paddr_le_t(addr.segment, addr.offset) {}
+
+  operator paddr_t() const {
+    return paddr_t{segment, offset};
+  }
+};
+
 std::ostream &operator<<(std::ostream &out, const paddr_t &rhs);
 
 // logical addr, see LBAManager, TransactionManager
@@ -158,11 +175,18 @@ constexpr laddr_t L_ADDR_NULL = std::numeric_limits<laddr_t>::max();
 constexpr laddr_t L_ADDR_ROOT = std::numeric_limits<laddr_t>::max() - 1;
 constexpr laddr_t L_ADDR_LBAT = std::numeric_limits<laddr_t>::max() - 2;
 
+using laddr_le_t = ceph_le64;
+
 // logical offset, see LBAManager, TransactionManager
 using extent_len_t = uint32_t;
 constexpr extent_len_t EXTENT_LEN_MAX =
   std::numeric_limits<extent_len_t>::max();
 
+using extent_len_le_t = ceph_le32;
+inline extent_len_le_t init_extent_len_le_t(extent_len_t len) {
+  return init_le32(len);
+}
+
 struct laddr_list_t : std::list<std::pair<laddr_t, extent_len_t>> {
   template <typename... T>
   laddr_list_t(T&&... args)
index 8784f85ce29a5336da4ec2168b238e4b30deff2a..0a0863c66332a10b5b403e1e1d78dced8a11eee5 100644 (file)
@@ -1,3 +1,11 @@
+add_executable(unittest_btree_lba_manager
+  test_btree_lba_manager.cc
+  ../gtest_seastar.cc)
+add_ceph_unittest(unittest_btree_lba_manager)
+target_link_libraries(
+  unittest_btree_lba_manager
+  ${CMAKE_DL_LIBS}
+  crimson-seastore)
 
 add_executable(unittest_seastore_journal
   test_seastore_journal.cc
diff --git a/src/test/crimson/seastore/test_btree_lba_manager.cc b/src/test/crimson/seastore/test_btree_lba_manager.cc
new file mode 100644 (file)
index 0000000..c31c3f5
--- /dev/null
@@ -0,0 +1,326 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "test/crimson/gtest_seastar.h"
+
+#include "crimson/common/log.h"
+
+#include "crimson/os/seastore/journal.h"
+#include "crimson/os/seastore/cache.h"
+#include "crimson/os/seastore/segment_manager.h"
+#include "crimson/os/seastore/lba_manager/btree/btree_lba_manager.h"
+
+namespace {
+  [[maybe_unused]] seastar::logger& logger() {
+    return crimson::get_logger(ceph_subsys_test);
+  }
+}
+
+using namespace crimson;
+using namespace crimson::os;
+using namespace crimson::os::seastore;
+using namespace crimson::os::seastore::lba_manager;
+using namespace crimson::os::seastore::lba_manager::btree;
+
+struct btree_lba_manager_test :
+  public seastar_test_suite_t, JournalSegmentProvider {
+  SegmentManagerRef segment_manager;
+  Journal journal;
+  Cache cache;
+  BtreeLBAManagerRef lba_manager;
+
+  const size_t block_size;
+
+  btree_lba_manager_test()
+    : segment_manager(create_ephemeral(segment_manager::DEFAULT_TEST_EPHEMERAL)),
+      journal(*segment_manager),
+      cache(*segment_manager),
+      lba_manager(new BtreeLBAManager(*segment_manager, cache)),
+      block_size(segment_manager->get_block_size())
+  {
+    journal.set_segment_provider(this);
+  }
+
+  segment_id_t next = 0;
+  get_segment_ret get_segment() final {
+    return get_segment_ret(
+      get_segment_ertr::ready_future_marker{},
+      next++);
+  }
+
+  void put_segment(segment_id_t segment) final {
+    return;
+  }
+
+  auto submit_transaction(TransactionRef t)
+  {
+    auto record = cache.try_construct_record(*t);
+    if (!record) {
+      ceph_assert(0 == "cannot fail");
+    }
+
+    return journal.submit_record(std::move(*record)).safe_then(
+      [this, t=std::move(t)](paddr_t addr) {
+       cache.complete_commit(*t, addr);
+      },
+      crimson::ct_error::all_same_way([](auto e) {
+       ceph_assert(0 == "Hit error submitting to journal");
+      }));
+  }
+
+  seastar::future<> set_up_fut() final {
+    return segment_manager->init(
+    ).safe_then([this] {
+      return journal.open_for_write();
+    }).safe_then([this] {
+      return seastar::do_with(
+       lba_manager->create_transaction(),
+       [this](auto &transaction) {
+         return cache.mkfs(*transaction
+         ).safe_then([this, &transaction] {
+           return lba_manager->mkfs(*transaction);
+         }).safe_then([this, &transaction] {
+           return submit_transaction(std::move(transaction));
+         });
+       });
+    }).handle_error(
+      crimson::ct_error::all_same_way([] {
+       ceph_assert(0 == "error");
+      })
+    );
+  }
+
+  seastar::future<> tear_down_fut() final {
+    return cache.close(
+    ).safe_then([this] {
+      return journal.close();
+    }).handle_error(
+      crimson::ct_error::all_same_way([] {
+       ASSERT_FALSE("Unable to close");
+      })
+    );
+  }
+
+
+  struct test_extent_t {
+    paddr_t addr;
+    size_t len = 0;
+    unsigned refcount = 0;
+  };
+  using test_lba_mapping_t = std::map<laddr_t, test_extent_t>;
+  test_lba_mapping_t test_lba_mappings;
+  struct test_transaction_t {
+    TransactionRef t;
+    test_lba_mapping_t mappings;
+  };
+
+  auto create_transaction() {
+    return test_transaction_t{
+      lba_manager->create_transaction(),
+      test_lba_mappings
+    };
+  }
+
+  void submit_test_transaction(test_transaction_t t) {
+    submit_transaction(std::move(t.t)).get0();
+    test_lba_mappings.swap(t.mappings);
+  }
+
+  auto get_overlap(test_transaction_t &t, laddr_t addr, size_t len) {
+    auto bottom = t.mappings.upper_bound(addr);
+    if (bottom != t.mappings.begin())
+      --bottom;
+    if (bottom != t.mappings.end() &&
+       bottom->first + bottom->second.len <= addr)
+      ++bottom;
+
+    auto top = t.mappings.upper_bound(addr + len);
+    return std::make_pair(
+      bottom,
+      top
+    );
+  }
+
+  auto alloc_mapping(
+    test_transaction_t &t,
+    laddr_t hint,
+    size_t len,
+    paddr_t paddr) {
+    auto ret = lba_manager->alloc_extent(*t.t, hint, len, paddr).unsafe_get0();
+    logger().debug("alloc'd: {}", *ret);
+    EXPECT_EQ(len, ret->get_length());
+    auto [b, e] = get_overlap(t, ret->get_laddr(), len);
+    EXPECT_EQ(b, e);
+    t.mappings.emplace(
+      std::make_pair(
+       ret->get_laddr(),
+       test_extent_t{
+         ret->get_paddr(),
+         ret->get_length(),
+         1
+        }
+      ));
+    return ret;
+  }
+
+  auto set_mapping(
+    test_transaction_t &t,
+    laddr_t addr,
+    size_t len,
+    paddr_t paddr) {
+    auto [b, e] = get_overlap(t, addr, len);
+    EXPECT_EQ(b, e);
+
+    auto ret = lba_manager->set_extent(*t.t, addr, len, paddr).unsafe_get0();
+    EXPECT_EQ(addr, ret->get_laddr());
+    EXPECT_EQ(len, ret->get_length());
+    EXPECT_EQ(paddr, ret->get_paddr());
+    t.mappings.emplace(
+      std::make_pair(
+       ret->get_laddr(),
+       test_extent_t{
+         ret->get_paddr(),
+         ret->get_length(),
+         1
+        }
+      ));
+    return ret;
+  }
+
+  auto decref_mapping(
+    test_transaction_t &t,
+    laddr_t addr) {
+    return decref_mapping(t, t.mappings.find(addr));
+  }
+
+  void decref_mapping(
+    test_transaction_t &t,
+    test_lba_mapping_t::iterator target) {
+    ceph_assert(target != t.mappings.end());
+    ceph_assert(target->second.refcount > 0);
+    target->second.refcount--;
+
+    auto refcnt = lba_manager->decref_extent(
+      *t.t,
+      target->first).unsafe_get0();
+    EXPECT_EQ(refcnt, target->second.refcount);
+    if (target->second.refcount == 0) {
+      t.mappings.erase(target);
+    }
+  }
+
+  void incref_mapping(
+    test_transaction_t &t,
+    test_lba_mapping_t::iterator target) {
+    ceph_assert(target->second.refcount > 0);
+    target->second.refcount++;
+    auto refcnt = lba_manager->incref_extent(
+      *t.t,
+      target->first).unsafe_get0();
+    EXPECT_EQ(refcnt, target->second.refcount);
+  }
+
+  std::vector<laddr_t> get_mapped_addresses() {
+    std::vector<laddr_t> addresses;
+    addresses.reserve(test_lba_mappings.size());
+    for (auto &i: test_lba_mappings) {
+      addresses.push_back(i.first);
+    }
+    return addresses;
+  }
+
+  void check_mappings() {
+    auto t = create_transaction();
+    check_mappings(t);
+  }
+
+  void check_mappings(test_transaction_t &t) {
+    for (auto &&i: t.mappings) {
+      auto ret_list = lba_manager->get_mapping(
+       *t.t, i.first, i.second.len
+      ).unsafe_get0();
+      EXPECT_EQ(ret_list.size(), 1);
+      auto &ret = *ret_list.begin();
+      EXPECT_EQ(i.second.addr, ret->get_paddr());
+      EXPECT_EQ(i.first, ret->get_laddr());
+      EXPECT_EQ(i.second.len, ret->get_length());
+    }
+  }
+};
+
+TEST_F(btree_lba_manager_test, basic)
+{
+  run_async([this] {
+    laddr_t laddr = 0x12345678 * block_size;
+    paddr_t paddr = { 1, static_cast<segment_off_t>(block_size * 10) };
+    {
+      // write initial mapping
+      auto t = create_transaction();
+      check_mappings(t);  // check in progress transaction sees mapping
+      check_mappings();   // check concurrent does not
+      auto ret = alloc_mapping(t, laddr, block_size, paddr);
+      submit_test_transaction(std::move(t));
+    }
+    check_mappings();     // check new transaction post commit sees it
+  });
+}
+
+TEST_F(btree_lba_manager_test, force_split)
+{
+  run_async([this] {
+    for (unsigned i = 0; i < 40; ++i) {
+      auto t = create_transaction();
+      logger().debug("opened transaction");
+      for (unsigned j = 0; j < 5; ++j) {
+       auto ret = alloc_mapping(t, 0, block_size, P_ADDR_MIN);
+       if ((i % 10 == 0) && (j == 3)) {
+         check_mappings(t);
+         check_mappings();
+       }
+      }
+      logger().debug("submitting transaction");
+      submit_test_transaction(std::move(t));
+      check_mappings();
+    }
+  });
+}
+
+TEST_F(btree_lba_manager_test, force_split_merge)
+{
+  run_async([this] {
+    for (unsigned i = 0; i < 80; ++i) {
+      auto t = create_transaction();
+      logger().debug("opened transaction");
+      for (unsigned j = 0; j < 5; ++j) {
+       auto ret = alloc_mapping(t, 0, block_size, P_ADDR_MIN);
+       // just to speed things up a bit
+       if ((i % 100 == 0) && (j == 3)) {
+         check_mappings(t);
+         check_mappings();
+       }
+      }
+      logger().debug("submitting transaction");
+      submit_test_transaction(std::move(t));
+      if (i % 50 == 0) {
+       check_mappings();
+      }
+    }
+    auto addresses = get_mapped_addresses();
+    auto t = create_transaction();
+    for (unsigned i = 0; i != addresses.size(); ++i) {
+      if (i % 2 == 0) {
+       decref_mapping(t, addresses[i]);
+      }
+      logger().debug("submitting transaction");
+      if (i % 7 == 0) {
+       submit_test_transaction(std::move(t));
+       t = create_transaction();
+      }
+      if (i % 13 == 0) {
+       check_mappings();
+       check_mappings(t);
+      }
+    }
+    submit_test_transaction(std::move(t));
+  });
+}