From: myoungwon oh Date: Sat, 28 Feb 2026 04:38:16 +0000 (+0900) Subject: crimson/os/seastore, osd/PGLog: handle omap_iterate retry to avoid duplicate entries X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=5dbd9ae51741bde2abfb4eaad5cc80dd64edd950;p=ceph.git crimson/os/seastore, osd/PGLog: handle omap_iterate retry to avoid duplicate entries Seastore omap_iterate may retry internally on conflicts, which can cause PGLog to process the same entries multiple times when entries are handled directly in the iteration callback. Introduce a conflict hook in omap_iterate so callers can reset iteration state on retry. PGLog now buffers entries during iteration and applies process_entry() only after a successful pass, clearing the buffer on retry to avoid duplicates. Signed-off-by: Myoungwon Oh --- diff --git a/src/crimson/os/alienstore/alien_store.cc b/src/crimson/os/alienstore/alien_store.cc index 59f20964dfb..978fa111ba1 100644 --- a/src/crimson/os/alienstore/alien_store.cc +++ b/src/crimson/os/alienstore/alien_store.cc @@ -431,7 +431,8 @@ AlienStore::omap_iterate(CollectionRef ch, const ghobject_t &oid, ObjectStore::omap_iter_seek_t start_from, omap_iterate_cb_t callback, - uint32_t op_flags) + uint32_t op_flags, + omap_iterate_conf_t on_conflict) { logger().debug("{} with_start", __func__); assert(tp); diff --git a/src/crimson/os/alienstore/alien_store.h b/src/crimson/os/alienstore/alien_store.h index 1ec6432882b..e712ce74d4e 100644 --- a/src/crimson/os/alienstore/alien_store.h +++ b/src/crimson/os/alienstore/alien_store.h @@ -77,7 +77,8 @@ public: const ghobject_t &oid, ObjectStore::omap_iter_seek_t start_from, omap_iterate_cb_t callback, - uint32_t op_flags = 0) final; + uint32_t op_flags = 0, + omap_iterate_conf_t on_conflict = nullptr) final; seastar::future create_new_collection(const coll_t& cid) final; seastar::future open_collection(const coll_t& cid) final; diff --git a/src/crimson/os/cyanstore/cyan_store.cc b/src/crimson/os/cyanstore/cyan_store.cc index d96cef1e48e..7d5cfa46f5d 100644 --- a/src/crimson/os/cyanstore/cyan_store.cc +++ b/src/crimson/os/cyanstore/cyan_store.cc @@ -517,7 +517,8 @@ auto CyanStore::Shard::omap_iterate( const ghobject_t &oid, ObjectStore::omap_iter_seek_t start_from, omap_iterate_cb_t callback, - uint32_t op_flags) + uint32_t op_flags, + omap_iterate_conf_t on_conflict) -> CyanStore::Shard::read_errorator::future { assert(store_active); diff --git a/src/crimson/os/cyanstore/cyan_store.h b/src/crimson/os/cyanstore/cyan_store.h index 4d993d2f9d1..53ea2576638 100644 --- a/src/crimson/os/cyanstore/cyan_store.h +++ b/src/crimson/os/cyanstore/cyan_store.h @@ -79,7 +79,8 @@ public: const ghobject_t &oid, ObjectStore::omap_iter_seek_t start_from, omap_iterate_cb_t callback, - uint32_t op_flags = 0 + uint32_t op_flags = 0, + omap_iterate_conf_t on_conflict = nullptr ) final; get_attr_errorator::future omap_get_header( diff --git a/src/crimson/os/futurized_store.h b/src/crimson/os/futurized_store.h index dec0ffe356d..3dccd9509fd 100644 --- a/src/crimson/os/futurized_store.h +++ b/src/crimson/os/futurized_store.h @@ -125,6 +125,7 @@ public: * omap_iter_ret_t::NEXT means omap_iterate() reaches the end of omap tree */ using omap_iterate_cb_t = std::function; + using omap_iterate_conf_t = std::function; virtual read_errorator::future omap_iterate( CollectionRef c, ///< [in] collection const ghobject_t &oid, ///< [in] object @@ -132,7 +133,8 @@ public: omap_iterate_cb_t callback, ///< [in] the callback function for each OMAP entry after start_from till end of the OMAP or /// till the iteration is stopped by `STOP`. - uint32_t op_flags = 0 + uint32_t op_flags = 0, + omap_iterate_conf_t on_conflict = nullptr ) = 0; virtual get_attr_errorator::future omap_get_header( diff --git a/src/crimson/os/seastore/seastore.cc b/src/crimson/os/seastore/seastore.cc index c84583fc07e..773d7f7153c 100644 --- a/src/crimson/os/seastore/seastore.cc +++ b/src/crimson/os/seastore/seastore.cc @@ -1457,14 +1457,17 @@ SeaStore::Shard::omap_iterate( const ghobject_t &oid, ObjectStore::omap_iter_seek_t start_from, omap_iterate_cb_t callback, - uint32_t op_flags) + uint32_t op_flags, + omap_iterate_conf_t on_conflict) { assert(store_active); ++(shard_stats.read_num); ++(shard_stats.pending_read_num); return seastar::do_with( std::move(start_from), - [this, ch, &oid, callback, op_flags] (auto &start_from) + uint32_t(0), + [this, ch, &oid, callback, op_flags, on_conflict] ( + auto &start_from, auto &conflict_counter) { return repeat_with_onode( ch, @@ -1473,8 +1476,17 @@ SeaStore::Shard::omap_iterate( "omap_iterate", op_type_t::OMAP_ITERATE, op_flags, - [this, &start_from, callback](auto &t, auto &onode) + [this, &start_from, callback, on_conflict, &conflict_counter](auto &t, auto &onode) { + ceph_assert(conflict_counter < std::numeric_limits::max()); + conflict_counter++; + if (conflict_counter > 1 && on_conflict) { + // This means conflict occurs + auto ret = on_conflict(); + if (ret == ObjectStore::omap_iter_ret_t::STOP) { + return base_iertr::make_ready_future(ret); + } + } auto root = select_log_omap_root(onode); return omaptree_iterate( t, std::move(root), start_from, callback); diff --git a/src/crimson/os/seastore/seastore.h b/src/crimson/os/seastore/seastore.h index afeab407091..cc3f5aa3533 100644 --- a/src/crimson/os/seastore/seastore.h +++ b/src/crimson/os/seastore/seastore.h @@ -144,7 +144,8 @@ public: const ghobject_t &oid, ObjectStore::omap_iter_seek_t start_from, omap_iterate_cb_t callback, - uint32_t op_flags = 0) final; + uint32_t op_flags = 0, + omap_iterate_conf_t on_conflict = nullptr) final; get_attr_errorator::future omap_get_header( CollectionRef c, diff --git a/src/crimson/osd/osd_operations/scrub_events.cc b/src/crimson/osd/osd_operations/scrub_events.cc index 14627fef2b8..2403c3b9616 100644 --- a/src/crimson/osd/osd_operations/scrub_events.cc +++ b/src/crimson/osd/osd_operations/scrub_events.cc @@ -333,7 +333,8 @@ ScrubScan::ifut<> ScrubScan::deep_scan_object( obj, start_from, callback, - 0 + 0, + nullptr ).safe_then([FNAME, this, &obj, &progress, &entry, &pg](auto result) { assert(result == ObjectStore::omap_iter_ret_t::NEXT); DEBUGDPP("op: {}, obj: {}, progress: {} omap done", diff --git a/src/crimson/osd/pg_backend.cc b/src/crimson/osd/pg_backend.cc index 8181f26383f..7f0e5a59293 100644 --- a/src/crimson/osd/pg_backend.cc +++ b/src/crimson/osd/pg_backend.cc @@ -1347,7 +1347,7 @@ maybe_do_omap_iterate( { if (oi.is_omap()) { return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::omap_iterate>( - store, coll, ghobject_t{oi.soid}, start_from, callback, 0); + store, coll, ghobject_t{oi.soid}, start_from, callback, 0, nullptr); } else { return crimson::ct_error::enodata::make(); } diff --git a/src/crimson/osd/replicated_recovery_backend.cc b/src/crimson/osd/replicated_recovery_backend.cc index cb9ee87665f..00f1578cb29 100644 --- a/src/crimson/osd/replicated_recovery_backend.cc +++ b/src/crimson/osd/replicated_recovery_backend.cc @@ -758,7 +758,7 @@ ReplicatedRecoveryBackend::read_omap_for_push_op( co_await interruptor::make_interruptible( crimson::os::with_store<&crimson::os::FuturizedStore::Shard::omap_iterate>( shard_services.get_store(pg.get_store_index()), - coll, ghobject_t{oid}, start_from, callback, 0 + coll, ghobject_t{oid}, start_from, callback, 0, nullptr ).safe_then([&new_progress](auto ret) { if (ret == ObjectStore::omap_iter_ret_t::NEXT) { new_progress.omap_complete = true; diff --git a/src/osd/PGLog.cc b/src/osd/PGLog.cc index 3e5a8dfea81..9c745dff964 100644 --- a/src/osd/PGLog.cc +++ b/src/osd/PGLog.cc @@ -1179,24 +1179,35 @@ namespace { ObjectStore::omap_iter_seek_t start_from{"", ObjectStore::omap_iter_seek_t::UPPER_BOUND}; + std::map kvs; std::function callback = - [this] (std::string_view key, std::string_view value) + [&kvs] (std::string_view key, std::string_view value) { - ceph::bufferlist bl; - bl.append(value); - process_entry(key, bl); + ceph::bufferlist bl; + bl.append(value); + kvs[std::string(key)] = std::move(bl); return ObjectStore::omap_iter_ret_t::NEXT; }; + std::function on_conflict = + [&kvs] () + { + kvs.clear(); + return ObjectStore::omap_iter_ret_t::NEXT; + }; co_await crimson::os::with_store<&crimson::os::FuturizedStore::Shard::omap_iterate>( - store, - ch, pgmeta_oid, start_from, callback, 0 + store, + ch, pgmeta_oid, start_from, callback, 0, on_conflict ).safe_then([] (auto ret) { ceph_assert (ret == ObjectStore::omap_iter_ret_t::NEXT); }).handle_error( crimson::os::FuturizedStore::Shard::read_errorator::assert_all{} ); + for (auto &p : kvs) { + process_entry(p.first, p.second); + } + if (info.pgid.is_no_shard()) { // replicated pool pg does not persist this key ceph_assert(on_disk_rollback_info_trimmed_to == eversion_t()); diff --git a/src/osd/SnapMapper.cc b/src/osd/SnapMapper.cc index a865a142eae..df2da64b3e4 100644 --- a/src/osd/SnapMapper.cc +++ b/src/osd/SnapMapper.cc @@ -160,7 +160,7 @@ int OSDriver::get_next( }; return interruptor::green_get( crimson::os::with_store<&crimson::os::FuturizedStore::Shard::omap_iterate>( - os, ch, hoid, start_from, callback, 0 + os, ch, hoid, start_from, callback, 0, nullptr ).safe_then([FNAME, key] (auto ret) { if (ret == ObjectStore::omap_iter_ret_t::NEXT) { DEBUG("key {} no more values", key);