]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/os: add omap_iterate interface to futurized store
authorchunmei liu <chunmei.liu@ibm.com>
Wed, 19 Mar 2025 20:50:45 +0000 (13:50 -0700)
committerchunmei liu <chunmei.liu@ibm.com>
Tue, 15 Jul 2025 02:58:23 +0000 (19:58 -0700)
Signed-off-by: chunmei liu <chunmei.liu@ibm.com>
13 files changed:
src/crimson/os/alienstore/alien_store.cc
src/crimson/os/alienstore/alien_store.h
src/crimson/os/cyanstore/cyan_store.cc
src/crimson/os/cyanstore/cyan_store.h
src/crimson/os/futurized_store.h
src/crimson/os/seastore/omap_manager.h
src/crimson/os/seastore/omap_manager/btree/btree_omap_manager.cc
src/crimson/os/seastore/omap_manager/btree/btree_omap_manager.h
src/crimson/os/seastore/omap_manager/btree/omap_btree_node.h
src/crimson/os/seastore/omap_manager/btree/omap_btree_node_impl.cc
src/crimson/os/seastore/omap_manager/btree/omap_btree_node_impl.h
src/crimson/os/seastore/seastore.cc
src/crimson/os/seastore/seastore.h

index c44abe46ae95654af6cf97ab74b6612265837c61..9ac25d8034e5643f52520c9966dc7ba8c3b331dc 100644 (file)
@@ -465,6 +465,35 @@ auto AlienStore::omap_get_values(CollectionRef ch,
   });
 }
 
+AlienStore::read_errorator::future<ObjectStore::omap_iter_ret_t>
+AlienStore::omap_iterate(CollectionRef ch,
+                         const ghobject_t &oid,
+                         ObjectStore::omap_iter_seek_t start_from,
+                         omap_iterate_cb_t callback,
+                         uint32_t op_flags)
+{
+  logger().debug("{} with_start", __func__);
+  assert(tp);
+  return do_with_op_gate(oid, [ch, start_from, callback, this] (auto& oid) {
+    return tp->submit(ch->get_cid().hash_to_shard(tp->size()), [ch, oid, start_from, callback, this] {
+      auto c = static_cast<AlienCollection*>(ch.get());
+      return store->omap_iterate(
+        c->collection, oid, start_from, callback);
+    }).then([] (int r)
+      -> read_errorator::future<ObjectStore::omap_iter_ret_t> {
+      if (r == -ENOENT) {
+        return crimson::ct_error::enoent::make();
+      } else {
+        if (r == 1) {
+          return read_errorator::make_ready_future<ObjectStore::omap_iter_ret_t>(ObjectStore::omap_iter_ret_t::STOP);
+        } else {
+          return read_errorator::make_ready_future<ObjectStore::omap_iter_ret_t>(ObjectStore::omap_iter_ret_t::NEXT);
+        }
+      }
+    });
+  });
+}
+
 seastar::future<> AlienStore::do_transaction_no_callbacks(
   CollectionRef ch,
   ceph::os::Transaction&& txn)
index 1d39411450e8beddc829530a8e1052536e4e4577..2c3d2b2d02cab4d351d848dc9008e04972b4ca21 100644 (file)
@@ -81,6 +81,13 @@ public:
     uint64_t limit,
     uint32_t op_flags = 0) const final;
 
+  read_errorator::future<ObjectStore::omap_iter_ret_t> omap_iterate(
+    CollectionRef c,
+    const ghobject_t &oid,
+    ObjectStore::omap_iter_seek_t start_from,
+    omap_iterate_cb_t callback,
+    uint32_t op_flags = 0) final;
+
   seastar::future<CollectionRef> create_new_collection(const coll_t& cid) final;
   seastar::future<CollectionRef> open_collection(const coll_t& cid) final;
   seastar::future<std::vector<coll_core_t>> list_collections() final;
index 41819fb5eb6ba9c450600b1e480fbc7e707853d7..73ec8bbc23404c11bcff7b0c6768a028325332db 100644 (file)
@@ -413,6 +413,33 @@ auto CyanStore::Shard::omap_get_values(
     std::make_tuple(true, std::move(values)));
 }
 
