]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/osd: implement PG merge detection and orchestration in PGAdvanceMap
authorAishwarya Mathuria <amathuri@redhat.com>
Fri, 9 Jan 2026 10:48:16 +0000 (10:48 +0000)
committerAishwarya Mathuria <amathuri@redhat.com>
Fri, 16 Jan 2026 12:09:09 +0000 (17:39 +0530)
Integrate PG merge logic into the map advancement pipeline by implementing
helpers within PGAdvanceMap.

- check_for_merges(): Monitors pg_num changes between epochs to identify
  pending merges for the current PG.
- merge_pg(): Orchestrates the merge based on the PG's role. For sources,
  it stops the PG and registers it with the target shard. For targets,
  it waits for all sources to arrive and executes the merge.
- start(): Updated the main loop with a shared stop_flag and exception-based
  early exit to halt map advancement immediately once a merge is triggered.

Signed-off-by: Aishwarya Mathuria <amathuri@redhat.com>
src/crimson/osd/osd_operations/pg_advance_map.cc
src/crimson/osd/osd_operations/pg_advance_map.h

index 79426060ae3c2be7c8578160032d2848f055e6e7..7555d8123a4abee64fcbc76537cdbb528715e890 100644 (file)
@@ -91,24 +91,36 @@ seastar::future<> PGAdvanceMap::start()
   }
   ceph_assert(std::cmp_less_equal(*from, to));
 
+  // is_merge ensures we stop advancing maps immediately if a merge occurs.
+  // In merge_pg(), we take care of continuing advancing map for the target (parent).
+  bool is_merge = false;
   for (epoch_t next_epoch = *from + 1; next_epoch <= to; ++next_epoch) {
     DEBUG("{}: start: getting map {}", *this, next_epoch);
     cached_map_t next_map = co_await shard_services.get_map(next_epoch);
+    is_merge = co_await check_for_merges(*from, next_map, rctx);
+    if (is_merge) {
+      DEBUG("{}: stopping advance due to merge", *this);
+      break;
+    }
     DEBUG("{}: advancing map to {}", *this, next_map->get_epoch());
-    
     pg->handle_advance_map(next_map, rctx);
     co_await check_for_splits(*from, next_map);
   }
 
