]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd: reject Seastore PG merges across shards 66885/head
authorAishwarya Mathuria <amathuri@redhat.com>
Mon, 18 May 2026 13:37:26 +0000 (13:37 +0000)
committerAishwarya Mathuria <amathuri@redhat.com>
Thu, 11 Jun 2026 04:55:49 +0000 (10:25 +0530)
Seastore cannot merge collections between reactor shards currently.
On cross-shard detection, tell the monitor the source PG is not ready
(via MOSDPGReadyToMerge{ ready=false }) so the unsafe pg_num decrement
is never proposed, then send MOSDPGStopMerge to clamp pg_num_target and
permanently disable further shrink for the pool.

Signed-off-by: Aishwarya Mathuria <amathuri@redhat.com>
12 files changed:
src/crimson/osd/osd_operations/pg_advance_map.cc
src/crimson/osd/pg.cc
src/crimson/osd/pg.h
src/crimson/osd/shard_services.cc
src/crimson/osd/shard_services.h
src/messages/MOSDPGStopMerge.h [new file with mode: 0644]
src/mon/Monitor.cc
src/mon/OSDMonitor.cc
src/mon/OSDMonitor.h
src/msg/Message.cc
src/msg/Message.h
src/msg/MessageRef.h

index ff412adc5db1f0a4372bb3fbaf4f5f71ac6a53ae..5ba9dc4dff5349283c3bf546b26a5595dcad18b9 100644 (file)
@@ -295,11 +295,21 @@ seastar::future<PGAdvanceMap::merge_result_t> PGAdvanceMap::merge_pg(
   if (pg->pgid.is_merge_source(old_pg_num,
                                new_pg_num,
                                &parent)) {
+    parent.is_split(new_pg_num, old_pg_num, &merge_sources);
+    if (!co_await shard_services.seastore_merge_shards_ok(
+          parent, merge_sources)) {
+      co_return merge_result_t{};
+    }
     co_return merge_result_t{merge_role_t::Source, parent};
   } else if (pg->pgid.is_merge_target(old_pg_num,
                                        new_pg_num)) {
     DEBUG("Target PG {} identified. Waiting for sources...", pg->get_pgid());
     pg->pgid.is_split(new_pg_num, old_pg_num, &merge_sources);
+
+    if (!co_await shard_services.seastore_merge_shards_ok(
+          pg->get_pgid(), merge_sources)) {
+      co_return merge_result_t{};
+    }
     // Block until all source PGs (potentially from other shards) arrive
     // on this PG's rendezvous
     auto sources = co_await pg->collect_merge_sources(merge_sources.size());
index a9506994971b845c49ea4195cdc909e5eee506f4..f79e8272d9947e3ef38b65b0efbcb9e291f38f40 100644 (file)
@@ -1690,10 +1690,7 @@ seastar::future<> PG::stop()
     clear_ready_to_merge();
   }
 
-  // Wake any coroutine parked in collect_merge_sources() so shutdown
-  // doesn't hang waiting for sources that will never arrive.
-  merge_rendezvous.arrivals.broken();
-  merge_rendezvous.sources.clear();
+  reset_merge_rendezvous();
 
   cancel_local_background_io_reservation();
   cancel_remote_recovery_reservation();
@@ -2081,6 +2078,14 @@ PG::collect_merge_sources(std::size_t n)
   co_return sources;
 }
 
