}
ceph_assert(std::cmp_less_equal(*from, to));
+ // is_merge ensures we stop advancing maps immediately if a merge occurs.
+ // In merge_pg(), we take care of continuing advancing map for the target (parent).
+ bool is_merge = false;
for (epoch_t next_epoch = *from + 1; next_epoch <= to; ++next_epoch) {
DEBUG("{}: start: getting map {}", *this, next_epoch);
cached_map_t next_map = co_await shard_services.get_map(next_epoch);
+ is_merge = co_await check_for_merges(*from, next_map, rctx);
+ if (is_merge) {
+ DEBUG("{}: stopping advance due to merge", *this);
+ break;
+ }
DEBUG("{}: advancing map to {}", *this, next_map->get_epoch());
-
pg->handle_advance_map(next_map, rctx);
co_await check_for_splits(*from, next_map);
}
- pg->handle_activate_map(rctx);
- DEBUG("{}: map activated", *this);
+ if (is_merge) {
+ // If we merged, the merge_pg logic has already handled the activation
+ // and rctx completion. We exit here.
+ DEBUG("{}: skipping activation because merge is in progress", *this);
+ } else {
+ pg->handle_activate_map(rctx);
+ DEBUG("{}: map activated", *this);
- if (do_init) {
- shard_services.pg_created(pg->get_pgid(), pg);
- INFO("PGAdvanceMap::start new pg {}", *pg);
+ if (do_init) {
+ shard_services.pg_created(pg->get_pgid(), pg);
+ INFO("PGAdvanceMap::start new pg {}", *pg);
+ }
+ co_await pg->complete_rctx(std::move(rctx));
}
- co_await pg->complete_rctx(std::move(rctx));
-
DEBUG("{}: complete", *this);
co_await handle.complete();
co_return;
co_return;
}
-
seastar::future<> PGAdvanceMap::split_pg(
std::set<spg_t> split_children,
cached_map_t next_map)
pg->finish_split_stats(*stat_iter, rctx.transaction);
}
+seastar::future<bool> PGAdvanceMap::check_for_merges(
+ epoch_t old_epoch,
+ cached_map_t next_map,
+ PeeringCtx &rctx)
+{
+ LOG_PREFIX(PGAdvanceMap::check_for_merges);
+ using cached_map_t = OSDMapService::cached_map_t;
+ cached_map_t old_map = co_await shard_services.get_map(old_epoch);
+ if (!old_map->have_pg_pool(pg->get_pgid().pool())) {
+ DEBUG("{} pool doesn't exist in epoch {}", pg->get_pgid(),
+ old_epoch);
+ co_return false;
+ }
+ auto old_pg_num = old_map->get_pg_num(pg->get_pgid().pool());
+ if (!next_map->have_pg_pool(pg->get_pgid().pool())) {
+ DEBUG("{} pool doesn't exist in epoch {}", pg->get_pgid(),
+ next_map->get_epoch());
+ co_return false;
+ }
+ auto new_pg_num = next_map->get_pg_num(pg->get_pgid().pool());
+ DEBUG("{} pg_num change in e{} {} -> {}", pg->get_pgid(), next_map->get_epoch(),
+ old_pg_num, new_pg_num);
+ if (new_pg_num && new_pg_num < old_pg_num) {
+ bool merged = co_await merge_pg(next_map, new_pg_num, old_pg_num, rctx);
+ co_return merged;
+ }
+ co_return false;
+}
+
+seastar::future<bool> PGAdvanceMap::merge_pg(
+ cached_map_t next_map,
+ unsigned new_pg_num,
+ unsigned old_pg_num,
+ PeeringCtx &rctx)
+{
+ LOG_PREFIX(PGAdvanceMap::merge_pg);
+ DEBUG("{}: start", *this);
+ spg_t parent;
+ std::map<spg_t, Ref<PG>> source_pgs;
+ std::set<spg_t> merge_sources;
+ // The PG is a Source (to be merged into a parent)
+ if (pg->pgid.is_merge_source(old_pg_num,
+ new_pg_num,
+ &parent)) {
+ parent.is_split(new_pg_num, old_pg_num, &merge_sources);
+ DEBUG("Source pg {} shutdown to avoid any operations", pg->get_pgid());
+ co_await pg->stop();
+ DEBUG("pg {} is a merge source, register it", pg->get_pgid());
+ co_await shard_services.register_merge_source(parent, pg->get_pgid(),
+ merge_sources.size());
+ co_return true;
+ // The PG is a Target
+ } else if (pg->pgid.is_merge_target(old_pg_num,
+ new_pg_num)) {
+ DEBUG("Target PG {} identified. Waiting for sources...", pg->get_pgid());
+ pg->pgid.is_split(new_pg_num, old_pg_num, &merge_sources);
+
+ // Block until all source PGs (potentially from other shards) arrive
+ auto sources = co_await shard_services.wait_for_merge_sources(pg->get_pgid(), std::move(merge_sources));
+
+ // Perform merge
+ unsigned split_bits = pg->get_pgid().get_split_bits(new_pg_num);
+ const auto& merge_meta = next_map->get_pg_pool(pg->get_pgid().pool())->last_pg_merge_meta;
+ pg->merge_from(sources, rctx, split_bits, merge_meta);
+
+ // Update PeeringState and activate the new map
+ pg->handle_advance_map(next_map, rctx);
+ pg->handle_activate_map(rctx);
+
+ co_await pg->complete_rctx(std::move(rctx));
+ // Return sources to their home shards for destruction
+ co_await shard_services.perform_source_cleanup(pg->get_pgid());
+
+ co_return true;
+ } else {
+ co_return false;
+ }
+}
}