]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd: per-PG rendezvous for cross-shard merge source handoff
authorAishwarya Mathuria <amathuri@redhat.com>
Thu, 8 Jan 2026 16:44:01 +0000 (16:44 +0000)
committerAishwarya Mathuria <amathuri@redhat.com>
Thu, 11 Jun 2026 04:49:14 +0000 (10:19 +0530)
Add infrastructure so source PGs can be extracted from their birth
shard, moved to the target shard, and collected by the target PG before
merge proceeds.

Cross-shard safety: PGs are tied to their birth_shard for destruction.
register_merge_source() uses extract_pg() to detach the source,
seastar::foreign_ptr to hop cores, and
crimson::local_shared_foreign_ptr on the target so release routes
destruction back to the birth shard.

Synchronization: replace the per-shard ShardServices merge_info_t
registry (shared_promise waiters, ready_pgs staging, and cleanup hooks)
with merge state on the target PG itself. Source-side
register_merge_source() delivers PGs via PG::add_merge_source(); the
target waits in PG::collect_merge_sources(n) on a per-PG semaphore.
Duplicate source registrations are ignored. PG::stop() breaks the
semaphore so shutdown does not hang.

ShardServices::register_merge_source() and extract_pg() live in
shard_services; rendezvous types and methods live on PG.

Signed-off-by: Aishwarya Mathuria <amathuri@redhat.com>
src/crimson/osd/pg.cc
src/crimson/osd/pg.h
src/crimson/osd/shard_services.cc
src/crimson/osd/shard_services.h

index 09919ab05f726c6bbaf84e55087e7954a79b897e..837d0e0f9a350b1728324f5eb1ce2f83f3a941ab 100644 (file)
@@ -1690,6 +1690,11 @@ seastar::future<> PG::stop()
     clear_ready_to_merge();
   }
 
+  // Wake any coroutine parked in collect_merge_sources() so shutdown
+  // doesn't hang waiting for sources that will never arrive.
+  merge_rendezvous.arrivals.broken();
+  merge_rendezvous.sources.clear();
+
   cancel_local_background_io_reservation();
   cancel_remote_recovery_reservation();
   check_readable_timer.cancel();
@@ -2037,4 +2042,43 @@ void PG::PGLogEntryHandler::partial_write(pg_info_t *info,
                  __func__, info->partial_writes_last_complete);
 }
 
+void PG::add_merge_source(
+    spg_t source,
+    core_id_t birth_shard,
+    seastar::foreign_ptr<Ref<PG>> source_pg)
+{
+  LOG_PREFIX(PG::add_merge_source);
+  auto wrapped =
+    crimson::make_local_shared_foreign<Ref<PG>>(std::move(source_pg));
+  auto [_, inserted] = merge_rendezvous.sources.emplace(
+    source, std::make_pair(birth_shard, std::move(wrapped)));
+  if (inserted) {
+    DEBUG("target {} source {} arrived from shard {} ({} total)",
+          get_pgid(), source, birth_shard,
+          merge_rendezvous.sources.size());
+    merge_rendezvous.arrivals.signal(1);
+  } else {
+    DEBUG("target {} source {} already registered, ignoring",
+          get_pgid(), source);
+  }
+}
+
+seastar::future<PG::merge_source_map_t>
+PG::collect_merge_sources(std::size_t n)
+{
+  LOG_PREFIX(PG::collect_merge_sources);
+  DEBUG("target {} waiting for {} sources ({} already arrived)",
+        get_pgid(), n, merge_rendezvous.sources.size());
+  try {
+    co_await merge_rendezvous.arrivals.wait(n);
+  } catch (const seastar::broken_semaphore&) {
+    DEBUG("target {} merge rendezvous broken, aborting wait", get_pgid());
+    co_return merge_source_map_t{};
+  }
+  ceph_assert(merge_rendezvous.sources.size() == n);
+  auto sources = std::move(merge_rendezvous.sources);
+  merge_rendezvous.sources.clear();
+  co_return sources;
+}
+
 }
index f3913c919df2c0a0707086d32f16e83dbd9c82ce..6e5e47c591c290b9251b714ac786e4794906f121 100644 (file)
@@ -8,6 +8,8 @@
 #include <boost/smart_ptr/intrusive_ref_counter.hpp>
 #include <seastar/core/future.hh>
 #include <seastar/core/shared_future.hh>
+#include <seastar/core/semaphore.hh>
+#include <seastar/core/sharded.hh>
 
 #include "common/dout.h"
 #include "common/ostream_temp.h"
@@ -564,6 +566,38 @@ public:
   std::pair<ghobject_t, bool>
   do_delete_work(ceph::os::Transaction &t, ghobject_t _next) final;
 
