From: Aishwarya Mathuria Date: Mon, 18 May 2026 13:37:26 +0000 (+0000) Subject: crimson/osd: reject Seastore PG merges across shards X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=refs%2Fpull%2F66885%2Fhead;p=ceph.git crimson/osd: reject Seastore PG merges across shards Seastore cannot merge collections between reactor shards currently. On cross-shard detection, tell the monitor the source PG is not ready (via MOSDPGReadyToMerge{ ready=false }) so the unsafe pg_num decrement is never proposed, then send MOSDPGStopMerge to clamp pg_num_target and permanently disable further shrink for the pool. Signed-off-by: Aishwarya Mathuria --- diff --git a/src/crimson/osd/osd_operations/pg_advance_map.cc b/src/crimson/osd/osd_operations/pg_advance_map.cc index ff412adc5db..5ba9dc4dff5 100644 --- a/src/crimson/osd/osd_operations/pg_advance_map.cc +++ b/src/crimson/osd/osd_operations/pg_advance_map.cc @@ -295,11 +295,21 @@ seastar::future PGAdvanceMap::merge_pg( if (pg->pgid.is_merge_source(old_pg_num, new_pg_num, &parent)) { + parent.is_split(new_pg_num, old_pg_num, &merge_sources); + if (!co_await shard_services.seastore_merge_shards_ok( + parent, merge_sources)) { + co_return merge_result_t{}; + } co_return merge_result_t{merge_role_t::Source, parent}; } else if (pg->pgid.is_merge_target(old_pg_num, new_pg_num)) { DEBUG("Target PG {} identified. Waiting for sources...", pg->get_pgid()); pg->pgid.is_split(new_pg_num, old_pg_num, &merge_sources); + + if (!co_await shard_services.seastore_merge_shards_ok( + pg->get_pgid(), merge_sources)) { + co_return merge_result_t{}; + } // Block until all source PGs (potentially from other shards) arrive // on this PG's rendezvous auto sources = co_await pg->collect_merge_sources(merge_sources.size()); diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index a9506994971..f79e8272d99 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -1690,10 +1690,7 @@ seastar::future<> PG::stop() clear_ready_to_merge(); } - // Wake any coroutine parked in collect_merge_sources() so shutdown - // doesn't hang waiting for sources that will never arrive. - merge_rendezvous.arrivals.broken(); - merge_rendezvous.sources.clear(); + reset_merge_rendezvous(); cancel_local_background_io_reservation(); cancel_remote_recovery_reservation(); @@ -2081,6 +2078,14 @@ PG::collect_merge_sources(std::size_t n) co_return sources; } +void PG::reset_merge_rendezvous() +{ + // Unblock any waiter in collect_merge_sources() with broken(); then + // replace the semaphore so the next merge attempt starts at zero signals. + merge_rendezvous.arrivals.broken(); + merge_rendezvous = merge_rendezvous_t{}; +} + void PG::merge_from( merge_source_map_t& sources, PeeringCtx &rctx, diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index 2e8238fcaf8..6e5e47c591c 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -586,11 +586,6 @@ public: // return them and clear the rendezvous state. Returns an empty map if // reset_merge_rendezvous() breaks the wait (e.g. PG stop or merge cancel). seastar::future collect_merge_sources(std::size_t n); - void merge_from( - merge_source_map_t& sources, - PeeringCtx &rctx, - unsigned split_bits, - const pg_merge_meta_t& last_pg_merge_meta); // Drop in-flight handoffs and reset the semaphore. Call on PG stop or // after Seastore cross-shard cancel so a failed try cannot leave stale diff --git a/src/crimson/osd/shard_services.cc b/src/crimson/osd/shard_services.cc index 06a55649d3b..00b5dbabe0e 100644 --- a/src/crimson/osd/shard_services.cc +++ b/src/crimson/osd/shard_services.cc @@ -10,6 +10,7 @@ #include "messages/MOSDPGCreated.h" #include "messages/MOSDPGTemp.h" #include "messages/MOSDPGReadyToMerge.h" +#include "messages/MOSDPGStopMerge.h" #include "osd/osd_perf_counters.h" #include "osd/PeeringState.h" @@ -148,6 +149,84 @@ seastar::future<> ShardServices::register_merge_source( }); } +bool ShardServices::is_seastore_objectstore() +{ + return crimson::common::local_conf().get_val( + "osd_objectstore") == "seastore"; +} + +seastar::future<> ShardServices::reset_target_merge_rendezvous(spg_t target) +{ + const core_id_t target_core = co_await get_pg_mapping(target); + co_await container().invoke_on( + target_core, + [target](ShardServices& target_svc) { + if (auto target_pg = target_svc.local_state.pg_map.get_pg(target)) { + target_pg->reset_merge_rendezvous(); + } + }); +} + +seastar::future<> ShardServices::send_stop_pool_merge(pg_t source_pgid) +{ + co_await with_singleton( + [pool = source_pgid.pool(), source_pgid](OSDSingletonState& singleton) { + return singleton.send_stop_pool_pg_merge(pool, source_pgid); + }); +} + +seastar::future<> ShardServices::abort_seastore_cross_shard_merge( + spg_t target, + pg_t source_pgid, + const std::set* all_sources, + bool reset_target_rendezvous) +{ + if (all_sources) { + for (const auto& src : *all_sources) { + co_await clear_ready_to_merge(src.pgid); + } + } else { + co_await clear_ready_to_merge(source_pgid); + } + co_await clear_ready_to_merge(target.pgid); + + // Ensure the monitor backs off before it can commit a pg_num decrement + // for this source. "Stop merge" is permanent policy; "not ready" prevents + // an already-in-flight decrement decision from being committed. + co_await set_not_ready_to_merge_source(source_pgid); + co_await send_stop_pool_merge(source_pgid); + if (reset_target_rendezvous) { + co_await reset_target_merge_rendezvous(target); + } +} + + +seastar::future ShardServices::seastore_merge_shards_ok( + spg_t target, + const std::set& merge_sources) +{ + LOG_PREFIX(ShardServices::seastore_merge_shards_ok); + if (!is_seastore_objectstore()) { + co_return true; + } + const core_id_t target_core = co_await get_pg_mapping(target); + std::optional cross_shard_source; + for (const auto& src : merge_sources) { + if (co_await get_pg_mapping(src) != target_core) { + cross_shard_source = src.pgid; + break; + } + } + if (!cross_shard_source) { + co_return true; + } + DEBUG("seastore: target {} has a cross-shard merge source {}; stop pool merge", + target, *cross_shard_source); + co_await abort_seastore_cross_shard_merge( + target, *cross_shard_source, &merge_sources, true); + co_return false; +} + Ref PerShardState::get_pg(spg_t pgid) { assert_core(); @@ -479,9 +558,49 @@ void OSDSingletonState::clear_sent_ready_to_merge() sent_ready_to_merge_source.clear(); } +seastar::future<> OSDSingletonState::send_stop_pool_pg_merge(int64_t pool, pg_t pgid) +{ + LOG_PREFIX(OSDSingletonState::send_stop_pool_pg_merge); + const pg_pool_t *pi = osdmap->get_pg_pool(pool); + if (!pi || !pi->is_crimson()) { + co_return; + } + if (!pi->has_flag(pg_pool_t::FLAG_CRIMSON_ALLOW_PG_MERGE)) { + pools_merge_stopped_reported.insert(pool); + co_return; + } + // Claim the pool before co_await: otherwise concurrent callers can all pass + // the check and each send MOSDPGStopMerge while yielded on send_message(). + if (!pools_merge_stopped_reported.insert(pool).second) { + co_return; + } + DEBUG("seastore: asking monitor to stop PG merge for pool {} (pg {})", + pool, pgid); + co_await monc.send_message(crimson::make_message( + pool, pgid, MOSDPGStopMerge::REASON_CROSS_SHARD, osdmap->get_epoch())); +} + +void OSDSingletonState::prune_pools_merge_stopped_reported() +{ + // send_stop_pool_pg_merge() inserts a pool after one MOSDPGStopMerge so we + // do not spam the mon on every cross-shard source/target pair. Keep that + // entry while the map still has merge disabled; forget it if the pool + // disappears or crimson_allow_pg_merge is set again. + auto pool = pools_merge_stopped_reported.begin(); + while (pool != pools_merge_stopped_reported.end()) { + const pg_pool_t *pi = osdmap->get_pg_pool(*pool); + if (!pi || pi->has_flag(pg_pool_t::FLAG_CRIMSON_ALLOW_PG_MERGE)) { + pool = pools_merge_stopped_reported.erase(pool); + } else { + ++pool; + } + } +} + void OSDSingletonState::prune_sent_ready_to_merge() { LOG_PREFIX(OSDSingletonState::prune_sent_ready_to_merge); + prune_pools_merge_stopped_reported(); auto source = sent_ready_to_merge_source.begin(); while (source != sent_ready_to_merge_source.end()) { if (!osdmap->pg_exists(*source)) { diff --git a/src/crimson/osd/shard_services.h b/src/crimson/osd/shard_services.h index 80c26e8d240..430f41502bf 100644 --- a/src/crimson/osd/shard_services.h +++ b/src/crimson/osd/shard_services.h @@ -378,6 +378,7 @@ private: std::set not_ready_to_merge_source; std::map not_ready_to_merge_target; std::set sent_ready_to_merge_source; + std::set pools_merge_stopped_reported; seastar::future<> set_ready_to_merge_source(pg_t pgid, eversion_t version); seastar::future<> set_ready_to_merge_target(pg_t pgid, @@ -388,7 +389,9 @@ private: seastar::future<> set_not_ready_to_merge_target(pg_t target, pg_t source); void clear_ready_to_merge(pg_t pgid); seastar::future<> send_ready_to_merge(); + seastar::future<> send_stop_pool_pg_merge(int64_t pool, pg_t pgid); void clear_sent_ready_to_merge(); + void prune_pools_merge_stopped_reported(); void prune_sent_ready_to_merge(); }; @@ -684,6 +687,14 @@ public: // local_shared_foreign_ptr it received. seastar::future<> register_merge_source(spg_t target, spg_t source); + static bool is_seastore_objectstore(); + + // True if every merge source is co-located on the target PG's shard + // (Seastore only). Uses the full sibling set so one source cannot commit + // while another would abort. + seastar::future seastore_merge_shards_ok( + spg_t target, const std::set& merge_sources); + FORWARD_TO_OSD_SINGLETON(set_ready_to_merge_source) FORWARD_TO_OSD_SINGLETON(set_ready_to_merge_target) FORWARD_TO_OSD_SINGLETON(set_not_ready_to_merge_source) @@ -854,6 +865,15 @@ public: invoke_context_on_core(seastar::this_shard_id(), on_reserved)); } +private: + seastar::future<> reset_target_merge_rendezvous(spg_t target); + seastar::future<> send_stop_pool_merge(pg_t source_pgid); + seastar::future<> abort_seastore_cross_shard_merge( + spg_t target, + pg_t source_pgid, + const std::set* all_sources, + bool reset_target_rendezvous); + #undef FORWARD_CONST #undef FORWARD #undef FORWARD_TO_OSD_SINGLETON diff --git a/src/messages/MOSDPGStopMerge.h b/src/messages/MOSDPGStopMerge.h new file mode 100644 index 00000000000..fa0fa5ce446 --- /dev/null +++ b/src/messages/MOSDPGStopMerge.h @@ -0,0 +1,53 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 sts=2 expandtab + +#pragma once + +#include "osd/osd_types.h" +#include "messages/PaxosServiceMessage.h" + +/// OSD -> mon: permanently stop PG merge shrink for a Crimson pool. +class MOSDPGStopMerge : public PaxosServiceMessage { +public: + static constexpr uint8_t REASON_CROSS_SHARD = 1; + + int64_t pool = -1; + pg_t pgid; + uint8_t reason = REASON_CROSS_SHARD; + + MOSDPGStopMerge() + : PaxosServiceMessage{MSG_OSD_PG_STOP_MERGE, 0} + {} + MOSDPGStopMerge(int64_t pool, pg_t pgid, uint8_t reason, epoch_t epoch) + : PaxosServiceMessage{MSG_OSD_PG_STOP_MERGE, epoch}, + pool(pool), + pgid(pgid), + reason(reason) + {} + void encode_payload(uint64_t features) override { + using ceph::encode; + paxos_encode(); + encode(pool, payload); + encode(pgid, payload); + encode(reason, payload); + } + void decode_payload() override { + using ceph::decode; + auto p = payload.cbegin(); + paxos_decode(p); + decode(pool, p); + decode(pgid, p); + decode(reason, p); + } + std::string_view get_type_name() const override { return "osd_pg_stop_merge"; } + void print(std::ostream &out) const { + out << get_type_name() + << "(pool " << pool + << " pg " << pgid + << " reason " << (unsigned)reason + << " v" << version << ")"; + } +private: + template + friend boost::intrusive_ptr ceph::make_message(Args&&... args); +}; diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index 80bca9ca4b9..205fb23c7ce 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -4782,6 +4782,7 @@ void Monitor::dispatch_op(MonOpRequestRef op) case MSG_REMOVE_SNAPS: case MSG_MON_GET_PURGED_SNAPS: case MSG_OSD_PG_READY_TO_MERGE: + case MSG_OSD_PG_STOP_MERGE: paxos_service[PAXOS_OSDMAP]->dispatch(op); return; diff --git a/src/mon/OSDMonitor.cc b/src/mon/OSDMonitor.cc index b256899e3a4..8121ea2523c 100644 --- a/src/mon/OSDMonitor.cc +++ b/src/mon/OSDMonitor.cc @@ -54,6 +54,7 @@ #include "messages/MOSDPGCreated.h" #include "messages/MOSDPGTemp.h" #include "messages/MOSDPGReadyToMerge.h" +#include "messages/MOSDPGStopMerge.h" #include "messages/MMonCommand.h" #include "messages/MRemoveSnaps.h" #include "messages/MRoute.h" @@ -2771,6 +2772,8 @@ bool OSDMonitor::preprocess_query(MonOpRequestRef op) return preprocess_pg_created(op); case MSG_OSD_PG_READY_TO_MERGE: return preprocess_pg_ready_to_merge(op); + case MSG_OSD_PG_STOP_MERGE: + return preprocess_pg_stop_merge(op); case MSG_OSD_PGTEMP: return preprocess_pgtemp(op); case MSG_OSD_BEACON: @@ -2817,6 +2820,8 @@ bool OSDMonitor::prepare_update(MonOpRequestRef op) return prepare_pgtemp(op); case MSG_OSD_PG_READY_TO_MERGE: return prepare_pg_ready_to_merge(op); + case MSG_OSD_PG_STOP_MERGE: + return prepare_pg_stop_merge(op); case MSG_OSD_BEACON: return prepare_beacon(op); @@ -4100,6 +4105,16 @@ bool OSDMonitor::prepare_pg_ready_to_merge(MonOpRequestRef op) p.last_change = pending_inc.epoch; } else { // back off the merge attempt! + if (!m->ready && !p.has_flag(pg_pool_t::FLAG_CRIMSON)) { + mon.clog->warn() << "osd." << m->get_orig_source().num() + << " reported pg " << m->pgid + << " not ready to merge; backing off pg_num decrease" + << " for pool '" + << osdmap.get_pool_name(m->pgid.pool()) << "'"; + dout(1) << __func__ << " osd." << m->get_orig_source().num() + << " pg " << m->pgid << " not ready to merge, backing off" + << dendl; + } p.set_pg_num_pending(p.get_pg_num()); } @@ -4129,6 +4144,79 @@ bool OSDMonitor::prepare_pg_ready_to_merge(MonOpRequestRef op) return true; } +bool OSDMonitor::preprocess_pg_stop_merge(MonOpRequestRef op) +{ + op->mark_osdmon_event(__func__); + auto m = op->get_req(); + dout(10) << __func__ << " " << *m << dendl; + auto session = op->get_session(); + if (!session) { + dout(10) << __func__ << ": no monitor session!" << dendl; + goto ignore; + } + if (!session->is_capable("osd", MON_CAP_X)) { + derr << __func__ << " received from entity " + << "with insufficient privileges " << session->caps << dendl; + goto ignore; + } + if (!osdmap.get_pg_pool(m->pool)) { + derr << __func__ << " pool " << m->pool << " dne" << dendl; + goto ignore; + } + return false; + + ignore: + mon.no_reply(op); + return true; +} + +bool OSDMonitor::prepare_pg_stop_merge(MonOpRequestRef op) +{ + op->mark_osdmon_event(__func__); + auto m = op->get_req(); + dout(10) << __func__ << " " << *m << dendl; + + pg_pool_t p; + if (pending_inc.new_pools.count(m->pool)) + p = pending_inc.new_pools[m->pool]; + else + p = *osdmap.get_pg_pool(m->pool); + + if (!p.is_crimson()) { + dout(10) << __func__ << " pool " << m->pool << " is not crimson, ignoring" + << dendl; + wait_for_finished_proposal(op, new C_ReplyMap(this, op, m->version)); + return false; + } + + const char *reason_str = "unknown"; + if (m->reason == MOSDPGStopMerge::REASON_CROSS_SHARD) { + reason_str = "cross-shard PG merge not supported on Seastore"; + } + + mon.clog->warn() << "osd." << m->get_orig_source().num() + << " stopped PG merge for pool '" + << osdmap.get_pool_name(m->pool) + << "' (" << reason_str << ", source pg " << m->pgid + << "); no further pg_num decrease will be attempted"; + + // Cancel any in-flight shrink; keep current pg_num as-is. + p.set_pg_num_pending(p.get_pg_num()); + if (p.get_pg_num_target() < p.get_pg_num()) { + p.set_pg_num_target(p.get_pg_num()); + } + if (p.get_pgp_num_target() < p.get_pgp_num()) { + p.set_pgp_num_target(p.get_pgp_num()); + } + p.unset_flag(pg_pool_t::FLAG_CRIMSON_ALLOW_PG_MERGE); + p.last_pg_merge_meta = pg_merge_meta_t{}; + p.last_change = pending_inc.epoch; + p.last_force_op_resend_prenautilus = pending_inc.epoch; + + pending_inc.new_pools[m->pool] = p; + wait_for_finished_proposal(op, new C_ReplyMap(this, op, m->version)); + return true; +} // ------------- // pg_temp changes diff --git a/src/mon/OSDMonitor.h b/src/mon/OSDMonitor.h index d0209fbc522..9000fc46de4 100644 --- a/src/mon/OSDMonitor.h +++ b/src/mon/OSDMonitor.h @@ -470,6 +470,9 @@ private: bool preprocess_pg_ready_to_merge(MonOpRequestRef op); bool prepare_pg_ready_to_merge(MonOpRequestRef op); + bool preprocess_pg_stop_merge(MonOpRequestRef op); + bool prepare_pg_stop_merge(MonOpRequestRef op); + int _check_remove_pool(int64_t pool_id, const pg_pool_t &pool, std::ostream *ss); bool _check_become_tier( int64_t tier_pool_id, const pg_pool_t *tier_pool, diff --git a/src/msg/Message.cc b/src/msg/Message.cc index e6600cda451..bbfa159345f 100644 --- a/src/msg/Message.cc +++ b/src/msg/Message.cc @@ -101,6 +101,7 @@ #include "messages/MOSDPGRecoveryDelete.h" #include "messages/MOSDPGRecoveryDeleteReply.h" #include "messages/MOSDPGReadyToMerge.h" +#include "messages/MOSDPGStopMerge.h" #include "messages/MRemoveSnaps.h" @@ -647,6 +648,9 @@ Message *decode_message(CephContext *cct, case MSG_OSD_PG_READY_TO_MERGE: m = make_message(); break; + case MSG_OSD_PG_STOP_MERGE: + m = make_message(); + break; case MSG_OSD_EC_WRITE: m = make_message(); break; diff --git a/src/msg/Message.h b/src/msg/Message.h index ec90687072e..071e6010381 100644 --- a/src/msg/Message.h +++ b/src/msg/Message.h @@ -152,6 +152,7 @@ #define MSG_OSD_SCRUB2 121 #define MSG_OSD_PG_READY_TO_MERGE 122 +#define MSG_OSD_PG_STOP_MERGE 124 #define MSG_OSD_PG_LEASE 133 #define MSG_OSD_PG_LEASE_ACK 134 diff --git a/src/msg/MessageRef.h b/src/msg/MessageRef.h index 416b58f8df7..c8f9f81aceb 100644 --- a/src/msg/MessageRef.h +++ b/src/msg/MessageRef.h @@ -157,6 +157,7 @@ class MOSDPGPush; class MOSDPGPushReply; class MOSDPGQuery; class MOSDPGReadyToMerge; +class MOSDPGStopMerge; class MOSDPGRecoveryDelete; class MOSDPGRecoveryDeleteReply; class MOSDPGRemove;