pg->handle_initialize(rctx);
pg->handle_activate_map(rctx);
}
- for (epoch_t next_epoch = from + 1; next_epoch <= to; next_epoch++) {
- DEBUG("{}: start: getting map {}",
- *this, next_epoch);
+ ceph_assert(std::cmp_less_equal(from, to));
+
+ for (epoch_t current_epoch = from; current_epoch < to; ++current_epoch) {
+ epoch_t next_epoch = current_epoch + 1;
+ DEBUG("{}: start: getting map {}", *this, next_epoch);
cached_map_t next_map = co_await shard_services.get_map(next_epoch);
- DEBUG("{}: advancing map to {}",
- *this, next_map->get_epoch());
- // Use the current OSDMap epoch to check for splits consecutively.
- epoch_t current_epoch = pg->get_osdmap_epoch();
+ merge_result_t merge_result =
+ co_await check_for_merges(current_epoch, next_map, rctx);
+ if (merge_result.role == merge_role_t::Source) {
+ // Merge source: hand the PG off to the target and finish. The PG
+ // stops existing locally, so there is nothing left to advance here.
+ DEBUG("{}: merge source, handing off after e{}", *this, current_epoch);
+ co_await finish_merge_source(merge_result.parent, rctx);
+ co_await handle.complete();
+ exit_handle.cancel();
+ co_return;
+ }
+ // A successful merge target (merge already applied by check_for_merges)
+ // and the no-merge case both advance this epoch and keep iterating, so
+ // the PG advances all the way to `to`, matching the classic OSD instead
+ // of stalling at an intermediate epoch.
+ DEBUG("{}: advancing map to {}", *this, next_map->get_epoch());
+ // Baseline epoch for split detection: PG map epoch before advancing to next_map.
+ epoch_t split_epoch = pg->get_osdmap_epoch();
pg->handle_advance_map(next_map, rctx);
DEBUG("{}: checking for splits between {} and {}",
- *this, current_epoch, next_map->get_epoch());
- co_await check_for_splits(current_epoch, std::move(next_map));
+ *this, split_epoch, next_map->get_epoch());
+ co_await check_for_splits(split_epoch, std::move(next_map));
}
+ // Normal completion, which also covers a completed merge target: the PG was
+ // fully advanced to `to` in the loop above, so just activate the final map
+ // and commit.
pg->handle_activate_map(rctx);
DEBUG("{}: map activated", *this);
if (do_init) {
}
}
-
seastar::future<> PGAdvanceMap::split_pg(
std::set<spg_t> split_children,
cached_map_t next_map)
pg->finish_split_stats(*stat_iter, rctx.transaction);
}
+seastar::future<PGAdvanceMap::merge_result_t> PGAdvanceMap::check_for_merges(
+ epoch_t old_epoch,
+ cached_map_t next_map,
+ PeeringCtx &rctx)
+{
+ LOG_PREFIX(PGAdvanceMap::check_for_merges);
+ using cached_map_t = OSDMapService::cached_map_t;
+ cached_map_t old_map = co_await shard_services.get_map(old_epoch);
+ if (!old_map->have_pg_pool(pg->get_pgid().pool())) {
+ DEBUG("{} pool doesn't exist in epoch {}", pg->get_pgid(),
+ old_epoch);
+ co_return merge_result_t{};
+ }
+ auto old_pg_num = old_map->get_pg_num(pg->get_pgid().pool());
+ if (!next_map->have_pg_pool(pg->get_pgid().pool())) {
+ DEBUG("{} pool doesn't exist in epoch {}", pg->get_pgid(),
+ next_map->get_epoch());
+ co_return merge_result_t{};
+ }
+ auto new_pg_num = next_map->get_pg_num(pg->get_pgid().pool());
+ DEBUG("{} pg_num change in e{} {} -> {}", pg->get_pgid(), next_map->get_epoch(),
+ old_pg_num, new_pg_num);
+ if (new_pg_num && new_pg_num < old_pg_num) {
+ co_return co_await merge_pg(next_map, new_pg_num, old_pg_num, rctx);
+ }
+ co_return merge_result_t{};
+}
+
+seastar::future<PGAdvanceMap::merge_result_t> PGAdvanceMap::merge_pg(
+ cached_map_t next_map,
+ unsigned new_pg_num,
+ unsigned old_pg_num,
+ PeeringCtx &rctx)
+{
+ LOG_PREFIX(PGAdvanceMap::merge_pg);
+ DEBUG("{}: start", *this);
+ spg_t parent;
+ std::set<spg_t> merge_sources;
+ if (pg->pgid.is_merge_source(old_pg_num,
+ new_pg_num,
+ &parent)) {
+ co_return merge_result_t{merge_role_t::Source, parent};
+ } else if (pg->pgid.is_merge_target(old_pg_num,
+ new_pg_num)) {
+ DEBUG("Target PG {} identified. Waiting for sources...", pg->get_pgid());
+ pg->pgid.is_split(new_pg_num, old_pg_num, &merge_sources);
+ // Block until all source PGs (potentially from other shards) arrive
+ // on this PG's rendezvous
+ auto sources = co_await pg->collect_merge_sources(merge_sources.size());
+ if (sources.empty()) {
+ co_return merge_result_t{};
+ }
+
+ unsigned split_bits = pg->get_pgid().get_split_bits(new_pg_num);
+ const auto& merge_meta =
+ next_map->get_pg_pool(pg->get_pgid().pool())->last_pg_merge_meta;
+ pg->merge_from(sources, rctx, split_bits, merge_meta);
+
+ co_return merge_result_t{merge_role_t::Target};
+ } else {
+ co_return merge_result_t{};
+ }
+}
+
+seastar::future<> PGAdvanceMap::finish_merge_source(
+ spg_t parent,
+ PeeringCtx &rctx)
+{
+ LOG_PREFIX(PGAdvanceMap::finish_merge_source);
+ DEBUG("{}: committing rctx before handoff to target {}", *this, parent);
+ co_await pg->complete_rctx(std::move(rctx));
+ co_await pg->stop();
+ co_await shard_services.register_merge_source(parent, pg->get_pgid());
+}
}