+  // Per-PG rendezvous used to collect source PGs converging on this PG
+  // during a merge. Producers (source-side coroutines on other shards)
+  // push entries via add_merge_source(); the target-side coroutine waits
+  // via collect_merge_sources(n).
+  using merge_source_entry_t =
+    std::pair<core_id_t, crimson::local_shared_foreign_ptr<Ref<PG>>>;
+  using merge_source_map_t = std::map<spg_t, merge_source_entry_t>;
+
+  // Producer side: called on the target's shard with the foreign PG
+  // already wrapped. The first registration for a given source signals
+  // the semaphore; duplicate registrations (e.g. from replay) are no-ops.
+  void add_merge_source(
+    spg_t source,
+    core_id_t birth_shard,
+    seastar::foreign_ptr<Ref<PG>> source_pg);
+
+  // Consumer side: wait until `n` distinct sources have arrived, then
+  // return them and clear the rendezvous state.  Returns an empty map if
+  // reset_merge_rendezvous() breaks the wait (e.g. PG stop or merge cancel).
+  seastar::future<merge_source_map_t> collect_merge_sources(std::size_t n);
+
+  // Drop in-flight handoffs and reset the semaphore.  Call on PG stop or
+  // after Seastore cross-shard cancel so a failed try cannot leave stale
+  // sources for the next epoch.
+  void reset_merge_rendezvous();
+
+  void merge_from(
+      merge_source_map_t& sources,
+      PeeringCtx &rctx,
+      unsigned split_bits,
+      const pg_merge_meta_t& last_pg_merge_meta);
+
   void clear_ready_to_merge() final {
     LOG_PREFIX(PG::clear_ready_to_merge);
     SUBDEBUGDPP(osd, "", *this);
@@ -1188,6 +1222,15 @@ private:
   // continuations here.
   bool stopping = false;
 
+  // Rendezvous state owned by the target PG of a pending merge.
+  // sources is keyed by source pgid so re-registration is naturally
+  // idempotent; arrivals is signaled exactly once per first insert.
+  struct merge_rendezvous_t {
+    merge_source_map_t sources;
+    seastar::semaphore arrivals{0};
+  };
+  merge_rendezvous_t merge_rendezvous;
+
   // PeeringListener merge callbacks must remain void, but they trigger async
   // mon notifies in ShardServices. Gate them here so failures are logged and
   // PG::stop() waits for them to drain.
index e4ff7d02a423a2122c65e15eba450de7b344aae4..06a55649d3b05607fc76692b89e5b35f4e6c37f5 100644 (file)
@@ -107,6 +107,47 @@ seastar::future<> PerShardState::broadcast_map_to_pgs(
     });
 }
 
+seastar::future<Ref<PG>> ShardServices::extract_pg(spg_t pgid) {
+  auto pg = local_state.pg_map.get_pg(pgid);
+  ceph_assert(pg);
+  co_await remove_pg(pgid);
+  co_return pg;
+}
+
+seastar::future<> ShardServices::register_merge_source(
+    spg_t target,
+    spg_t source)
+{
+  LOG_PREFIX(ShardServices::register_merge_source);
+
+  core_id_t birth_shard = seastar::this_shard_id();
+  core_id_t target_core = co_await get_pg_mapping(target);
+
+  // Remove the source pg from pg_to_shard_mapping so that
+  // it no longer receives any messages
+  auto pg_to_move = co_await extract_pg(source);
+
+  // Wrap the PG in a foreign_ptr to cross cores safely. If the target
+  // happens to be on the same shard, invoke_on simply runs the lambda
+  // inline.
+  auto foreign_pg = seastar::make_foreign(std::move(pg_to_move));
+
+  DEBUG("Target {} on shard {} (from shard {}); handing off source {}",
+        target, target_core, birth_shard, source);
+
+  co_await container().invoke_on(
+    target_core,
+    [target, source, birth_shard, foreign_pg = std::move(foreign_pg)]
+    (ShardServices& target_svc) mutable {
+      auto target_pg = target_svc.local_state.pg_map.get_pg(target);
+      // PGAdvanceMap on the target shard is expected to have instantiated
+      // the target PG by this point in the merge protocol.
+      ceph_assert(target_pg);
+      target_pg->add_merge_source(
+        source, birth_shard, std::move(foreign_pg));
+    });
+}
+
 Ref<PG> PerShardState::get_pg(spg_t pgid)
 {
   assert_core();
index 803ff092d79763cc340b5a040bd0f52b1f1f9063..80c26e8d2409616c25e3f288617654e464bd2c4f 100644 (file)
@@ -546,6 +546,13 @@ public:
     return pg_to_shard_mapping.get_or_create_pg_mapping(pgid, core, store_index);
   }
 
+  seastar::future<core_id_t> get_pg_mapping(spg_t pgid) {
+    return pg_to_shard_mapping.get_or_create_pg_mapping(pgid).then(
+      [](std::pair<core_id_t, store_index_t> mapping) {
+        return mapping.first;
+      });
+  }
+
   auto remove_pg(spg_t pgid) {
     local_state.pg_map.remove_pg(pgid);
     return pg_to_shard_mapping.remove_pg_mapping(pgid);
@@ -670,6 +677,13 @@ public:
     return local_state.ec_extent_cache_lru;
   }
 
+  seastar::future<Ref<PG>> extract_pg(spg_t pgid);
+  // Hand the source PG off to the target PG's rendezvous, hopping shards
+  // if needed. The consumer side (target PG) waits via
+  // PG::collect_merge_sources(); cleanup happens when the target drops the
+  // local_shared_foreign_ptr it received.
+  seastar::future<> register_merge_source(spg_t target, spg_t source);
+
   FORWARD_TO_OSD_SINGLETON(set_ready_to_merge_source)
   FORWARD_TO_OSD_SINGLETON(set_ready_to_merge_target)
   FORWARD_TO_OSD_SINGLETON(set_not_ready_to_merge_source)