]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/os/seastore, osd/PGLog: handle omap_iterate retry to avoid duplicate entries
authormyoungwon oh <ohmyoungwon@gmail.com>
Sat, 28 Feb 2026 04:38:16 +0000 (13:38 +0900)
committermyoungwon oh <ohmyoungwon@gmail.com>
Tue, 17 Mar 2026 08:38:07 +0000 (17:38 +0900)
Seastore omap_iterate may retry internally on conflicts, which can
cause PGLog to process the same entries multiple times when entries
are handled directly in the iteration callback.

Introduce a conflict hook in omap_iterate so callers can reset
iteration state on retry. PGLog now buffers entries during iteration and
applies process_entry() only after a successful pass, clearing the buffer
on retry to avoid duplicates.

Signed-off-by: Myoungwon Oh <ohmyoungwon@gmail.com>
12 files changed:
src/crimson/os/alienstore/alien_store.cc
src/crimson/os/alienstore/alien_store.h
src/crimson/os/cyanstore/cyan_store.cc
src/crimson/os/cyanstore/cyan_store.h
src/crimson/os/futurized_store.h
src/crimson/os/seastore/seastore.cc
src/crimson/os/seastore/seastore.h
src/crimson/osd/osd_operations/scrub_events.cc
src/crimson/osd/pg_backend.cc
src/crimson/osd/replicated_recovery_backend.cc
src/osd/PGLog.cc
src/osd/SnapMapper.cc

index 59f20964dfbb16b80085e8c57e3b9a8c0543e8ee..978fa111ba104b1728770de44378001d540ebe7c 100644 (file)
@@ -431,7 +431,8 @@ AlienStore::omap_iterate(CollectionRef ch,
                          const ghobject_t &oid,
                          ObjectStore::omap_iter_seek_t start_from,
                          omap_iterate_cb_t callback,
-                         uint32_t op_flags)
+                         uint32_t op_flags,
+                        omap_iterate_conf_t on_conflict)
 {
   logger().debug("{} with_start", __func__);
   assert(tp);
index 1ec6432882b981b030d3fe651f9de5c69c5d37f1..e712ce74d4e3e0755437f8b1a0539b64cfb10e71 100644 (file)
@@ -77,7 +77,8 @@ public:
     const ghobject_t &oid,
     ObjectStore::omap_iter_seek_t start_from,
     omap_iterate_cb_t callback,
-    uint32_t op_flags = 0) final;
+    uint32_t op_flags = 0,
+    omap_iterate_conf_t on_conflict = nullptr) final;
 
   seastar::future<CollectionRef> create_new_collection(const coll_t& cid) final;
   seastar::future<CollectionRef> open_collection(const coll_t& cid) final;
