]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/osd: Add functions to notify mon when PGs are ready to merge
authorAishwarya Mathuria <amathuri@redhat.com>
Wed, 7 Jan 2026 11:55:25 +0000 (11:55 +0000)
committerAishwarya Mathuria <amathuri@redhat.com>
Mon, 12 Jan 2026 07:52:43 +0000 (07:52 +0000)
When a PG is in the pending merge state it is >= pg_num_pending and <
pg_num. When this happens, IO is paused and once the PG peers we notify
the mon that we are idle and safe to merge.

Signed-off-by: Aishwarya Mathuria <amathuri@redhat.com>
src/crimson/osd/pg.h
src/crimson/osd/shard_services.cc
src/crimson/osd/shard_services.h

index c840648975d13ffde2c6bda8f59c5c872b94f9e2..c05b5e6900cacfb626d9f6f9f54b54ec5d786df0 100644 (file)
@@ -418,12 +418,31 @@ public:
   std::pair<ghobject_t, bool>
   do_delete_work(ceph::os::Transaction &t, ghobject_t _next) final;
 
-  // merge/split not ready
-  void clear_ready_to_merge() final {}
-  void set_not_ready_to_merge_target(pg_t pgid, pg_t src) final {}
-  void set_not_ready_to_merge_source(pg_t pgid) final {}
-  void set_ready_to_merge_target(eversion_t lu, epoch_t les, epoch_t lec) final {}
-  void set_ready_to_merge_source(eversion_t lu) final {}
+  void clear_ready_to_merge() final {
+    LOG_PREFIX(PG::clear_ready_to_merge);
+    SUBDEBUGDPP(osd, "", *this);
+    (void)shard_services.clear_ready_to_merge(pgid.pgid).discard_result();
+  }
+  void set_not_ready_to_merge_target(pg_t pgid, pg_t src) final {
+    LOG_PREFIX(PG::set_not_ready_to_merge_target);
+    SUBDEBUGDPP(osd, "", *this);
+    (void)shard_services.set_not_ready_to_merge_target(pgid, src).discard_result();
+  }
+  void set_not_ready_to_merge_source(pg_t pgid) final {
+    LOG_PREFIX(PG::set_not_ready_to_merge_source);
+    SUBDEBUGDPP(osd, "", *this);
+    (void)shard_services.set_not_ready_to_merge_source(pgid).discard_result();
+  }
+  void set_ready_to_merge_target(eversion_t lu, epoch_t les, epoch_t lec) final {
+    LOG_PREFIX(PG::set_ready_to_merge_target);
+    SUBDEBUGDPP(osd, "", *this);
+    (void)shard_services.set_ready_to_merge_target(pgid.pgid, lu, les, lec).discard_result();
+  }
+  void set_ready_to_merge_source(eversion_t lu) final {
+    LOG_PREFIX(PG::set_ready_to_merge_source);
+    SUBDEBUGDPP(osd, "", *this);
+    (void)shard_services.set_ready_to_merge_source(pgid.pgid, lu).discard_result();
+  }
 
   void on_active_actmap() final;
   void on_active_advmap(const OSDMapRef &osdmap) final;
index 09da0412416335767ef79faa0e563ed975fe2455..f53fb9f0a063fda004fcc797e3828e65b56dfd7d 100644 (file)
@@ -9,6 +9,7 @@
 #include "messages/MOSDMap.h"
 #include "messages/MOSDPGCreated.h"
 #include "messages/MOSDPGTemp.h"
+#include "messages/MOSDPGReadyToMerge.h"
 
 #include "osd/osd_perf_counters.h"
 #include "osd/PeeringState.h"
@@ -307,6 +308,135 @@ void OSDSingletonState::prune_pg_created()
   }
 }
 
