From: chunmei liu Date: Wed, 19 Mar 2025 20:50:45 +0000 (-0700) Subject: crimson/os: add omap_iterate interface to futurized store X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=e696da88f968335afaf692cf448239032545022a;p=ceph.git crimson/os: add omap_iterate interface to futurized store Signed-off-by: chunmei liu --- diff --git a/src/crimson/os/alienstore/alien_store.cc b/src/crimson/os/alienstore/alien_store.cc index c44abe46ae956..9ac25d8034e56 100644 --- a/src/crimson/os/alienstore/alien_store.cc +++ b/src/crimson/os/alienstore/alien_store.cc @@ -465,6 +465,35 @@ auto AlienStore::omap_get_values(CollectionRef ch, }); } +AlienStore::read_errorator::future +AlienStore::omap_iterate(CollectionRef ch, + const ghobject_t &oid, + ObjectStore::omap_iter_seek_t start_from, + omap_iterate_cb_t callback, + uint32_t op_flags) +{ + logger().debug("{} with_start", __func__); + assert(tp); + return do_with_op_gate(oid, [ch, start_from, callback, this] (auto& oid) { + return tp->submit(ch->get_cid().hash_to_shard(tp->size()), [ch, oid, start_from, callback, this] { + auto c = static_cast(ch.get()); + return store->omap_iterate( + c->collection, oid, start_from, callback); + }).then([] (int r) + -> read_errorator::future { + if (r == -ENOENT) { + return crimson::ct_error::enoent::make(); + } else { + if (r == 1) { + return read_errorator::make_ready_future(ObjectStore::omap_iter_ret_t::STOP); + } else { + return read_errorator::make_ready_future(ObjectStore::omap_iter_ret_t::NEXT); + } + } + }); + }); +} + seastar::future<> AlienStore::do_transaction_no_callbacks( CollectionRef ch, ceph::os::Transaction&& txn) diff --git a/src/crimson/os/alienstore/alien_store.h b/src/crimson/os/alienstore/alien_store.h index 1d39411450e8b..2c3d2b2d02cab 100644 --- a/src/crimson/os/alienstore/alien_store.h +++ b/src/crimson/os/alienstore/alien_store.h @@ -81,6 +81,13 @@ public: uint64_t limit, uint32_t op_flags = 0) const final; + read_errorator::future omap_iterate( + CollectionRef c, + const ghobject_t &oid, + ObjectStore::omap_iter_seek_t start_from, + omap_iterate_cb_t callback, + uint32_t op_flags = 0) final; + seastar::future create_new_collection(const coll_t& cid) final; seastar::future open_collection(const coll_t& cid) final; seastar::future> list_collections() final; diff --git a/src/crimson/os/cyanstore/cyan_store.cc b/src/crimson/os/cyanstore/cyan_store.cc index 41819fb5eb6ba..73ec8bbc23404 100644 --- a/src/crimson/os/cyanstore/cyan_store.cc +++ b/src/crimson/os/cyanstore/cyan_store.cc @@ -413,6 +413,33 @@ auto CyanStore::Shard::omap_get_values( std::make_tuple(true, std::move(values))); } +auto CyanStore::Shard::omap_iterate( + CollectionRef ch, + const ghobject_t &oid, + ObjectStore::omap_iter_seek_t start_from, + omap_iterate_cb_t callback, + uint32_t op_flags) + -> CyanStore::Shard::read_errorator::future +{ + auto c = static_cast(ch.get()); + logger().debug("{} {} {}", __func__, c->get_cid(), oid); + auto o = c->get_object(oid); + if (!o) { + return crimson::ct_error::enoent::make(); + } + auto ret = ObjectStore::omap_iter_ret_t::NEXT; + auto i = (start_from.seek_type == ObjectStore::omap_iter_seek_t::LOWER_BOUND) ? + o->omap.lower_bound(start_from.seek_position) : + o->omap.upper_bound(start_from.seek_position); + for (; i != o->omap.end(); ++i) { + ceph::bufferlist bl = i->second; + std::string result(bl.c_str(), bl.length()); + ret = callback(i->first, result); + if (ret == ObjectStore::omap_iter_ret_t::STOP) + break; + } + return read_errorator::make_ready_future(ret); +} auto CyanStore::Shard::omap_get_header( CollectionRef ch, const ghobject_t& oid, diff --git a/src/crimson/os/cyanstore/cyan_store.h b/src/crimson/os/cyanstore/cyan_store.h index 93d0d14431198..d5029d9e36754 100644 --- a/src/crimson/os/cyanstore/cyan_store.h +++ b/src/crimson/os/cyanstore/cyan_store.h @@ -79,6 +79,14 @@ public: uint32_t op_flags = 0 ) final; + read_errorator::future omap_iterate( + CollectionRef c, + const ghobject_t &oid, + ObjectStore::omap_iter_seek_t start_from, + omap_iterate_cb_t callback, + uint32_t op_flags = 0 + ) final; + get_attr_errorator::future omap_get_header( CollectionRef c, const ghobject_t& oid, diff --git a/src/crimson/os/futurized_store.h b/src/crimson/os/futurized_store.h index e7d4c8546de09..c19d893fb39a7 100644 --- a/src/crimson/os/futurized_store.h +++ b/src/crimson/os/futurized_store.h @@ -17,6 +17,7 @@ #include "include/buffer_fwd.h" #include "include/uuid.h" #include "osd/osd_types.h" +#include "os/ObjectStore.h" namespace ceph::os { class Transaction; @@ -95,6 +96,37 @@ public: uint32_t op_flags = 0 ) = 0; ///< @return values.empty() only if done + /** + * Iterate over object map with user-provided callable + * + * Warning! f cannot block or perform IO and must not wait on a future. + * + * @param c collection + * @param oid object + * @param start_from where the iterator should point to at + * the beginning + * @param f callable that takes OMAP key and corresponding + * value as string_views and controls iteration + * by the return. It is executed for every object's + * OMAP entry from `start_from` till end of the + * object's OMAP or till the iteration is stopped + * by `STOP`. Please note that if there is no such + * entry, `visitor` will be called 0 times. + * @return omap_iter_ret_t on success + * omap_iter_ret_t::STOP means omap_iterate() is stopped by f, + * omap_iter_ret_t::NEXT means omap_iterate() reaches the end of omap tree + */ + using omap_iterate_cb_t = std::function; + virtual read_errorator::future omap_iterate( + CollectionRef c, ///< [in] collection + const ghobject_t &oid, ///< [in] object + ObjectStore::omap_iter_seek_t start_from, ///< [in] where the iterator should point to at the beginning + omap_iterate_cb_t callback, + ///< [in] the callback function for each OMAP entry after start_from till end of the OMAP or + /// till the iteration is stopped by `STOP`. + uint32_t op_flags = 0 + ) = 0; + virtual get_attr_errorator::future omap_get_header( CollectionRef c, const ghobject_t& oid, diff --git a/src/crimson/os/seastore/omap_manager.h b/src/crimson/os/seastore/omap_manager.h index 85a3f89a96f75..a8d6b3722ef9d 100644 --- a/src/crimson/os/seastore/omap_manager.h +++ b/src/crimson/os/seastore/omap_manager.h @@ -4,6 +4,7 @@ #pragma once #include +#include #include #include @@ -13,6 +14,8 @@ #include "crimson/osd/exceptions.h" #include "crimson/os/seastore/seastore_types.h" #include "crimson/os/seastore/transaction_manager.h" +#include "os/ObjectStore.h" // for ObjectStore::omap_iter_seek_t + //TODO: calculate the max key and value sizes the current layout supports, // and return errors during insert if the max is exceeded. @@ -20,6 +23,22 @@ #define OMAP_LEAF_BLOCK_SIZE 65536 #define LOG_LEAF_BLOCK_SIZE 16384 +template <> +struct fmt::formatter { + constexpr auto parse(format_parse_context& ctx) const { + return ctx.begin(); + } + + auto format(const ObjectStore::omap_iter_seek_t& seek, format_context& ctx) const { + return fmt::format_to( + ctx.out(), + "omap_iter_seek_t{{seek_position='{}', seek_type={}}}", + seek.seek_position, + seek.seek_type == ObjectStore::omap_iter_seek_t::LOWER_BOUND ? "LOWER_BOUND" : "UPPER_BOUND" + ); + } +}; + namespace crimson::os::seastore { std::ostream &operator<<(std::ostream &out, const std::list &rhs); @@ -98,10 +117,35 @@ public: Transaction &t, const std::string &key) = 0; + /** + * omap_iterate + * + * scan key/value pairs and give key/value from start_from.seek_poistion to function f + * + * @param omap_root: omap btree root information + * @param t: current transaction + * @param start_from: seek start key and seek type + * Upon transaction conflict, start_from will be updated to the + * upper bound of the last key of the last visited omap leaf node. + * @param f: function is called for each seeked key/value pair + * + */ + using omap_iterate_cb_t = std::function; + using omap_iterate_iertr = base_iertr; + using omap_iterate_ret = omap_iterate_iertr::future; + virtual omap_iterate_ret omap_iterate( + const omap_root_t &omap_root, + Transaction &t, + ObjectStore::omap_iter_seek_t &start_from, + omap_iterate_cb_t callback + ) = 0; + /** * omap_list * * Scans key/value pairs in order. + * // TODO: omap_list() is deprecated in favor of omap_iterate() + * // TODO: omap_rm_key_range() that omap_list_config_t needs to be dropped from that interface * * @param omap_root: omap btree root information * @param t: current transaction diff --git a/src/crimson/os/seastore/omap_manager/btree/btree_omap_manager.cc b/src/crimson/os/seastore/omap_manager/btree/btree_omap_manager.cc index 5f5bd3b50f678..ade8d986bf468 100644 --- a/src/crimson/os/seastore/omap_manager/btree/btree_omap_manager.cc +++ b/src/crimson/os/seastore/omap_manager/btree/btree_omap_manager.cc @@ -268,6 +268,26 @@ BtreeOMapManager::omap_rm_key_range( }); } +BtreeOMapManager::omap_iterate_ret +BtreeOMapManager::omap_iterate( + const omap_root_t &omap_root, + Transaction &t, + ObjectStore::omap_iter_seek_t &start_from, + omap_iterate_cb_t callback) +{ + LOG_PREFIX(BtreeOMapManager::omap_iterate); + DEBUGT("{}, {}", t, omap_root, start_from); + return get_omap_root( + get_omap_context(t, omap_root), + omap_root + ).si_then([this, &t, &start_from, callback, &omap_root](auto extent) { + return extent->iterate( + get_omap_context(t, omap_root), + start_from, + callback); + }); +} + BtreeOMapManager::omap_list_ret BtreeOMapManager::omap_list( const omap_root_t &omap_root, diff --git a/src/crimson/os/seastore/omap_manager/btree/btree_omap_manager.h b/src/crimson/os/seastore/omap_manager/btree/btree_omap_manager.h index 6de3ade3f7b76..9522a4bcd35ff 100644 --- a/src/crimson/os/seastore/omap_manager/btree/btree_omap_manager.h +++ b/src/crimson/os/seastore/omap_manager/btree/btree_omap_manager.h @@ -96,6 +96,12 @@ public: const std::string &last, omap_list_config_t config) final; + omap_iterate_ret omap_iterate( + const omap_root_t &omap_root, + Transaction &t, + ObjectStore::omap_iter_seek_t &start_from, + omap_iterate_cb_t callback) final; + omap_list_ret omap_list( const omap_root_t &omap_root, Transaction &t, diff --git a/src/crimson/os/seastore/omap_manager/btree/omap_btree_node.h b/src/crimson/os/seastore/omap_manager/btree/omap_btree_node.h index 52dd2869eb8c6..140513beb9f9a 100644 --- a/src/crimson/os/seastore/omap_manager/btree/omap_btree_node.h +++ b/src/crimson/os/seastore/omap_manager/btree/omap_btree_node.h @@ -80,6 +80,14 @@ struct OMapNode : LogicalChildNode { omap_context_t oc, const std::string &key) = 0; + using iterate_iertr = base_iertr; + using iterate_ret = OMapManager::omap_iterate_ret; + using omap_iterate_cb_t = OMapManager::omap_iterate_cb_t; + virtual iterate_ret iterate( + omap_context_t oc, + ObjectStore::omap_iter_seek_t &start_from, + omap_iterate_cb_t callback) = 0; + using omap_list_config_t = OMapManager::omap_list_config_t; using list_iertr = base_iertr; using list_bare_ret = OMapManager::omap_list_bare_ret; diff --git a/src/crimson/os/seastore/omap_manager/btree/omap_btree_node_impl.cc b/src/crimson/os/seastore/omap_manager/btree/omap_btree_node_impl.cc index 346977b2c9d56..3b26c75b413c5 100644 --- a/src/crimson/os/seastore/omap_manager/btree/omap_btree_node_impl.cc +++ b/src/crimson/os/seastore/omap_manager/btree/omap_btree_node_impl.cc @@ -223,6 +223,50 @@ OMapInnerNode::rm_key(omap_context_t oc, const std::string &key) }); } +OMapInnerNode::iterate_ret +OMapInnerNode::iterate( + omap_context_t oc, + ObjectStore::omap_iter_seek_t &start_from, + omap_iterate_cb_t callback) +{ + LOG_PREFIX(OMapInnerNode::iterate); + DEBUGT("{}, this: {}", oc.t, start_from, *this); + + auto start_iter = get_containing_child(start_from.seek_position); + return seastar::do_with( + iter_t(start_iter), + ObjectStore::omap_iter_ret_t{ObjectStore::omap_iter_ret_t::NEXT}, + [this, &start_from, oc, callback] + (auto &iter, auto &ret) + { + return trans_intr::repeat( + [&start_from, &iter, &ret, oc, callback, this]() + -> iterate_iertr::future + { + if (iter == iter_cend()) { + return iterate_iertr::make_ready_future( + seastar::stop_iteration::yes); + } + return get_child_node(oc, iter + ).si_then([&start_from, &iter, &ret, callback, oc] (auto &&extent) { + return extent->iterate(oc, start_from, callback + ).si_then([&ret, &iter](auto &&child_ret) mutable { + ret = child_ret; + if (child_ret == ObjectStore::omap_iter_ret_t::STOP) { + return iterate_iertr::make_ready_future( + seastar::stop_iteration::yes); + } + ++iter; + return iterate_iertr::make_ready_future( + seastar::stop_iteration::no); + }); + }); + }).si_then([&ret, ref = OMapNodeRef(this)] { + return iterate_iertr::make_ready_future(std::move(ret)); + }); + }); +} + OMapInnerNode::list_ret OMapInnerNode::list( omap_context_t oc, @@ -749,6 +793,38 @@ OMapLeafNode::rm_key(omap_context_t oc, const std::string &key) } +OMapLeafNode::iterate_ret +OMapLeafNode::iterate( + omap_context_t oc, + ObjectStore::omap_iter_seek_t &start_from, + omap_iterate_cb_t callback) +{ + LOG_PREFIX(OMapLeafNode::iterate); + DEBUGT("{}, this: {}", oc.t, start_from, *this); + + auto ret = ObjectStore::omap_iter_ret_t::NEXT; + auto iter = start_from.seek_type == ObjectStore::omap_iter_seek_t::LOWER_BOUND ? + string_lower_bound(start_from.seek_position) : + string_upper_bound(start_from.seek_position); + + std::string key; + for(; iter != iter_end(); iter++) { + ceph::bufferlist bl = iter->get_val(); + std::string result(bl.c_str(), bl.length()); + key = iter->get_key(); + ret = callback(key, result); + if (ret == ObjectStore::omap_iter_ret_t::STOP) { + break; + } + } + if (!key.empty()) { + start_from.seek_position = key; + } + start_from.seek_type = ObjectStore::omap_iter_seek_t::UPPER_BOUND; + + return iterate_iertr::make_ready_future(std::move(ret)); +} + OMapLeafNode::list_ret OMapLeafNode::list( omap_context_t oc, diff --git a/src/crimson/os/seastore/omap_manager/btree/omap_btree_node_impl.h b/src/crimson/os/seastore/omap_manager/btree/omap_btree_node_impl.h index c42030a657631..31e1fdc197a8b 100644 --- a/src/crimson/os/seastore/omap_manager/btree/omap_btree_node_impl.h +++ b/src/crimson/os/seastore/omap_manager/btree/omap_btree_node_impl.h @@ -172,6 +172,11 @@ struct OMapInnerNode omap_context_t oc, const std::string &key) final; + iterate_ret iterate( + omap_context_t oc, + ObjectStore::omap_iter_seek_t &start_from, + omap_iterate_cb_t callback) final; + list_ret list( omap_context_t oc, const std::optional &first, @@ -413,6 +418,11 @@ struct OMapLeafNode rm_key_ret rm_key( omap_context_t oc, const std::string &key) final; + iterate_ret iterate( + omap_context_t oc, + ObjectStore::omap_iter_seek_t &start_from, + omap_iterate_cb_t callback) final; + list_ret list( omap_context_t oc, const std::optional &first, diff --git a/src/crimson/os/seastore/seastore.cc b/src/crimson/os/seastore/seastore.cc index 0fc5e3bcaf080..f24dd51f054a8 100644 --- a/src/crimson/os/seastore/seastore.cc +++ b/src/crimson/os/seastore/seastore.cc @@ -65,6 +65,9 @@ template <> struct fmt::formatter case op_type_t::OMAP_GET_VALUES2: name = "omap_get_values2"; break; + case op_type_t::OMAP_ITERATE: + name = "omap_iterate"; + break; case op_type_t::MAX: name = "unknown"; break; @@ -164,6 +167,7 @@ void SeaStore::Shard::register_metrics() {op_type_t::STAT, sm::label_instance("latency", "STAT")}, {op_type_t::OMAP_GET_VALUES, sm::label_instance("latency", "OMAP_GET_VALUES")}, {op_type_t::OMAP_GET_VALUES2, sm::label_instance("latency", "OMAP_GET_VALUES2")}, + {op_type_t::OMAP_ITERATE, sm::label_instance("latency", "OMAP_ITERATE")}, }; for (auto& [op_type, label] : labels_by_op_type) { @@ -1454,6 +1458,39 @@ SeaStore::Shard::omap_get_values( }); } +SeaStore::Shard::read_errorator::future +SeaStore::Shard::omap_iterate( + CollectionRef ch, + const ghobject_t &oid, + ObjectStore::omap_iter_seek_t start_from, + omap_iterate_cb_t callback, + uint32_t op_flags) +{ + ++(shard_stats.read_num); + ++(shard_stats.pending_read_num); + return seastar::do_with( + std::move(start_from), + [this, ch, &oid, callback, op_flags] (auto &start_from) + { + return repeat_with_onode( + ch, + oid, + Transaction::src_t::READ, + "omap_iterate", + op_type_t::OMAP_ITERATE, + op_flags, + [this, &start_from, callback](auto &t, auto &onode) + { + auto root = select_log_omap_root(onode); + return omaptree_iterate( + t, std::move(root), start_from, callback); + }).finally([this] { + assert(shard_stats.pending_read_num); + --(shard_stats.pending_read_num); + }); + }); +} + SeaStore::base_iertr::future SeaStore::Shard::_fiemap( Transaction &t, @@ -2583,6 +2620,29 @@ SeaStore::Shard::omaptree_get_values( }); } +SeaStore::Shard::omaptree_iterate_ret +SeaStore::Shard::omaptree_iterate( + Transaction& t, + omap_root_t&& root, + ObjectStore::omap_iter_seek_t &start_from, + omap_iterate_cb_t callback) +{ + LOG_PREFIX(SeaStoreS::omaptree_iterate); + auto type = root.get_type(); + DEBUGT("{}, {} ", t, type, start_from); + if (root.is_null()) { + DEBUGT("{} root is null", t, type); + return seastar::make_ready_future(ObjectStore::omap_iter_ret_t::NEXT); + } + return seastar::do_with( + BtreeOMapManager(*transaction_manager), + std::move(root), + [&t, &start_from, callback](auto &manager, auto &root) + { + return manager.omap_iterate(root, t, start_from, callback); + }); +} + SeaStore::Shard::omaptree_list_ret SeaStore::Shard::omaptree_list( Transaction& t, diff --git a/src/crimson/os/seastore/seastore.h b/src/crimson/os/seastore/seastore.h index 98a3095a68d77..e49533eff35a5 100644 --- a/src/crimson/os/seastore/seastore.h +++ b/src/crimson/os/seastore/seastore.h @@ -43,6 +43,7 @@ enum class op_type_t : uint8_t { STAT, OMAP_GET_VALUES, OMAP_GET_VALUES2, + OMAP_ITERATE, MAX }; @@ -147,6 +148,13 @@ public: uint32_t op_flags = 0 ) final; ///< @return values.empty() iff done + read_errorator::future omap_iterate( + CollectionRef c, + const ghobject_t &oid, + ObjectStore::omap_iter_seek_t start_from, + omap_iterate_cb_t callback, + uint32_t op_flags = 0) final; + get_attr_errorator::future omap_get_header( CollectionRef c, const ghobject_t& oid, @@ -493,6 +501,14 @@ public: const std::optional& start, OMapManager::omap_list_config_t config) const; + using omaptree_iterate_ret = OMapManager::omap_iterate_ret; + omaptree_iterate_ret omaptree_iterate( + Transaction& t, + omap_root_t&& root, + ObjectStore::omap_iter_seek_t &start_from, + omap_iterate_cb_t callback + ); + base_iertr::future omaptree_get_values( Transaction& t, omap_root_t&& root,