+void PG::reset_merge_rendezvous()
+{
+  // Unblock any waiter in collect_merge_sources() with broken(); then
+  // replace the semaphore so the next merge attempt starts at zero signals.
+  merge_rendezvous.arrivals.broken();
+  merge_rendezvous = merge_rendezvous_t{};
+}
+
 void PG::merge_from(
     merge_source_map_t& sources,
     PeeringCtx &rctx,
index 2e8238fcaf872684ad23bfc9c2a39a426f002eaf..6e5e47c591c290b9251b714ac786e4794906f121 100644 (file)
@@ -586,11 +586,6 @@ public:
   // return them and clear the rendezvous state.  Returns an empty map if
   // reset_merge_rendezvous() breaks the wait (e.g. PG stop or merge cancel).
   seastar::future<merge_source_map_t> collect_merge_sources(std::size_t n);
-  void merge_from(
-      merge_source_map_t& sources,
-      PeeringCtx &rctx,
-      unsigned split_bits,
-      const pg_merge_meta_t& last_pg_merge_meta);
 
   // Drop in-flight handoffs and reset the semaphore.  Call on PG stop or
   // after Seastore cross-shard cancel so a failed try cannot leave stale
index 06a55649d3b05607fc76692b89e5b35f4e6c37f5..00b5dbabe0e11760d199e1659869cfc52a127f1b 100644 (file)
@@ -10,6 +10,7 @@
 #include "messages/MOSDPGCreated.h"
 #include "messages/MOSDPGTemp.h"
 #include "messages/MOSDPGReadyToMerge.h"
+#include "messages/MOSDPGStopMerge.h"
 
 #include "osd/osd_perf_counters.h"
 #include "osd/PeeringState.h"
@@ -148,6 +149,84 @@ seastar::future<> ShardServices::register_merge_source(
     });
 }
 
+bool ShardServices::is_seastore_objectstore()
+{
+  return crimson::common::local_conf().get_val<std::string>(
+    "osd_objectstore") == "seastore";
+}
+
+seastar::future<> ShardServices::reset_target_merge_rendezvous(spg_t target)
+{
+  const core_id_t target_core = co_await get_pg_mapping(target);
+  co_await container().invoke_on(
+    target_core,
+    [target](ShardServices& target_svc) {
+      if (auto target_pg = target_svc.local_state.pg_map.get_pg(target)) {
+        target_pg->reset_merge_rendezvous();
+      }
+    });
+}
+
+seastar::future<> ShardServices::send_stop_pool_merge(pg_t source_pgid)
+{
+  co_await with_singleton(
+    [pool = source_pgid.pool(), source_pgid](OSDSingletonState& singleton) {
+      return singleton.send_stop_pool_pg_merge(pool, source_pgid);
+    });
+}
+
+seastar::future<> ShardServices::abort_seastore_cross_shard_merge(
+  spg_t target,
+  pg_t source_pgid,
+  const std::set<spg_t>* all_sources,
+  bool reset_target_rendezvous)
+{
+  if (all_sources) {
+    for (const auto& src : *all_sources) {
+      co_await clear_ready_to_merge(src.pgid);
+    }
+  } else {
+    co_await clear_ready_to_merge(source_pgid);
+  }
+  co_await clear_ready_to_merge(target.pgid);
+
+  // Ensure the monitor backs off before it can commit a pg_num decrement
+  // for this source.  "Stop merge" is permanent policy; "not ready" prevents
+  // an already-in-flight decrement decision from being committed.
+  co_await set_not_ready_to_merge_source(source_pgid);
+  co_await send_stop_pool_merge(source_pgid);
+  if (reset_target_rendezvous) {
+    co_await reset_target_merge_rendezvous(target);
+  }
+}
+
+
+seastar::future<bool> ShardServices::seastore_merge_shards_ok(
+  spg_t target,
+  const std::set<spg_t>& merge_sources)
+{
+  LOG_PREFIX(ShardServices::seastore_merge_shards_ok);
+  if (!is_seastore_objectstore()) {
+    co_return true;
+  }
+  const core_id_t target_core = co_await get_pg_mapping(target);
+  std::optional<pg_t> cross_shard_source;
+  for (const auto& src : merge_sources) {
+    if (co_await get_pg_mapping(src) != target_core) {
+      cross_shard_source = src.pgid;
+      break;
+    }
+  }
+  if (!cross_shard_source) {
+    co_return true;
+  }
+  DEBUG("seastore: target {} has a cross-shard merge source {}; stop pool merge",
+        target, *cross_shard_source);
+  co_await abort_seastore_cross_shard_merge(
+    target, *cross_shard_source, &merge_sources, true);
+  co_return false;
+}
+
 Ref<PG> PerShardState::get_pg(spg_t pgid)
 {
   assert_core();
@@ -479,9 +558,49 @@ void OSDSingletonState::clear_sent_ready_to_merge()
   sent_ready_to_merge_source.clear();
 }
 
