]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd/../scrub_events: replace omap_get_values() by omap_iterate()
authorchunmei liu <chunmei.liu@ibm.com>
Mon, 24 Mar 2025 07:30:25 +0000 (00:30 -0700)
committerchunmei liu <chunmei.liu@ibm.com>
Tue, 15 Jul 2025 02:58:30 +0000 (19:58 -0700)
Signed-off-by: chunmei liu <chunmei.liu@ibm.com>
src/crimson/osd/osd_operations/scrub_events.cc

index df404014db0c94c0ef35ee4fd38bf5485556defa..c2fd916ff714f4d51470e02f916e9b84304c6f5a 100644 (file)
@@ -7,6 +7,7 @@
 #include "crimson/osd/osd_connection_priv.h"
 #include "messages/MOSDRepScrubMap.h"
 #include "scrub_events.h"
+#include "crimson/os/futurized_store.h"
 
 SET_SUBSYS(osd);
 
@@ -231,127 +232,144 @@ ScrubScan::ifut<> ScrubScan::deep_scan_object(
   auto &entry = ret.objects[obj.hobj];
   auto progress_ref = std::make_unique<obj_scrub_progress_t>();
   auto &progress = *progress_ref;
-  return interruptor::repeat(
+
+  co_await interruptor::repeat(
     [FNAME, this, &progress, &obj, &entry, &pg]()
-    -> interruptible_future<seastar::stop_iteration> {
-      if (progress.offset) {
-       DEBUGDPP("op: {}, obj: {}, progress: {} scanning data",
-                pg, *this, obj, progress);
-       const auto stride = local_conf().get_val<Option::size_t>(
-         "osd_deep_scrub_stride");
-       return pg.shard_services.get_store().read(
-         pg.get_collection_ref(),
-         obj,
-         *(progress.offset),
-         stride
-       ).safe_then([this, FNAME, stride, &obj, &progress, &entry, &pg](auto bl) {
-         size_t offset = *progress.offset;
-         DEBUGDPP("op: {}, obj: {}, progress: {} got offset {}",
-                  pg, *this, obj, progress, offset);
-         progress.data_hash << bl;
-         if (bl.length() < stride) {
-           progress.offset = std::nullopt;
-           entry.digest = progress.data_hash.digest();
-           entry.digest_present = true;
-         } else {
-           ceph_assert(stride == bl.length());
-           *(progress.offset) += stride;
-         }
-       }).handle_error(
-         ct_error::all_same_way([&progress, &entry](auto e) {
-           entry.read_error = true;
-           progress.offset = std::nullopt;
-           return seastar::now();
-         })
-       ).then([] {
-         return interruptor::make_interruptible(
-           seastar::make_ready_future<seastar::stop_iteration>(
-             seastar::stop_iteration::no));
-       });
-      } else if (!progress.header_done) {
-       DEBUGDPP("op: {}, obj: {}, progress: {} scanning omap header",
-                pg, *this, obj, progress);
-       return pg.shard_services.get_store().omap_get_header(
-         pg.get_collection_ref(),
-         obj
-       ).safe_then([&progress](auto bl) {
-         progress.omap_hash << bl;
-       }).handle_error(
-         ct_error::enodata::handle([] { return seastar::now(); }),
-         ct_error::all_same_way([&entry](auto e) {
-           entry.read_error = true;
-           return seastar::now();
-         })
-       ).then([&progress] {
-         progress.header_done = true;
-         return interruptor::make_interruptible(
-           seastar::make_ready_future<seastar::stop_iteration>(
-             seastar::stop_iteration::no));
-       });
-      } else if (!progress.keys_done) {
-       DEBUGDPP("op: {}, obj: {}, progress: {} scanning omap keys",
-                pg, *this, obj, progress);
-       return pg.shard_services.get_store().omap_get_values(
-         pg.get_collection_ref(),
-         obj,
-         progress.next_key
-       ).safe_then([FNAME, this, &obj, &progress, &entry, &pg](auto result) {
-         const auto &[done, omap] = result;
-         DEBUGDPP("op: {}, obj: {}, progress: {} got {} keys",
-                  pg, *this, obj, progress, omap.size());
-         for (const auto &p : omap) {
-           bufferlist bl;
-           encode(p.first, bl);
-           encode(p.second, bl);
-           progress.omap_hash << bl;
-           entry.object_omap_keys++;
-           entry.object_omap_bytes += p.second.length();
-         }
-         if (done) {
-           DEBUGDPP("op: {}, obj: {}, progress: {} omap done",
-                    pg, *this, obj, progress);
-           progress.keys_done = true;
-           entry.omap_digest = progress.omap_hash.digest();
-           entry.omap_digest_present = true;
-
-           if ((entry.object_omap_keys >
-                local_conf().get_val<uint64_t>(
-                  "osd_deep_scrub_large_omap_object_key_threshold")) ||
-               (entry.object_omap_bytes >
-                local_conf().get_val<Option::size_t>(
-                  "osd_deep_scrub_large_omap_object_value_sum_threshold"))) {
-             entry.large_omap_object_found = true;
-             entry.large_omap_object_key_count = entry.object_omap_keys;
-             ret.has_large_omap_object_errors = true;
-           }
-         } else {
-           ceph_assert(!omap.empty()); // omap_get_values invariant
-           DEBUGDPP("op: {}, obj: {}, progress: {} omap not done, next {}",
-                    pg, *this, obj, progress, omap.crbegin()->first);
-           progress.next_key = omap.crbegin()->first;
-         }
-       }).handle_error(
-         ct_error::all_same_way([FNAME, this, &obj, &progress, &entry, &pg]
-                                (auto e) {
-           DEBUGDPP("op: {}, obj: {}, progress: {} error reading omap {}",
-                    pg, *this, obj, progress, e);
-           progress.keys_done = true;
-           entry.read_error = true;
-           return seastar::now();
-         })
-       ).then([] {
-         return interruptor::make_interruptible(
-           seastar::make_ready_future<seastar::stop_iteration>(
-             seastar::stop_iteration::no));
-       });
-      } else {
-       DEBUGDPP("op: {}, obj: {}, progress: {} done",
-                pg, *this, obj, progress);
-       return interruptor::make_interruptible(
-         seastar::make_ready_future<seastar::stop_iteration>(
-           seastar::stop_iteration::yes));
-      }
-    }).finally([progress_ref=std::move(progress_ref)] {});
+    -> interruptible_future<seastar::stop_iteration>
+  {
+    auto store_read = [FNAME, this, &progress, &obj, &entry, &pg]()
+      -> interruptible_future<seastar::stop_iteration>
+    {
+      DEBUGDPP("op: {}, obj: {}, progress: {} scanning data",
+                pg, *this, obj, progress);
+      const auto stride = local_conf().get_val<Option::size_t>(
+        "osd_deep_scrub_stride");
+      return pg.shard_services.get_store().read(
+        pg.get_collection_ref(),
+        obj,
+        *(progress.offset),
+        stride
+      ).safe_then([this, FNAME, stride, &obj, &progress, &entry, &pg](auto bl) {
+        size_t offset = *progress.offset;
+        DEBUGDPP("op: {}, obj: {}, progress: {} got offset {}",
+                  pg, *this, obj, progress, offset);
+        progress.data_hash << bl;
+        if (bl.length() < stride) {
+          progress.offset = std::nullopt;
+          entry.digest = progress.data_hash.digest();
+          entry.digest_present = true;
+        } else {
+          ceph_assert(stride == bl.length());
+          *(progress.offset) += stride;
+        }
+      }).handle_error(
+        ct_error::all_same_way([&progress, &entry](auto e) {
+          entry.read_error = true;
+          progress.offset = std::nullopt;
+          return seastar::now();
+        })
+      ).then([] {
+        return seastar::make_ready_future<seastar::stop_iteration>(
+               seastar::stop_iteration::no);
+      });
+    };
+
+    auto get_header = [FNAME, this, &progress, &obj, &entry, &pg]()
+      -> interruptible_future<seastar::stop_iteration>
+   {
+      DEBUGDPP("op: {}, obj: {}, progress: {} scanning omap header",
+                pg, *this, obj, progress);
+      return pg.shard_services.get_store().omap_get_header(
+        pg.get_collection_ref(),
+        obj
+      ).safe_then([&progress](auto bl) {
+        progress.omap_hash << bl;
+      }).handle_error(
+        ct_error::enodata::handle([] { return seastar::now(); }),
+        ct_error::all_same_way([&entry](auto e) {
+          entry.read_error = true;
+          return seastar::now();
+        })
+      ).then([&progress] {
+        progress.header_done = true;
+        return seastar::make_ready_future<seastar::stop_iteration>(
+               seastar::stop_iteration::no);
+      });
+    };
+
+    ObjectStore::omap_iter_seek_t start_from;
+    start_from.seek_position =  progress.next_key.has_value() ?
+                                progress.next_key.value() : std::string{};
+    start_from.seek_type = ObjectStore::omap_iter_seek_t::UPPER_BOUND;
+
+    std::function<ObjectStore::omap_iter_ret_t(std::string_view, std::string_view)> callback =
+      [&progress, &entry] (std::string_view key, std::string_view value)
+    {
+      bufferlist bl;
+      encode(key, bl);
+      encode(value, bl);
+      progress.omap_hash << bl;
+      entry.object_omap_keys++;
+      entry.object_omap_bytes += value.length();
+      return ObjectStore::omap_iter_ret_t::NEXT;
+    };
+
+    auto get_keys = [FNAME, this, &progress, &obj, &entry, &pg, start_from, callback]()
+      -> interruptible_future<seastar::stop_iteration>
+    {
+      DEBUGDPP("op: {}, obj: {}, progress: {} scanning omap keys",
+                pg, *this, obj, progress);
+      return pg.shard_services.get_store().omap_iterate(
+        pg.get_collection_ref(),
+        obj,
+        start_from,
+        callback
+      ).safe_then([FNAME, this, &obj, &progress, &entry, &pg](auto result) {
+        assert(result == ObjectStore::omap_iter_ret_t::NEXT);
+        DEBUGDPP("op: {}, obj: {}, progress: {} omap done",
+                  pg, *this, obj, progress);
+        progress.keys_done = true;
+        entry.omap_digest = progress.omap_hash.digest();
+        entry.omap_digest_present = true;
+
+        if ((entry.object_omap_keys >
+             local_conf().get_val<uint64_t>(
+             "osd_deep_scrub_large_omap_object_key_threshold")) ||
+            (entry.object_omap_bytes >
+             local_conf().get_val<Option::size_t>(
+             "osd_deep_scrub_large_omap_object_value_sum_threshold"))) {
+          entry.large_omap_object_found = true;
+          entry.large_omap_object_key_count = entry.object_omap_keys;
+          ret.has_large_omap_object_errors = true;
+        }
+      }).handle_error(
+        ct_error::all_same_way([FNAME, this, &obj, &progress, &entry, &pg]
+          (auto e)
+        {
+          DEBUGDPP("op: {}, obj: {}, progress: {} error reading omap {}",
+                    pg, *this, obj, progress, e);
+          progress.keys_done = true;
+          entry.read_error = true;
+          return seastar::now();
+        })
+      ).then([] {
+        return seastar::make_ready_future<seastar::stop_iteration>(
+               seastar::stop_iteration::no);
+      });
+    };
+
+    if (progress.offset) {
+      co_return co_await store_read();
+    } else if (!progress.header_done) {
+      co_return co_await get_header();
+    } else if (!progress.keys_done) {
+      co_return co_await get_keys();
+    } else {
+      DEBUGDPP("op: {}, obj: {}, progress: {} done",
+                pg, *this, obj, progress);
+      co_return seastar::stop_iteration::yes;
+    }
+  }).finally([progress_ref=std::move(progress_ref)] {});
 }
 
 template class ScrubAsyncOpT<ScrubScan>;