]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd: implement PG merge detection and orchestration in PGAdvanceMap
authorAishwarya Mathuria <amathuri@redhat.com>
Fri, 9 Jan 2026 10:48:16 +0000 (10:48 +0000)
committerAishwarya Mathuria <amathuri@redhat.com>
Thu, 11 Jun 2026 04:49:14 +0000 (10:19 +0530)
Integrate PG merge handling into the map advancement pipeline.

When pg_num shrinks between epochs, check_for_merges() returns a
merge_result_t describing whether this PG is a merge source, target, or
not involved. start() stops advancing through later epochs once a merge
is detected, then either finish_merge_advance() or the normal activate
path runs so complete_rctx() always happens in one place.

- check_for_merges(): detect pg_num shrink and dispatch merge_pg().
- merge_pg(): merge-only work — Seastore eligibility, source handoff
  setup, target rendezvous collection and PG::merge_from().
- finish_merge_advance(): commit rctx and complete the role-specific
  steps (source: complete_rctx, stop, register_merge_source; target:
  handle_advance_map, handle_activate_map, complete_rctx).

Signed-off-by: Aishwarya Mathuria <amathuri@redhat.com>
src/crimson/osd/osd_operations/pg_advance_map.cc
src/crimson/osd/osd_operations/pg_advance_map.h

index d4cc12847e2a9bae7d2127e8806bb1e8bc49d93f..ff412adc5db1f0a4372bb3fbaf4f5f71ac6a53ae 100644 (file)
@@ -82,20 +82,39 @@ seastar::future<> PGAdvanceMap::start()
     pg->handle_initialize(rctx);
     pg->handle_activate_map(rctx);
   }
-  for (epoch_t next_epoch = from + 1; next_epoch <= to; next_epoch++) {
-    DEBUG("{}: start: getting map {}",
-          *this, next_epoch);
+  ceph_assert(std::cmp_less_equal(from, to));
+
+  for (epoch_t current_epoch = from; current_epoch < to; ++current_epoch) {
+    epoch_t next_epoch = current_epoch + 1;
+    DEBUG("{}: start: getting map {}", *this, next_epoch);
     cached_map_t next_map = co_await shard_services.get_map(next_epoch);
-    DEBUG("{}: advancing map to {}",
-          *this, next_map->get_epoch());
-    // Use the current OSDMap epoch to check for splits consecutively.
-    epoch_t current_epoch = pg->get_osdmap_epoch();
+    merge_result_t merge_result =
+      co_await check_for_merges(current_epoch, next_map, rctx);
+    if (merge_result.role == merge_role_t::Source) {
+      // Merge source: hand the PG off to the target and finish.  The PG
+      // stops existing locally, so there is nothing left to advance here.
+      DEBUG("{}: merge source, handing off after e{}", *this, current_epoch);
+      co_await finish_merge_source(merge_result.parent, rctx);
+      co_await handle.complete();
+      exit_handle.cancel();
+      co_return;
+    }
+    // A successful merge target (merge already applied by check_for_merges)
+    // and the no-merge case both advance this epoch and keep iterating, so
+    // the PG advances all the way to `to`, matching the classic OSD instead
+    // of stalling at an intermediate epoch.
+    DEBUG("{}: advancing map to {}", *this, next_map->get_epoch());
+    // Baseline epoch for split detection: PG map epoch before advancing to next_map.
+    epoch_t split_epoch = pg->get_osdmap_epoch();
     pg->handle_advance_map(next_map, rctx);
     DEBUG("{}: checking for splits between {} and {}",
-          *this, current_epoch, next_map->get_epoch());
-    co_await check_for_splits(current_epoch, std::move(next_map));
+          *this, split_epoch, next_map->get_epoch());
+    co_await check_for_splits(split_epoch, std::move(next_map));
   }
 
+  // Normal completion, which also covers a completed merge target: the PG was
+  // fully advanced to `to` in the loop above, so just activate the final map
+  // and commit.
   pg->handle_activate_map(rctx);
   DEBUG("{}: map activated", *this);
   if (do_init) {
@@ -140,7 +159,6 @@ seastar::future<> PGAdvanceMap::check_for_splits(
   }
 }
 
-
 seastar::future<> PGAdvanceMap::split_pg(
     std::set<spg_t> split_children,
     cached_map_t next_map)
@@ -236,5 +254,79 @@ void PGAdvanceMap::split_stats(std::set<Ref<PG>> children_pgs,
   pg->finish_split_stats(*stat_iter, rctx.transaction);
 }
 