+auto CyanStore::Shard::omap_iterate(
+  CollectionRef ch,
+  const ghobject_t &oid,
+  ObjectStore::omap_iter_seek_t start_from,
+  omap_iterate_cb_t callback,
+  uint32_t op_flags)
+  -> CyanStore::Shard::read_errorator::future<ObjectStore::omap_iter_ret_t>
+{
+  auto c = static_cast<Collection*>(ch.get());
+  logger().debug("{} {} {}", __func__, c->get_cid(), oid);
+  auto o = c->get_object(oid);
+  if (!o) {
+    return crimson::ct_error::enoent::make();
+  }
+  auto ret = ObjectStore::omap_iter_ret_t::NEXT;
+  auto i = (start_from.seek_type == ObjectStore::omap_iter_seek_t::LOWER_BOUND) ?
+            o->omap.lower_bound(start_from.seek_position) :
+            o->omap.upper_bound(start_from.seek_position);
+  for (; i != o->omap.end(); ++i) {
+    ceph::bufferlist bl = i->second;
+    std::string result(bl.c_str(), bl.length());
+    ret = callback(i->first, result);
+    if (ret == ObjectStore::omap_iter_ret_t::STOP)
+      break;
+  }
+  return read_errorator::make_ready_future<ObjectStore::omap_iter_ret_t>(ret);
+}
 auto CyanStore::Shard::omap_get_header(
   CollectionRef ch,
   const ghobject_t& oid,
index 93d0d1443119877aaae18921d93ea54d2a2894e6..d5029d9e367540141e7651cf3050ff0d4084799f 100644 (file)
@@ -79,6 +79,14 @@ public:
       uint32_t op_flags = 0
       ) final;
 
