clear_ready_to_merge();
}
+ // Wake any coroutine parked in collect_merge_sources() so shutdown
+ // doesn't hang waiting for sources that will never arrive.
+ merge_rendezvous.arrivals.broken();
+ merge_rendezvous.sources.clear();
+
cancel_local_background_io_reservation();
cancel_remote_recovery_reservation();
check_readable_timer.cancel();
__func__, info->partial_writes_last_complete);
}
+void PG::add_merge_source(
+ spg_t source,
+ core_id_t birth_shard,
+ seastar::foreign_ptr<Ref<PG>> source_pg)
+{
+ LOG_PREFIX(PG::add_merge_source);
+ auto wrapped =
+ crimson::make_local_shared_foreign<Ref<PG>>(std::move(source_pg));
+ auto [_, inserted] = merge_rendezvous.sources.emplace(
+ source, std::make_pair(birth_shard, std::move(wrapped)));
+ if (inserted) {
+ DEBUG("target {} source {} arrived from shard {} ({} total)",
+ get_pgid(), source, birth_shard,
+ merge_rendezvous.sources.size());
+ merge_rendezvous.arrivals.signal(1);
+ } else {
+ DEBUG("target {} source {} already registered, ignoring",
+ get_pgid(), source);
+ }
+}
+
+seastar::future<PG::merge_source_map_t>
+PG::collect_merge_sources(std::size_t n)
+{
+ LOG_PREFIX(PG::collect_merge_sources);
+ DEBUG("target {} waiting for {} sources ({} already arrived)",
+ get_pgid(), n, merge_rendezvous.sources.size());
+ try {
+ co_await merge_rendezvous.arrivals.wait(n);
+ } catch (const seastar::broken_semaphore&) {
+ DEBUG("target {} merge rendezvous broken, aborting wait", get_pgid());
+ co_return merge_source_map_t{};
+ }
+ ceph_assert(merge_rendezvous.sources.size() == n);
+ auto sources = std::move(merge_rendezvous.sources);
+ merge_rendezvous.sources.clear();
+ co_return sources;
+}
+
}
#include <boost/smart_ptr/intrusive_ref_counter.hpp>
#include <seastar/core/future.hh>
#include <seastar/core/shared_future.hh>
+#include <seastar/core/semaphore.hh>
+#include <seastar/core/sharded.hh>
#include "common/dout.h"
#include "common/ostream_temp.h"
std::pair<ghobject_t, bool>
do_delete_work(ceph::os::Transaction &t, ghobject_t _next) final;
+ // Per-PG rendezvous used to collect source PGs converging on this PG
+ // during a merge. Producers (source-side coroutines on other shards)
+ // push entries via add_merge_source(); the target-side coroutine waits
+ // via collect_merge_sources(n).
+ using merge_source_entry_t =
+ std::pair<core_id_t, crimson::local_shared_foreign_ptr<Ref<PG>>>;
+ using merge_source_map_t = std::map<spg_t, merge_source_entry_t>;
+
+ // Producer side: called on the target's shard with the foreign PG
+ // already wrapped. The first registration for a given source signals
+ // the semaphore; duplicate registrations (e.g. from replay) are no-ops.
+ void add_merge_source(
+ spg_t source,
+ core_id_t birth_shard,
+ seastar::foreign_ptr<Ref<PG>> source_pg);
+
+ // Consumer side: wait until `n` distinct sources have arrived, then
+ // return them and clear the rendezvous state. Returns an empty map if
+ // reset_merge_rendezvous() breaks the wait (e.g. PG stop or merge cancel).
+ seastar::future<merge_source_map_t> collect_merge_sources(std::size_t n);
+
+ // Drop in-flight handoffs and reset the semaphore. Call on PG stop or
+ // after Seastore cross-shard cancel so a failed try cannot leave stale
+ // sources for the next epoch.
+ void reset_merge_rendezvous();
+
+ void merge_from(
+ merge_source_map_t& sources,
+ PeeringCtx &rctx,
+ unsigned split_bits,
+ const pg_merge_meta_t& last_pg_merge_meta);
+
void clear_ready_to_merge() final {
LOG_PREFIX(PG::clear_ready_to_merge);
SUBDEBUGDPP(osd, "", *this);
// continuations here.
bool stopping = false;
+ // Rendezvous state owned by the target PG of a pending merge.
+ // sources is keyed by source pgid so re-registration is naturally
+ // idempotent; arrivals is signaled exactly once per first insert.
+ struct merge_rendezvous_t {
+ merge_source_map_t sources;
+ seastar::semaphore arrivals{0};
+ };
+ merge_rendezvous_t merge_rendezvous;
+
// PeeringListener merge callbacks must remain void, but they trigger async
// mon notifies in ShardServices. Gate them here so failures are logged and
// PG::stop() waits for them to drain.
});
}
+seastar::future<Ref<PG>> ShardServices::extract_pg(spg_t pgid) {
+ auto pg = local_state.pg_map.get_pg(pgid);
+ ceph_assert(pg);
+ co_await remove_pg(pgid);
+ co_return pg;
+}
+
+seastar::future<> ShardServices::register_merge_source(
+ spg_t target,
+ spg_t source)
+{
+ LOG_PREFIX(ShardServices::register_merge_source);
+
+ core_id_t birth_shard = seastar::this_shard_id();
+ core_id_t target_core = co_await get_pg_mapping(target);
+
+ // Remove the source pg from pg_to_shard_mapping so that
+ // it no longer receives any messages
+ auto pg_to_move = co_await extract_pg(source);
+
+ // Wrap the PG in a foreign_ptr to cross cores safely. If the target
+ // happens to be on the same shard, invoke_on simply runs the lambda
+ // inline.
+ auto foreign_pg = seastar::make_foreign(std::move(pg_to_move));
+
+ DEBUG("Target {} on shard {} (from shard {}); handing off source {}",
+ target, target_core, birth_shard, source);
+
+ co_await container().invoke_on(
+ target_core,
+ [target, source, birth_shard, foreign_pg = std::move(foreign_pg)]
+ (ShardServices& target_svc) mutable {
+ auto target_pg = target_svc.local_state.pg_map.get_pg(target);
+ // PGAdvanceMap on the target shard is expected to have instantiated
+ // the target PG by this point in the merge protocol.
+ ceph_assert(target_pg);
+ target_pg->add_merge_source(
+ source, birth_shard, std::move(foreign_pg));
+ });
+}
+
Ref<PG> PerShardState::get_pg(spg_t pgid)
{
assert_core();
return pg_to_shard_mapping.get_or_create_pg_mapping(pgid, core, store_index);
}
+ seastar::future<core_id_t> get_pg_mapping(spg_t pgid) {
+ return pg_to_shard_mapping.get_or_create_pg_mapping(pgid).then(
+ [](std::pair<core_id_t, store_index_t> mapping) {
+ return mapping.first;
+ });
+ }
+
auto remove_pg(spg_t pgid) {
local_state.pg_map.remove_pg(pgid);
return pg_to_shard_mapping.remove_pg_mapping(pgid);
return local_state.ec_extent_cache_lru;
}
+ seastar::future<Ref<PG>> extract_pg(spg_t pgid);
+ // Hand the source PG off to the target PG's rendezvous, hopping shards
+ // if needed. The consumer side (target PG) waits via
+ // PG::collect_merge_sources(); cleanup happens when the target drops the
+ // local_shared_foreign_ptr it received.
+ seastar::future<> register_merge_source(spg_t target, spg_t source);
+
FORWARD_TO_OSD_SINGLETON(set_ready_to_merge_source)
FORWARD_TO_OSD_SINGLETON(set_ready_to_merge_target)
FORWARD_TO_OSD_SINGLETON(set_not_ready_to_merge_source)