]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd/pg_backend: replace omap_get_keys/vals() by omap_iterate()
authorchunmei liu <chunmei.liu@ibm.com>
Sat, 22 Mar 2025 04:58:20 +0000 (21:58 -0700)
committerchunmei liu <chunmei.liu@ibm.com>
Tue, 15 Jul 2025 02:58:30 +0000 (19:58 -0700)
needn't scan the entire omap tree.

Signed-off-by: chunmei liu <chunmei.liu@ibm.com>
src/crimson/osd/pg_backend.cc

index 42f5f82420218d97c9ad7ad875aa6d3e1eac1e03..b8dc88134a90f676575440da663a0abdd94b097d 100644 (file)
@@ -1306,17 +1306,21 @@ maybe_get_omap_vals_by_keys(
   }
 }
 
+using get_omap_iterate_ertr =
+  crimson::os::FuturizedStore::Shard::read_errorator::extend<
+    crimson::ct_error::enodata>;
+using omap_iterate_cb_t = crimson::os::FuturizedStore::Shard::omap_iterate_cb_t;
 static
-get_omap_iertr::future<
-  std::tuple<bool, crimson::os::FuturizedStore::Shard::omap_values_t>>
-maybe_get_omap_vals(
+get_omap_iterate_ertr::future<ObjectStore::omap_iter_ret_t>
+maybe_do_omap_iterate(
   crimson::os::FuturizedStore::Shard* store,
   const crimson::os::CollectionRef& coll,
   const object_info_t& oi,
-  const std::string& start_after)
+  ObjectStore::omap_iter_seek_t start_from,
+  omap_iterate_cb_t callback)
 {
   if (oi.is_omap()) {
-    return store->omap_get_values(coll, ghobject_t{oi.soid}, start_after);
+    return store->omap_iterate(coll, ghobject_t{oi.soid}, start_from, callback);
   } else {
     return crimson::ct_error::enodata::make();
   }
@@ -1369,7 +1373,7 @@ PGBackend::omap_get_keys(
 {
   if (!os.exists || os.oi.is_whiteout()) {
     logger().debug("{}: object does not exist: {}", os.oi.soid);
-    return crimson::ct_error::enoent::make();
+    co_await ll_read_ierrorator::future<>(crimson::ct_error::enoent::make());
   }
   std::string start_after;
   uint64_t max_return;
@@ -1380,43 +1384,49 @@ PGBackend::omap_get_keys(
   } catch (buffer::error&) {
     throw crimson::osd::invalid_argument{};
   }
+  uint64_t max_omap_entries = local_conf()->osd_max_omap_entries_per_request;
   max_return =
-    std::min(max_return, local_conf()->osd_max_omap_entries_per_request);
-
-
-  // TODO: truly chunk the reading
-  return maybe_get_omap_vals(store, coll, os.oi, start_after).safe_then_interruptible(
-    [=,&delta_stats, &osd_op](auto ret) {
-      ceph::bufferlist result;
-      bool truncated = false;
+    std::min(max_return, max_omap_entries);
+
+  ceph::bufferlist result;
+  uint32_t num = 0;
+  bool truncated = false;
+  ObjectStore::omap_iter_seek_t start_from{start_after, ObjectStore::omap_iter_seek_t::UPPER_BOUND};
+  omap_iterate_cb_t callback = [&result, &num, &truncated, max_return]
+    (std::string_view key, std::string_view value)
+  {
+    if (num >= max_return) {
+      truncated = true;
+      return ObjectStore::omap_iter_ret_t::STOP;
+    }
+    encode(key, result);
+    ++num;
+    return ObjectStore::omap_iter_ret_t::NEXT;
+  };
+
+  co_await maybe_do_omap_iterate(store, coll, os.oi, start_from, callback
+  ).safe_then([&delta_stats, &osd_op, &result, &num, &truncated](auto ret){
+    if (ret != ObjectStore::omap_iter_ret_t::STOP) {
+      logger().warn("omap_iterate not meet a stop condition");
+    }
+    encode(num, osd_op.outdata);
+    osd_op.outdata.claim_append(result);
+    encode(truncated, osd_op.outdata);
+    delta_stats.num_rd_kb += shift_round_up(osd_op.outdata.length(), 10);
+    delta_stats.num_rd++;
+  }).handle_error(
+    crimson::ct_error::enodata::handle([&osd_op] {
       uint32_t num = 0;
-      for (auto &[key, val] : std::get<1>(ret)) {
-        if (num >= max_return ||
-            result.length() >= local_conf()->osd_max_omap_bytes_per_request) {
-          truncated = true;
-          break;
-        }
-        encode(key, result);
-        ++num;
-      }
+      bool truncated = false;
       encode(num, osd_op.outdata);
-      osd_op.outdata.claim_append(result);
       encode(truncated, osd_op.outdata);
-      delta_stats.num_rd_kb += shift_round_up(osd_op.outdata.length(), 10);
-      delta_stats.num_rd++;
+      osd_op.rval = 0;
       return seastar::now();
-    }).handle_error_interruptible(
-      crimson::ct_error::enodata::handle([&osd_op] {
-        uint32_t num = 0;
-       bool truncated = false;
-       encode(num, osd_op.outdata);
-       encode(truncated, osd_op.outdata);
-       osd_op.rval = 0;
-       return seastar::now();
-      }),
-      ll_read_errorator::pass_further{}
-    );
+    }),
+    ll_read_errorator::pass_further{}
+  );
 }
+
 static
 PGBackend::omap_cmp_ertr::future<> do_omap_val_cmp(
   std::map<std::string, bufferlist, std::less<>> out,
@@ -1491,7 +1501,7 @@ PGBackend::omap_get_vals(
 {
   if (!os.exists || os.oi.is_whiteout()) {
     logger().debug("{}: object does not exist: {}", os.oi.soid);
-    return crimson::ct_error::enoent::make();
+    co_await ll_read_ierrorator::future<>(crimson::ct_error::enoent::make());
   }
   std::string start_after;
   uint64_t max_return;
@@ -1505,48 +1515,53 @@ PGBackend::omap_get_vals(
     throw crimson::osd::invalid_argument{};
   }
 
+  uint64_t max_omap_entries = local_conf()->osd_max_omap_entries_per_request;
   max_return = \
-    std::min(max_return, local_conf()->osd_max_omap_entries_per_request);
-  delta_stats.num_rd_kb += shift_round_up(osd_op.outdata.length(), 10);
-  delta_stats.num_rd++;
+    std::min(max_return, max_omap_entries);
+
+  ceph::bufferlist result;
+  uint32_t num = 0;
+  bool truncated = false;
+  ObjectStore::omap_iter_seek_t start_from = ObjectStore::omap_iter_seek_t::min_lower_bound();
+  start_from.seek_position = filter_prefix > start_after ? filter_prefix : start_after;
+  start_from.seek_type = filter_prefix > start_after ?
+                         ObjectStore::omap_iter_seek_t::LOWER_BOUND :
+                         ObjectStore::omap_iter_seek_t::UPPER_BOUND;
+  omap_iterate_cb_t callback = [filter_prefix, max_return, &result, &num, &truncated]
+    (std::string_view key, std::string_view value)
+  {
+    if (num >= max_return) {
+      truncated = true;
+    }
+    if (key.substr(0, filter_prefix.size()) != filter_prefix || truncated == true) {
+      return ObjectStore::omap_iter_ret_t::STOP;
+    }
 
-  // TODO: truly chunk the reading
-  return maybe_get_omap_vals(store, coll, os.oi, start_after)
-  .safe_then_interruptible(
-    [=, &osd_op] (auto&& ret) {
-      auto [done, vals] = std::move(ret);
-      assert(done);
-      ceph::bufferlist result;
-      bool truncated = false;
-      uint32_t num = 0;
-      auto iter = filter_prefix > start_after ? vals.lower_bound(filter_prefix)
-                                              : std::begin(vals);
-      for (; iter != std::end(vals); ++iter) {
-        const auto& [key, value] = *iter;
-        if (key.substr(0, filter_prefix.size()) != filter_prefix) {
-          break;
-        } else if (num >= max_return ||
-            result.length() >= local_conf()->osd_max_omap_bytes_per_request) {
-          truncated = true;
-          break;
-        }
-        encode(key, result);
-        encode(value, result);
-        ++num;
-      }
-      encode(num, osd_op.outdata);
-      osd_op.outdata.claim_append(result);
-      encode(truncated, osd_op.outdata);
+    encode(key, result);
+    encode(value, result);
+    ++num;
+    return ObjectStore::omap_iter_ret_t::NEXT;
+  };
+
+  co_await maybe_do_omap_iterate(store, coll, os.oi, start_from, callback
+  ).safe_then([&osd_op, &delta_stats, &result, &num, &truncated](auto ret) {
+    if (ret != ObjectStore::omap_iter_ret_t::STOP) {
+      logger().warn("omap_iterate not meet a stop condition");
+    }
+    encode(num, osd_op.outdata);
+    osd_op.outdata.claim_append(result);
+    encode(truncated, osd_op.outdata);
+    delta_stats.num_rd_kb += shift_round_up(osd_op.outdata.length(), 10);
+    delta_stats.num_rd++;
+  }).handle_error(
+    crimson::ct_error::enodata::handle([&osd_op] {
+      encode(uint32_t{0} /* num */, osd_op.outdata);
+      encode(bool{false} /* truncated */, osd_op.outdata);
+      osd_op.rval = 0;
       return ll_read_errorator::now();
-    }).handle_error_interruptible(
-      crimson::ct_error::enodata::handle([&osd_op] {
-        encode(uint32_t{0} /* num */, osd_op.outdata);
-        encode(bool{false} /* truncated */, osd_op.outdata);
-        osd_op.rval = 0;
-        return ll_read_errorator::now();
-      }),
-      ll_read_errorator::pass_further{}
-    );
+    }),
+    ll_read_errorator::pass_further{}
+  );
 }
 
 PGBackend::ll_read_ierrorator::future<>