]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/os/seastore/omap_manager: rework to be null safe and adjust interfaces
authorSamuel Just <sjust@redhat.com>
Tue, 16 Feb 2021 06:51:36 +0000 (06:51 +0000)
committerSamuel Just <sjust@redhat.com>
Fri, 26 Feb 2021 01:59:34 +0000 (17:59 -0800)
Adjusts a few things in omap_manager/btree:
- fix to be null safe, simplify/rename iterator interface
- update interface methods to match futurized_store.h
- misc whitespace fixes

Signed-off-by: Samuel Just <sjust@redhat.com>
src/crimson/os/seastore/omap_manager.h
src/crimson/os/seastore/omap_manager/btree/btree_omap_manager.cc
src/crimson/os/seastore/omap_manager/btree/btree_omap_manager.h
src/crimson/os/seastore/omap_manager/btree/omap_btree_node.h
src/crimson/os/seastore/omap_manager/btree/omap_btree_node_impl.cc
src/crimson/os/seastore/omap_manager/btree/omap_btree_node_impl.h
src/crimson/os/seastore/omap_manager/btree/omap_types.h
src/crimson/os/seastore/omap_manager/btree/string_kv_node_layout.h
src/test/crimson/seastore/test_omap_manager.cc

index e9ca56e673c594326410de68619bdac2feb750cd..721103e8302d213c71369edf05fe481643bc7f83 100644 (file)
 
 namespace crimson::os::seastore {
 
-struct list_keys_result_t {
-  std::vector<std::string> keys;
-  std::optional<std::string> next;  // if return std::nullopt, means list all keys in omap tree.
-};
-
-struct list_kvs_result_t {
-  std::vector<std::pair<std::string, std::string>> kvs;
-  std::optional<std::string> next;  // if return std::nullopt, means list all kv mapping in omap tree.
-};
 constexpr size_t MAX_SIZE = std::numeric_limits<size_t>::max();
 std::ostream &operator<<(std::ostream &out, const std::list<std::string> &rhs);
 std::ostream &operator<<(std::ostream &out, const std::map<std::string, std::string> &rhs);
@@ -58,9 +49,12 @@ public:
    * @retval return string key->string value mapping pair.
    */
   using omap_get_value_ertr = base_ertr;
-  using omap_get_value_ret = omap_get_value_ertr::future<std::pair<std::string, std::string>>;
-  virtual omap_get_value_ret omap_get_value(const omap_root_t &omap_root, Transaction &t,
-                                           const std::string &key) = 0;
+  using omap_get_value_ret = omap_get_value_ertr::future<
+    std::optional<bufferlist>>;
+  virtual omap_get_value_ret omap_get_value(
+    const omap_root_t &omap_root,
+    Transaction &t,
+    const std::string &key) = 0;
 
   /**
    * set key value mapping in omap
@@ -72,8 +66,11 @@ public:
    */
   using omap_set_key_ertr = base_ertr;
   using omap_set_key_ret = omap_set_key_ertr::future<>;
-  virtual omap_set_key_ret omap_set_key(omap_root_t &omap_root, Transaction &t,
-                                       const std::string &key, const std::string &value) = 0;
+  virtual omap_set_key_ret omap_set_key(
+    omap_root_t &omap_root,
+    Transaction &t,
+    const std::string &key,
+    const ceph::bufferlist &value) = 0;
 
   /**
    * remove key value mapping in omap tree
@@ -84,42 +81,32 @@ public:
    */
   using omap_rm_key_ertr = base_ertr;
   using omap_rm_key_ret = omap_rm_key_ertr::future<>;
-  virtual omap_rm_key_ret omap_rm_key(omap_root_t &omap_root, Transaction &t,
-                                                   const std::string &key) = 0;
+  virtual omap_rm_key_ret omap_rm_key(
+    omap_root_t &omap_root,
+    Transaction &t,
+    const std::string &key) = 0;
 
   /**
-   * get all keys or partial keys in omap tree
+   * Ordered scan of key-> value mapping in omap tree
    *
-   * @param omap_root_t &omap_root,  omap btree root information
-   * @param Transaction &t,  current transaction
-   * @param string &start, the list keys range begin from start,
-   *        if start is "", list from the first omap key
-   * @param max_result_size, the number of list keys,
-   *        it it is not set, list all keys after string start
-   * @retval list_keys_result_t, listed keys and next key
-   */
-  using omap_list_keys_ertr = base_ertr;
-  using omap_list_keys_ret = omap_list_keys_ertr::future<list_keys_result_t>;
-  virtual omap_list_keys_ret omap_list_keys(const omap_root_t &omap_root, Transaction &t,
-                             std::string &start,
-                             size_t max_result_size = MAX_SIZE) = 0;
-
-  /**
-   * Get all or partial key-> value mapping in omap tree
-   *
-   * @param omap_root_t &omap_root,  omap btree root information
-   * @param Transaction &t,  current transaction
-   * @param string &start, the list keys range begin from start,
-   *        if start is "" , list from the first omap key
-   * @param max_result_size, the number of list keys,
+   * @param omap_root: omap btree root information
+   * @param t: current transaction
+   * @param start: the list keys range begin > start if present,
+   *        at beginning if std::nullopt
+   * @param max_result_size: the number of list keys,
    *        it it is not set, list all keys after string start.
-   * @retval list_kvs_result_t, listed key->value mapping and next key.
+   * @retval listed key->value mapping and next key
    */
   using omap_list_ertr = base_ertr;
-  using omap_list_ret = omap_list_ertr::future<list_kvs_result_t>;
-  virtual omap_list_ret omap_list(const omap_root_t &omap_root, Transaction &t,
-                                  std::string &start,
-                                  size_t max_result_size = MAX_SIZE) = 0;
+  using omap_list_bare_ret = std::pair<
+    bool,
+    std::map<std::string, bufferlist>>;
+  using omap_list_ret = omap_list_ertr::future<omap_list_bare_ret>;
+  virtual omap_list_ret omap_list(
+    const omap_root_t &omap_root,
+    Transaction &t,
+    const std::optional<std::string> &start,
+    size_t max_result_size = MAX_SIZE) = 0;
 
   /**
    * clear all omap tree key->value mapping
index a28bb009287bce37deda7bf4b33e2c0c7de71252..75b8f30b385f0fafd07d112cfd7b434619136ae2 100644 (file)
@@ -31,7 +31,7 @@ BtreeOMapManager::initialize_omap(Transaction &t)
       root_extent->set_size(0);
       omap_node_meta_t meta{1};
       root_extent->set_meta(meta);
-      omap_root_t omap_root = omap_root_t(1, root_extent->get_laddr());
+      omap_root_t omap_root = omap_root_t(root_extent->get_laddr(), 1);
       return initialize_omap_ertr::make_ready_future<omap_root_t>(omap_root);
   });
 }
@@ -59,7 +59,7 @@ BtreeOMapManager::handle_root_split(
                                 "", nroot->maybe_get_delta_buffer());
     nroot->journal_inner_insert(nroot->iter_begin() + 1, right->get_laddr(),
                                 pivot, nroot->maybe_get_delta_buffer());
-    omap_root.update(nroot->get_laddr(), omap_root.depth += 1);
+    omap_root.update(nroot->get_laddr(), omap_root.get_depth() + 1);
     return seastar::now();
   });
 }
@@ -73,7 +73,7 @@ BtreeOMapManager::handle_root_merge(
   auto root = *(mresult.need_merge);
   auto iter = root->cast<OMapInnerNode>()->iter_begin();
   omap_root.update(
-    iter->get_node_key().laddr,
+    iter->get_val(),
     omap_root.depth -= 1);
   return oc.tm.dec_ref(oc.t, root->get_laddr()
   ).safe_then([](auto &&ret) -> handle_root_merge_ret {
@@ -87,8 +87,10 @@ BtreeOMapManager::handle_root_merge(
 }
 
 BtreeOMapManager::omap_get_value_ret
-BtreeOMapManager::omap_get_value(const omap_root_t &omap_root, Transaction &t,
-                                 const std::string &key)
+BtreeOMapManager::omap_get_value(
+  const omap_root_t &omap_root,
+  Transaction &t,
+  const std::string &key)
 {
   logger().debug("{}: {}", __func__, key);
   return get_omap_root(
@@ -97,7 +99,6 @@ BtreeOMapManager::omap_get_value(const omap_root_t &omap_root, Transaction &t,
   ).safe_then([this, &t, &key](auto&& extent) {
     return extent->get_value(get_omap_context(t), key);
   }).safe_then([](auto &&e) {
-    logger().debug("{}: {} -> {}", __func__, e.first, e.second);
     return omap_get_value_ret(
         omap_get_value_ertr::ready_future_marker{},
         std::move(e));
@@ -105,8 +106,11 @@ BtreeOMapManager::omap_get_value(const omap_root_t &omap_root, Transaction &t,
 }
 
 BtreeOMapManager::omap_set_key_ret
-BtreeOMapManager::omap_set_key(omap_root_t &omap_root, Transaction &t,
-                             const std::string &key, const std::string &value)
+BtreeOMapManager::omap_set_key(
+  omap_root_t &omap_root,
+  Transaction &t,
+  const std::string &key,
+  const ceph::bufferlist &value)
 {
   logger().debug("{}: {} -> {}", __func__, key, value);
   return get_omap_root(
@@ -125,7 +129,10 @@ BtreeOMapManager::omap_set_key(omap_root_t &omap_root, Transaction &t,
 }
 
 BtreeOMapManager::omap_rm_key_ret
-BtreeOMapManager::omap_rm_key(omap_root_t &omap_root, Transaction &t, const std::string &key)
+BtreeOMapManager::omap_rm_key(
+  omap_root_t &omap_root,
+  Transaction &t,
+  const std::string &key)
 {
   logger().debug("{}: {}", __func__, key);
   return get_omap_root(
@@ -152,47 +159,26 @@ BtreeOMapManager::omap_rm_key(omap_root_t &omap_root, Transaction &t, const std:
 
 }
 
-BtreeOMapManager::omap_list_keys_ret
-BtreeOMapManager::omap_list_keys(const omap_root_t &omap_root, Transaction &t,
-                                 std::string &start, size_t max_result_size)
-{
-  logger().debug("{}", __func__);
-  return get_omap_root(
-    get_omap_context(t),
-    omap_root
-  ).safe_then([this, &t, &start,
-    max_result_size] (auto extent) {
-    return extent->list_keys(get_omap_context(t), start, max_result_size)
-      .safe_then([](auto &&result) {
-      return omap_list_keys_ret(
-             omap_list_keys_ertr::ready_future_marker{},
-             std::move(result));
-    });
-  });
-
-}
-
 BtreeOMapManager::omap_list_ret
-BtreeOMapManager::omap_list(const omap_root_t &omap_root, Transaction &t,
-                            std::string &start, size_t max_result_size)
+BtreeOMapManager::omap_list(
+  const omap_root_t &omap_root,
+  Transaction &t,
+  const std::optional<std::string> &start,
+  size_t max_result_size = MAX_SIZE)
 {
   logger().debug("{}", __func__);
   return get_omap_root(
     get_omap_context(t),
     omap_root
-  ).safe_then([this, &t, &start, max_result_size]
-    (auto extent) {
-    return extent->list(get_omap_context(t), start, max_result_size)
-      .safe_then([](auto &&result) {
-      return omap_list_ret(
-             omap_list_ertr::ready_future_marker{},
-             std::move(result));
-    });
+  ).safe_then([this, &t, &start, max_result_size](auto extent) {
+    return extent->list(get_omap_context(t), start, max_result_size);
   });
 }
 
 BtreeOMapManager::omap_clear_ret
-BtreeOMapManager::omap_clear(omap_root_t &omap_root, Transaction &t)
+BtreeOMapManager::omap_clear(
+  omap_root_t &omap_root,
+  Transaction &t)
 {
   logger().debug("{}", __func__);
   return get_omap_root(
index 131ccdd5ddb9417099dd3bf2d2c88df85be3cd2f..6762bdd661b095b23a64a541ddf704f9ad645c89 100644 (file)
@@ -67,24 +67,30 @@ public:
 
   initialize_omap_ret initialize_omap(Transaction &t) final;
 
-  omap_get_value_ret omap_get_value(const omap_root_t &omap_root, Transaction &t,
-                                    const std::string &key) final;
+  omap_get_value_ret omap_get_value(
+    const omap_root_t &omap_root,
+    Transaction &t,
+    const std::string &key) final;
 
-  omap_set_key_ret omap_set_key(omap_root_t &omap_root, Transaction &t,
-                                const std::string &key, const std::string &value) final;
-
-  omap_rm_key_ret omap_rm_key(omap_root_t &omap_root, Transaction &t,
-                              const std::string &key) final;
+  omap_set_key_ret omap_set_key(
+    omap_root_t &omap_root,
+    Transaction &t,
+    const std::string &key, const ceph::bufferlist &value) final;
 
-  omap_list_keys_ret omap_list_keys(const omap_root_t &omap_root, Transaction &t,
-                                    std::string &start,
-                                    size_t max_result_size = MAX_SIZE) final;
+  omap_rm_key_ret omap_rm_key(
+    omap_root_t &omap_root,
+    Transaction &t,
+    const std::string &key) final;
 
-  omap_list_ret omap_list(const omap_root_t &omap_root, Transaction &t,
-                          std::string &start,
-                          size_t max_result_size = MAX_SIZE) final;
+  omap_list_ret omap_list(
+    const omap_root_t &omap_root,
+    Transaction &t,
+    const std::optional<std::string> &start,
+    size_t max_result_size) final;
 
-  omap_clear_ret omap_clear(omap_root_t &omap_root, Transaction &t) final;
+  omap_clear_ret omap_clear(
+    omap_root_t &omap_root,
+    Transaction &t) final;
 
 };
 using BtreeOMapManagerRef = std::unique_ptr<BtreeOMapManager>;
index e26746995b61dc8a8e3b7b5eeec9494c4c7a8a2b..d508887f3dd7e1f1c1ddea8e0c2b20150cec0b9e 100644 (file)
@@ -53,24 +53,30 @@ struct OMapNode : LogicalCachedExtent {
 
   using get_value_ertr = base_ertr;
   using get_value_ret = OMapManager::omap_get_value_ret;
-  virtual get_value_ret get_value(omap_context_t oc, const std::string &key) = 0;
+  virtual get_value_ret get_value(
+    omap_context_t oc,
+    const std::string &key) = 0;
 
   using insert_ertr = base_ertr;
   using insert_ret = insert_ertr::future<mutation_result_t>;
-  virtual insert_ret insert(omap_context_t oc, const std::string &key, const std::string &value) = 0;
+  virtual insert_ret insert(
+    omap_context_t oc,
+    const std::string &key,
+    const ceph::bufferlist &value) = 0;
 
   using rm_key_ertr = base_ertr;
   using rm_key_ret = rm_key_ertr::future<mutation_result_t>;
-  virtual rm_key_ret rm_key(omap_context_t oc, const std::string &key) = 0;
-
-  using list_keys_ertr = OMapManager::omap_list_keys_ertr;
-  using list_keys_ret = OMapManager::omap_list_keys_ret;
-  virtual list_keys_ret list_keys(omap_context_t oc, std::string &start,
-                                  size_t max_result_size) = 0;
+  virtual rm_key_ret rm_key(
+    omap_context_t oc,
+    const std::string &key) = 0;
 
   using list_ertr = base_ertr;
+  using list_bare_ret = OMapManager::omap_list_bare_ret;
   using list_ret = OMapManager::omap_list_ret;
-  virtual list_ret list(omap_context_t oc, std::string &start, size_t max_result_size) = 0;
+  virtual list_ret list(
+    omap_context_t oc,
+    const std::optional<std::string> &start,
+    size_t max_result_size) = 0;
 
   using clear_ertr = base_ertr;
   using clear_ret = clear_ertr::future<>;
@@ -78,15 +84,21 @@ struct OMapNode : LogicalCachedExtent {
 
   using full_merge_ertr = base_ertr;
   using full_merge_ret = full_merge_ertr::future<OMapNodeRef>;
-  virtual full_merge_ret make_full_merge(omap_context_t oc, OMapNodeRef right) = 0;
+  virtual full_merge_ret make_full_merge(
+    omap_context_t oc,
+    OMapNodeRef right) = 0;
 
   using make_balanced_ertr = base_ertr;
   using make_balanced_ret = make_balanced_ertr::future
           <std::tuple<OMapNodeRef, OMapNodeRef, std::string>>;
-  virtual make_balanced_ret make_balanced(omap_context_t oc, OMapNodeRef _right) = 0;
+  virtual make_balanced_ret make_balanced(
+    omap_context_t oc,
+    OMapNodeRef _right) = 0;
 
   virtual omap_node_meta_t get_node_meta() const = 0;
-  virtual bool extent_will_overflow(size_t ksize, std::optional<size_t> vsize) const = 0;
+  virtual bool extent_will_overflow(
+    size_t ksize,
+    std::optional<size_t> vsize) const = 0;
   virtual bool extent_is_below_min() const = 0;
   virtual uint32_t get_node_size() = 0;
 
index 01197cfedeada48b6b26efb7f2cbd65220f12559..1e94e1bb4f61a0a01f1e9b8a460786e538af1c1c 100644 (file)
@@ -27,7 +27,7 @@ std::ostream &operator<<(std::ostream &out, const omap_inner_key_t &rhs)
 std::ostream &operator<<(std::ostream &out, const omap_leaf_key_t &rhs)
 {
   return out << "omap_leaf_key_t (" << rhs.key_off<< " - " << rhs.key_len
-             << " "<< rhs.val_off<<" - " << rhs.val_len << ")";
+             << " - " << rhs.val_len << ")";
 }
 
 std::ostream &OMapInnerNode::print_detail_l(std::ostream &out) const
@@ -55,8 +55,11 @@ dec_ref_ret dec_ref(omap_context_t oc, T&& addr) {
  * will result in a split outcome encoded in the returned mutation_result_t
  */
 OMapInnerNode::make_split_insert_ret
-OMapInnerNode::make_split_insert(omap_context_t oc, internal_iterator_t iter,
-                                 std::string key, laddr_t laddr)
+OMapInnerNode::make_split_insert(
+  omap_context_t oc,
+  internal_iterator_t iter,
+  std::string key,
+  laddr_t laddr)
 {
   return make_split_children(oc).safe_then([=] (auto tuple) {
     auto [left, right, pivot] = tuple;
@@ -78,8 +81,10 @@ OMapInnerNode::make_split_insert(omap_context_t oc, internal_iterator_t iter,
 
 
 OMapInnerNode::handle_split_ret
-OMapInnerNode::handle_split(omap_context_t oc, internal_iterator_t iter,
-                               mutation_result_t mresult)
+OMapInnerNode::handle_split(
+  omap_context_t oc,
+  internal_iterator_t iter,
+  mutation_result_t mresult)
 {
   logger().debug("OMapInnerNode: {}",  __func__);
   if (!is_pending()) {
@@ -90,7 +95,7 @@ OMapInnerNode::handle_split(omap_context_t oc, internal_iterator_t iter,
   auto [left, right, pivot] = *(mresult.split_tuple);
   //update operation will not cause node overflow, so we can do it first.
   journal_inner_update(iter, left->get_laddr(), maybe_get_delta_buffer());
-  bool overflow = extent_will_overflow(pivot.size() + 1, std::nullopt);
+  bool overflow = extent_will_overflow(pivot.size(), std::nullopt);
   if (!overflow) {
     journal_inner_insert(iter + 1, right->get_laddr(), pivot,
                          maybe_get_delta_buffer());
@@ -111,11 +116,14 @@ OMapInnerNode::handle_split(omap_context_t oc, internal_iterator_t iter,
 }
 
 OMapInnerNode::get_value_ret
-OMapInnerNode::get_value(omap_context_t oc, const std::string &key)
+OMapInnerNode::get_value(
+  omap_context_t oc,
+  const std::string &key)
 {
   logger().debug("OMapInnerNode: {} key = {}",  __func__, key);
   auto child_pt = get_containing_child(key);
-  auto laddr = child_pt->get_node_key().laddr;
+  assert(child_pt != iter_end());
+  auto laddr = child_pt->get_val();
   return omap_load_extent(oc, laddr, get_meta().depth - 1).safe_then(
     [oc, &key] (auto extent) {
     return extent->get_value(oc, key);
@@ -123,12 +131,15 @@ OMapInnerNode::get_value(omap_context_t oc, const std::string &key)
 }
 
 OMapInnerNode::insert_ret
-OMapInnerNode::insert(omap_context_t oc, const std::string &key, const std::string &value)
+OMapInnerNode::insert(
+  omap_context_t oc,
+  const std::string &key,
+  const ceph::bufferlist &value)
 {
   logger().debug("OMapInnerNode: {}  {}->{}",  __func__, key, value);
   auto child_pt = get_containing_child(key);
   assert(child_pt != iter_end());
-  auto laddr = child_pt->get_node_key().laddr;
+  auto laddr = child_pt->get_val();
   return omap_load_extent(oc, laddr, get_meta().depth - 1).safe_then(
     [oc, &key, &value] (auto extent) {
     return extent->insert(oc, key, value);
@@ -150,7 +161,8 @@ OMapInnerNode::rm_key(omap_context_t oc, const std::string &key)
 {
   logger().debug("OMapInnerNode: {}", __func__);
   auto child_pt = get_containing_child(key);
-  auto laddr = child_pt->get_node_key().laddr;
+  assert(child_pt != iter_end());
+  auto laddr = child_pt->get_val();
   return omap_load_extent(oc, laddr, get_meta().depth - 1).safe_then(
     [this, oc, &key, child_pt] (auto extent) {
     return extent->rm_key(oc, key)
@@ -177,78 +189,60 @@ OMapInnerNode::rm_key(omap_context_t oc, const std::string &key)
   });
 }
 
-OMapInnerNode::list_keys_ret
-OMapInnerNode::list_keys(omap_context_t oc, std::string &start, size_t max_result_size)
-{
-  logger().debug("OMapInnerNode: {}", __func__);
-  auto  child_iter = get_containing_child(start);
-
-  return seastar::do_with(child_iter, iter_end(), list_keys_result_t(), [=, &start]
-    (auto &biter, auto &eiter, auto &result) {
-    result.next = start;
-    return crimson::do_until([=, &biter, &eiter, &result] ()
-       -> list_keys_ertr::future<bool> {
-      if (biter == eiter  || result.keys.size() == max_result_size)
-        return list_keys_ertr::make_ready_future<bool>(true);
-
-      auto laddr = biter->get_node_key().laddr;
-      return omap_load_extent(oc, laddr, get_meta().depth - 1).safe_then(
-        [=, &biter, &eiter, &result] (auto &&extent) {
-        assert(result.next != std::nullopt);
-        return extent->list_keys(oc, result.next.value(), max_result_size - result.keys.size())
-          .safe_then([&biter, &eiter, &result] (auto &&child_result) mutable {
-          result.keys.insert(result.keys.end(), child_result.keys.begin(),
-                             child_result.keys.end());
-          biter++;
-          if (child_result.next == std::nullopt && biter != eiter)
-            result.next = biter->get_node_val();
-          else
-            result.next = child_result.next;
-
-          return list_keys_ertr::make_ready_future<bool>(false);
-        });
-      });
-    }).safe_then([&result, ref = OMapNodeRef(this)] {
-      return list_keys_ertr::make_ready_future<list_keys_result_t>(std::move(result));
-    });
-  });
-}
-
 OMapInnerNode::list_ret
-OMapInnerNode::list(omap_context_t oc, std::string &start, size_t max_result_size)
+OMapInnerNode::list(
+  omap_context_t oc,
+  const std::optional<std::string> &start,
+  size_t max_result_size)
 {
   logger().debug("OMapInnerNode: {}", __func__);
-  auto child_iter = get_containing_child(start);
-
-  return seastar::do_with(child_iter, iter_end(), list_kvs_result_t(), [=, &start]
-    (auto &biter, auto &eiter, auto &result) {
-    result.next = start;
-    return crimson::do_until([=, &biter, &eiter, &result] ()
-      -> list_ertr::future<bool> {
-      if (biter == eiter  || result.kvs.size() == max_result_size)
-        return list_ertr::make_ready_future<bool>(true);
-
-      auto laddr = biter->get_node_key().laddr;
-      return omap_load_extent(oc, laddr, get_meta().depth - 1).safe_then(
-        [=, &biter, &eiter, &result] (auto &&extent) {
-        assert(result.next != std::nullopt);
-        return extent->list(oc, result.next.value(), max_result_size - result.kvs.size())
-          .safe_then([&biter, &eiter, &result] (auto &&child_result) mutable {
-          result.kvs.insert(result.kvs.end(), child_result.kvs.begin(),
-                              child_result.kvs.end());
-          biter++;
-          if (child_result.next == std::nullopt && biter != eiter)
-            result.next = biter->get_node_val();
-          else
-            result.next = child_result.next;
 
-          return list_ertr::make_ready_future<bool>(false);
-        });
-      });
-    }).safe_then([&result, ref = OMapNodeRef(this)] {
-      return list_ertr::make_ready_future<list_kvs_result_t>(std::move(result));
+  auto child_iter = start ?
+    get_containing_child(*start) :
+    iter_cbegin();
+  assert(child_iter != iter_cend());
+
+  return seastar::do_with(
+    child_iter,
+    iter_cend(),
+    list_bare_ret(false, {}),
+    [=, &start](auto &biter, auto &eiter, auto &ret) {
+      auto &[complete, result] = ret;
+      return crimson::do_until(
+       [&, max_result_size, oc, this]() -> list_ertr::future<bool> {
+         if (biter == eiter  || result.size() == max_result_size) {
+           complete = biter == eiter;
+           return list_ertr::make_ready_future<bool>(true);
+         }
+         auto laddr = biter->get_val();
+         return omap_load_extent(
+           oc, laddr,
+           get_meta().depth - 1
+         ).safe_then([&, max_result_size, oc, this] (auto &&extent) {
+           return extent->list(
+             oc,
+             start,
+             max_result_size - result.size()
+           ).safe_then([&, max_result_size, this](auto &&child_ret) mutable {
+             auto &[child_complete, child_result] = child_ret;
+             if (result.size() && child_result.size()) {
+               assert(child_result.begin()->first > result.rbegin()->first);
+             }
+             if (child_result.size() && start) {
+               assert(child_result.begin()->first > *start);
+             }
+             result.insert(
+               child_result.begin(),
+               child_result.end());
+             biter++;
+             assert(child_complete || result.size() == max_result_size);
+             return list_ertr::make_ready_future<bool>(false);
+           });
+         });
+       }).safe_then([&ret, ref = OMapNodeRef(this)] {
+         return list_ertr::make_ready_future<list_bare_ret>(std::move(ret));
+       });
     });
-  });
 }
 
 OMapInnerNode::clear_ret
@@ -256,7 +250,7 @@ OMapInnerNode::clear(omap_context_t oc)
 {
   logger().debug("OMapInnerNode: {}", __func__);
   return crimson::do_for_each(iter_begin(), iter_end(), [this, oc] (auto iter) {
-    auto laddr = iter->get_node_key().laddr;
+    auto laddr = iter->get_val();
     return omap_load_extent(oc, laddr, get_meta().depth - 1).safe_then(
       [oc] (auto &&extent) {
       return extent->clear(oc);
@@ -314,7 +308,10 @@ OMapInnerNode::make_balanced(omap_context_t oc, OMapNodeRef _right)
 }
 
 OMapInnerNode::merge_entry_ret
-OMapInnerNode::merge_entry(omap_context_t oc, internal_iterator_t iter, OMapNodeRef entry)
+OMapInnerNode::merge_entry(
+  omap_context_t oc,
+  internal_iterator_t iter,
+  OMapNodeRef entry)
 {
   logger().debug("OMapInnerNode: {}", __func__);
   if (!is_pending()) {
@@ -324,7 +321,7 @@ OMapInnerNode::merge_entry(omap_context_t oc, internal_iterator_t iter, OMapNode
   }
   auto is_left = (iter + 1) == iter_end();
   auto donor_iter = is_left ? iter - 1 : iter + 1;
-  return omap_load_extent(oc, donor_iter->get_node_key().laddr,  get_meta().depth - 1)
+  return omap_load_extent(oc, donor_iter->get_val(), get_meta().depth - 1)
     .safe_then([=] (auto &&donor) mutable {
     auto [l, r] = is_left ?
       std::make_pair(donor, entry) : std::make_pair(entry, donor);
@@ -359,10 +356,14 @@ OMapInnerNode::merge_entry(omap_context_t oc, internal_iterator_t iter, OMapNode
         auto [replacement_l, replacement_r, replacement_pivot] = tuple;
         //update operation will not cuase node overflow, so we can do it first
         journal_inner_update(liter, replacement_l->get_laddr(), maybe_get_delta_buffer());
-        bool overflow = extent_will_overflow(replacement_pivot.size() + 1, std::nullopt);
+        bool overflow = extent_will_overflow(replacement_pivot.size(), std::nullopt);
         if (!overflow) {
-          journal_inner_replace(riter, replacement_r->get_laddr(),
-                                replacement_pivot, maybe_get_delta_buffer());
+          journal_inner_remove(riter, maybe_get_delta_buffer());
+          journal_inner_insert(
+           riter,
+           replacement_r->get_laddr(),
+           replacement_pivot,
+           maybe_get_delta_buffer());
           std::vector<laddr_t> dec_laddrs{l->get_laddr(), r->get_laddr()};
           return dec_ref(oc, dec_laddrs).safe_then([] {
             return merge_entry_ret(
@@ -411,22 +412,25 @@ OMapLeafNode::get_value(omap_context_t oc, const std::string &key)
   logger().debug("OMapLeafNode: {} key = {}", __func__, key);
   auto ite = find_string_key(key);
   if (ite != iter_end()) {
-    auto value = ite->get_string_val();
+    auto value = ite->get_val();
     return get_value_ret(
       get_value_ertr::ready_future_marker{},
-      std::make_pair(key, value));
+      value);
   } else {
     return get_value_ret(
       get_value_ertr::ready_future_marker{},
-      std::make_pair(key, ""));
+      std::nullopt);
   }
 }
 
 OMapLeafNode::insert_ret
-OMapLeafNode::insert(omap_context_t oc, const std::string &key, const std::string &value)
+OMapLeafNode::insert(
+  omap_context_t oc,
+  const std::string &key,
+  const ceph::bufferlist &value)
 {
   logger().debug("OMapLeafNode: {}, {} -> {}", __func__, key, value);
-  bool overflow = extent_will_overflow(key.size() + 1, value.size() + 1);
+  bool overflow = extent_will_overflow(key.size(), value.length());
   if (!overflow) {
     if (!is_pending()) {
       auto mut = oc.tm.get_mutable_extent(oc.t, this)->cast<OMapLeafNode>();
@@ -440,10 +444,8 @@ OMapLeafNode::insert(omap_context_t oc, const std::string &key, const std::strin
       journal_leaf_insert(insert_pt, key, value, maybe_get_delta_buffer());
 
       logger().debug(
-        "{}: {} inserted {}->{} {}"," OMapLeafNode",  __func__,
-        insert_pt.get_node_key(),
-        insert_pt.get_node_val(),
-        insert_pt.get_string_val());
+        "{}: {} inserted {}"," OMapLeafNode",  __func__,
+        insert_pt.get_key());
     }
     return insert_ret(
            insert_ertr::ready_future_marker{},
@@ -492,63 +494,50 @@ OMapLeafNode::rm_key(omap_context_t oc, const std::string &key)
   auto rm_pt = find_string_key(key);
   if (rm_pt != iter_end()) {
     journal_leaf_remove(rm_pt, maybe_get_delta_buffer());
-    logger().debug(
-      "{}: removed {}->{} {}", __func__,
-      rm_pt->get_node_key(),
-      rm_pt->get_node_val(),
-      rm_pt->get_string_val());
-      if (extent_is_below_min()) {
-        return rm_key_ret(
-               rm_key_ertr::ready_future_marker{},
-               mutation_result_t(mutation_status_t::NEED_MERGE, std::nullopt,
-                                 this->cast<OMapNode>()));
-      } else {
-        return rm_key_ret(
-               rm_key_ertr::ready_future_marker{},
-               mutation_result_t(mutation_status_t::SUCCESS, std::nullopt, std::nullopt));
-      }
+    if (extent_is_below_min()) {
+      return rm_key_ret(
+       rm_key_ertr::ready_future_marker{},
+       mutation_result_t(mutation_status_t::NEED_MERGE, std::nullopt,
+                         this->cast<OMapNode>()));
+    } else {
+      return rm_key_ret(
+       rm_key_ertr::ready_future_marker{},
+       mutation_result_t(mutation_status_t::SUCCESS, std::nullopt, std::nullopt));
+    }
   } else {
     return rm_key_ret(
-           rm_key_ertr::ready_future_marker{},
-           mutation_result_t(mutation_status_t::FAIL, std::nullopt, std::nullopt));
+      rm_key_ertr::ready_future_marker{},
+      mutation_result_t(mutation_status_t::FAIL, std::nullopt, std::nullopt));
   }
 
 }
 
-OMapLeafNode::list_keys_ret
-OMapLeafNode::list_keys(omap_context_t oc, std::string &start, size_t max_result_size)
-{
-  logger().debug("OMapLeafNode: {}", __func__);
-  auto result = list_keys_result_t();
-  iterator  iter = start == "" ?  iter_begin() : string_lower_bound(start);
-  // two stop condition, reach the end of leaf or size > required size(max_result_size)
-  for (; iter != iter_end() && result.keys.size() <= max_result_size; iter++) {
-    result.keys.push_back(iter->get_node_val());
-  }
-  if (iter == iter_end())
-   result.next = std::nullopt;   //have searched all items in the leaf
-  else
-   result.next = iter->get_node_val();
-
-  return list_keys_ertr::make_ready_future<list_keys_result_t>(std::move(result));
-
-}
-
 OMapLeafNode::list_ret
-OMapLeafNode::list(omap_context_t oc, std::string &start, size_t max_result_size)
+OMapLeafNode::list(
+  omap_context_t oc,
+  const std::optional<std::string> &start,
+  size_t max_result_size)
 {
-  logger().debug("OMapLeafNode: {}", __func__);
-  auto result = list_kvs_result_t();
-  iterator  iter = start == "" ? iter_begin() : string_lower_bound(start);
-  for (; iter != iter_end() && result.kvs.size() <= max_result_size; iter++) {
-    result.kvs.push_back({iter->get_node_val(), iter->get_string_val()});
+  logger().debug(
+    "OMapLeafNode::{} start {} max_result_size {}",
+    __func__,
+    start ? start->c_str() : "",
+    max_result_size
+  );
+  auto ret = list_bare_ret(false, {});
+  auto &[complete, result] = ret;
+  auto iter = start ?
+    string_upper_bound(*start) : 
+    iter_begin();
+
+  for (; iter != iter_end() && result.size() < max_result_size; iter++) {
+    result.emplace(std::make_pair(iter->get_key(), iter->get_val()));
   }
-  if (iter == iter_end())
-   result.next = std::nullopt;  //have searched all items in the lead
-  else
-   result.next = iter->get_node_val();
 
-  return list_ertr::make_ready_future<list_kvs_result_t>(std::move(result));
+  complete = (iter == iter_end());
+
+  return list_ertr::make_ready_future<list_bare_ret>(
+    std::move(ret));
 }
 
 OMapLeafNode::clear_ret
index 524933f513ee971c9c4abc193a772576e130999f..fdd6f0a0c94c90c3c05964382eecc603f0605a9b 100644 (file)
@@ -57,13 +57,19 @@ struct OMapInnerNode
 
   get_value_ret get_value(omap_context_t oc, const std::string &key) final;
 
-  insert_ret insert(omap_context_t oc, const std::string &key, const std::string &value) final;
+  insert_ret insert(
+    omap_context_t oc,
+    const std::string &key,
+    const ceph::bufferlist &value) final;
 
-  rm_key_ret rm_key(omap_context_t oc, const std::string &key) final;
+  rm_key_ret rm_key(
+    omap_context_t oc,
+    const std::string &key) final;
 
-  list_keys_ret list_keys(omap_context_t oc, std::string &start, size_t max_result_size) final;
-
-  list_ret list(omap_context_t oc, std::string &start, size_t max_result_size) final;
+  list_ret list(
+    omap_context_t oc,
+    const std::optional<std::string> &start,
+    size_t max_result_size) final;
 
   clear_ret clear(omap_context_t oc) final;
 
@@ -72,25 +78,29 @@ struct OMapInnerNode
           <std::tuple<OMapInnerNodeRef, OMapInnerNodeRef, std::string>>;
   split_children_ret make_split_children(omap_context_t oc);
 
-  full_merge_ret make_full_merge(omap_context_t oc, OMapNodeRef right) final;
+  full_merge_ret make_full_merge(
+    omap_context_t oc, OMapNodeRef right) final;
 
-  make_balanced_ret
-    make_balanced(omap_context_t oc, OMapNodeRef right) final;
+  make_balanced_ret make_balanced(
+    omap_context_t oc, OMapNodeRef right) final;
 
   using make_split_insert_ertr = base_ertr; 
   using make_split_insert_ret = make_split_insert_ertr::future<mutation_result_t>;
-  make_split_insert_ret make_split_insert(omap_context_t oc, internal_iterator_t iter,
-                                          std::string key, laddr_t laddr);
+  make_split_insert_ret make_split_insert(
+    omap_context_t oc, internal_iterator_t iter,
+    std::string key, laddr_t laddr);
 
   using merge_entry_ertr = base_ertr;
   using merge_entry_ret = merge_entry_ertr::future<mutation_result_t>;
-  merge_entry_ret merge_entry(omap_context_t oc,
-                              internal_iterator_t iter, OMapNodeRef entry);
+  merge_entry_ret merge_entry(
+    omap_context_t oc,
+    internal_iterator_t iter, OMapNodeRef entry);
 
   using handle_split_ertr = base_ertr;
   using handle_split_ret = handle_split_ertr::future<mutation_result_t>;
-  handle_split_ret handle_split(omap_context_t oc, internal_iterator_t iter,
-                                      mutation_result_t mresult);
+  handle_split_ret handle_split(
+    omap_context_t oc, internal_iterator_t iter,
+    mutation_result_t mresult);
 
   std::ostream &print_detail_l(std::ostream &out) const final;
 
@@ -115,9 +125,9 @@ struct OMapInnerNode
   }
 
   internal_iterator_t get_containing_child(const std::string &key);
-
 };
 using OMapInnerNodeRef = OMapInnerNode::OMapInnerNodeRef;
+
 /**
  * OMapLeafNode
  *
@@ -142,7 +152,8 @@ struct OMapLeafNode
   static constexpr extent_types_t type = extent_types_t::OMAP_LEAF;
 
   omap_node_meta_t get_node_meta() const final { return get_meta(); }
-  bool extent_will_overflow(size_t ksize, std::optional<size_t> vsize) const {
+  bool extent_will_overflow(
+    size_t ksize, std::optional<size_t> vsize) const {
     return is_overflow(ksize, *vsize);
   }
   bool extent_is_below_min() const { return below_min(); }
@@ -158,26 +169,38 @@ struct OMapLeafNode
     return is_mutation_pending() ? &delta_buffer : nullptr;
   }
 
-  get_value_ret get_value(omap_context_t oc, const std::string &key) final;
-
-  insert_ret insert(omap_context_t oc, const std::string &key, const std::string &value) final;
+  get_value_ret get_value(
+    omap_context_t oc, const std::string &key) final;
 
-  rm_key_ret rm_key(omap_context_t oc, const std::string &key) final;
+  insert_ret insert(
+    omap_context_t oc,
+    const std::string &key,
+    const ceph::bufferlist &value) final;
 
-  list_keys_ret list_keys(omap_context_t oc, std::string &start, size_t max_result_size) final;
+  rm_key_ret rm_key(
+    omap_context_t oc, const std::string &key) final;
 
-  list_ret list(omap_context_t oc, std::string &start, size_t max_result_size) final;
+  list_ret list(
+    omap_context_t oc,
+    const std::optional<std::string> &start,
+    size_t max_result_size) final;
 
-  clear_ret clear(omap_context_t oc) final;
+  clear_ret clear(
+    omap_context_t oc) final;
 
   using split_children_ertr = base_ertr;
   using split_children_ret = split_children_ertr::future
           <std::tuple<OMapLeafNodeRef, OMapLeafNodeRef, std::string>>;
-  split_children_ret make_split_children(omap_context_t oc);
+  split_children_ret make_split_children(
+    omap_context_t oc);
 
-  full_merge_ret make_full_merge(omap_context_t oc, OMapNodeRef right) final;
+  full_merge_ret make_full_merge(
+    omap_context_t oc,
+    OMapNodeRef right) final;
 
-  make_balanced_ret make_balanced(omap_context_t oc, OMapNodeRef _right) final;
+  make_balanced_ret make_balanced(
+    omap_context_t oc,
+    OMapNodeRef _right) final;
 
   extent_types_t get_type() const final {
     return type;
index 1f2c770dc89e6c4c23a357445605c7ff914bd576..e187a1cad903a3f5d77953b2b20c907cdfb36c86 100644 (file)
@@ -44,12 +44,12 @@ struct omap_node_meta_le_t {
 };
 
 struct omap_inner_key_t {
-  int16_t key_off = 0;
-  int16_t key_len = 0;
+  uint16_t key_off = 0;
+  uint16_t key_len = 0;
   laddr_t laddr = 0;
 
   omap_inner_key_t() = default;
-  omap_inner_key_t(int16_t off, int16_t len, laddr_t addr)
+  omap_inner_key_t(uint16_t off, uint16_t len, laddr_t addr)
   : key_off(off), key_len(len), laddr(addr) {}
 
   inline bool operator==(const omap_inner_key_t b) const {
@@ -68,24 +68,24 @@ struct omap_inner_key_t {
 };
 
 struct omap_inner_key_le_t {
-  ceph_les16 key_off = init_les16(0);
-  ceph_les16 key_len = init_les16(0);
+  ceph_le16 key_off = init_le16(0);
+  ceph_le16 key_len = init_le16(0);
   laddr_le_t laddr = laddr_le_t(0);
 
   omap_inner_key_le_t() = default;
   omap_inner_key_le_t(const omap_inner_key_le_t &) = default;
   explicit omap_inner_key_le_t(const omap_inner_key_t &key)
-    : key_off(init_les16(key.key_off)),
-      key_len(init_les16(key.key_len)),
+    : key_off(init_le16(key.key_off)),
+      key_len(init_le16(key.key_len)),
       laddr(laddr_le_t(key.laddr)) {}
 
   operator omap_inner_key_t() const {
-    return omap_inner_key_t{int16_t(key_off), int16_t(key_len), laddr_t(laddr)};
+    return omap_inner_key_t{uint16_t(key_off), uint16_t(key_len), laddr_t(laddr)};
   }
 
   omap_inner_key_le_t& operator=(omap_inner_key_t key) {
-    key_off = init_les16(key.key_off);
-    key_len = init_les16(key.key_len);
+    key_off = init_le16(key.key_off);
+    key_len = init_le16(key.key_len);
     laddr = laddr_le_t(key.laddr);
     return *this;
   }
@@ -96,64 +96,59 @@ struct omap_inner_key_le_t {
 };
 
 struct omap_leaf_key_t {
-  int16_t key_off = 0;
-  int16_t key_len = 0;
-  int16_t val_off = 0;
-  int16_t val_len = 0;
+  uint16_t key_off = 0;
+  uint16_t key_len = 0;
+  uint16_t val_len = 0;
 
   omap_leaf_key_t() = default;
-  omap_leaf_key_t(int16_t k_off, int16_t k_len, int16_t v_off, int16_t v_len)
-  : key_off(k_off), key_len(k_len), val_off(v_off), val_len(v_len) {}
+  omap_leaf_key_t(uint16_t k_off, uint16_t k_len, uint16_t v_len)
+  : key_off(k_off), key_len(k_len), val_len(v_len) {}
 
   inline bool operator==(const omap_leaf_key_t b) const {
     return key_off == b.key_off && key_len == b.key_len &&
-           val_off == b.val_off && val_len == b.val_len;
+           val_len == b.val_len;
   }
   inline bool operator!=(const omap_leaf_key_t b) const {
     return key_off != b.key_off || key_len != b.key_len ||
-           val_off != b.val_off || val_len != b.val_len;
+           val_len != b.val_len;
   }
 
   DENC(omap_leaf_key_t, v, p) {
     DENC_START(1, 1, p);
     denc(v.key_off, p);
     denc(v.key_len, p);
-    denc(v.val_off, p);
     denc(v.val_len, p);
     DENC_FINISH(p);
   }
 };
 
 struct omap_leaf_key_le_t {
-  ceph_les16 key_off = init_les16(0);
-  ceph_les16 key_len = init_les16(0);
-  ceph_les16 val_off = init_les16(0);
-  ceph_les16 val_len = init_les16(0);
+  ceph_le16 key_off = init_le16(0);
+  ceph_le16 key_len = init_le16(0);
+  ceph_le16 val_len = init_le16(0);
 
   omap_leaf_key_le_t() = default;
   omap_leaf_key_le_t(const omap_leaf_key_le_t &) = default;
   explicit omap_leaf_key_le_t(const omap_leaf_key_t &key)
-    : key_off(init_les16(key.key_off)),
-      key_len(init_les16(key.key_len)),
-      val_off(init_les16(key.val_off)),
-      val_len(init_les16(key.val_len)) {}
+    : key_off(init_le16(key.key_off)),
+      key_len(init_le16(key.key_len)),
+      val_len(init_le16(key.val_len)) {}
 
   operator omap_leaf_key_t() const {
-    return omap_leaf_key_t{int16_t(key_off), int16_t(key_len),
-                           int16_t(val_off), int16_t(val_len)};
+    return omap_leaf_key_t{uint16_t(key_off), uint16_t(key_len),
+                           uint16_t(val_len)};
   }
 
   omap_leaf_key_le_t& operator=(omap_leaf_key_t key) {
-    key_off = init_les16(key.key_off);
-    key_len = init_les16(key.key_len);
-    val_off = init_les16(key.val_off);
-    val_len = init_les16(key.val_len);
+    key_off = init_le16(key.key_off);
+    key_len = init_le16(key.key_len);
+    val_len = init_le16(key.val_len);
     return *this;
   }
 
   inline bool operator==(const omap_leaf_key_le_t b) const {
     return key_off == b.key_off && key_len == b.key_len &&
-           val_off == b.val_off && val_len == b.val_len;
+           val_len == b.val_len;
   }
 };
 
index ef24f7eea6cc866dd12faaf6dc49d66d6cd64120..aea7487ec6e24c0c48557019d9d72348d963231e 100644 (file)
@@ -19,16 +19,15 @@ namespace crimson::os::seastore::omap_manager {
 class StringKVInnerNodeLayout;
 class StringKVLeafNodeLayout;
 
-
 /**
 * copy_from_foreign
 *
 * Copy from another node entries to this node.
 * [from_src, to_src) is another node entry range.
 * tgt is this node entry to copy to.
 * tgt and from_src must be from different nodes.
 * from_src and to_src must be in the same node.
 */
+ * copy_from_foreign
+ *
+ * Copy from another node entries to this node.
+ * [from_src, to_src) is another node entry range.
+ * tgt is this node entry to copy to.
+ * tgt and from_src must be from different nodes.
+ * from_src and to_src must be in the same node.
+ */
 template <typename iterator, typename const_iterator>
 static void copy_from_foreign(
   iterator tgt,
@@ -55,7 +54,9 @@ static void copy_from_foreign(
     i->update_offset(offset_diff);
   }
 }
-/* copy_from_local
+
+/**
+ * copy_from_local
  *
  * Copies entries from [from_src, to_src) to tgt.
  * tgt, from_src, and to_src must be from the same node.
@@ -89,14 +90,14 @@ struct delta_inner_t {
     UPDATE,
     REMOVE,
   } op;
-  omap_inner_key_t key;
-  std::string val;
+  std::string key;
+  laddr_t addr;
 
   DENC(delta_inner_t, v, p) {
     DENC_START(1, 1, p);
     denc(v.op, p);
     denc(v.key, p);
-    denc(v.val, p);
+    denc(v.addr, p);
     DENC_FINISH(p);
   }
 
@@ -104,7 +105,7 @@ struct delta_inner_t {
   bool operator==(const delta_inner_t &rhs) const {
     return op == rhs.op &&
            key == rhs.key &&
-           val == rhs.val;
+           addr == rhs.addr;
   }
 };
 }
@@ -118,7 +119,7 @@ struct delta_leaf_t {
     REMOVE,
   } op;
   std::string key;
-  std::string val;
+  ceph::bufferlist val;
 
   DENC(delta_leaf_t, v, p) {
     DENC_START(1, 1, p);
@@ -146,35 +147,31 @@ public:
     return buffer.empty();
   }
   void insert(
-    const omap_inner_key_t key,
-    std::string_view val) {
-    omap_inner_key_le_t k;
-    k = key;
+    const std::string &key,
+    laddr_t addr) {
     buffer.push_back(
       delta_inner_t{
         delta_inner_t::op_t::INSERT,
-        k,
-        val.data()
+        key,
+        addr
       });
   }
   void update(
-    const omap_inner_key_t key,
-    std::string_view val) {
-    omap_inner_key_le_t k;
-    k = key;
+    const std::string &key,
+    laddr_t addr) {
     buffer.push_back(
       delta_inner_t{
-        delta_inner_t::op_t::UPDATE,
-        k,
-        val.data()
+       delta_inner_t::op_t::UPDATE,
+       key,
+       addr
       });
   }
-  void remove(std::string_view val) {
+  void remove(const std::string &key) {
     buffer.push_back(
       delta_inner_t{
-        delta_inner_t::op_t::REMOVE,
-        omap_inner_key_le_t(),
-        val.data()
+       delta_inner_t::op_t::REMOVE,
+       key,
+       L_ADDR_NULL
       });
   }
 
@@ -183,13 +180,7 @@ public:
       i.replay(node);
     }
   }
-  size_t get_bytes() const {
-    size_t size = 0;
-    for (auto &i: buffer) {
-      size += sizeof(i.op) + sizeof(i.key) + i.val.size();
-    }
-    return size;
-  }
+
   void clear() {
     buffer.clear();
   }
@@ -215,31 +206,31 @@ public:
     return buffer.empty();
   }
   void insert(
-    std::string_view key,
-    std::string_view val) {
+    const std::string &key,
+    const ceph::bufferlist &val) {
     buffer.push_back(
       delta_leaf_t{
         delta_leaf_t::op_t::INSERT,
-        key.data(),
-        val.data()
+        key,
+        val
       });
   }
   void update(
-    std::string_view key,
-    std::string_view val) {
+    const std::string &key,
+    const ceph::bufferlist &val) {
     buffer.push_back(
       delta_leaf_t{
         delta_leaf_t::op_t::UPDATE,
-        key.data(),
-        val.data()
+       key,
+       val
       });
   }
-  void remove(std::string_view key) {
+  void remove(const std::string &key) {
     buffer.push_back(
       delta_leaf_t{
-        delta_leaf_t::op_t::REMOVE,
-        key.data(),
-        ""
+       delta_leaf_t::op_t::REMOVE,
+       key,
+       bufferlist()
       });
   }
 
@@ -248,13 +239,7 @@ public:
       i.replay(node);
     }
   }
-  size_t get_bytes() const {
-    size_t size = 0;
-    for (auto &i: buffer) {
-      size += sizeof(i.op) + i.key.size() + i.val.size();
-    }
-    return size;
-  }
+
   void clear() {
     buffer.clear();
   }
@@ -276,9 +261,6 @@ namespace crimson::os::seastore::omap_manager {
 /**
  * StringKVInnerNodeLayout
  *
- * Reusable implementation of a fixed size key mapping
- * omap_inner_key_t(fixed) -> V(string) with internal representations omap_inner_key_le_t.
- *
  * Uses absl::container_internal::Layout for the actual key memory layout.
  *
  * The primary interface exposed is centered on the iterator
@@ -294,7 +276,7 @@ class StringKVInnerNodeLayout {
   friend class delta_inner_t;
 public:
   template <bool is_const>
-  struct iter_t : public std::iterator<std::input_iterator_tag, StringKVInnerNodeLayout> {
+  class iter_t : public std::iterator<std::input_iterator_tag, StringKVInnerNodeLayout> {
     friend class StringKVInnerNodeLayout;
 
     template <typename iterator, typename const_iterator>
@@ -304,13 +286,14 @@ public:
 
     using parent_t = typename crimson::common::maybe_const_t<StringKVInnerNodeLayout, is_const>::type;
 
-    parent_t node;
+    mutable parent_t node;
     uint16_t index;
 
     iter_t(
       parent_t parent,
       uint16_t index) : node(parent), index(index) {}
 
+  public:
     iter_t(const iter_t &) = default;
     iter_t(iter_t &&) = default;
     iter_t &operator=(const iter_t &) = default;
@@ -335,20 +318,29 @@ public:
       return *this;
     }
 
+    iter_t operator--(int) {
+      auto ret = *this;
+      assert(index > 0);
+      --index;
+      return ret;
+    }
+
+    iter_t &operator--() {
+      assert(index > 0);
+      --index;
+      return *this;
+    }
+
     uint16_t operator-(const iter_t &rhs) const {
       assert(rhs.node == node);
       return index - rhs.index;
     }
 
     iter_t operator+(uint16_t off) const {
-      return iter_t(
-                  node,
-                  index + off);
+      return iter_t(node, index + off);
     }
     iter_t operator-(uint16_t off) const {
-      return iter_t(
-                  node,
-                  index - off);
+      return iter_t(node, index - off);
     }
 
     uint16_t operator<(const iter_t &rhs) const {
@@ -370,135 +362,104 @@ public:
       return !(*this == rhs);
     }
 
+  private:
     omap_inner_key_t get_node_key() const {
       omap_inner_key_le_t kint = node->get_node_key_ptr()[index];
       return omap_inner_key_t(kint);
     }
-
-    char *get_node_val_ptr() {
-      auto tail = node->buf + OMAP_BLOCK_SIZE;
-      if (*this == node->iter_end())
-        return tail;
-      else {
-        return tail - static_cast<uint32_t>(get_node_key().key_off);
-      }
+    auto get_node_key_ptr() const {
+      return reinterpret_cast<
+       typename crimson::common::maybe_const_t<char, is_const>::type>(
+         node->get_node_key_ptr() + index);
     }
 
-    const char *get_node_val_ptr() const {
+    uint32_t get_node_val_offset() const {
+      return get_node_key().key_off;
+    }
+    auto get_node_val_ptr() const {
       auto tail = node->buf + OMAP_BLOCK_SIZE;
-      if ( *this == node->iter_end())
+      if (*this == node->iter_end())
         return tail;
       else {
-        return tail - static_cast<uint32_t>(get_node_key().key_off);
+        return tail - get_node_val_offset();
       }
     }
 
-    void set_node_val(const std::string &val) {
-      static_assert(!is_const);
-      std::strcpy((char*)get_node_val_ptr(), val.c_str()); //copy char* to char* include "\0"
-    }
-
-    std::string get_node_val(){
-     std::string s(get_node_val_ptr());
-     return s;
-    }
-    std::string get_node_val() const{
-      std::string s(get_node_val_ptr());
-      return s;
-    }
-
-    bool contains(std::string_view key) const {
-      auto next = *this + 1;
-      if (next == node->iter_end())
-        return get_node_val() <= key;
-
-      return (get_node_val() <= key) && (next->get_node_val() > key);
-    }
-
-    uint16_t get_index() const {
-      return index;
-    }
-
-  private:
-    int get_right_offset() const {
-      return get_node_key().key_off;
-    }
-
     int get_right_offset_end() const {
       if (index == 0)
         return 0;
       else
-        return (*this - 1)->get_right_offset();
+        return (*this - 1)->get_node_val_offset();
     }
-
-    char *get_right_ptr() {
-      return node->buf + OMAP_BLOCK_SIZE - get_right_offset();
-    }
-
-    const char *get_right_ptr() const {
-      static_assert(!is_const);
-      return node->buf + OMAP_BLOCK_SIZE - get_right_offset();
-    }
-
-    char *get_right_ptr_end() {
-      if (index == 0)
-        return node->buf + OMAP_BLOCK_SIZE;
-      else
-        return (*this - 1)->get_right_ptr();
-    }
-
-    const char *get_right_ptr_end() const {
-      if (index == 0)
-        return node->buf + OMAP_BLOCK_SIZE;
-      else
-        return (*this - 1)->get_right_ptr();
+    auto get_right_ptr_end() const {
+      return node->buf + OMAP_BLOCK_SIZE - get_right_offset_end();
     }
 
     void update_offset(int offset) {
+      static_assert(!is_const);
       auto key = get_node_key();
       assert(offset + key.key_off >= 0);
       key.key_off += offset;
       set_node_key(key);
     }
 
-    void set_node_key(omap_inner_key_t _lb) const {
+    void set_node_key(omap_inner_key_t _lb) {
       static_assert(!is_const);
       omap_inner_key_le_t lb;
       lb = _lb;
       node->get_node_key_ptr()[index] = lb;
     }
 
-    typename crimson::common::maybe_const_t<char, is_const>::type get_node_key_ptr() const {
-      return reinterpret_cast<
-        typename crimson::common::maybe_const_t<char, is_const>::type>(
-               node->get_node_key_ptr() + index);
+    void set_node_val(const std::string &str) {
+      static_assert(!is_const);
+      assert(str.size() == get_node_key().key_len);
+      assert(get_node_key().key_off >= str.size());
+      assert(get_node_key().key_off < OMAP_BLOCK_SIZE);
+      assert(str.size() < OMAP_BLOCK_SIZE);
+      ::memcpy(get_node_val_ptr(), str.data(), str.size());
+    }
+
+  public:
+    uint16_t get_index() const {
+      return index;
+    }
+
+    std::string get_key() const {
+      return std::string(
+       get_node_val_ptr(),
+       get_node_key().key_len);
     }
 
+    laddr_t get_val() const {
+      return get_node_key().laddr;
+    }
+
+    bool contains(std::string_view key) const {
+      assert(*this != node->iter_end());
+      auto next = *this + 1;
+      if (next == node->iter_end()) {
+        return get_key() <= key;
+      } else {
+       return (get_key() <= key) && (next->get_key() > key);
+      }
+    }
   };
   using const_iterator = iter_t<true>;
   using iterator = iter_t<false>;
 
-
 public:
-
   void journal_inner_insert(
     const_iterator _iter,
     const laddr_t laddr,
     const std::string &key,
     delta_inner_buffer_t *recorder) {
     auto iter = iterator(this, _iter.index);
-    omap_inner_key_t node_key;
-    node_key.laddr = laddr;
-    node_key.key_len = key.size() + 1;
-    node_key.key_off = iter.get_index() == 0 ?
-                       node_key.key_len :
-                       (iter - 1).get_node_key().key_off + node_key.key_len;
     if (recorder) {
       recorder->insert(
-        node_key,
-        key);
+       key,
+       laddr);
     }
-    inner_insert(iter, node_key, key);
+    inner_insert(iter, key, laddr);
   }
 
   void journal_inner_update(
@@ -506,31 +467,11 @@ public:
     const laddr_t laddr,
     delta_inner_buffer_t *recorder) {
     auto iter = iterator(this, _iter.index);
-    auto node_key = iter.get_node_key();
-    node_key.laddr = laddr;
-    if (recorder) {
-      recorder->update(node_key, iter->get_node_val());
-    }
-    inner_update(iter, node_key);
-  }
-
-  void journal_inner_replace(
-    const_iterator _iter,
-    const laddr_t laddr,
-    const std::string &key,
-    delta_inner_buffer_t *recorder) {
-    auto iter = iterator(this, _iter.index);
-    omap_inner_key_t node_key;
-    node_key.laddr = laddr;
-    node_key.key_len = key.size() + 1;
-    node_key.key_off = iter.get_index() == 0?
-                       node_key.key_len :
-                       (iter - 1).get_node_key().key_off + node_key.key_len;
+    auto key = iter->get_key();
     if (recorder) {
-      recorder->remove(iter->get_node_val());
-      recorder->insert(node_key, key);
+      recorder->update(key, laddr);
     }
-    inner_replace(iter, node_key, key);
+    inner_update(iter, laddr);
   }
 
   void journal_inner_remove(
@@ -538,7 +479,7 @@ public:
     delta_inner_buffer_t *recorder) {
     auto iter = iterator(this, _iter.index);
     if (recorder) {
-      recorder->remove(iter->get_node_val());
+      recorder->remove(iter->get_key());
     }
     inner_remove(iter);
   }
@@ -562,17 +503,23 @@ public:
     *layout.template Pointer<0>(buf) = s;
   }
 
-  const_iterator iter_begin() const {
+  const_iterator iter_cbegin() const {
     return const_iterator(
       this,
       0);
   }
+  const_iterator iter_begin() const {
+    return iter_cbegin();
+  }
 
-  const_iterator iter_end() const {
+  const_iterator iter_cend() const {
     return const_iterator(
       this,
       get_size());
   }
+  const_iterator iter_end() const {
+    return iter_cend();
+  }
 
   iterator iter_begin() {
     return iterator(
@@ -597,7 +544,7 @@ public:
     while (start != end) {
       unsigned mid = (start + end) / 2;
       const_iterator iter(this, mid);
-      std::string s = iter->get_node_val();
+      std::string s = iter->get_key();
       if (s < str) {
         start = ++mid;
       } else if ( s > str) {
@@ -617,7 +564,7 @@ public:
   const_iterator string_upper_bound(std::string_view str) const {
     auto ret = iter_begin();
     for (; ret != iter_end(); ++ret) {
-     std::string s = ret->get_node_val();
+      std::string s = ret->get_key();
       if (s > str)
         break;
     }
@@ -632,19 +579,21 @@ public:
   const_iterator find_string_key(std::string_view str) const {
     auto ret = iter_begin();
     for (; ret != iter_end(); ++ret) {
-     std::string s = ret->get_node_val();
+     std::string s = ret->get_key();
       if (s == str)
         break;
     }
     return ret;
   }
+
   iterator find_string_key(std::string_view str) {
     const auto &tref = *this;
     return iterator(this, tref.find_string_key(str).index);
   }
 
   const_iterator get_split_pivot() const {
-    uint32_t total_size = omap_inner_key_t(get_node_key_ptr()[get_size()-1]).key_off;
+    uint32_t total_size = omap_inner_key_t(
+      get_node_key_ptr()[get_size()-1]).key_off;
     uint32_t pivot_size = total_size / 2;
     uint32_t size = 0;
     for (auto ite = iter_begin(); ite < iter_end(); ite++) {
@@ -707,8 +656,8 @@ public:
     auto iter = iter_begin();
     auto iter2 = rhs.iter_begin();
     while (iter != iter_end()) {
-      if (iter->get_node_key() != iter2->get_node_key() ||
-          iter->get_node_val() != iter2->get_node_val()) {
+      if (iter->get_key() != iter2->get_key() ||
+          iter->get_val() != iter2->get_val()) {
           return false;
       }
       iter++;
@@ -738,7 +687,7 @@ public:
     left.set_meta(lmeta);
     right.set_meta(rmeta);
 
-    return piviter->get_node_val();
+    return piviter->get_key();
   }
 
   /**
@@ -807,8 +756,8 @@ public:
     }
 
     auto replacement_pivot = pivot_idx >= left.get_size() ?
-      right.iter_idx(pivot_idx - left.get_size())->get_node_val() :
-      left.iter_idx(pivot_idx)->get_node_val();
+      right.iter_idx(pivot_idx - left.get_size())->get_key() :
+      left.iter_idx(pivot_idx)->get_key();
 
     if (pivot_size < left_size) {
       copy_from_foreign(
@@ -858,44 +807,42 @@ public:
 private:
   void inner_insert(
     iterator iter,
-    const omap_inner_key_t key,
-    const std::string &val) {
+    const std::string &key,
+    laddr_t val) {
     if (iter != iter_begin()) {
-      assert((iter - 1)->get_node_val() < val);
+      assert((iter - 1)->get_key() < key);
     }
     if (iter != iter_end()) {
-      assert(iter->get_node_val() > val);
+      assert(iter->get_key() > key);
+    }
+    assert(!is_overflow(key.size()));
+
+    if (iter != iter_end()) {
+      copy_from_local(key.size(), iter + 1, iter, iter_end());
     }
-    assert(is_overflow(val.size() + 1) == false);
-    if (get_size() != 0 && iter != iter_end())
-      copy_from_local(key.key_len, iter + 1, iter, iter_end());
 
-    iter->set_node_key(key);
+    omap_inner_key_t nkey;
+    nkey.key_len = key.size();
+    nkey.laddr = val;
+    if (iter != iter_begin()) {
+      auto pkey = (iter - 1).get_node_key();
+      nkey.key_off = nkey.key_len + pkey.key_off;
+    } else {
+      nkey.key_off = nkey.key_len;
+    }
+
+    iter->set_node_key(nkey);
     set_size(get_size() + 1);
-    iter->set_node_val(val);
+    iter->set_node_val(key);
   }
 
   void inner_update(
     iterator iter,
-    omap_inner_key_t key ) {
-    assert(iter != iter_end());
-    iter->set_node_key(key);
-  }
-
-  void inner_replace(
-    iterator iter,
-    const omap_inner_key_t key,
-    const std::string &val) {
+    laddr_t addr) {
     assert(iter != iter_end());
-    if (iter != iter_begin()) {
-      assert((iter - 1)->get_node_val() < val);
-    }
-    if ((iter + 1) != iter_end()) {
-      assert((iter + 1)->get_node_val() > val);
-    }
-    assert(is_overflow(val.size() + 1) == false);
-    inner_remove(iter);
-    inner_insert(iter, key, val);
+    auto node_key = iter->get_node_key();
+    node_key.laddr = addr;
+    iter->set_node_key(node_key);
   }
 
   void inner_remove(iterator iter) {
@@ -928,7 +875,7 @@ class StringKVLeafNodeLayout {
 
 public:
   template <bool is_const>
-  struct iter_t {
+  class iter_t {
     friend class StringKVLeafNodeLayout;
     using parent_t = typename crimson::common::maybe_const_t<StringKVLeafNodeLayout, is_const>::type;
 
@@ -944,6 +891,7 @@ public:
       parent_t parent,
       uint16_t index) : node(parent), index(index) {}
 
+  public:
     iter_t(const iter_t &) = default;
     iter_t(iter_t &&) = default;
     iter_t &operator=(const iter_t &) = default;
@@ -1004,120 +952,43 @@ public:
       return index != rhs.index;
     }
 
+  private:
     omap_leaf_key_t get_node_key() const {
       omap_leaf_key_le_t kint = node->get_node_key_ptr()[index];
       return omap_leaf_key_t(kint);
     }
-
-    char *get_node_val_ptr() {
-      auto tail = node->buf + OMAP_BLOCK_SIZE;
-      if ( *this == node->iter_end())
-        return tail;
-      else
-        return tail - static_cast<int>(get_node_key().key_off);
-    }
-
-    const char *get_node_val_ptr() const {
-      auto tail = node->buf + OMAP_BLOCK_SIZE;
-      if ( *this == node->iter_end())
-        return tail;
-      else
-        return tail - static_cast<int>(get_node_key().key_off);
+    auto get_node_key_ptr() const {
+      return reinterpret_cast<
+       typename crimson::common::maybe_const_t<char, is_const>::type>(
+         node->get_node_key_ptr() + index);
     }
 
-    char *get_string_val_ptr() {
-      auto tail = node->buf + OMAP_BLOCK_SIZE;
-      return tail - static_cast<int>(get_node_key().val_off);
+    uint32_t get_node_val_offset() const {
+      return get_node_key().key_off;
     }
-
-    const char *get_string_val_ptr() const {
+    auto get_node_val_ptr() const {
       auto tail = node->buf + OMAP_BLOCK_SIZE;
-      return tail - static_cast<int>(get_node_key().val_off);
-    }
-
-    void set_node_val(const std::string &val) const {
-      static_assert(!is_const);
-      std::strcpy((char*)get_node_val_ptr(), val.c_str()); //copy char* to char* include "\0"
-    }
-
-    std::string get_node_val() {
-      std::string s(get_node_val_ptr());
-      return s;
-    }
-    std::string get_node_val() const{
-      std::string s(get_node_val_ptr());
-      return s;
-    }
-
-    void set_string_val(const std::string &val) {
-      static_assert(!is_const);
-      std::strcpy((char*)get_string_val_ptr(), val.c_str()); //copy char* to char* include "\0"
-    }
-
-    std::string get_string_val() const {
-      std::string s(get_string_val_ptr());
-      return s;
-    }
-
-    bool contains(std::string_view key) const {
-      auto next = *this + 1;
-      if (*this == node->iter_begin()){
-        if (next->get_node_val() > key)
-          return true;
-        else
-          return false;
+      if (*this == node->iter_end())
+        return tail;
+      else {
+        return tail - get_node_val_offset();
       }
-      if (next == node->iter_end())
-        return get_node_val() <= key;
-
-      return (get_node_val() <= key) && (next->get_node_val() > key);
-    }
-
-    uint16_t get_index() const {
-      return index;
-    }
-
-  private:
-    int get_right_offset() const {
-      return get_node_key().key_off;
     }
 
     int get_right_offset_end() const {
       if (index == 0)
-             return 0;
-      else
-             return (*this - 1)->get_right_offset();
-    }
-
-    char *get_right_ptr() {
-      return node->buf + OMAP_BLOCK_SIZE - get_right_offset();
-    }
-
-    const char *get_right_ptr() const {
-      static_assert(!is_const);
-      return node->buf + OMAP_BLOCK_SIZE - get_right_offset();
-    }
-
-    char *get_right_ptr_end() {
-      if (index == 0)
-             return node->buf + OMAP_BLOCK_SIZE;
+        return 0;
       else
-             return (*this - 1)->get_right_ptr();
+        return (*this - 1)->get_node_val_offset();
     }
-
-    const char *get_right_ptr_end() const {
-      if (index == 0)
-             return node->buf + OMAP_BLOCK_SIZE;
-      else
-             return (*this - 1)->get_right_ptr();
+    auto get_right_ptr_end() const {
+      return node->buf + OMAP_BLOCK_SIZE - get_right_offset_end();
     }
 
     void update_offset(int offset) {
       auto key = get_node_key();
       assert(offset + key.key_off >= 0);
-      assert(offset + key.val_off >= 0);
       key.key_off += offset;
-      key.val_off += offset;
       set_node_key(key);
     }
 
@@ -1128,22 +999,52 @@ public:
       node->get_node_key_ptr()[index] = lb;
     }
 
-    typename crimson::common::maybe_const_t<char, is_const>::type get_node_key_ptr() const {
-      return reinterpret_cast<
-        typename crimson::common::maybe_const_t<char, is_const>::type>(
-        node->get_node_key_ptr() + index);
+    void set_node_val(const std::string &key, const ceph::bufferlist &val) {
+      static_assert(!is_const);
+      auto node_key = get_node_key();
+      assert(key.size() == node_key.key_len);
+      assert(val.length() == node_key.val_len);
+      ::memcpy(get_node_val_ptr(), key.data(), key.size());
+      auto bliter = val.begin();
+      bliter.copy(node_key.val_len, get_node_val_ptr() + node_key.key_len);
+    }
+
+  public:
+    uint16_t get_index() const {
+      return index;
+    }
+
+    std::string get_key() const {
+      return std::string(
+       get_node_val_ptr(),
+       get_node_key().key_len);
+    }
+
+    std::string get_str_val() const {
+      auto node_key = get_node_key();
+      return std::string(
+       get_node_val_ptr() + node_key.key_len,
+       get_node_key().val_len);
+    }
+
+    ceph::bufferlist get_val() const {
+      auto node_key = get_node_key();
+      ceph::bufferlist bl;
+      ceph::bufferptr bptr(
+       get_node_val_ptr() + node_key.key_len,
+       get_node_key().val_len);
+      bl.append(bptr);
+      return bl;
     }
   };
   using const_iterator = iter_t<true>;
   using iterator = iter_t<false>;
 
-
 public:
-
   void journal_leaf_insert(
     const_iterator _iter,
     const std::string &key,
-    const std::string &val,
+    const ceph::bufferlist &val,
     delta_leaf_buffer_t *recorder) {
     auto iter = iterator(this, _iter.index);
     if (recorder) {
@@ -1157,23 +1058,22 @@ public:
   void journal_leaf_update(
     const_iterator _iter,
     const std::string &key,
-    const std::string &val,
+    const ceph::bufferlist &val,
     delta_leaf_buffer_t *recorder) {
     auto iter = iterator(this, _iter.index);
     if (recorder) {
-      recorder->remove(iter->get_node_val());
+      recorder->remove(iter->get_key());
       recorder->insert(key, val);
     }
     leaf_update(iter, key, val);
   }
 
-
   void journal_leaf_remove(
     const_iterator _iter,
     delta_leaf_buffer_t *recorder) {
     auto iter = iterator(this, _iter.index);
     if (recorder) {
-      recorder->remove(iter->get_node_val());
+      recorder->remove(iter->get_key());
     }
     leaf_remove(iter);
   }
@@ -1216,7 +1116,7 @@ public:
     while (start != end) {
       unsigned mid = (start + end) / 2;
       const_iterator iter(this, mid);
-      std::string s = iter->get_node_val();
+      std::string s = iter->get_key();
       if (s < str) {
         start = ++mid;
       } else if (s > str) {
@@ -1236,7 +1136,7 @@ public:
   const_iterator string_upper_bound(std::string_view str) const {
     auto ret = iter_begin();
     for (; ret != iter_end(); ++ret) {
-      std::string s = ret->get_node_val();
+      std::string s = ret->get_key();
       if (s > str)
         break;
     }
@@ -1251,7 +1151,7 @@ public:
   const_iterator find_string_key(std::string_view str) const {
     auto ret = iter_begin();
     for (; ret != iter_end(); ++ret) {
-      std::string s = ret->get_node_val();
+      std::string s = ret->get_key();
       if (s == str)
         break;
     }
@@ -1341,10 +1241,9 @@ public:
     auto iter = iter_begin();
     auto iter2 = rhs.iter_begin();
     while (iter != iter_end()) {
-      if(iter->get_node_key() != iter2->get_node_key() ||
-              iter->get_node_val() != iter2->get_node_val() ||
-         iter->get_string_val() != iter2->get_string_val()){
-             return false;
+      if(iter->get_key() != iter2->get_key() ||
+         iter->get_val() != iter2->get_val()) {
+       return false;
       }
       iter++;
       iter2++;
@@ -1373,7 +1272,7 @@ public:
     left.set_meta(lmeta);
     right.set_meta(rmeta);
 
-    return piviter->get_node_val();
+    return piviter->get_key();
   }
 
   /**
@@ -1442,8 +1341,8 @@ public:
     }
 
     auto replacement_pivot = pivot_idx >= left.get_size() ?
-      right.iter_idx(pivot_idx - left.get_size())->get_node_val() :
-      left.iter_idx(pivot_idx)->get_node_val();
+      right.iter_idx(pivot_idx - left.get_size())->get_key() :
+      left.iter_idx(pivot_idx)->get_key();
 
     if (pivot_size < left_size) {
       copy_from_foreign(
@@ -1494,42 +1393,40 @@ private:
   void leaf_insert(
     iterator iter,
     const std::string &key,
-    const std::string &val) {
+    const bufferlist &val) {
     if (iter != iter_begin()) {
-      assert((iter - 1)->get_node_val() < key);
+      assert((iter - 1)->get_key() < key);
     }
     if (iter != iter_end()) {
-      assert(iter->get_node_val() > key);
+      assert(iter->get_key() > key);
     }
-    assert(is_overflow(key.size() + 1, val.size() + 1) == false);
+    assert(!is_overflow(key.size(), val.length()));
     omap_leaf_key_t node_key;
     if (iter == iter_begin()) {
-      node_key.key_off = key.size() + 1 + val.size() + 1;
-      node_key.key_len = key.size() + 1;
-      node_key.val_off = val.size() + 1;
-      node_key.val_len = val.size() + 1;
+      node_key.key_off = key.size() + val.length();
+      node_key.key_len = key.size();
+      node_key.val_len = val.length();
     } else {
-      node_key.key_off = (iter - 1)->get_node_key().key_off + (key.size() + 1 + val.size() + 1);
-      node_key.key_len = key.size() + 1;
-      node_key.val_off = (iter - 1)->get_node_key().key_off + (val.size() + 1);
-      node_key.val_len = val.size() + 1;
+      node_key.key_off = (iter - 1)->get_node_key().key_off +
+       (key.size() + val.length());
+      node_key.key_len = key.size();
+      node_key.val_len = val.length();
     }
     if (get_size() != 0 && iter != iter_end())
       copy_from_local(node_key.key_len + node_key.val_len, iter + 1, iter, iter_end());
 
     iter->set_node_key(node_key);
     set_size(get_size() + 1);
-    iter->set_node_val(key);
-    iter->set_string_val(val);
+    iter->set_node_val(key, val);
   }
 
   void leaf_update(
     iterator iter,
     const std::string &key,
-    const std::string &val) {
+    const ceph::bufferlist &val) {
     assert(iter != iter_end());
-    assert(is_overflow(0, val.size() + 1) == false);
     leaf_remove(iter);
+    assert(!is_overflow(key.size(), val.length()));
     leaf_insert(iter, key, val);
   }
 
@@ -1559,17 +1456,17 @@ private:
 inline void delta_inner_t::replay(StringKVInnerNodeLayout &l) {
   switch (op) {
     case op_t::INSERT: {
-      l.inner_insert(l.string_lower_bound(val), key, val);
+      l.inner_insert(l.string_lower_bound(key), key, addr);
       break;
     }
     case op_t::UPDATE: {
-      auto iter = l.find_string_key(val);
+      auto iter = l.find_string_key(key);
       assert(iter != l.iter_end());
-      l.inner_update(iter, key);
+      l.inner_update(iter, addr);
       break;
     }
     case op_t::REMOVE: {
-      auto iter = l.find_string_key(val);
+      auto iter = l.find_string_key(key);
       assert(iter != l.iter_end());
       l.inner_remove(iter);
       break;
index ba9dd002206c1debdc670e1fba835e6dab277f8a..eacaf4b939a483379d210cd7954f6b598c620d95 100644 (file)
@@ -23,6 +23,28 @@ namespace {
   }
 }
 
+const int STR_LEN = 50;
+
+std::string rand_name(const int len)
+{
+  std::string ret;
+  ret.reserve(len);
+  for (int i = 0; i < len; ++i) {
+    ret.append(1, (char)(rand() % ('z' - '0')) + '0');
+  }
+  return ret;
+}
+
+bufferlist rand_buffer(const int len) {
+  bufferptr ptr(len);
+  for (auto i = ptr.c_str(); i < ptr.c_str() + len; ++i) {
+    *i = (char)rand();
+  }
+  bufferlist bl;
+  bl.append(ptr);
+  return bl;
+}
+
 struct omap_manager_test_t :
   public seastar_test_suite_t,
   TMTestState {
@@ -45,25 +67,54 @@ struct omap_manager_test_t :
     });
   }
 
-  using test_omap_t = std::map<std::string, std::string>;
+  using test_omap_t = std::map<std::string, ceph::bufferlist>;
   test_omap_t test_omap_mappings;
 
   void set_key(
     omap_root_t &omap_root,
     Transaction &t,
-    string &key,
-    string &val) {
+    const string &key,
+    const bufferlist &val) {
     omap_manager->omap_set_key(omap_root, t, key, val).unsafe_get0();
     test_omap_mappings[key] = val;
   }
 
-  std::pair<string, string> get_value(
+  void set_key(
+    omap_root_t &omap_root,
+    Transaction &t,
+    const string &key,
+    const string &val) {
+    bufferlist bl;
+    bl.append(val);
+    set_key(omap_root, t, key, bl);
+  }
+
+  std::string set_random_key(
+    omap_root_t &omap_root,
+    Transaction &t) {
+    auto key = rand_name(STR_LEN);
+    set_key(
+      omap_root,
+      t,
+      key,
+      rand_buffer(STR_LEN));
+    return key;
+  }
+
+  void get_value(
     omap_root_t &omap_root,
     Transaction &t,
     const string &key) {
     auto ret = omap_manager->omap_get_value(omap_root, t, key).unsafe_get0();
-    EXPECT_EQ(key, ret.first);
-    return ret;
+    auto iter = test_omap_mappings.find(key);
+    if (iter == test_omap_mappings.end()) {
+      EXPECT_FALSE(ret);
+    } else {
+      EXPECT_TRUE(ret);
+      if (ret) {
+       EXPECT_TRUE(*ret == iter->second);
+      }
+    }
   }
 
   void rm_key(
@@ -74,67 +125,38 @@ struct omap_manager_test_t :
     test_omap_mappings.erase(test_omap_mappings.find(key));
   }
 
-  list_keys_result_t list_keys(
-    omap_root_t &omap_root,
+  void list(
+    const omap_root_t &omap_root,
     Transaction &t,
-    std::string &start,
+    const std::optional<std::string> &start,
     size_t max = MAX_SIZE) {
-    auto ret = omap_manager->omap_list_keys(omap_root, t, start, max).unsafe_get0();
-    if (start == "" && max == MAX_SIZE) {
-      EXPECT_EQ(test_omap_mappings.size(), ret.keys.size());
-      for ( auto &i : ret.keys) {
-        auto it = test_omap_mappings.find(i);
-        EXPECT_NE(it, test_omap_mappings.end());
-        EXPECT_EQ(i, it->first);
-      }
-      EXPECT_EQ(ret.next, std::nullopt);
+
+    if (start) {
+      logger().debug("list on {}", *start);
     } else {
-      size_t i =0;
-      auto it = test_omap_mappings.find(start);
-      for (; it != test_omap_mappings.end() && i < max; it++) {
-        EXPECT_EQ(ret.keys[i], it->first);
-        i++;
-      }
-      if (it == test_omap_mappings.end()) {
-        EXPECT_EQ(ret.next, std::nullopt);
-      } else {
-        EXPECT_EQ(ret.keys.size(), max);
-        EXPECT_EQ(ret.next, it->first);
-      }
+      logger().debug("list on start");
     }
-    return ret;
-  }
 
-  list_kvs_result_t list(
-    omap_root_t &omap_root,
-    Transaction &t,
-    std::string &start,
-    size_t max = MAX_SIZE) {
-    auto ret = omap_manager->omap_list(omap_root, t, start, max).unsafe_get0();
-    if (start == "" && max == MAX_SIZE) {
-      EXPECT_EQ(test_omap_mappings.size(), ret.kvs.size());
-      for ( auto &i : ret.kvs) {
-        auto it = test_omap_mappings.find(i.first);
-        EXPECT_NE(it, test_omap_mappings.end());
-        EXPECT_EQ(i.second, it->second);
-      }
-      EXPECT_EQ(ret.next, std::nullopt);
+    auto [complete, results] = omap_manager->omap_list(
+      omap_root, t, start, max
+    ).unsafe_get0();
+
+    auto it = start ?
+      test_omap_mappings.upper_bound(*start) :
+      test_omap_mappings.begin();
+    for (auto &&[k, v]: results) {
+      EXPECT_NE(it, test_omap_mappings.end());
+      if (it == test_omap_mappings.end())
+       return;
+      EXPECT_EQ(k, it->first);
+      EXPECT_EQ(v, it->second);
+      it++;
+    }
+    if (it == test_omap_mappings.end()) {
+      EXPECT_TRUE(complete);
     } else {
-      size_t i = 0;
-      auto it = test_omap_mappings.find(start);
-      for (; it != test_omap_mappings.end() && i < max; it++) {
-        EXPECT_EQ(ret.kvs[i].first, it->first);
-        i++;
-      }
-      if (it == test_omap_mappings.end()) {
-        EXPECT_EQ(ret.next, std::nullopt);
-      } else {
-        EXPECT_EQ(ret.kvs.size(), max);
-        EXPECT_EQ(ret.next, it->first);
-      }
+      EXPECT_EQ(results.size(), max);
     }
-
-    return ret;
   }
 
   void clear(
@@ -146,9 +168,7 @@ struct omap_manager_test_t :
 
   void check_mappings(omap_root_t &omap_root, Transaction &t) {
     for (const auto &i: test_omap_mappings){
-      auto ret = get_value(omap_root, t, i.first);
-      EXPECT_EQ(i.first, ret.first);
-      EXPECT_EQ(i.second, ret.second);
+      get_value(omap_root, t, i.first);
     }
   }
 
@@ -165,26 +185,6 @@ struct omap_manager_test_t :
   }
 };
 
-char* rand_string(char* str, const int len)
-{
-  int i;
-  for (i = 0; i < len; ++i) {
-    switch (rand() % 3) {
-      case 1:
-        str[i] = 'A' + rand() % 26;
-        break;
-      case 2:
-        str[i] = 'a' +rand() % 26;
-        break;
-      case 0:
-        str[i] = '0' + rand() % 10;
-        break;
-    }
-  }
-  str[len] = '\0';
-  return str;
-}
-
 TEST_F(omap_manager_test_t, basic)
 {
   run_async([this] {
@@ -197,27 +197,26 @@ TEST_F(omap_manager_test_t, basic)
 
     string key = "owner";
     string val = "test";
+
     {
       auto t = tm->create_transaction();
       logger().debug("first transaction");
       set_key(omap_root, *t, key, val);
-      [[maybe_unused]] auto getret = get_value(omap_root, *t, key);
+      get_value(omap_root, *t, key);
       tm->submit_transaction(std::move(t)).unsafe_get();
     }
     {
       auto t = tm->create_transaction();
       logger().debug("second transaction");
-      [[maybe_unused]] auto getret = get_value(omap_root, *t, key);
+      get_value(omap_root, *t, key);
       rm_key(omap_root, *t, key);
-      [[maybe_unused]] auto getret2 = get_value(omap_root, *t, key);
-      EXPECT_EQ(getret2.second, "");
+      get_value(omap_root, *t, key);
       tm->submit_transaction(std::move(t)).unsafe_get();
     }
     {
       auto t = tm->create_transaction();
       logger().debug("third transaction");
-      [[maybe_unused]] auto getret = get_value(omap_root, *t, key);
-      EXPECT_EQ(getret.second, "");
+      get_value(omap_root, *t, key);
       tm->submit_transaction(std::move(t)).unsafe_get();
     }
   });
@@ -232,15 +231,11 @@ TEST_F(omap_manager_test_t, force_leafnode_split)
       omap_root = omap_manager->initialize_omap(*t).unsafe_get0();
       tm->submit_transaction(std::move(t)).unsafe_get();
     }
-    const int STR_LEN = 50;
-    char str[STR_LEN + 1];
     for (unsigned i = 0; i < 40; i++) {
       auto t = tm->create_transaction();
       logger().debug("opened transaction");
       for (unsigned j = 0; j < 10; ++j) {
-        string key(rand_string(str, rand() % STR_LEN));
-        string val(rand_string(str, rand() % STR_LEN));
-        set_key(omap_root, *t, key, val);
+        set_random_key(omap_root, *t);
         if ((i % 20 == 0) && (j == 5)) {
           check_mappings(omap_root, *t);
         }
@@ -261,16 +256,12 @@ TEST_F(omap_manager_test_t, force_leafnode_split_merge)
       omap_root = omap_manager->initialize_omap(*t).unsafe_get0();
       tm->submit_transaction(std::move(t)).unsafe_get();
     }
-    const int STR_LEN = 50;
-    char str[STR_LEN + 1];
 
     for (unsigned i = 0; i < 80; i++) {
       auto t = tm->create_transaction();
       logger().debug("opened split_merge transaction");
       for (unsigned j = 0; j < 5; ++j) {
-        string key(rand_string(str, rand() % STR_LEN));
-        string val(rand_string(str, rand() % STR_LEN));
-        set_key(omap_root, *t, key, val);
+        set_random_key(omap_root, *t);
         if ((i % 10 == 0) && (j == 3)) {
           check_mappings(omap_root, *t);
         }
@@ -314,16 +305,12 @@ TEST_F(omap_manager_test_t, force_leafnode_split_merge_fullandbalanced)
       omap_root = omap_manager->initialize_omap(*t).unsafe_get0();
       tm->submit_transaction(std::move(t)).unsafe_get();
     }
-    const int STR_LEN = 50;
-    char str[STR_LEN + 1];
 
     for (unsigned i = 0; i < 50; i++) {
       auto t = tm->create_transaction();
       logger().debug("opened split_merge transaction");
       for (unsigned j = 0; j < 5; ++j) {
-        string key(rand_string(str, rand() % STR_LEN));
-        string val(rand_string(str, rand() % STR_LEN));
-        set_key(omap_root, *t, key, val);
+        set_random_key(omap_root, *t);
         if ((i % 10 == 0) && (j == 3)) {
           check_mappings(omap_root, *t);
         }
@@ -362,7 +349,6 @@ TEST_F(omap_manager_test_t, force_leafnode_split_merge_fullandbalanced)
   });
 }
 
-
 TEST_F(omap_manager_test_t, force_split_listkeys_list_clear)
 {
   run_async([this] {
@@ -372,16 +358,12 @@ TEST_F(omap_manager_test_t, force_split_listkeys_list_clear)
       omap_root = omap_manager->initialize_omap(*t).unsafe_get0();
       tm->submit_transaction(std::move(t)).unsafe_get();
     }
-    const int STR_LEN = 300;
-    char str[STR_LEN + 1];
     string temp;
     for (unsigned i = 0; i < 40; i++) {
       auto t = tm->create_transaction();
       logger().debug("opened transaction");
       for (unsigned j = 0; j < 10; ++j) {
-        string key(rand_string(str, rand() % STR_LEN));
-        string val(rand_string(str, rand() % STR_LEN));
-        set_key(omap_root, *t, key, val);
+        auto key = set_random_key(omap_root, *t);
         if (i == 10)
           temp = key;
         if ((i % 20 == 0) && (j == 5)) {
@@ -392,27 +374,22 @@ TEST_F(omap_manager_test_t, force_split_listkeys_list_clear)
       tm->submit_transaction(std::move(t)).unsafe_get();
       check_mappings(omap_root);
     }
-    std::string empty = "";
-    auto t = tm->create_transaction();
-    [[maybe_unused]] auto keys = list_keys(omap_root, *t, empty);
-    tm->submit_transaction(std::move(t)).unsafe_get();
-
-    t = tm->create_transaction();
-    keys = list_keys(omap_root, *t, temp, 100);
-    tm->submit_transaction(std::move(t)).unsafe_get();
 
-    t = tm->create_transaction();
-    [[maybe_unused]] auto ls = list(omap_root, *t, empty);
-    tm->submit_transaction(std::move(t)).unsafe_get();
-
-    t = tm->create_transaction();
-    ls = list(omap_root, *t, temp, 100);
-    tm->submit_transaction(std::move(t)).unsafe_get();
+    {
+      auto t = tm->create_transaction();
+      list(omap_root, *t, std::nullopt);
+    }
 
-    t = tm->create_transaction();
-    clear(omap_root, *t);
-    tm->submit_transaction(std::move(t)).unsafe_get();
+    {
+      auto t = tm->create_transaction();
+      list(omap_root, *t, temp, 100);
+    }
 
+    {
+      auto t = tm->create_transaction();
+      clear(omap_root, *t);
+      tm->submit_transaction(std::move(t)).unsafe_get();
+    }
   });
 }
 
@@ -425,16 +402,12 @@ TEST_F(omap_manager_test_t, internal_force_split)
       omap_root = omap_manager->initialize_omap(*t).unsafe_get0();
       tm->submit_transaction(std::move(t)).unsafe_get();
     }
-    const int STR_LEN = 300;
-    char str[STR_LEN + 1];
     for (unsigned i = 0; i < 10; i++) {
       logger().debug("opened split transaction");
       auto t = tm->create_transaction();
 
       for (unsigned j = 0; j < 80; ++j) {
-        string key(rand_string(str, rand() % STR_LEN));
-        string val(rand_string(str, rand() % STR_LEN));
-        set_key(omap_root, *t, key, val);
+        set_random_key(omap_root, *t);
         if ((i % 2 == 0) && (j % 50 == 0)) {
           check_mappings(omap_root, *t);
         }
@@ -455,17 +428,13 @@ TEST_F(omap_manager_test_t, internal_force_merge_fullandbalanced)
       omap_root = omap_manager->initialize_omap(*t).unsafe_get0();
       tm->submit_transaction(std::move(t)).unsafe_get();
     }
-    const int STR_LEN = 300;
-    char str[STR_LEN + 1];
 
     for (unsigned i = 0; i < 8; i++) {
       logger().debug("opened split transaction");
       auto t = tm->create_transaction();
 
       for (unsigned j = 0; j < 80; ++j) {
-        string key(rand_string(str, rand() % STR_LEN));
-        string val(rand_string(str, rand() % STR_LEN));
-        set_key(omap_root, *t, key, val);
+        set_random_key(omap_root, *t);
         if ((i % 2 == 0) && (j % 50 == 0)) {
           check_mappings(omap_root, *t);
         }
@@ -507,17 +476,13 @@ TEST_F(omap_manager_test_t, replay)
       tm->submit_transaction(std::move(t)).unsafe_get();
       replay();
     }
-    const int STR_LEN = 300;
-    char str[STR_LEN + 1];
 
     for (unsigned i = 0; i < 8; i++) {
       logger().debug("opened split transaction");
       auto t = tm->create_transaction();
 
       for (unsigned j = 0; j < 80; ++j) {
-        string key(rand_string(str, rand() % STR_LEN));
-        string val(rand_string(str, rand() % STR_LEN));
-        set_key(omap_root, *t, key, val);
+        set_random_key(omap_root, *t);
         if ((i % 2 == 0) && (j % 50 == 0)) {
           check_mappings(omap_root, *t);
         }
@@ -564,31 +529,22 @@ TEST_F(omap_manager_test_t, internal_force_split_to_root)
       omap_root = omap_manager->initialize_omap(*t).unsafe_get0();
       tm->submit_transaction(std::move(t)).unsafe_get();
     }
-    const int STR_LEN = 300;
-    char str[STR_LEN + 1];
 
     logger().debug("set big keys");
     for (unsigned i = 0; i < 53; i++) {
       auto t = tm->create_transaction();
 
       for (unsigned j = 0; j < 8; ++j) {
-        string key(rand_string(str, STR_LEN));
-        string val(rand_string(str, STR_LEN));
-        set_key(omap_root, *t, key, val);
+        set_random_key(omap_root, *t);
       }
       logger().debug("submitting transaction i = {}", i);
       tm->submit_transaction(std::move(t)).unsafe_get();
     }
      logger().debug("set small keys");
-     const int STR_LEN_2 = 100;
-     char str_2[STR_LEN_2 + 1];
      for (unsigned i = 0; i < 100; i++) {
        auto t = tm->create_transaction();
-
        for (unsigned j = 0; j < 8; ++j) {
-         string key(rand_string(str_2, STR_LEN_2));
-         string val(rand_string(str_2, STR_LEN_2));
-         set_key(omap_root, *t, key, val);
+         set_random_key(omap_root, *t);
        }
       logger().debug("submitting transaction last");
       tm->submit_transaction(std::move(t)).unsafe_get();