+    read_errorator::future<ObjectStore::omap_iter_ret_t> omap_iterate(
+      CollectionRef c,
+      const ghobject_t &oid,
+      ObjectStore::omap_iter_seek_t start_from,
+      omap_iterate_cb_t callback,
+      uint32_t op_flags = 0
+    ) final;
+
     get_attr_errorator::future<ceph::bufferlist> omap_get_header(
       CollectionRef c,
       const ghobject_t& oid,
index e7d4c8546de098581c1c22eea57a5995b0ce6d22..c19d893fb39a7d8813d03477fe81f4508c70f8de 100644 (file)
@@ -17,6 +17,7 @@
 #include "include/buffer_fwd.h"
 #include "include/uuid.h"
 #include "osd/osd_types.h"
+#include "os/ObjectStore.h"
 
 namespace ceph::os {
 class Transaction;
@@ -95,6 +96,37 @@ public:
       uint32_t op_flags = 0
       ) = 0; ///< @return <done, values> values.empty() only if done
 
+    /**
+     * Iterate over object map with user-provided callable
+     *
+     * Warning! f cannot block or perform IO and must not wait on a future.
+     *
+     * @param c collection
+     * @param oid object
+     * @param start_from where the iterator should point to at
+     *                   the beginning
+     * @param f callable that takes OMAP key and corresponding
+     *          value as string_views and controls iteration
+     *          by the return. It is executed for every object's
+     *          OMAP entry from `start_from` till end of the
+     *          object's OMAP or till the iteration is stopped
+     *          by `STOP`. Please note that if there is no such
+     *          entry, `visitor` will be called 0 times.
+     * @return omap_iter_ret_t on success
+     *         omap_iter_ret_t::STOP means omap_iterate() is stopped by f,
+     *         omap_iter_ret_t::NEXT means omap_iterate() reaches the end of omap tree
+     */
+    using omap_iterate_cb_t = std::function<ObjectStore::omap_iter_ret_t(std::string_view, std::string_view)>;
+    virtual read_errorator::future<ObjectStore::omap_iter_ret_t> omap_iterate(
+      CollectionRef c,   ///< [in] collection
+      const ghobject_t &oid, ///< [in] object
+      ObjectStore::omap_iter_seek_t start_from, ///< [in] where the iterator should point to at the beginning
+      omap_iterate_cb_t callback,
+      ///< [in] the callback function for each OMAP entry after start_from till end of the OMAP or
+      /// till the iteration is stopped by `STOP`.
+      uint32_t op_flags = 0
+      ) = 0;
+
     virtual get_attr_errorator::future<bufferlist> omap_get_header(
       CollectionRef c,
       const ghobject_t& oid,
index 85a3f89a96f75a77a75d788f5de6c90f25c3664e..a8d6b3722ef9d4de600b04b9dbe2036151bcec00 100644 (file)
@@ -4,6 +4,7 @@
 #pragma once
 
 #include <iostream>
+#include <fmt/format.h>
 
 #include <boost/intrusive_ptr.hpp>
 #include <boost/smart_ptr/intrusive_ref_counter.hpp>
@@ -13,6 +14,8 @@
 #include "crimson/osd/exceptions.h"
 #include "crimson/os/seastore/seastore_types.h"
 #include "crimson/os/seastore/transaction_manager.h"
+#include "os/ObjectStore.h" // for ObjectStore::omap_iter_seek_t
+
 
 //TODO: calculate the max key and value sizes the current layout supports,
 //     and return errors during insert if the max is exceeded.
 #define OMAP_LEAF_BLOCK_SIZE 65536
 #define LOG_LEAF_BLOCK_SIZE 16384
 
+template <>
+struct fmt::formatter<ObjectStore::omap_iter_seek_t> {
+  constexpr auto parse(format_parse_context& ctx) const {
+    return ctx.begin();
+  }
+
+  auto format(const ObjectStore::omap_iter_seek_t& seek, format_context& ctx) const {
+    return fmt::format_to(
+      ctx.out(),
+      "omap_iter_seek_t{{seek_position='{}', seek_type={}}}",
+      seek.seek_position,
+      seek.seek_type == ObjectStore::omap_iter_seek_t::LOWER_BOUND ? "LOWER_BOUND" : "UPPER_BOUND"
+    );
+  }
+};
+
 namespace crimson::os::seastore {
 
 std::ostream &operator<<(std::ostream &out, const std::list<std::string> &rhs);
@@ -98,10 +117,35 @@ public:
     Transaction &t,
     const std::string &key) = 0;
 
+  /**
+   * omap_iterate
+   *
+   * scan key/value pairs and give key/value from start_from.seek_poistion to function f
+   *
+   * @param omap_root: omap btree root information
+   * @param t: current transaction
+   * @param start_from: seek start key and seek type
+   *                    Upon transaction conflict, start_from will be updated to the
+   *                    upper bound of the last key of the last visited omap leaf node.
+   * @param f: function is called for each seeked key/value pair
+   *
+   */
+   using omap_iterate_cb_t = std::function<ObjectStore::omap_iter_ret_t(std::string_view, std::string_view)>;
+   using omap_iterate_iertr = base_iertr;
+   using omap_iterate_ret = omap_iterate_iertr::future<ObjectStore::omap_iter_ret_t>;
+   virtual omap_iterate_ret omap_iterate(
+     const omap_root_t &omap_root,
+     Transaction &t,
+     ObjectStore::omap_iter_seek_t &start_from,
+     omap_iterate_cb_t callback
+   ) = 0;
+
   /**
    * omap_list
    *
    * Scans key/value pairs in order.
+   * // TODO: omap_list() is deprecated in favor of omap_iterate()
+   * // TODO: omap_rm_key_range() that omap_list_config_t needs to be dropped from that interface
    *
    * @param omap_root: omap btree root information
    * @param t: current transaction
index 5f5bd3b50f678a843552096a9198dc49605cace9..ade8d986bf468dff74d9f4aede105756c1d35736 100644 (file)
@@ -268,6 +268,26 @@ BtreeOMapManager::omap_rm_key_range(
   });
 }
 
+BtreeOMapManager::omap_iterate_ret
+BtreeOMapManager::omap_iterate(
+  const omap_root_t &omap_root,
+  Transaction &t,
+  ObjectStore::omap_iter_seek_t &start_from,
+  omap_iterate_cb_t callback)
+{
+  LOG_PREFIX(BtreeOMapManager::omap_iterate);
+  DEBUGT("{}, {}", t, omap_root, start_from);
+  return get_omap_root(
+    get_omap_context(t, omap_root),
+    omap_root
+  ).si_then([this, &t, &start_from, callback, &omap_root](auto extent) {
+    return extent->iterate(
+      get_omap_context(t, omap_root),
+      start_from,
+      callback);
+  });
+}
+
 BtreeOMapManager::omap_list_ret
 BtreeOMapManager::omap_list(
   const omap_root_t &omap_root,
index 6de3ade3f7b765538e71a4ca93eaa083301d091b..9522a4bcd35fff2ccc51fe45bf9c98a85019d3f5 100644 (file)
@@ -96,6 +96,12 @@ public:
     const std::string &last,
     omap_list_config_t config) final;
 
+  omap_iterate_ret omap_iterate(
+    const omap_root_t &omap_root,
+    Transaction &t,
+    ObjectStore::omap_iter_seek_t &start_from,
+    omap_iterate_cb_t callback) final;
+
   omap_list_ret omap_list(
     const omap_root_t &omap_root,
     Transaction &t,
index 52dd2869eb8c6a99171ec37a209f7a984fe2b852..140513beb9f9a665ca7919e911395deba3625639 100644 (file)
@@ -80,6 +80,14 @@ struct OMapNode : LogicalChildNode {
     omap_context_t oc,
     const std::string &key) = 0;
 
+  using iterate_iertr = base_iertr;
+  using iterate_ret = OMapManager::omap_iterate_ret;
+  using omap_iterate_cb_t = OMapManager::omap_iterate_cb_t;
+  virtual iterate_ret iterate(
+    omap_context_t oc,
+    ObjectStore::omap_iter_seek_t &start_from,
+    omap_iterate_cb_t callback) = 0;
+
   using omap_list_config_t = OMapManager::omap_list_config_t;
   using list_iertr = base_iertr;
   using list_bare_ret = OMapManager::omap_list_bare_ret;
index 346977b2c9d5645371ba8eb71340ce08ef7f3ef1..3b26c75b413c5e7d73c07bef10058fe10743436b 100644 (file)
@@ -223,6 +223,50 @@ OMapInnerNode::rm_key(omap_context_t oc, const std::string &key)
   });
 }
 
+OMapInnerNode::iterate_ret
+OMapInnerNode::iterate(
+  omap_context_t oc,
+  ObjectStore::omap_iter_seek_t &start_from,
+  omap_iterate_cb_t callback)
+{
+  LOG_PREFIX(OMapInnerNode::iterate);
+  DEBUGT("{}, this: {}", oc.t, start_from, *this);
+
+  auto start_iter = get_containing_child(start_from.seek_position);
+  return seastar::do_with(
+    iter_t(start_iter),
+    ObjectStore::omap_iter_ret_t{ObjectStore::omap_iter_ret_t::NEXT},
+    [this, &start_from, oc, callback]
+    (auto &iter, auto &ret)
+  {
+    return trans_intr::repeat(
+      [&start_from, &iter, &ret, oc, callback, this]()
+      -> iterate_iertr::future<seastar::stop_iteration>
+    {
+      if (iter == iter_cend()) {
+        return iterate_iertr::make_ready_future<seastar::stop_iteration>(
+               seastar::stop_iteration::yes);
+      }
+      return get_child_node(oc, iter
+      ).si_then([&start_from, &iter, &ret, callback, oc] (auto &&extent) {
+        return extent->iterate(oc, start_from, callback
+        ).si_then([&ret, &iter](auto &&child_ret) mutable {
+          ret = child_ret;
+          if (child_ret == ObjectStore::omap_iter_ret_t::STOP) {
+            return iterate_iertr::make_ready_future<seastar::stop_iteration>(
+                   seastar::stop_iteration::yes);
+          }
+          ++iter;
+          return iterate_iertr::make_ready_future<seastar::stop_iteration>(
+                 seastar::stop_iteration::no);
+        });
+      });
+    }).si_then([&ret, ref = OMapNodeRef(this)] {
+      return iterate_iertr::make_ready_future<ObjectStore::omap_iter_ret_t>(std::move(ret));
+    });
+  });
+}
+
 OMapInnerNode::list_ret
 OMapInnerNode::list(
   omap_context_t oc,
@@ -749,6 +793,38 @@ OMapLeafNode::rm_key(omap_context_t oc, const std::string &key)
 
 }
 
+OMapLeafNode::iterate_ret
+OMapLeafNode::iterate(
+  omap_context_t oc,
+  ObjectStore::omap_iter_seek_t &start_from,
+  omap_iterate_cb_t callback)
+{
+  LOG_PREFIX(OMapLeafNode::iterate);
+  DEBUGT("{}, this: {}", oc.t, start_from, *this);
+
+  auto ret = ObjectStore::omap_iter_ret_t::NEXT;
+  auto iter = start_from.seek_type == ObjectStore::omap_iter_seek_t::LOWER_BOUND ?
+                string_lower_bound(start_from.seek_position) :
+                string_upper_bound(start_from.seek_position);
+
+  std::string key;
+  for(; iter != iter_end(); iter++) {
+    ceph::bufferlist bl = iter->get_val();
+    std::string result(bl.c_str(), bl.length());
+    key = iter->get_key();
+    ret = callback(key, result);
+    if (ret == ObjectStore::omap_iter_ret_t::STOP) {
+      break;
+    }
+  }
+  if (!key.empty()) {
+    start_from.seek_position = key;
+  }
+  start_from.seek_type = ObjectStore::omap_iter_seek_t::UPPER_BOUND;
+
+  return iterate_iertr::make_ready_future<ObjectStore::omap_iter_ret_t>(std::move(ret));
+}
+
 OMapLeafNode::list_ret
 OMapLeafNode::list(
   omap_context_t oc,
index c42030a6576315e8630ec554f8e36add6b255d4d..31e1fdc197a8b6d5c6c0b1d4c01853e590766840 100644 (file)
@@ -172,6 +172,11 @@ struct OMapInnerNode
     omap_context_t oc,
     const std::string &key) final;
 
+  iterate_ret iterate(
+    omap_context_t oc,
+    ObjectStore::omap_iter_seek_t &start_from,
+    omap_iterate_cb_t callback) final;
+
   list_ret list(
     omap_context_t oc,
     const std::optional<std::string> &first,
@@ -413,6 +418,11 @@ struct OMapLeafNode
   rm_key_ret rm_key(
     omap_context_t oc, const std::string &key) final;
 
+  iterate_ret iterate(
+    omap_context_t oc,
+    ObjectStore::omap_iter_seek_t &start_from,
+    omap_iterate_cb_t callback) final;
+
   list_ret list(
     omap_context_t oc,
     const std::optional<std::string> &first,
index 0fc5e3bcaf0806b6fb3072f8c6270d604383e794..f24dd51f054a83416cbd5a03d89de5752be87fe7 100644 (file)
@@ -65,6 +65,9 @@ template <> struct fmt::formatter<crimson::os::seastore::op_type_t>
     case op_type_t::OMAP_GET_VALUES2:
       name = "omap_get_values2";
       break;
+    case op_type_t::OMAP_ITERATE:
+      name = "omap_iterate";
+      break;
     case op_type_t::MAX:
       name = "unknown";
       break;
@@ -164,6 +167,7 @@ void SeaStore::Shard::register_metrics()
     {op_type_t::STAT,             sm::label_instance("latency", "STAT")},
     {op_type_t::OMAP_GET_VALUES,  sm::label_instance("latency", "OMAP_GET_VALUES")},
     {op_type_t::OMAP_GET_VALUES2, sm::label_instance("latency", "OMAP_GET_VALUES2")},
+    {op_type_t::OMAP_ITERATE,     sm::label_instance("latency", "OMAP_ITERATE")},
   };
 
   for (auto& [op_type, label] : labels_by_op_type) {
@@ -1454,6 +1458,39 @@ SeaStore::Shard::omap_get_values(
   });
 }
 
+SeaStore::Shard::read_errorator::future<ObjectStore::omap_iter_ret_t>
+SeaStore::Shard::omap_iterate(
+  CollectionRef ch,
+  const ghobject_t &oid,
+  ObjectStore::omap_iter_seek_t start_from,
+  omap_iterate_cb_t callback,
+  uint32_t op_flags)
+{
+  ++(shard_stats.read_num);
+  ++(shard_stats.pending_read_num);
+  return seastar::do_with(
+    std::move(start_from),
+    [this, ch, &oid, callback, op_flags] (auto &start_from)
+  {
+    return repeat_with_onode<ObjectStore::omap_iter_ret_t>(
+      ch,
+      oid,
+      Transaction::src_t::READ,
+      "omap_iterate",
+      op_type_t::OMAP_ITERATE,
+      op_flags,
+      [this, &start_from, callback](auto &t, auto &onode)
+    {
+      auto root = select_log_omap_root(onode);
+      return omaptree_iterate(
+        t, std::move(root), start_from, callback);
+    }).finally([this] {
+      assert(shard_stats.pending_read_num);
+      --(shard_stats.pending_read_num);
+    });
+  });
+}
+
 SeaStore::base_iertr::future<SeaStore::Shard::fiemap_ret_t>
 SeaStore::Shard::_fiemap(
   Transaction &t,
@@ -2583,6 +2620,29 @@ SeaStore::Shard::omaptree_get_values(
   });
 }
 
+SeaStore::Shard::omaptree_iterate_ret
+SeaStore::Shard::omaptree_iterate(
+  Transaction& t,
+  omap_root_t&& root,
+  ObjectStore::omap_iter_seek_t &start_from,
+  omap_iterate_cb_t callback)
+{
+  LOG_PREFIX(SeaStoreS::omaptree_iterate);
+  auto type = root.get_type();
+  DEBUGT("{}, {} ", t, type, start_from);
+  if (root.is_null()) {
+    DEBUGT("{} root is null", t, type);
+    return seastar::make_ready_future<ObjectStore::omap_iter_ret_t>(ObjectStore::omap_iter_ret_t::NEXT);
+  }
+  return seastar::do_with(
+    BtreeOMapManager(*transaction_manager),
+    std::move(root),
+    [&t, &start_from, callback](auto &manager, auto &root)
+  {
+    return manager.omap_iterate(root, t, start_from, callback);
+  });
+}
+
 SeaStore::Shard::omaptree_list_ret
 SeaStore::Shard::omaptree_list(
   Transaction& t,
index 98a3095a68d77d1490a4699f1e8fbf7b84d64788..e49533eff35a53879c3113db2e100deaa8c3b4da 100644 (file)
@@ -43,6 +43,7 @@ enum class op_type_t : uint8_t {
     STAT,
     OMAP_GET_VALUES,
     OMAP_GET_VALUES2,
+    OMAP_ITERATE,
     MAX
 };
 
@@ -147,6 +148,13 @@ public:
       uint32_t op_flags = 0
       ) final; ///< @return <done, values> values.empty() iff done
 
+    read_errorator::future<ObjectStore::omap_iter_ret_t> omap_iterate(
+      CollectionRef c,
+      const ghobject_t &oid,
+      ObjectStore::omap_iter_seek_t start_from,
+      omap_iterate_cb_t callback,
+      uint32_t op_flags = 0) final;
+
     get_attr_errorator::future<bufferlist> omap_get_header(
       CollectionRef c,
       const ghobject_t& oid,
@@ -493,6 +501,14 @@ public:
       const std::optional<std::string>& start,
       OMapManager::omap_list_config_t config) const;
 
+    using omaptree_iterate_ret = OMapManager::omap_iterate_ret;
+    omaptree_iterate_ret omaptree_iterate(
+      Transaction& t,
+      omap_root_t&& root,
+      ObjectStore::omap_iter_seek_t &start_from,
+      omap_iterate_cb_t callback
+      );
+
     base_iertr::future<omap_values_t> omaptree_get_values(
       Transaction& t,
       omap_root_t&& root,