From: Aishwarya Mathuria Date: Fri, 9 Jan 2026 10:48:16 +0000 (+0000) Subject: crimson/osd: implement PG merge detection and orchestration in PGAdvanceMap X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=7c033ce1990937a850d3a8f07b063bd7da198cea;p=ceph.git crimson/osd: implement PG merge detection and orchestration in PGAdvanceMap Integrate PG merge handling into the map advancement pipeline. When pg_num shrinks between epochs, check_for_merges() returns a merge_result_t describing whether this PG is a merge source, target, or not involved. start() stops advancing through later epochs once a merge is detected, then either finish_merge_advance() or the normal activate path runs so complete_rctx() always happens in one place. - check_for_merges(): detect pg_num shrink and dispatch merge_pg(). - merge_pg(): merge-only work — Seastore eligibility, source handoff setup, target rendezvous collection and PG::merge_from(). - finish_merge_advance(): commit rctx and complete the role-specific steps (source: complete_rctx, stop, register_merge_source; target: handle_advance_map, handle_activate_map, complete_rctx). Signed-off-by: Aishwarya Mathuria --- diff --git a/src/crimson/osd/osd_operations/pg_advance_map.cc b/src/crimson/osd/osd_operations/pg_advance_map.cc index d4cc12847e2..ff412adc5db 100644 --- a/src/crimson/osd/osd_operations/pg_advance_map.cc +++ b/src/crimson/osd/osd_operations/pg_advance_map.cc @@ -82,20 +82,39 @@ seastar::future<> PGAdvanceMap::start() pg->handle_initialize(rctx); pg->handle_activate_map(rctx); } - for (epoch_t next_epoch = from + 1; next_epoch <= to; next_epoch++) { - DEBUG("{}: start: getting map {}", - *this, next_epoch); + ceph_assert(std::cmp_less_equal(from, to)); + + for (epoch_t current_epoch = from; current_epoch < to; ++current_epoch) { + epoch_t next_epoch = current_epoch + 1; + DEBUG("{}: start: getting map {}", *this, next_epoch); cached_map_t next_map = co_await shard_services.get_map(next_epoch); - DEBUG("{}: advancing map to {}", - *this, next_map->get_epoch()); - // Use the current OSDMap epoch to check for splits consecutively. - epoch_t current_epoch = pg->get_osdmap_epoch(); + merge_result_t merge_result = + co_await check_for_merges(current_epoch, next_map, rctx); + if (merge_result.role == merge_role_t::Source) { + // Merge source: hand the PG off to the target and finish. The PG + // stops existing locally, so there is nothing left to advance here. + DEBUG("{}: merge source, handing off after e{}", *this, current_epoch); + co_await finish_merge_source(merge_result.parent, rctx); + co_await handle.complete(); + exit_handle.cancel(); + co_return; + } + // A successful merge target (merge already applied by check_for_merges) + // and the no-merge case both advance this epoch and keep iterating, so + // the PG advances all the way to `to`, matching the classic OSD instead + // of stalling at an intermediate epoch. + DEBUG("{}: advancing map to {}", *this, next_map->get_epoch()); + // Baseline epoch for split detection: PG map epoch before advancing to next_map. + epoch_t split_epoch = pg->get_osdmap_epoch(); pg->handle_advance_map(next_map, rctx); DEBUG("{}: checking for splits between {} and {}", - *this, current_epoch, next_map->get_epoch()); - co_await check_for_splits(current_epoch, std::move(next_map)); + *this, split_epoch, next_map->get_epoch()); + co_await check_for_splits(split_epoch, std::move(next_map)); } + // Normal completion, which also covers a completed merge target: the PG was + // fully advanced to `to` in the loop above, so just activate the final map + // and commit. pg->handle_activate_map(rctx); DEBUG("{}: map activated", *this); if (do_init) { @@ -140,7 +159,6 @@ seastar::future<> PGAdvanceMap::check_for_splits( } } - seastar::future<> PGAdvanceMap::split_pg( std::set split_children, cached_map_t next_map) @@ -236,5 +254,79 @@ void PGAdvanceMap::split_stats(std::set> children_pgs, pg->finish_split_stats(*stat_iter, rctx.transaction); } +seastar::future PGAdvanceMap::check_for_merges( + epoch_t old_epoch, + cached_map_t next_map, + PeeringCtx &rctx) +{ + LOG_PREFIX(PGAdvanceMap::check_for_merges); + using cached_map_t = OSDMapService::cached_map_t; + cached_map_t old_map = co_await shard_services.get_map(old_epoch); + if (!old_map->have_pg_pool(pg->get_pgid().pool())) { + DEBUG("{} pool doesn't exist in epoch {}", pg->get_pgid(), + old_epoch); + co_return merge_result_t{}; + } + auto old_pg_num = old_map->get_pg_num(pg->get_pgid().pool()); + if (!next_map->have_pg_pool(pg->get_pgid().pool())) { + DEBUG("{} pool doesn't exist in epoch {}", pg->get_pgid(), + next_map->get_epoch()); + co_return merge_result_t{}; + } + auto new_pg_num = next_map->get_pg_num(pg->get_pgid().pool()); + DEBUG("{} pg_num change in e{} {} -> {}", pg->get_pgid(), next_map->get_epoch(), + old_pg_num, new_pg_num); + if (new_pg_num && new_pg_num < old_pg_num) { + co_return co_await merge_pg(next_map, new_pg_num, old_pg_num, rctx); + } + co_return merge_result_t{}; +} + +seastar::future PGAdvanceMap::merge_pg( + cached_map_t next_map, + unsigned new_pg_num, + unsigned old_pg_num, + PeeringCtx &rctx) +{ + LOG_PREFIX(PGAdvanceMap::merge_pg); + DEBUG("{}: start", *this); + spg_t parent; + std::set merge_sources; + if (pg->pgid.is_merge_source(old_pg_num, + new_pg_num, + &parent)) { + co_return merge_result_t{merge_role_t::Source, parent}; + } else if (pg->pgid.is_merge_target(old_pg_num, + new_pg_num)) { + DEBUG("Target PG {} identified. Waiting for sources...", pg->get_pgid()); + pg->pgid.is_split(new_pg_num, old_pg_num, &merge_sources); + // Block until all source PGs (potentially from other shards) arrive + // on this PG's rendezvous + auto sources = co_await pg->collect_merge_sources(merge_sources.size()); + if (sources.empty()) { + co_return merge_result_t{}; + } + + unsigned split_bits = pg->get_pgid().get_split_bits(new_pg_num); + const auto& merge_meta = + next_map->get_pg_pool(pg->get_pgid().pool())->last_pg_merge_meta; + pg->merge_from(sources, rctx, split_bits, merge_meta); + + co_return merge_result_t{merge_role_t::Target}; + } else { + co_return merge_result_t{}; + } +} + +seastar::future<> PGAdvanceMap::finish_merge_source( + spg_t parent, + PeeringCtx &rctx) +{ + LOG_PREFIX(PGAdvanceMap::finish_merge_source); + DEBUG("{}: committing rctx before handoff to target {}", *this, parent); + co_await pg->complete_rctx(std::move(rctx)); + co_await pg->stop(); + co_await shard_services.register_merge_source(parent, pg->get_pgid()); +} } diff --git a/src/crimson/osd/osd_operations/pg_advance_map.h b/src/crimson/osd/osd_operations/pg_advance_map.h index 6dbb4a974c2..07e625e69a4 100644 --- a/src/crimson/osd/osd_operations/pg_advance_map.h +++ b/src/crimson/osd/osd_operations/pg_advance_map.h @@ -52,8 +52,23 @@ public: PipelineHandle &get_handle() { return handle; } using cached_map_t = OSDMapService::cached_map_t; + + enum class merge_role_t { + None, + Source, + Target, + }; + + struct merge_result_t { + merge_role_t role = merge_role_t::None; + spg_t parent; + }; + seastar::future<> check_for_splits(epoch_t old_epoch, cached_map_t next_map); + seastar::future check_for_merges(epoch_t old_epoch, + cached_map_t next_map, + PeeringCtx &rctx); seastar::future<> split_pg(std::set split_children, cached_map_t next_map); void split_stats(std::set> child_pgs, @@ -69,10 +84,17 @@ public: } private: + seastar::future merge_pg(cached_map_t next_map, + unsigned new_pg_num, + unsigned old_pg_num, + PeeringCtx &rctx); PGPeeringPipeline &peering_pp(PG &pg); seastar::future> handle_split_pg_creation( spg_t child_pgid, cached_map_t next_map); + seastar::future<> finish_merge_source( + spg_t parent, + PeeringCtx &rctx); }; }