+seastar::future<> OSDSingletonState::send_stop_pool_pg_merge(int64_t pool, pg_t pgid)
+{
+  LOG_PREFIX(OSDSingletonState::send_stop_pool_pg_merge);
+  const pg_pool_t *pi = osdmap->get_pg_pool(pool);
+  if (!pi || !pi->is_crimson()) {
+    co_return;
+  }
+  if (!pi->has_flag(pg_pool_t::FLAG_CRIMSON_ALLOW_PG_MERGE)) {
+    pools_merge_stopped_reported.insert(pool);
+    co_return;
+  }
+  // Claim the pool before co_await: otherwise concurrent callers can all pass
+  // the check and each send MOSDPGStopMerge while yielded on send_message().
+  if (!pools_merge_stopped_reported.insert(pool).second) {
+    co_return;
+  }
+  DEBUG("seastore: asking monitor to stop PG merge for pool {} (pg {})",
+        pool, pgid);
+  co_await monc.send_message(crimson::make_message<MOSDPGStopMerge>(
+    pool, pgid, MOSDPGStopMerge::REASON_CROSS_SHARD, osdmap->get_epoch()));
+}
+
+void OSDSingletonState::prune_pools_merge_stopped_reported()
+{
+  // send_stop_pool_pg_merge() inserts a pool after one MOSDPGStopMerge so we
+  // do not spam the mon on every cross-shard source/target pair.  Keep that
+  // entry while the map still has merge disabled; forget it if the pool
+  // disappears or crimson_allow_pg_merge is set again.
+  auto pool = pools_merge_stopped_reported.begin();
+  while (pool != pools_merge_stopped_reported.end()) {
+    const pg_pool_t *pi = osdmap->get_pg_pool(*pool);
+    if (!pi || pi->has_flag(pg_pool_t::FLAG_CRIMSON_ALLOW_PG_MERGE)) {
+      pool = pools_merge_stopped_reported.erase(pool);
+    } else {
+      ++pool;
+    }
+  }
+}
+
 void OSDSingletonState::prune_sent_ready_to_merge()
 {
   LOG_PREFIX(OSDSingletonState::prune_sent_ready_to_merge);
+  prune_pools_merge_stopped_reported();
   auto source = sent_ready_to_merge_source.begin();
   while (source != sent_ready_to_merge_source.end()) {
     if (!osdmap->pg_exists(*source)) {
index 80c26e8d2409616c25e3f288617654e464bd2c4f..430f41502bf5843ca66773cfc2423d3a1af311d8 100644 (file)
@@ -378,6 +378,7 @@ private:
   std::set<pg_t> not_ready_to_merge_source;
   std::map<pg_t,pg_t> not_ready_to_merge_target;
   std::set<pg_t> sent_ready_to_merge_source;
+  std::set<int64_t> pools_merge_stopped_reported;
   seastar::future<> set_ready_to_merge_source(pg_t pgid,
                                  eversion_t version);
   seastar::future<> set_ready_to_merge_target(pg_t pgid,
@@ -388,7 +389,9 @@ private:
   seastar::future<> set_not_ready_to_merge_target(pg_t target, pg_t source);
   void clear_ready_to_merge(pg_t pgid);
   seastar::future<> send_ready_to_merge();
+  seastar::future<> send_stop_pool_pg_merge(int64_t pool, pg_t pgid);
   void clear_sent_ready_to_merge();
+  void prune_pools_merge_stopped_reported();
   void prune_sent_ready_to_merge();
 };
 
@@ -684,6 +687,14 @@ public:
   // local_shared_foreign_ptr it received.
   seastar::future<> register_merge_source(spg_t target, spg_t source);
 
+  static bool is_seastore_objectstore();
+
+  // True if every merge source is co-located on the target PG's shard
+  // (Seastore only). Uses the full sibling set so one source cannot commit
+  // while another would abort.
+  seastar::future<bool> seastore_merge_shards_ok(
+    spg_t target, const std::set<spg_t>& merge_sources);
+
   FORWARD_TO_OSD_SINGLETON(set_ready_to_merge_source)
   FORWARD_TO_OSD_SINGLETON(set_ready_to_merge_target)
   FORWARD_TO_OSD_SINGLETON(set_not_ready_to_merge_source)
@@ -854,6 +865,15 @@ public:
       invoke_context_on_core(seastar::this_shard_id(), on_reserved));
   }
 
+private:
+  seastar::future<> reset_target_merge_rendezvous(spg_t target);
+  seastar::future<> send_stop_pool_merge(pg_t source_pgid);
+  seastar::future<> abort_seastore_cross_shard_merge(
+    spg_t target,
+    pg_t source_pgid,
+    const std::set<spg_t>* all_sources,
+    bool reset_target_rendezvous);
+
 #undef FORWARD_CONST
 #undef FORWARD
 #undef FORWARD_TO_OSD_SINGLETON
diff --git a/src/messages/MOSDPGStopMerge.h b/src/messages/MOSDPGStopMerge.h
new file mode 100644 (file)
index 0000000..fa0fa5c
--- /dev/null
@@ -0,0 +1,53 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 sts=2 expandtab
+
+#pragma once
+
+#include "osd/osd_types.h"
+#include "messages/PaxosServiceMessage.h"
+
+/// OSD -> mon: permanently stop PG merge shrink for a Crimson pool.
+class MOSDPGStopMerge : public PaxosServiceMessage {
+public:
+  static constexpr uint8_t REASON_CROSS_SHARD = 1;
+
+  int64_t pool = -1;
+  pg_t pgid;
+  uint8_t reason = REASON_CROSS_SHARD;
+
+  MOSDPGStopMerge()
+    : PaxosServiceMessage{MSG_OSD_PG_STOP_MERGE, 0}
+  {}
+  MOSDPGStopMerge(int64_t pool, pg_t pgid, uint8_t reason, epoch_t epoch)
+    : PaxosServiceMessage{MSG_OSD_PG_STOP_MERGE, epoch},
+      pool(pool),
+      pgid(pgid),
+      reason(reason)
+  {}
+  void encode_payload(uint64_t features) override {
+    using ceph::encode;
+    paxos_encode();
+    encode(pool, payload);
+    encode(pgid, payload);
+    encode(reason, payload);
+  }
+  void decode_payload() override {
+    using ceph::decode;
+    auto p = payload.cbegin();
+    paxos_decode(p);
+    decode(pool, p);
+    decode(pgid, p);
+    decode(reason, p);
+  }
+  std::string_view get_type_name() const override { return "osd_pg_stop_merge"; }
+  void print(std::ostream &out) const {
+    out << get_type_name()
+        << "(pool " << pool
+        << " pg " << pgid
+        << " reason " << (unsigned)reason
+        << " v" << version << ")";
+  }
+private:
+  template<class T, typename... Args>
+  friend boost::intrusive_ptr<T> ceph::make_message(Args&&... args);
+};
index 80bca9ca4b9a390f8c73fc12ce857aa1a3a48eec..205fb23c7ceaeb8739f63131aa369e736ab00e77 100644 (file)
@@ -4782,6 +4782,7 @@ void Monitor::dispatch_op(MonOpRequestRef op)
     case MSG_REMOVE_SNAPS:
     case MSG_MON_GET_PURGED_SNAPS:
     case MSG_OSD_PG_READY_TO_MERGE:
+    case MSG_OSD_PG_STOP_MERGE:
       paxos_service[PAXOS_OSDMAP]->dispatch(op);
       return;
 
index b256899e3a4022965b0fa833b3a58d14098e8465..8121ea2523cdb4b0d2aa65f6a7f2f793e6d73381 100644 (file)
@@ -54,6 +54,7 @@
 #include "messages/MOSDPGCreated.h"
 #include "messages/MOSDPGTemp.h"
 #include "messages/MOSDPGReadyToMerge.h"
+#include "messages/MOSDPGStopMerge.h"
 #include "messages/MMonCommand.h"
 #include "messages/MRemoveSnaps.h"
 #include "messages/MRoute.h"
@@ -2771,6 +2772,8 @@ bool OSDMonitor::preprocess_query(MonOpRequestRef op)
     return preprocess_pg_created(op);
   case MSG_OSD_PG_READY_TO_MERGE:
     return preprocess_pg_ready_to_merge(op);
+  case MSG_OSD_PG_STOP_MERGE:
+    return preprocess_pg_stop_merge(op);
   case MSG_OSD_PGTEMP:
     return preprocess_pgtemp(op);
   case MSG_OSD_BEACON:
@@ -2817,6 +2820,8 @@ bool OSDMonitor::prepare_update(MonOpRequestRef op)
     return prepare_pgtemp(op);
   case MSG_OSD_PG_READY_TO_MERGE:
     return prepare_pg_ready_to_merge(op);
+  case MSG_OSD_PG_STOP_MERGE:
+    return prepare_pg_stop_merge(op);
   case MSG_OSD_BEACON:
     return prepare_beacon(op);
 
@@ -4100,6 +4105,16 @@ bool OSDMonitor::prepare_pg_ready_to_merge(MonOpRequestRef op)
     p.last_change = pending_inc.epoch;
   } else {
     // back off the merge attempt!
+    if (!m->ready && !p.has_flag(pg_pool_t::FLAG_CRIMSON)) {
+      mon.clog->warn() << "osd." << m->get_orig_source().num()
+                       << " reported pg " << m->pgid
+                       << " not ready to merge; backing off pg_num decrease"
+                       << " for pool '"
+                       << osdmap.get_pool_name(m->pgid.pool()) << "'";
+      dout(1) << __func__ << " osd." << m->get_orig_source().num()
+              << " pg " << m->pgid << " not ready to merge, backing off"
+              << dendl;
+    }
     p.set_pg_num_pending(p.get_pg_num());
   }
 