-  pg->handle_activate_map(rctx);
-  DEBUG("{}: map activated", *this);
+  if (is_merge) {
+    // If we merged, the merge_pg logic has already handled the activation
+    // and rctx completion. We exit here.
+    DEBUG("{}: skipping activation because merge is in progress", *this);
+  } else {
+    pg->handle_activate_map(rctx);
+    DEBUG("{}: map activated", *this);
 
-  if (do_init) {
-      shard_services.pg_created(pg->get_pgid(), pg);
-      INFO("PGAdvanceMap::start new pg {}", *pg);
+    if (do_init) {
+        shard_services.pg_created(pg->get_pgid(), pg);
+        INFO("PGAdvanceMap::start new pg {}", *pg);
+    }
+    co_await pg->complete_rctx(std::move(rctx));
   }
-  co_await pg->complete_rctx(std::move(rctx));
-
   DEBUG("{}: complete", *this);
   co_await handle.complete();
   co_return;
@@ -147,7 +159,6 @@ seastar::future<> PGAdvanceMap::check_for_splits(
   co_return;
 }
 
-
 seastar::future<> PGAdvanceMap::split_pg(
     std::set<spg_t> split_children,
     cached_map_t next_map)
@@ -243,5 +254,83 @@ void PGAdvanceMap::split_stats(std::set<Ref<PG>> children_pgs,
   pg->finish_split_stats(*stat_iter, rctx.transaction);
 }
 
+seastar::future<bool> PGAdvanceMap::check_for_merges(
+    epoch_t old_epoch,
+    cached_map_t next_map,
+    PeeringCtx &rctx)
+{
+  LOG_PREFIX(PGAdvanceMap::check_for_merges);
+  using cached_map_t = OSDMapService::cached_map_t;
+  cached_map_t old_map = co_await shard_services.get_map(old_epoch);
+  if (!old_map->have_pg_pool(pg->get_pgid().pool())) {
+    DEBUG("{} pool doesn't exist in epoch {}", pg->get_pgid(),
+       old_epoch);
+    co_return false;
+  }
+  auto old_pg_num = old_map->get_pg_num(pg->get_pgid().pool());
+  if (!next_map->have_pg_pool(pg->get_pgid().pool())) {
+    DEBUG("{} pool doesn't exist in epoch {}", pg->get_pgid(),
+        next_map->get_epoch());
+    co_return false;
+  }
+  auto new_pg_num = next_map->get_pg_num(pg->get_pgid().pool());
+  DEBUG("{} pg_num change in e{} {} -> {}", pg->get_pgid(), next_map->get_epoch(),
+                 old_pg_num, new_pg_num);
+  if (new_pg_num && new_pg_num < old_pg_num) {
+      bool merged = co_await merge_pg(next_map, new_pg_num, old_pg_num, rctx);
+      co_return merged;
+  }
+  co_return false;
+}
+
+seastar::future<bool> PGAdvanceMap::merge_pg(
+    cached_map_t next_map,
+    unsigned new_pg_num,
+    unsigned old_pg_num,
+    PeeringCtx &rctx)
+{
+  LOG_PREFIX(PGAdvanceMap::merge_pg);
+  DEBUG("{}: start", *this);
+  spg_t parent;
+  std::map<spg_t, Ref<PG>> source_pgs;
+  std::set<spg_t> merge_sources;
+  // The PG is a Source (to be merged into a parent)
+  if (pg->pgid.is_merge_source(old_pg_num,
+                              new_pg_num,
+                              &parent)) {
+    parent.is_split(new_pg_num, old_pg_num, &merge_sources);
+    DEBUG("Source pg {} shutdown to avoid any operations", pg->get_pgid());
+    co_await pg->stop();
+    DEBUG("pg {} is a merge source, register it", pg->get_pgid());
+    co_await shard_services.register_merge_source(parent, pg->get_pgid(),
+                                                 merge_sources.size());
+    co_return true;
+  // The PG is a Target
+  } else if (pg->pgid.is_merge_target(old_pg_num,
+                                     new_pg_num)) {
+    DEBUG("Target PG {} identified. Waiting for sources...", pg->get_pgid());
+    pg->pgid.is_split(new_pg_num, old_pg_num, &merge_sources);
+    
+    // Block until all source PGs (potentially from other shards) arrive
+    auto sources = co_await shard_services.wait_for_merge_sources(pg->get_pgid(), std::move(merge_sources));
+    
+    // Perform merge
+    unsigned split_bits = pg->get_pgid().get_split_bits(new_pg_num);
+    const auto& merge_meta = next_map->get_pg_pool(pg->get_pgid().pool())->last_pg_merge_meta;
+    pg->merge_from(sources, rctx, split_bits, merge_meta);
+
+    // Update PeeringState and activate the new map
+    pg->handle_advance_map(next_map, rctx);
+    pg->handle_activate_map(rctx); 
+
+    co_await pg->complete_rctx(std::move(rctx));
+    // Return sources to their home shards for destruction
+    co_await shard_services.perform_source_cleanup(pg->get_pgid());
+
+    co_return true;
+  } else {
+    co_return false;
+  }
+}
 
 }
index 67c56e62d2d3ede846d8082beaede6fd39020da8..4d5e97246331a6840bc45948f953e6d3a80591bf 100644 (file)
@@ -55,8 +55,15 @@ public:
   using cached_map_t = OSDMapService::cached_map_t;
   seastar::future<> check_for_splits(epoch_t old_epoch,
                                      cached_map_t next_map);
+  seastar::future<bool> check_for_merges(epoch_t old_epoch,
+                                         cached_map_t next_map, 
+                                        PeeringCtx &rctx);
   seastar::future<> split_pg(std::set<spg_t> split_children,
                              cached_map_t next_map);
+  seastar::future<bool> merge_pg(cached_map_t next_map,
+                            unsigned new_pg_num,
+                            unsigned old_pg_num,
+                            PeeringCtx &rctx);
   void split_stats(std::set<Ref<PG>> child_pgs,
                   const std::set<spg_t> &child_pgids);