+seastar::future<PGAdvanceMap::merge_result_t> PGAdvanceMap::check_for_merges(
+    epoch_t old_epoch,
+    cached_map_t next_map,
+    PeeringCtx &rctx)
+{
+  LOG_PREFIX(PGAdvanceMap::check_for_merges);
+  using cached_map_t = OSDMapService::cached_map_t;
+  cached_map_t old_map = co_await shard_services.get_map(old_epoch);
+  if (!old_map->have_pg_pool(pg->get_pgid().pool())) {
+    DEBUG("{} pool doesn't exist in epoch {}", pg->get_pgid(),
+       old_epoch);
+    co_return merge_result_t{};
+  }
+  auto old_pg_num = old_map->get_pg_num(pg->get_pgid().pool());
+  if (!next_map->have_pg_pool(pg->get_pgid().pool())) {
+    DEBUG("{} pool doesn't exist in epoch {}", pg->get_pgid(),
+        next_map->get_epoch());
+    co_return merge_result_t{};
+  }
+  auto new_pg_num = next_map->get_pg_num(pg->get_pgid().pool());
+  DEBUG("{} pg_num change in e{} {} -> {}", pg->get_pgid(), next_map->get_epoch(),
+                 old_pg_num, new_pg_num);
+  if (new_pg_num && new_pg_num < old_pg_num) {
+    co_return co_await merge_pg(next_map, new_pg_num, old_pg_num, rctx);
+  }
+  co_return merge_result_t{};
+}
+
+seastar::future<PGAdvanceMap::merge_result_t> PGAdvanceMap::merge_pg(
+    cached_map_t next_map,
+    unsigned new_pg_num,
+    unsigned old_pg_num,
+    PeeringCtx &rctx)
+{
+  LOG_PREFIX(PGAdvanceMap::merge_pg);
+  DEBUG("{}: start", *this);
+  spg_t parent;
+  std::set<spg_t> merge_sources;
+  if (pg->pgid.is_merge_source(old_pg_num,
+                               new_pg_num,
+                               &parent)) {
+    co_return merge_result_t{merge_role_t::Source, parent};
+  } else if (pg->pgid.is_merge_target(old_pg_num,
+                                       new_pg_num)) {
+    DEBUG("Target PG {} identified. Waiting for sources...", pg->get_pgid());
+    pg->pgid.is_split(new_pg_num, old_pg_num, &merge_sources);
+    // Block until all source PGs (potentially from other shards) arrive
+    // on this PG's rendezvous
+    auto sources = co_await pg->collect_merge_sources(merge_sources.size());
+    if (sources.empty()) {
+      co_return merge_result_t{};
+    }
+
+    unsigned split_bits = pg->get_pgid().get_split_bits(new_pg_num);
+    const auto& merge_meta =
+      next_map->get_pg_pool(pg->get_pgid().pool())->last_pg_merge_meta;
+    pg->merge_from(sources, rctx, split_bits, merge_meta);
+
+    co_return merge_result_t{merge_role_t::Target};
+  } else {
+    co_return merge_result_t{};
+  }
+}
+
+seastar::future<> PGAdvanceMap::finish_merge_source(
+    spg_t parent,
+    PeeringCtx &rctx)
+{
+  LOG_PREFIX(PGAdvanceMap::finish_merge_source);
+  DEBUG("{}: committing rctx before handoff to target {}", *this, parent);
+  co_await pg->complete_rctx(std::move(rctx));
+  co_await pg->stop();
+  co_await shard_services.register_merge_source(parent, pg->get_pgid());
+}
 
 }
index 6dbb4a974c2fb592ddb03b569561f4a9330b5091..07e625e69a49eb9b330278ea77481ff5df455c15 100644 (file)
@@ -52,8 +52,23 @@ public:
   PipelineHandle &get_handle() { return handle; }
 
   using cached_map_t = OSDMapService::cached_map_t;
+
+  enum class merge_role_t {
+    None,
+    Source,
+    Target,
+  };
+
+  struct merge_result_t {
+    merge_role_t role = merge_role_t::None;
+    spg_t parent;
+  };
+
   seastar::future<> check_for_splits(epoch_t old_epoch,
                                      cached_map_t next_map);
+  seastar::future<merge_result_t> check_for_merges(epoch_t old_epoch,
+                                                   cached_map_t next_map,
+                                                   PeeringCtx &rctx);
   seastar::future<> split_pg(std::set<spg_t> split_children,
                              cached_map_t next_map);
   void split_stats(std::set<Ref<PG>> child_pgs,
@@ -69,10 +84,17 @@ public:
   }
 
 private:
+  seastar::future<merge_result_t> merge_pg(cached_map_t next_map,
+                                           unsigned new_pg_num,
+                                           unsigned old_pg_num,
+                                           PeeringCtx &rctx);
   PGPeeringPipeline &peering_pp(PG &pg);
   seastar::future<Ref<PG>> handle_split_pg_creation(
     spg_t child_pgid,
     cached_map_t next_map);
+  seastar::future<> finish_merge_source(
+    spg_t parent,
+    PeeringCtx &rctx);
 };
 
 }