index d96cef1e48e79ec979769d732b119f22150b22f5..7d5cfa46f5d77db1b454ded0d2e96a2fc75d45e7 100644 (file)
@@ -517,7 +517,8 @@ auto CyanStore::Shard::omap_iterate(
   const ghobject_t &oid,
   ObjectStore::omap_iter_seek_t start_from,
   omap_iterate_cb_t callback,
-  uint32_t op_flags)
+  uint32_t op_flags,
+  omap_iterate_conf_t on_conflict)
   -> CyanStore::Shard::read_errorator::future<ObjectStore::omap_iter_ret_t>
 {
   assert(store_active);
index 4d993d2f9d1619f4842cd45eac55d96c62c5bbc2..53ea2576638b4ea1a47ce996bb1f41bf1181d73c 100644 (file)
@@ -79,7 +79,8 @@ public:
       const ghobject_t &oid,
       ObjectStore::omap_iter_seek_t start_from,
       omap_iterate_cb_t callback,
-      uint32_t op_flags = 0
+      uint32_t op_flags = 0,
+      omap_iterate_conf_t on_conflict = nullptr
     ) final;
 
     get_attr_errorator::future<ceph::bufferlist> omap_get_header(
index dec0ffe356dae01b9b689553882be502aa8a1b3b..3dccd9509fddba2f25b4a737b552d9036f42cca2 100644 (file)
@@ -125,6 +125,7 @@ public:
      *         omap_iter_ret_t::NEXT means omap_iterate() reaches the end of omap tree
      */
     using omap_iterate_cb_t = std::function<ObjectStore::omap_iter_ret_t(std::string_view, std::string_view)>;
+    using omap_iterate_conf_t = std::function<ObjectStore::omap_iter_ret_t()>;
     virtual read_errorator::future<ObjectStore::omap_iter_ret_t> omap_iterate(
       CollectionRef c,   ///< [in] collection
       const ghobject_t &oid, ///< [in] object
@@ -132,7 +133,8 @@ public:
       omap_iterate_cb_t callback,
       ///< [in] the callback function for each OMAP entry after start_from till end of the OMAP or
       /// till the iteration is stopped by `STOP`.
-      uint32_t op_flags = 0
+      uint32_t op_flags = 0,
+      omap_iterate_conf_t on_conflict = nullptr
       ) = 0;
 
     virtual get_attr_errorator::future<bufferlist> omap_get_header(
index c84583fc07e674599065a6a4f2c2acbc99e1a99a..773d7f7153cd0b7617c1d869c407ff3a8d724938 100644 (file)
@@ -1457,14 +1457,17 @@ SeaStore::Shard::omap_iterate(
   const ghobject_t &oid,
   ObjectStore::omap_iter_seek_t start_from,
   omap_iterate_cb_t callback,
-  uint32_t op_flags)
+  uint32_t op_flags,
+  omap_iterate_conf_t on_conflict)
 {
   assert(store_active);
   ++(shard_stats.read_num);
   ++(shard_stats.pending_read_num);
   return seastar::do_with(
     std::move(start_from),
-    [this, ch, &oid, callback, op_flags] (auto &start_from)
+    uint32_t(0),
+    [this, ch, &oid, callback, op_flags, on_conflict] (
+    auto &start_from, auto &conflict_counter)
   {
     return repeat_with_onode<ObjectStore::omap_iter_ret_t>(
       ch,
@@ -1473,8 +1476,17 @@ SeaStore::Shard::omap_iterate(
       "omap_iterate",
       op_type_t::OMAP_ITERATE,
       op_flags,
-      [this, &start_from, callback](auto &t, auto &onode)
+      [this, &start_from, callback, on_conflict, &conflict_counter](auto &t, auto &onode)
     {
+      ceph_assert(conflict_counter < std::numeric_limits<uint32_t>::max());
+      conflict_counter++;
+      if (conflict_counter > 1 && on_conflict) {
+       // This means conflict occurs
+       auto ret = on_conflict();
+       if (ret == ObjectStore::omap_iter_ret_t::STOP) {
+         return base_iertr::make_ready_future<ObjectStore::omap_iter_ret_t>(ret);
+       }
+      }
       auto root = select_log_omap_root(onode);
       return omaptree_iterate(
         t, std::move(root), start_from, callback);
index afeab407091b53451fc021070f38e07867e114ba..cc3f5aa3533d731e24669d97cb80e1b4e9b84c4d 100644 (file)
@@ -144,7 +144,8 @@ public:
       const ghobject_t &oid,
       ObjectStore::omap_iter_seek_t start_from,
       omap_iterate_cb_t callback,
-      uint32_t op_flags = 0) final;
+      uint32_t op_flags = 0,
+      omap_iterate_conf_t on_conflict = nullptr) final;
 
     get_attr_errorator::future<bufferlist> omap_get_header(
       CollectionRef c,
index 14627fef2b8ae14276ba2ba7fe72997c3b35359b..2403c3b961607eeb64ff8853eaadbbc9069ef359 100644 (file)
@@ -333,7 +333,8 @@ ScrubScan::ifut<> ScrubScan::deep_scan_object(
         obj,
         start_from,
         callback,
-        0
+        0,
+       nullptr
       ).safe_then([FNAME, this, &obj, &progress, &entry, &pg](auto result) {
         assert(result == ObjectStore::omap_iter_ret_t::NEXT);
         DEBUGDPP("op: {}, obj: {}, progress: {} omap done",
index 8181f26383f2ccfa1e8a4c3cbf14961f9e63eb87..7f0e5a59293cecbbadf191605f9c8f30e38630f5 100644 (file)
@@ -1347,7 +1347,7 @@ maybe_do_omap_iterate(
 {
   if (oi.is_omap()) {
     return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::omap_iterate>(
-      store, coll, ghobject_t{oi.soid}, start_from, callback, 0);
+      store, coll, ghobject_t{oi.soid}, start_from, callback, 0, nullptr);
   } else {
     return crimson::ct_error::enodata::make();
   }
index cb9ee87665fd5629bde3ba2949cd841eb8b8167a..00f1578cb29effddb4a3bb5e1539b684f1b43fe0 100644 (file)
@@ -758,7 +758,7 @@ ReplicatedRecoveryBackend::read_omap_for_push_op(
   co_await interruptor::make_interruptible(
     crimson::os::with_store<&crimson::os::FuturizedStore::Shard::omap_iterate>(
       shard_services.get_store(pg.get_store_index()),
-      coll, ghobject_t{oid}, start_from, callback, 0
+      coll, ghobject_t{oid}, start_from, callback, 0, nullptr
     ).safe_then([&new_progress](auto ret) {
       if (ret == ObjectStore::omap_iter_ret_t::NEXT) {
         new_progress.omap_complete = true;
index 3e5a8dfea812073dd421f6af8896a9620d0fee69..9c745dff964c071c804fc7064c66701dff58f73f 100644 (file)
@@ -1179,24 +1179,35 @@ namespace {
 
       ObjectStore::omap_iter_seek_t start_from{"", ObjectStore::omap_iter_seek_t::UPPER_BOUND};
 
+      std::map<std::string, ceph::bufferlist> kvs;
       std::function<ObjectStore::omap_iter_ret_t(std::string_view, std::string_view)> callback =
-        [this] (std::string_view key, std::string_view value)
+        [&kvs] (std::string_view key, std::string_view value)
       {
-        ceph::bufferlist bl;
-        bl.append(value);
-        process_entry(key, bl);
+       ceph::bufferlist bl;
+       bl.append(value);
+       kvs[std::string(key)] = std::move(bl);
         return ObjectStore::omap_iter_ret_t::NEXT;
       };
+      std::function<ObjectStore::omap_iter_ret_t()> on_conflict =
+        [&kvs] ()
+      {
+       kvs.clear();
+       return ObjectStore::omap_iter_ret_t::NEXT;
+      };
 
       co_await crimson::os::with_store<&crimson::os::FuturizedStore::Shard::omap_iterate>(
-        store,
-        ch, pgmeta_oid, start_from, callback, 0
+       store,
+        ch, pgmeta_oid, start_from, callback, 0, on_conflict
       ).safe_then([] (auto ret) {
         ceph_assert (ret == ObjectStore::omap_iter_ret_t::NEXT);
       }).handle_error(
         crimson::os::FuturizedStore::Shard::read_errorator::assert_all{}
       );
 
+      for (auto &p : kvs) {
+        process_entry(p.first, p.second);
+      }
+
       if (info.pgid.is_no_shard()) {
         // replicated pool pg does not persist this key
         ceph_assert(on_disk_rollback_info_trimmed_to == eversion_t());
index a865a142eae8d9e07eb272dfeb39e4df1d847764..df2da64b3e499830d3d4dfc1c4d49d2544ce53a8 100644 (file)
@@ -160,7 +160,7 @@ int OSDriver::get_next(
   };
   return interruptor::green_get(
     crimson::os::with_store<&crimson::os::FuturizedStore::Shard::omap_iterate>(
-      os, ch, hoid, start_from, callback, 0
+      os, ch, hoid, start_from, callback, 0, nullptr
     ).safe_then([FNAME, key] (auto ret) {
       if (ret == ObjectStore::omap_iter_ret_t::NEXT) {
         DEBUG("key {} no more values", key);