]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/osd/replicated_recovery_backend: replace omap_get_values() by omap_iterate()
authorchunmei liu <chunmei.liu@ibm.com>
Sat, 22 Mar 2025 08:11:28 +0000 (01:11 -0700)
committerchunmei liu <chunmei.liu@ibm.com>
Tue, 15 Jul 2025 02:58:30 +0000 (19:58 -0700)
Signed-off-by: chunmei liu <chunmei.liu@ibm.com>
src/crimson/osd/replicated_recovery_backend.cc

index 71a1deef575be8188e616eb235de2f42f1994a69..998ec8fe128779cb72c0fb5261c6a114abe486b9 100644 (file)
@@ -716,42 +716,43 @@ ReplicatedRecoveryBackend::read_omap_for_push_op(
     PushOp* push_op)
 {
   if (progress.omap_complete) {
-    return seastar::make_ready_future<>();
+    co_return;
   }
-  return seastar::repeat([&new_progress, &max_len, push_op, &oid, this] {
-    return shard_services.get_store().omap_get_values(
-      coll, ghobject_t{oid}, nullopt_if_empty(new_progress.omap_recovered_to)
-    ).safe_then([&new_progress, &max_len, push_op](const auto& ret) {
-      const auto& [done, kvs] = ret;
-      bool stop = done;
-      // assuming "values.empty() only if done" holds here!
-      for (const auto& [key, value] : kvs) {
-        if (is_too_many_entries_per_chunk(push_op)) {
-          stop = true;
-          break;
-        }
-        if (const uint64_t entry_size = key.size() + value.length();
-            entry_size > max_len) {
-          stop = true;
-          break;
-        } else {
-          max_len -= std::min(max_len, entry_size);
-        }
-        push_op->omap_entries.emplace(key, value);
-      }
-      if (!push_op->omap_entries.empty()) {
-        // we iterate in order
-        new_progress.omap_recovered_to = std::rbegin(push_op->omap_entries)->first;
-      }
-      if (done) {
+
+  ObjectStore::omap_iter_seek_t start_from;
+  start_from.seek_position = new_progress.omap_recovered_to.empty() ?
+                             std::string{} : new_progress.omap_recovered_to;
+  start_from.seek_type = ObjectStore::omap_iter_seek_t::UPPER_BOUND;
+
+  std::function<ObjectStore::omap_iter_ret_t(std::string_view, std::string_view)> callback =
+    [&max_len, push_op] (std::string_view key, std::string_view value)
+  {
+    uint64_t entry_size = key.size() + value.length();
+    if (is_too_many_entries_per_chunk(push_op) || entry_size > max_len) {
+      return ObjectStore::omap_iter_ret_t::STOP;
+    }
+
+    max_len -= std::min(max_len, entry_size);
+    ceph::bufferlist bl;
+    bl.append(value);
+    push_op->omap_entries.emplace(key, bl);
+    return ObjectStore::omap_iter_ret_t::NEXT;
+  };
+
+  co_await interruptor::make_interruptible(
+    shard_services.get_store().omap_iterate(
+      coll, ghobject_t{oid}, start_from, callback
+    ).safe_then([&new_progress](auto ret) {
+      if (ret == ObjectStore::omap_iter_ret_t::NEXT) {
         new_progress.omap_complete = true;
+      } else {
+        new_progress.omap_complete = false;
       }
-      return seastar::make_ready_future<seastar::stop_iteration>(
-        stop ? seastar::stop_iteration::yes : seastar::stop_iteration::no
-      );
-    }, crimson::os::FuturizedStore::Shard::read_errorator::assert_all(fmt::format(
-         "{} ReplicatedRecoveryBackend::read_omap_for_push_op error with {}", pg, oid).c_str()));
-  });
+    }).handle_error(
+      crimson::os::FuturizedStore::Shard::read_errorator::assert_all(fmt::format(
+        "{} ReplicatedRecoveryBackend::read_omap_for_push_op error with {}", pg, oid).c_str())
+    )
+  );
 }
 
 std::vector<pg_shard_t>