From: Aishwarya Mathuria Date: Thu, 8 Jan 2026 16:44:01 +0000 (+0000) Subject: crimson/osd: per-PG rendezvous for cross-shard merge source handoff X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=e12454b5fedf880abee8364c5bad2e6aacc7b1cb;p=ceph.git crimson/osd: per-PG rendezvous for cross-shard merge source handoff Add infrastructure so source PGs can be extracted from their birth shard, moved to the target shard, and collected by the target PG before merge proceeds. Cross-shard safety: PGs are tied to their birth_shard for destruction. register_merge_source() uses extract_pg() to detach the source, seastar::foreign_ptr to hop cores, and crimson::local_shared_foreign_ptr on the target so release routes destruction back to the birth shard. Synchronization: replace the per-shard ShardServices merge_info_t registry (shared_promise waiters, ready_pgs staging, and cleanup hooks) with merge state on the target PG itself. Source-side register_merge_source() delivers PGs via PG::add_merge_source(); the target waits in PG::collect_merge_sources(n) on a per-PG semaphore. Duplicate source registrations are ignored. PG::stop() breaks the semaphore so shutdown does not hang. ShardServices::register_merge_source() and extract_pg() live in shard_services; rendezvous types and methods live on PG. Signed-off-by: Aishwarya Mathuria --- diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index 09919ab05f7..837d0e0f9a3 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -1690,6 +1690,11 @@ seastar::future<> PG::stop() clear_ready_to_merge(); } + // Wake any coroutine parked in collect_merge_sources() so shutdown + // doesn't hang waiting for sources that will never arrive. + merge_rendezvous.arrivals.broken(); + merge_rendezvous.sources.clear(); + cancel_local_background_io_reservation(); cancel_remote_recovery_reservation(); check_readable_timer.cancel(); @@ -2037,4 +2042,43 @@ void PG::PGLogEntryHandler::partial_write(pg_info_t *info, __func__, info->partial_writes_last_complete); } +void PG::add_merge_source( + spg_t source, + core_id_t birth_shard, + seastar::foreign_ptr> source_pg) +{ + LOG_PREFIX(PG::add_merge_source); + auto wrapped = + crimson::make_local_shared_foreign>(std::move(source_pg)); + auto [_, inserted] = merge_rendezvous.sources.emplace( + source, std::make_pair(birth_shard, std::move(wrapped))); + if (inserted) { + DEBUG("target {} source {} arrived from shard {} ({} total)", + get_pgid(), source, birth_shard, + merge_rendezvous.sources.size()); + merge_rendezvous.arrivals.signal(1); + } else { + DEBUG("target {} source {} already registered, ignoring", + get_pgid(), source); + } +} + +seastar::future +PG::collect_merge_sources(std::size_t n) +{ + LOG_PREFIX(PG::collect_merge_sources); + DEBUG("target {} waiting for {} sources ({} already arrived)", + get_pgid(), n, merge_rendezvous.sources.size()); + try { + co_await merge_rendezvous.arrivals.wait(n); + } catch (const seastar::broken_semaphore&) { + DEBUG("target {} merge rendezvous broken, aborting wait", get_pgid()); + co_return merge_source_map_t{}; + } + ceph_assert(merge_rendezvous.sources.size() == n); + auto sources = std::move(merge_rendezvous.sources); + merge_rendezvous.sources.clear(); + co_return sources; +} + } diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index f3913c919df..6e5e47c591c 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -8,6 +8,8 @@ #include #include #include +#include +#include #include "common/dout.h" #include "common/ostream_temp.h" @@ -564,6 +566,38 @@ public: std::pair do_delete_work(ceph::os::Transaction &t, ghobject_t _next) final; + // Per-PG rendezvous used to collect source PGs converging on this PG + // during a merge. Producers (source-side coroutines on other shards) + // push entries via add_merge_source(); the target-side coroutine waits + // via collect_merge_sources(n). + using merge_source_entry_t = + std::pair>>; + using merge_source_map_t = std::map; + + // Producer side: called on the target's shard with the foreign PG + // already wrapped. The first registration for a given source signals + // the semaphore; duplicate registrations (e.g. from replay) are no-ops. + void add_merge_source( + spg_t source, + core_id_t birth_shard, + seastar::foreign_ptr> source_pg); + + // Consumer side: wait until `n` distinct sources have arrived, then + // return them and clear the rendezvous state. Returns an empty map if + // reset_merge_rendezvous() breaks the wait (e.g. PG stop or merge cancel). + seastar::future collect_merge_sources(std::size_t n); + + // Drop in-flight handoffs and reset the semaphore. Call on PG stop or + // after Seastore cross-shard cancel so a failed try cannot leave stale + // sources for the next epoch. + void reset_merge_rendezvous(); + + void merge_from( + merge_source_map_t& sources, + PeeringCtx &rctx, + unsigned split_bits, + const pg_merge_meta_t& last_pg_merge_meta); + void clear_ready_to_merge() final { LOG_PREFIX(PG::clear_ready_to_merge); SUBDEBUGDPP(osd, "", *this); @@ -1188,6 +1222,15 @@ private: // continuations here. bool stopping = false; + // Rendezvous state owned by the target PG of a pending merge. + // sources is keyed by source pgid so re-registration is naturally + // idempotent; arrivals is signaled exactly once per first insert. + struct merge_rendezvous_t { + merge_source_map_t sources; + seastar::semaphore arrivals{0}; + }; + merge_rendezvous_t merge_rendezvous; + // PeeringListener merge callbacks must remain void, but they trigger async // mon notifies in ShardServices. Gate them here so failures are logged and // PG::stop() waits for them to drain. diff --git a/src/crimson/osd/shard_services.cc b/src/crimson/osd/shard_services.cc index e4ff7d02a42..06a55649d3b 100644 --- a/src/crimson/osd/shard_services.cc +++ b/src/crimson/osd/shard_services.cc @@ -107,6 +107,47 @@ seastar::future<> PerShardState::broadcast_map_to_pgs( }); } +seastar::future> ShardServices::extract_pg(spg_t pgid) { + auto pg = local_state.pg_map.get_pg(pgid); + ceph_assert(pg); + co_await remove_pg(pgid); + co_return pg; +} + +seastar::future<> ShardServices::register_merge_source( + spg_t target, + spg_t source) +{ + LOG_PREFIX(ShardServices::register_merge_source); + + core_id_t birth_shard = seastar::this_shard_id(); + core_id_t target_core = co_await get_pg_mapping(target); + + // Remove the source pg from pg_to_shard_mapping so that + // it no longer receives any messages + auto pg_to_move = co_await extract_pg(source); + + // Wrap the PG in a foreign_ptr to cross cores safely. If the target + // happens to be on the same shard, invoke_on simply runs the lambda + // inline. + auto foreign_pg = seastar::make_foreign(std::move(pg_to_move)); + + DEBUG("Target {} on shard {} (from shard {}); handing off source {}", + target, target_core, birth_shard, source); + + co_await container().invoke_on( + target_core, + [target, source, birth_shard, foreign_pg = std::move(foreign_pg)] + (ShardServices& target_svc) mutable { + auto target_pg = target_svc.local_state.pg_map.get_pg(target); + // PGAdvanceMap on the target shard is expected to have instantiated + // the target PG by this point in the merge protocol. + ceph_assert(target_pg); + target_pg->add_merge_source( + source, birth_shard, std::move(foreign_pg)); + }); +} + Ref PerShardState::get_pg(spg_t pgid) { assert_core(); diff --git a/src/crimson/osd/shard_services.h b/src/crimson/osd/shard_services.h index 803ff092d79..80c26e8d240 100644 --- a/src/crimson/osd/shard_services.h +++ b/src/crimson/osd/shard_services.h @@ -546,6 +546,13 @@ public: return pg_to_shard_mapping.get_or_create_pg_mapping(pgid, core, store_index); } + seastar::future get_pg_mapping(spg_t pgid) { + return pg_to_shard_mapping.get_or_create_pg_mapping(pgid).then( + [](std::pair mapping) { + return mapping.first; + }); + } + auto remove_pg(spg_t pgid) { local_state.pg_map.remove_pg(pgid); return pg_to_shard_mapping.remove_pg_mapping(pgid); @@ -670,6 +677,13 @@ public: return local_state.ec_extent_cache_lru; } + seastar::future> extract_pg(spg_t pgid); + // Hand the source PG off to the target PG's rendezvous, hopping shards + // if needed. The consumer side (target PG) waits via + // PG::collect_merge_sources(); cleanup happens when the target drops the + // local_shared_foreign_ptr it received. + seastar::future<> register_merge_source(spg_t target, spg_t source); + FORWARD_TO_OSD_SINGLETON(set_ready_to_merge_source) FORWARD_TO_OSD_SINGLETON(set_ready_to_merge_target) FORWARD_TO_OSD_SINGLETON(set_not_ready_to_merge_source)