@@ -4129,6 +4144,79 @@ bool OSDMonitor::prepare_pg_ready_to_merge(MonOpRequestRef op)
   return true;
 }
 
+bool OSDMonitor::preprocess_pg_stop_merge(MonOpRequestRef op)
+{
+  op->mark_osdmon_event(__func__);
+  auto m = op->get_req<MOSDPGStopMerge>();
+  dout(10) << __func__ << " " << *m << dendl;
+  auto session = op->get_session();
+  if (!session) {
+    dout(10) << __func__ << ": no monitor session!" << dendl;
+    goto ignore;
+  }
+  if (!session->is_capable("osd", MON_CAP_X)) {
+    derr << __func__ << " received from entity "
+         << "with insufficient privileges " << session->caps << dendl;
+    goto ignore;
+  }
+  if (!osdmap.get_pg_pool(m->pool)) {
+    derr << __func__ << " pool " << m->pool << " dne" << dendl;
+    goto ignore;
+  }
+  return false;
+
+ ignore:
+  mon.no_reply(op);
+  return true;
+}
+
+bool OSDMonitor::prepare_pg_stop_merge(MonOpRequestRef op)
+{
+  op->mark_osdmon_event(__func__);
+  auto m = op->get_req<MOSDPGStopMerge>();
+  dout(10) << __func__ << " " << *m << dendl;
+
+  pg_pool_t p;
+  if (pending_inc.new_pools.count(m->pool))
+    p = pending_inc.new_pools[m->pool];
+  else
+    p = *osdmap.get_pg_pool(m->pool);
+
+  if (!p.is_crimson()) {
+    dout(10) << __func__ << " pool " << m->pool << " is not crimson, ignoring"
+            << dendl;
+    wait_for_finished_proposal(op, new C_ReplyMap(this, op, m->version));
+    return false;
+  }
+
+  const char *reason_str = "unknown";
+  if (m->reason == MOSDPGStopMerge::REASON_CROSS_SHARD) {
+    reason_str = "cross-shard PG merge not supported on Seastore";
+  }
+
+  mon.clog->warn() << "osd." << m->get_orig_source().num()
+                   << " stopped PG merge for pool '"
+                   << osdmap.get_pool_name(m->pool)
+                   << "' (" << reason_str << ", source pg " << m->pgid
+                   << "); no further pg_num decrease will be attempted";
+
+  // Cancel any in-flight shrink; keep current pg_num as-is.
+  p.set_pg_num_pending(p.get_pg_num());
+  if (p.get_pg_num_target() < p.get_pg_num()) {
+    p.set_pg_num_target(p.get_pg_num());
+  }
+  if (p.get_pgp_num_target() < p.get_pgp_num()) {
+    p.set_pgp_num_target(p.get_pgp_num());
+  }
+  p.unset_flag(pg_pool_t::FLAG_CRIMSON_ALLOW_PG_MERGE);
+  p.last_pg_merge_meta = pg_merge_meta_t{};
+  p.last_change = pending_inc.epoch;
+  p.last_force_op_resend_prenautilus = pending_inc.epoch;
+
+  pending_inc.new_pools[m->pool] = p;
+  wait_for_finished_proposal(op, new C_ReplyMap(this, op, m->version));
+  return true;
+}
 
 // -------------
 // pg_temp changes
