From: Aishwarya Mathuria Date: Wed, 7 Jan 2026 11:55:25 +0000 (+0000) Subject: crimson/osd: Add functions to notify mon when PGs are ready to merge X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=fc009737396bbbdf142537bd91d873c70562cc0c;p=ceph-ci.git crimson/osd: Add functions to notify mon when PGs are ready to merge When a PG is in the pending merge state it is >= pg_num_pending and < pg_num. When this happens, IO is paused and once the PG peers we notify the mon that we are idle and safe to merge. Signed-off-by: Aishwarya Mathuria --- diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index c840648975d..c05b5e6900c 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -418,12 +418,31 @@ public: std::pair do_delete_work(ceph::os::Transaction &t, ghobject_t _next) final; - // merge/split not ready - void clear_ready_to_merge() final {} - void set_not_ready_to_merge_target(pg_t pgid, pg_t src) final {} - void set_not_ready_to_merge_source(pg_t pgid) final {} - void set_ready_to_merge_target(eversion_t lu, epoch_t les, epoch_t lec) final {} - void set_ready_to_merge_source(eversion_t lu) final {} + void clear_ready_to_merge() final { + LOG_PREFIX(PG::clear_ready_to_merge); + SUBDEBUGDPP(osd, "", *this); + (void)shard_services.clear_ready_to_merge(pgid.pgid).discard_result(); + } + void set_not_ready_to_merge_target(pg_t pgid, pg_t src) final { + LOG_PREFIX(PG::set_not_ready_to_merge_target); + SUBDEBUGDPP(osd, "", *this); + (void)shard_services.set_not_ready_to_merge_target(pgid, src).discard_result(); + } + void set_not_ready_to_merge_source(pg_t pgid) final { + LOG_PREFIX(PG::set_not_ready_to_merge_source); + SUBDEBUGDPP(osd, "", *this); + (void)shard_services.set_not_ready_to_merge_source(pgid).discard_result(); + } + void set_ready_to_merge_target(eversion_t lu, epoch_t les, epoch_t lec) final { + LOG_PREFIX(PG::set_ready_to_merge_target); + SUBDEBUGDPP(osd, "", *this); + (void)shard_services.set_ready_to_merge_target(pgid.pgid, lu, les, lec).discard_result(); + } + void set_ready_to_merge_source(eversion_t lu) final { + LOG_PREFIX(PG::set_ready_to_merge_source); + SUBDEBUGDPP(osd, "", *this); + (void)shard_services.set_ready_to_merge_source(pgid.pgid, lu).discard_result(); + } void on_active_actmap() final; void on_active_advmap(const OSDMapRef &osdmap) final; diff --git a/src/crimson/osd/shard_services.cc b/src/crimson/osd/shard_services.cc index 09da0412416..f53fb9f0a06 100644 --- a/src/crimson/osd/shard_services.cc +++ b/src/crimson/osd/shard_services.cc @@ -9,6 +9,7 @@ #include "messages/MOSDMap.h" #include "messages/MOSDPGCreated.h" #include "messages/MOSDPGTemp.h" +#include "messages/MOSDPGReadyToMerge.h" #include "osd/osd_perf_counters.h" #include "osd/PeeringState.h" @@ -307,6 +308,135 @@ void OSDSingletonState::prune_pg_created() } } +seastar::future<> OSDSingletonState::set_ready_to_merge_source(pg_t pgid, + eversion_t version) +{ + LOG_PREFIX(OSDSingletonState::set_ready_to_merge_source); + DEBUG("{}", pgid); + ready_to_merge_source[pgid] = version; + ceph_assert(not_ready_to_merge_source.count(pgid) == 0); + return send_ready_to_merge(); + +} + +seastar::future<> OSDSingletonState::set_ready_to_merge_target(pg_t pgid, + eversion_t version, + epoch_t last_epoch_started, + epoch_t last_epoch_clean) +{ + LOG_PREFIX(OSDSingletonState::set_ready_to_merge_target); + DEBUG("{}", pgid); + ready_to_merge_target.insert(std::make_pair(pgid, + std::make_tuple(version, + last_epoch_started, + last_epoch_clean))); + ceph_assert(not_ready_to_merge_target.count(pgid) == 0); + return send_ready_to_merge(); + +} + +seastar::future<> OSDSingletonState::set_not_ready_to_merge_source(pg_t source) +{ + LOG_PREFIX(OSDSingletonState::set_not_ready_to_merge_source); + DEBUG("{}", source); + not_ready_to_merge_source.insert(source); + ceph_assert(ready_to_merge_source.count(source) == 0); + return send_ready_to_merge(); + +} + +seastar::future<> OSDSingletonState::set_not_ready_to_merge_target(pg_t target, pg_t source) +{ + LOG_PREFIX(OSDSingletonState::set_not_ready_to_merge_target); + DEBUG("{} source {}", target, source); + not_ready_to_merge_target[target] = source; + ceph_assert(ready_to_merge_source.count(target) == 0); + return send_ready_to_merge(); + +} + +seastar::future<> OSDSingletonState::send_ready_to_merge() +{ + LOG_PREFIX(OSDSingletonState::send_ready_to_merge); + DEBUG(" ready_to_merge_source: {} not_ready_to_merge_source: {} \ + ready_to_merge_target: {} not_ready_to_merge_target: {} \ + sent_ready_to_merge_source {}", ready_to_merge_source, + not_ready_to_merge_source, ready_to_merge_target, not_ready_to_merge_target, + sent_ready_to_merge_source); + for (auto src : not_ready_to_merge_source) { + if (sent_ready_to_merge_source.count(src) == 0) { + return monc.send_message(crimson::make_message( + src, + eversion_t{}, eversion_t{}, 0, 0, + false, + osdmap->get_epoch())).then([this, src] { + sent_ready_to_merge_source.insert(src); + }); + } + } + for (auto p : not_ready_to_merge_target) { + if (sent_ready_to_merge_source.count(p.second) == 0) { + return monc.send_message(crimson::make_message( + p.second, + eversion_t{}, eversion_t{}, 0, 0, + false, + osdmap->get_epoch())).then([this, p] { + sent_ready_to_merge_source.insert(p.second); + }); + } + } + for (auto src : ready_to_merge_source) { + if (not_ready_to_merge_source.count(src.first) || + not_ready_to_merge_target.count(src.first.get_parent())) { + continue; + } + auto p = ready_to_merge_target.find(src.first.get_parent()); + if (p != ready_to_merge_target.end() && + sent_ready_to_merge_source.count(src.first) == 0) { + return monc.send_message(crimson::make_message( + src.first, // source pgid + src.second, // src version + std::get<0>(p->second), // target version + std::get<1>(p->second), // PG's last_epoch_started + std::get<2>(p->second), // PG's last_epoch_clean + true, + osdmap->get_epoch())).then([this, src] { + sent_ready_to_merge_source.insert(src.first); + }); + } + } + return seastar::now(); +} + +void OSDSingletonState::clear_ready_to_merge(pg_t pgid) +{ + ready_to_merge_source.erase(pgid); + ready_to_merge_target.erase(pgid); + not_ready_to_merge_source.erase(pgid); + not_ready_to_merge_target.erase(pgid); + sent_ready_to_merge_source.erase(pgid); +} + +void OSDSingletonState::clear_sent_ready_to_merge() +{ + sent_ready_to_merge_source.clear(); +} + +void OSDSingletonState::prune_sent_ready_to_merge(const OSDMapService::cached_map_t osdmap) +{ + LOG_PREFIX(OSDSingletonState::prune_sent_ready_to_merge); + auto source = sent_ready_to_merge_source.begin(); + while (source != sent_ready_to_merge_source.end()) { + if (!osdmap->pg_exists(*source)) { + DEBUG("{}", *source); + source = sent_ready_to_merge_source.erase(source); + } else { + DEBUG(" exist {}", *source); + ++source; + } + } +} + seastar::future<> OSDSingletonState::send_alive(const epoch_t want) { LOG_PREFIX(OSDSingletonState::send_alive); diff --git a/src/crimson/osd/shard_services.h b/src/crimson/osd/shard_services.h index 8689a7da49a..e28717cfc40 100644 --- a/src/crimson/osd/shard_services.h +++ b/src/crimson/osd/shard_services.h @@ -351,6 +351,25 @@ private: seastar::future<> store_maps(ceph::os::Transaction& t, epoch_t start, Ref m); void trim_maps(ceph::os::Transaction& t, OSDSuperblock& superblock); + + // -- PG merging -- + std::map ready_to_merge_source; + std::map> ready_to_merge_target; + std::set not_ready_to_merge_source; + std::map not_ready_to_merge_target; + std::set sent_ready_to_merge_source; + seastar::future<> set_ready_to_merge_source(pg_t pgid, + eversion_t version); + seastar::future<> set_ready_to_merge_target(pg_t pgid, + eversion_t version, + epoch_t last_epoch_started, + epoch_t last_epoch_clean); + seastar::future<> set_not_ready_to_merge_source(pg_t source); + seastar::future<> set_not_ready_to_merge_target(pg_t target, pg_t source); + void clear_ready_to_merge(pg_t pgid); + seastar::future<> send_ready_to_merge(); + void clear_sent_ready_to_merge(); + void prune_sent_ready_to_merge(const cached_map_t osdmap); }; /** @@ -612,6 +631,15 @@ public: }); } + FORWARD_TO_OSD_SINGLETON(set_ready_to_merge_source) + FORWARD_TO_OSD_SINGLETON(set_ready_to_merge_target) + FORWARD_TO_OSD_SINGLETON(set_not_ready_to_merge_source) + FORWARD_TO_OSD_SINGLETON(set_not_ready_to_merge_target) + FORWARD_TO_OSD_SINGLETON(clear_ready_to_merge) + FORWARD_TO_OSD_SINGLETON(send_ready_to_merge) + FORWARD_TO_OSD_SINGLETON(clear_sent_ready_to_merge) + FORWARD_TO_OSD_SINGLETON(prune_sent_ready_to_merge) + FORWARD_TO_OSD_SINGLETON(get_pool_info) FORWARD(get_throttle, get_throttle, local_state.throttler)