From 9d102213d0f6d5da494a597705c7f7227f249c03 Mon Sep 17 00:00:00 2001 From: chunmei liu Date: Fri, 21 Mar 2025 21:58:20 -0700 Subject: [PATCH] crimson/osd/pg_backend: replace omap_get_keys/vals() by omap_iterate() needn't scan the entire omap tree. Signed-off-by: chunmei liu --- src/crimson/osd/pg_backend.cc | 169 ++++++++++++++++++---------------- 1 file changed, 92 insertions(+), 77 deletions(-) diff --git a/src/crimson/osd/pg_backend.cc b/src/crimson/osd/pg_backend.cc index 42f5f82420218..b8dc88134a90f 100644 --- a/src/crimson/osd/pg_backend.cc +++ b/src/crimson/osd/pg_backend.cc @@ -1306,17 +1306,21 @@ maybe_get_omap_vals_by_keys( } } +using get_omap_iterate_ertr = + crimson::os::FuturizedStore::Shard::read_errorator::extend< + crimson::ct_error::enodata>; +using omap_iterate_cb_t = crimson::os::FuturizedStore::Shard::omap_iterate_cb_t; static -get_omap_iertr::future< - std::tuple> -maybe_get_omap_vals( +get_omap_iterate_ertr::future +maybe_do_omap_iterate( crimson::os::FuturizedStore::Shard* store, const crimson::os::CollectionRef& coll, const object_info_t& oi, - const std::string& start_after) + ObjectStore::omap_iter_seek_t start_from, + omap_iterate_cb_t callback) { if (oi.is_omap()) { - return store->omap_get_values(coll, ghobject_t{oi.soid}, start_after); + return store->omap_iterate(coll, ghobject_t{oi.soid}, start_from, callback); } else { return crimson::ct_error::enodata::make(); } @@ -1369,7 +1373,7 @@ PGBackend::omap_get_keys( { if (!os.exists || os.oi.is_whiteout()) { logger().debug("{}: object does not exist: {}", os.oi.soid); - return crimson::ct_error::enoent::make(); + co_await ll_read_ierrorator::future<>(crimson::ct_error::enoent::make()); } std::string start_after; uint64_t max_return; @@ -1380,43 +1384,49 @@ PGBackend::omap_get_keys( } catch (buffer::error&) { throw crimson::osd::invalid_argument{}; } + uint64_t max_omap_entries = local_conf()->osd_max_omap_entries_per_request; max_return = - std::min(max_return, local_conf()->osd_max_omap_entries_per_request); - - - // TODO: truly chunk the reading - return maybe_get_omap_vals(store, coll, os.oi, start_after).safe_then_interruptible( - [=,&delta_stats, &osd_op](auto ret) { - ceph::bufferlist result; - bool truncated = false; + std::min(max_return, max_omap_entries); + + ceph::bufferlist result; + uint32_t num = 0; + bool truncated = false; + ObjectStore::omap_iter_seek_t start_from{start_after, ObjectStore::omap_iter_seek_t::UPPER_BOUND}; + omap_iterate_cb_t callback = [&result, &num, &truncated, max_return] + (std::string_view key, std::string_view value) + { + if (num >= max_return) { + truncated = true; + return ObjectStore::omap_iter_ret_t::STOP; + } + encode(key, result); + ++num; + return ObjectStore::omap_iter_ret_t::NEXT; + }; + + co_await maybe_do_omap_iterate(store, coll, os.oi, start_from, callback + ).safe_then([&delta_stats, &osd_op, &result, &num, &truncated](auto ret){ + if (ret != ObjectStore::omap_iter_ret_t::STOP) { + logger().warn("omap_iterate not meet a stop condition"); + } + encode(num, osd_op.outdata); + osd_op.outdata.claim_append(result); + encode(truncated, osd_op.outdata); + delta_stats.num_rd_kb += shift_round_up(osd_op.outdata.length(), 10); + delta_stats.num_rd++; + }).handle_error( + crimson::ct_error::enodata::handle([&osd_op] { uint32_t num = 0; - for (auto &[key, val] : std::get<1>(ret)) { - if (num >= max_return || - result.length() >= local_conf()->osd_max_omap_bytes_per_request) { - truncated = true; - break; - } - encode(key, result); - ++num; - } + bool truncated = false; encode(num, osd_op.outdata); - osd_op.outdata.claim_append(result); encode(truncated, osd_op.outdata); - delta_stats.num_rd_kb += shift_round_up(osd_op.outdata.length(), 10); - delta_stats.num_rd++; + osd_op.rval = 0; return seastar::now(); - }).handle_error_interruptible( - crimson::ct_error::enodata::handle([&osd_op] { - uint32_t num = 0; - bool truncated = false; - encode(num, osd_op.outdata); - encode(truncated, osd_op.outdata); - osd_op.rval = 0; - return seastar::now(); - }), - ll_read_errorator::pass_further{} - ); + }), + ll_read_errorator::pass_further{} + ); } + static PGBackend::omap_cmp_ertr::future<> do_omap_val_cmp( std::map> out, @@ -1491,7 +1501,7 @@ PGBackend::omap_get_vals( { if (!os.exists || os.oi.is_whiteout()) { logger().debug("{}: object does not exist: {}", os.oi.soid); - return crimson::ct_error::enoent::make(); + co_await ll_read_ierrorator::future<>(crimson::ct_error::enoent::make()); } std::string start_after; uint64_t max_return; @@ -1505,48 +1515,53 @@ PGBackend::omap_get_vals( throw crimson::osd::invalid_argument{}; } + uint64_t max_omap_entries = local_conf()->osd_max_omap_entries_per_request; max_return = \ - std::min(max_return, local_conf()->osd_max_omap_entries_per_request); - delta_stats.num_rd_kb += shift_round_up(osd_op.outdata.length(), 10); - delta_stats.num_rd++; + std::min(max_return, max_omap_entries); + + ceph::bufferlist result; + uint32_t num = 0; + bool truncated = false; + ObjectStore::omap_iter_seek_t start_from = ObjectStore::omap_iter_seek_t::min_lower_bound(); + start_from.seek_position = filter_prefix > start_after ? filter_prefix : start_after; + start_from.seek_type = filter_prefix > start_after ? + ObjectStore::omap_iter_seek_t::LOWER_BOUND : + ObjectStore::omap_iter_seek_t::UPPER_BOUND; + omap_iterate_cb_t callback = [filter_prefix, max_return, &result, &num, &truncated] + (std::string_view key, std::string_view value) + { + if (num >= max_return) { + truncated = true; + } + if (key.substr(0, filter_prefix.size()) != filter_prefix || truncated == true) { + return ObjectStore::omap_iter_ret_t::STOP; + } - // TODO: truly chunk the reading - return maybe_get_omap_vals(store, coll, os.oi, start_after) - .safe_then_interruptible( - [=, &osd_op] (auto&& ret) { - auto [done, vals] = std::move(ret); - assert(done); - ceph::bufferlist result; - bool truncated = false; - uint32_t num = 0; - auto iter = filter_prefix > start_after ? vals.lower_bound(filter_prefix) - : std::begin(vals); - for (; iter != std::end(vals); ++iter) { - const auto& [key, value] = *iter; - if (key.substr(0, filter_prefix.size()) != filter_prefix) { - break; - } else if (num >= max_return || - result.length() >= local_conf()->osd_max_omap_bytes_per_request) { - truncated = true; - break; - } - encode(key, result); - encode(value, result); - ++num; - } - encode(num, osd_op.outdata); - osd_op.outdata.claim_append(result); - encode(truncated, osd_op.outdata); + encode(key, result); + encode(value, result); + ++num; + return ObjectStore::omap_iter_ret_t::NEXT; + }; + + co_await maybe_do_omap_iterate(store, coll, os.oi, start_from, callback + ).safe_then([&osd_op, &delta_stats, &result, &num, &truncated](auto ret) { + if (ret != ObjectStore::omap_iter_ret_t::STOP) { + logger().warn("omap_iterate not meet a stop condition"); + } + encode(num, osd_op.outdata); + osd_op.outdata.claim_append(result); + encode(truncated, osd_op.outdata); + delta_stats.num_rd_kb += shift_round_up(osd_op.outdata.length(), 10); + delta_stats.num_rd++; + }).handle_error( + crimson::ct_error::enodata::handle([&osd_op] { + encode(uint32_t{0} /* num */, osd_op.outdata); + encode(bool{false} /* truncated */, osd_op.outdata); + osd_op.rval = 0; return ll_read_errorator::now(); - }).handle_error_interruptible( - crimson::ct_error::enodata::handle([&osd_op] { - encode(uint32_t{0} /* num */, osd_op.outdata); - encode(bool{false} /* truncated */, osd_op.outdata); - osd_op.rval = 0; - return ll_read_errorator::now(); - }), - ll_read_errorator::pass_further{} - ); + }), + ll_read_errorator::pass_further{} + ); } PGBackend::ll_read_ierrorator::future<> -- 2.39.5