+seastar::future<> OSDSingletonState::set_ready_to_merge_source(pg_t pgid,
+                                                  eversion_t version)
+{
+  LOG_PREFIX(OSDSingletonState::set_ready_to_merge_source);
+  DEBUG("{}", pgid);
+  ready_to_merge_source[pgid] = version;
+  ceph_assert(not_ready_to_merge_source.count(pgid) == 0);
+  return send_ready_to_merge();
+
+}
+
+seastar::future<> OSDSingletonState::set_ready_to_merge_target(pg_t pgid,
+                                           eversion_t version,
+                                           epoch_t last_epoch_started,
+                                           epoch_t last_epoch_clean)
+{
+  LOG_PREFIX(OSDSingletonState::set_ready_to_merge_target);
+  DEBUG("{}", pgid);
+  ready_to_merge_target.insert(std::make_pair(pgid,
+                                         std::make_tuple(version,
+                                                    last_epoch_started,
+                                                    last_epoch_clean)));
+  ceph_assert(not_ready_to_merge_target.count(pgid) == 0);
+  return send_ready_to_merge();
+
+}
+
+seastar::future<> OSDSingletonState::set_not_ready_to_merge_source(pg_t source)
+{
+  LOG_PREFIX(OSDSingletonState::set_not_ready_to_merge_source);
+  DEBUG("{}", source);
+  not_ready_to_merge_source.insert(source);
+  ceph_assert(ready_to_merge_source.count(source) == 0);
+  return send_ready_to_merge();
+
+}
+
+seastar::future<> OSDSingletonState::set_not_ready_to_merge_target(pg_t target, pg_t source)
+{
+  LOG_PREFIX(OSDSingletonState::set_not_ready_to_merge_target);
+  DEBUG("{} source {}", target, source);
+  not_ready_to_merge_target[target] = source;
+  ceph_assert(ready_to_merge_source.count(target) == 0);
+  return send_ready_to_merge();
+
+}
+
+seastar::future<> OSDSingletonState::send_ready_to_merge()
+{
+  LOG_PREFIX(OSDSingletonState::send_ready_to_merge);
+  DEBUG(" ready_to_merge_source: {} not_ready_to_merge_source: {} \
+          ready_to_merge_target: {} not_ready_to_merge_target: {} \
+          sent_ready_to_merge_source {}", ready_to_merge_source,
+          not_ready_to_merge_source, ready_to_merge_target, not_ready_to_merge_target,
+          sent_ready_to_merge_source);
+  for (auto src : not_ready_to_merge_source) {
+    if (sent_ready_to_merge_source.count(src) == 0) {
+      return monc.send_message(crimson::make_message<MOSDPGReadyToMerge>(
+                               src,
+                               eversion_t{}, eversion_t{}, 0, 0,
+                               false,
+                               osdmap->get_epoch())).then([this, src] {
+        sent_ready_to_merge_source.insert(src);
+      });
+    }
+  }
+  for (auto p : not_ready_to_merge_target) {
+    if (sent_ready_to_merge_source.count(p.second) == 0) {
+      return monc.send_message(crimson::make_message<MOSDPGReadyToMerge>(
+                               p.second,
+                               eversion_t{}, eversion_t{}, 0, 0,
+                               false,
+                               osdmap->get_epoch())).then([this, p] {
+      sent_ready_to_merge_source.insert(p.second);
+      });
+    }
+  }
+  for (auto src : ready_to_merge_source) {
+    if (not_ready_to_merge_source.count(src.first) ||
+        not_ready_to_merge_target.count(src.first.get_parent())) {
+      continue;
+    }
+    auto p = ready_to_merge_target.find(src.first.get_parent());
+    if (p != ready_to_merge_target.end() &&
+        sent_ready_to_merge_source.count(src.first) == 0) {
+      return monc.send_message(crimson::make_message<MOSDPGReadyToMerge>(
+                               src.first,           // source pgid
+                               src.second,          // src version
+                               std::get<0>(p->second), // target version
+                               std::get<1>(p->second), // PG's last_epoch_started
+                               std::get<2>(p->second), // PG's last_epoch_clean
+                               true,
+                               osdmap->get_epoch())).then([this, src] {
+      sent_ready_to_merge_source.insert(src.first);
+      });
+    }
+  }
+  return seastar::now();
+}
+
+void OSDSingletonState::clear_ready_to_merge(pg_t pgid)
+{
+  ready_to_merge_source.erase(pgid);
+  ready_to_merge_target.erase(pgid);
+  not_ready_to_merge_source.erase(pgid);
+  not_ready_to_merge_target.erase(pgid);
+  sent_ready_to_merge_source.erase(pgid);
+}
+
+void OSDSingletonState::clear_sent_ready_to_merge()
+{
+  sent_ready_to_merge_source.clear();
+}
+
+void OSDSingletonState::prune_sent_ready_to_merge(const OSDMapService::cached_map_t osdmap)
+{
+  LOG_PREFIX(OSDSingletonState::prune_sent_ready_to_merge);
+  auto source = sent_ready_to_merge_source.begin();
+  while (source != sent_ready_to_merge_source.end()) {
+    if (!osdmap->pg_exists(*source)) {
+      DEBUG("{}", *source);
+      source = sent_ready_to_merge_source.erase(source);
+    } else {
+      DEBUG(" exist {}", *source);
+      ++source;
+    }
+  }
+}
+
 seastar::future<> OSDSingletonState::send_alive(const epoch_t want)
 {
   LOG_PREFIX(OSDSingletonState::send_alive);
index 8689a7da49a58dc89fa5de56a526cd44f82a9a09..e28717cfc40f74ca7f84a4c202668305a24db7b5 100644 (file)
@@ -351,6 +351,25 @@ private:
   seastar::future<> store_maps(ceph::os::Transaction& t,
                                epoch_t start, Ref<MOSDMap> m);
   void trim_maps(ceph::os::Transaction& t, OSDSuperblock& superblock);
+
+  // -- PG merging --
+  std::map<pg_t, eversion_t> ready_to_merge_source;
+  std::map<pg_t,std::tuple<eversion_t,epoch_t,epoch_t>> ready_to_merge_target;
+  std::set<pg_t> not_ready_to_merge_source;
+  std::map<pg_t,pg_t> not_ready_to_merge_target;
+  std::set<pg_t> sent_ready_to_merge_source;
+  seastar::future<> set_ready_to_merge_source(pg_t pgid,
+                                 eversion_t version);
+  seastar::future<> set_ready_to_merge_target(pg_t pgid,
+                                 eversion_t version,
+                                 epoch_t last_epoch_started,
+                                 epoch_t last_epoch_clean);
+  seastar::future<> set_not_ready_to_merge_source(pg_t source);
+  seastar::future<> set_not_ready_to_merge_target(pg_t target, pg_t source);
+  void clear_ready_to_merge(pg_t pgid);
+  seastar::future<> send_ready_to_merge();
+  void clear_sent_ready_to_merge();
+  void prune_sent_ready_to_merge(const cached_map_t osdmap);
 };
 
 /**
@@ -612,6 +631,15 @@ public:
       });
   }
 
+  FORWARD_TO_OSD_SINGLETON(set_ready_to_merge_source)
+  FORWARD_TO_OSD_SINGLETON(set_ready_to_merge_target)
+  FORWARD_TO_OSD_SINGLETON(set_not_ready_to_merge_source)
+  FORWARD_TO_OSD_SINGLETON(set_not_ready_to_merge_target)
+  FORWARD_TO_OSD_SINGLETON(clear_ready_to_merge)
+  FORWARD_TO_OSD_SINGLETON(send_ready_to_merge)
+  FORWARD_TO_OSD_SINGLETON(clear_sent_ready_to_merge)
+  FORWARD_TO_OSD_SINGLETON(prune_sent_ready_to_merge)
+
   FORWARD_TO_OSD_SINGLETON(get_pool_info)
   FORWARD(get_throttle, get_throttle, local_state.throttler)