index d0209fbc52279824f79325480215d07f4f71ad48..9000fc46de4e1bd91b6275b9180b7528983f6c58 100644 (file)
@@ -470,6 +470,9 @@ private:
   bool preprocess_pg_ready_to_merge(MonOpRequestRef op);
   bool prepare_pg_ready_to_merge(MonOpRequestRef op);
 
+  bool preprocess_pg_stop_merge(MonOpRequestRef op);
+  bool prepare_pg_stop_merge(MonOpRequestRef op);
+
   int _check_remove_pool(int64_t pool_id, const pg_pool_t &pool, std::ostream *ss);
   bool _check_become_tier(
       int64_t tier_pool_id, const pg_pool_t *tier_pool,
index e6600cda451ac52eacd2e5b647f9c7b3f533c17f..bbfa159345f3446301b78fe1c1d7a97c276a2e07 100644 (file)
 #include "messages/MOSDPGRecoveryDelete.h"
 #include "messages/MOSDPGRecoveryDeleteReply.h"
 #include "messages/MOSDPGReadyToMerge.h"
+#include "messages/MOSDPGStopMerge.h"
 
 #include "messages/MRemoveSnaps.h"
 
@@ -647,6 +648,9 @@ Message *decode_message(CephContext *cct,
   case MSG_OSD_PG_READY_TO_MERGE:
     m = make_message<MOSDPGReadyToMerge>();
     break;
+  case MSG_OSD_PG_STOP_MERGE:
+    m = make_message<MOSDPGStopMerge>();
+    break;
   case MSG_OSD_EC_WRITE:
     m = make_message<MOSDECSubOpWrite>();
     break;
index ec90687072ec7144c3421e57de7589a2a30af2f2..071e60103813a6c259c9399a628c04af7251bbfb 100644 (file)
 #define MSG_OSD_SCRUB2          121
 
 #define MSG_OSD_PG_READY_TO_MERGE 122
+#define MSG_OSD_PG_STOP_MERGE     124
 
 #define MSG_OSD_PG_LEASE        133
 #define MSG_OSD_PG_LEASE_ACK    134
index 416b58f8df7f41e0c5dfddaa2ad65751302163ac..c8f9f81aceb2239bb816a4144f7ba327fe02590c 100644 (file)
@@ -157,6 +157,7 @@ class MOSDPGPush;
 class MOSDPGPushReply;
 class MOSDPGQuery;
 class MOSDPGReadyToMerge;
+class MOSDPGStopMerge;
 class MOSDPGRecoveryDelete;
 class MOSDPGRecoveryDeleteReply;
 class MOSDPGRemove;