]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: wire up async primary->replica pct updates
authorSamuel Just <sjust@redhat.com>
Wed, 24 Apr 2024 23:26:27 +0000 (16:26 -0700)
committerSamuel Just <sjust@redhat.com>
Mon, 21 Oct 2024 17:04:51 +0000 (17:04 +0000)
Signed-off-by: Samuel Just <sjust@redhat.com>
src/osd/OSD.cc
src/osd/OSD.h
src/osd/PG.cc
src/osd/PGBackend.h
src/osd/PeeringState.h
src/osd/PrimaryLogPG.cc
src/osd/PrimaryLogPG.h
src/osd/ReplicatedBackend.cc
src/osd/ReplicatedBackend.h
src/osd/pg_features.h

index f5b75b726997f6e85c8dfe3e64fdf3a8781d0f97..5d7f67137ef93ff000ac9ad8dc2cc512b6957408 100644 (file)
@@ -504,6 +504,8 @@ void OSDService::shutdown_reserver()
 
 void OSDService::shutdown()
 {
+  pg_timer.stop();
+
   mono_timer.suspend();
 
   {
index 7c9aed7c6ba764f761430989ff2e3dc430f13352..a6cd03dc5185f7ee7d4ee5c247ff0b565b1b40fa 100644 (file)
@@ -48,6 +48,7 @@
 
 #include "include/unordered_map.h"
 
+#include "common/intrusive_timer.h"
 #include "common/shared_cache.hpp"
 #include "common/simple_cache.hpp"
 #include "messages/MOSDOp.h"
@@ -877,6 +878,8 @@ public:
   bool prepare_to_stop();
   void got_stop_ack();
 
+  // -- PG timer --
+  common::intrusive_timer pg_timer;
 
 #ifdef PG_DEBUG_REFS
   ceph::mutex pgid_lock = ceph::make_mutex("OSDService::pgid_lock");
@@ -1941,6 +1944,7 @@ private:
     case MSG_OSD_REP_SCRUBMAP:
     case MSG_OSD_PG_UPDATE_LOG_MISSING:
     case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
+    case MSG_OSD_PG_PCT:
     case MSG_OSD_PG_RECOVERY_DELETE:
     case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
     case MSG_OSD_PG_LEASE:
index a327c407e367b26e87ec7bd4fd40ccb7a92bbf8d..307651fd6272911b4057bb6b09eff53e57a520e4 100644 (file)
@@ -43,6 +43,7 @@
 #include "messages/MOSDECSubOpReadReply.h"
 #include "messages/MOSDPGUpdateLogMissing.h"
 #include "messages/MOSDPGUpdateLogMissingReply.h"
+#include "messages/MOSDPGPCT.h"
 #include "messages/MOSDBackoff.h"
 #include "messages/MOSDScrubReserve.h"
 #include "messages/MOSDRepOp.h"
@@ -2092,6 +2093,9 @@ bool PG::can_discard_request(OpRequestRef& op)
   case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
     return can_discard_replica_op<
       MOSDPGUpdateLogMissingReply, MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY>(op);
+  case MSG_OSD_PG_PCT:
+    return can_discard_replica_op<
+      MOSDPGPCT, MSG_OSD_PG_PCT>(op);
 
   case MSG_OSD_PG_SCAN:
     return can_discard_scan(op);
index 362226006babc2adbdc2935610e2d043ba3e1822..b87aa1da6771b72a685093d4a1780d0bb387f089 100644 (file)
@@ -20,6 +20,8 @@
 
 #include "ECCommon.h"
 #include "osd_types.h"
+#include "pg_features.h"
+#include "common/intrusive_timer.h"
 #include "common/WorkQueue.h"
 #include "include/Context.h"
 #include "os/ObjectStore.h"
@@ -136,6 +138,17 @@ typedef std::shared_ptr<const OSDMap> OSDMapRef;
        eversion_t v,
        Context *on_complete) = 0;
 
+     /**
+      * pg_lock, pg_unlock, pg_add_ref, pg_dec_ref
+      *
+      * Utilities for locking and manipulating refcounts on
+      * implementation.
+      */
+     virtual void pg_lock() = 0;
+     virtual void pg_unlock() = 0;
+     virtual void pg_add_ref() = 0;
+     virtual void pg_dec_ref() = 0;
+
      /**
       * Bless a context
       *
@@ -193,6 +206,7 @@ typedef std::shared_ptr<const OSDMap> OSDMapRef;
      virtual epoch_t pgb_get_osdmap_epoch() const = 0;
      virtual const pg_info_t &get_info() const = 0;
      virtual const pg_pool_t &get_pool() const = 0;
+     virtual eversion_t get_pg_committed_to() const = 0;
 
      virtual ObjectContextRef get_obc(
        const hobject_t &hoid,
@@ -240,6 +254,9 @@ typedef std::shared_ptr<const OSDMap> OSDMapRef;
      virtual void update_last_complete_ondisk(
        eversion_t lcod) = 0;
 
+     virtual void update_pct(
+       eversion_t pct) = 0;
+
      virtual void update_stats(
        const pg_stat_t &stat) = 0;
 
@@ -247,6 +264,8 @@ typedef std::shared_ptr<const OSDMap> OSDMapRef;
        GenContext<ThreadPool::TPHandle&> *c,
        uint64_t cost) = 0;
 
+     virtual common::intrusive_timer &get_pg_timer() = 0;
+
      virtual pg_shard_t whoami_shard() const = 0;
      int whoami() const {
        return whoami_shard().osd;
@@ -259,6 +278,7 @@ typedef std::shared_ptr<const OSDMap> OSDMapRef;
      virtual pg_shard_t primary_shard() const = 0;
      virtual uint64_t min_peer_features() const = 0;
      virtual uint64_t min_upacting_features() const = 0;
+     virtual pg_feature_vec_t get_pg_acting_features() const = 0;
      virtual hobject_t get_temp_recovery_object(const hobject_t& target,
                                                eversion_t version) = 0;
 
index de2a4cae042facdcb68608f1d0f52db78c2cd120..4b5285b18786f03cc456ac6b4a729699313a3b4a 100644 (file)
@@ -1941,6 +1941,16 @@ public:
     bool transaction_applied,
     bool async);
 
+  /**
+   * update_pct
+   *
+   * Updates pg_committed_to.  Generally invoked on replica on
+   * receipt of MODPGPCT from primary.
+   */
+  void update_pct(eversion_t pct) {
+    pg_committed_to = pct;
+  }
+
   /**
    * retrieve the min last_backfill among backfill targets
    */
index b58089904269b37a724ff9cc31526a456de121c9..14d2f85f40f038de683ed03d62ccdfea6f33a6bd 100644 (file)
@@ -543,6 +543,11 @@ void PrimaryLogPG::schedule_recovery_work(
     recovery_state.get_recovery_op_priority());
 }
 
+common::intrusive_timer &PrimaryLogPG::get_pg_timer()
+{
+  return osd->pg_timer;
+}
+
 void PrimaryLogPG::replica_clear_repop_obc(
   const vector<pg_log_entry_t> &logv,
   ObjectStore::Transaction &t)
index 9ee305165e38545b9ea8d129936c457245087e5b..f66b5c6e16aed2145a75fdd4fc0363fc654a09a8 100644 (file)
@@ -27,6 +27,7 @@
 #include "messages/MOSDOpReply.h"
 #include "common/admin_finisher.h"
 #include "common/Checksummer.h"
+#include "common/intrusive_timer.h"
 #include "common/sharedptr_registry.hpp"
 #include "common/shared_cache.hpp"
 #include "ReplicatedBackend.h"
@@ -349,6 +350,19 @@ public:
                             eversion_t v,
                             Context *on_complete) override;
 
+  void pg_lock() override {
+    lock();
+  }
+  void pg_unlock() override {
+    unlock();
+  }
+  void pg_add_ref() override {
+    intrusive_ptr_add_ref(this);
+  }
+  void pg_dec_ref() override {
+    intrusive_ptr_release(this);
+  }
+
   template<class T> class BlessedGenContext;
   template<class T> class UnlockedBlessedGenContext;
   class BlessedContext;
@@ -439,6 +453,9 @@ public:
   const pg_pool_t &get_pool() const override {
     return pool.info;
   }
+  eversion_t get_pg_committed_to() const override {
+    return recovery_state.get_pg_committed_to();
+  }
 
   ObjectContextRef get_obc(
     const hobject_t &hoid,
@@ -552,6 +569,10 @@ public:
     recovery_state.update_last_complete_ondisk(lcod);
   }
 
+  void update_pct(eversion_t pct) override {
+    recovery_state.update_pct(pct);
+  }
+
   void update_stats(
     const pg_stat_t &stat) override {
     recovery_state.update_stats(
@@ -565,6 +586,8 @@ public:
     GenContext<ThreadPool::TPHandle&> *c,
     uint64_t cost) override;
 
+  common::intrusive_timer &get_pg_timer() override;
+
   pg_shard_t whoami_shard() const override {
     return pg_whoami;
   }
@@ -580,6 +603,9 @@ public:
   uint64_t min_upacting_features() const override {
     return recovery_state.get_min_upacting_features();
   }
+  pg_feature_vec_t get_pg_acting_features() const override {
+    return recovery_state.get_pg_acting_features();
+  }
   void send_message_osd_cluster(
     int peer, Message *m, epoch_t from_epoch) override {
     osd->send_message_osd_cluster(peer, m, from_epoch);
index beb379ca0594602987e6b31552359257cb36e6e2..7ce8fbcd2102bc805e215b9772ee165fdff36099 100644 (file)
@@ -14,6 +14,7 @@
 #include "common/errno.h"
 #include "ReplicatedBackend.h"
 #include "messages/MOSDOp.h"
+#include "messages/MOSDPGPCT.h"
 #include "messages/MOSDRepOp.h"
 #include "messages/MOSDRepOpReply.h"
 #include "messages/MOSDPGPush.h"
@@ -124,7 +125,9 @@ ReplicatedBackend::ReplicatedBackend(
   ObjectStore::CollectionHandle &c,
   ObjectStore *store,
   CephContext *cct) :
-  PGBackend(cct, pg, store, coll, c) {}
+  PGBackend(cct, pg, store, coll, c),
+  pct_callback(this)
+{}
 
 void ReplicatedBackend::run_recovery_op(
   PGBackend::RecoveryHandle *_h,
@@ -229,6 +232,10 @@ bool ReplicatedBackend::_handle_message(
     return true;
   }
 
+  case MSG_OSD_PG_PCT:
+    do_pct(op);
+    return true;
+
   default:
     break;
   }
@@ -261,6 +268,7 @@ void ReplicatedBackend::on_change()
   }
   in_progress_ops.clear();
   clear_recovery_state();
+  cancel_pct_update();
 }
 
 int ReplicatedBackend::objects_read_sync(
@@ -462,6 +470,79 @@ void generate_transaction(
     });
 }
 
+void ReplicatedBackend::do_pct(OpRequestRef op)
+{
+  const MOSDPGPCT *m = static_cast<const MOSDPGPCT*>(op->get_req());
+  dout(10) << __func__ << ": received pct update to "
+          << m->pg_committed_to << dendl;
+  parent->update_pct(m->pg_committed_to);
+}
+
+void ReplicatedBackend::send_pct_update()
+{
+  dout(10) << __func__ << ": sending pct update" << dendl;
+  ceph_assert(
+    PG_HAVE_FEATURE(parent->get_pg_acting_features(), PCT));
+  for (const auto &i: parent->get_acting_shards()) {
+    if (i == parent->whoami_shard()) continue;
+
+    auto *pct_update = new MOSDPGPCT(
+      spg_t(parent->whoami_spg_t().pgid, i.shard),
+      get_osdmap_epoch(), parent->get_interval_start_epoch(),
+      parent->get_pg_committed_to()
+    );
+
+    dout(10) << __func__ << ": sending pct update to i " << i
+            << ", i.osd " << i.osd << dendl;
+    parent->send_message_osd_cluster(
+      i.osd, pct_update, get_osdmap_epoch());
+  }
+  dout(10) << __func__ << ": sending pct update complete" << dendl;
+}
+
+void ReplicatedBackend::maybe_kick_pct_update()
+{
+  if (!in_progress_ops.empty()) {
+    dout(20) << __func__ << ": not scheduling pct update, "
+            << in_progress_ops.size() << " ops pending" << dendl;
+    return;
+  }
+
+  if (!PG_HAVE_FEATURE(parent->get_pg_acting_features(), PCT)) {
+    dout(20) << __func__ << ": not scheduling pct update, PCT feature not"
+            << " supported" << dendl;
+    return;
+  }
+
+  if (pct_callback.is_scheduled()) {
+    derr << __func__
+        << ": pct_callback is already scheduled, this should be impossible"
+        << dendl;
+    return;
+  }
+
+  int64_t pct_delay;
+  if (!parent->get_pool().opts.get(
+       pool_opts_t::PCT_UPDATE_DELAY, &pct_delay)) {
+    dout(20) << __func__ << ": not scheduling pct update, PCT_UPDATE_DELAY not"
+            << " set" << dendl;
+    return;
+  }
+
+  dout(10) << __func__ << ": scheduling pct update after "
+          << pct_delay << " seconds" << dendl;
+  parent->get_pg_timer().schedule_after(
+    pct_callback, std::chrono::seconds(pct_delay));
+}
+
+void ReplicatedBackend::cancel_pct_update()
+{
+  if (pct_callback.is_scheduled()) {
+    dout(10) << __func__ << ": canceling pct update" << dendl;
+    parent->get_pg_timer().cancel(pct_callback);
+  }
+}
+
 void ReplicatedBackend::submit_transaction(
   const hobject_t &soid,
   const object_stat_sum_t &delta_stats,
@@ -476,6 +557,8 @@ void ReplicatedBackend::submit_transaction(
   osd_reqid_t reqid,
   OpRequestRef orig_op)
 {
+  cancel_pct_update();
+
   parent->apply_stats(
     soid,
     delta_stats);
@@ -572,6 +655,7 @@ void ReplicatedBackend::op_commit(const ceph::ref_t<InProgressOp>& op)
     op->on_commit = 0;
     in_progress_ops.erase(op->tid);
   }
+  maybe_kick_pct_update();
 }
 
 void ReplicatedBackend::do_repop_reply(OpRequestRef op)
@@ -628,6 +712,7 @@ void ReplicatedBackend::do_repop_reply(OpRequestRef op)
       in_progress_ops.erase(iter);
     }
   }
+  maybe_kick_pct_update();
 }
 
 int ReplicatedBackend::be_deep_scrub(
index 2f3c1ea2509e9d8fa5dc5cb7439bf8c2d37d496a..3dcae20605941f9dd23ee6627dda29dd9b0a1828 100644 (file)
@@ -341,6 +341,40 @@ private:
        op(op), v(v) {}
   };
   std::map<ceph_tid_t, ceph::ref_t<InProgressOp>> in_progress_ops;
+
+  /// Invoked by pct_callback to update PCT after a pause in IO
+  void send_pct_update();
+
+  /// Handle MOSDPGPCT message
+  void do_pct(OpRequestRef op);
+
+  /// Kick pct timer if repop_queue is empty
+  void maybe_kick_pct_update();
+
+  /// Kick pct timer if repop_queue is empty
+  void cancel_pct_update();
+
+  struct pct_callback_t final : public common::intrusive_timer::callback_t {
+    ReplicatedBackend *backend;
+
+    pct_callback_t(ReplicatedBackend *backend) : backend(backend) {}
+
+    void lock() override {
+      return backend->parent->pg_lock();
+    }
+    void unlock() override {
+      return backend->parent->pg_unlock();
+    }
+    void add_ref() override {
+      return backend->parent->pg_add_ref();
+    }
+    void dec_ref() override {
+      return backend->parent->pg_dec_ref();
+    }
+    void invoke() override {
+      return backend->send_pct_update();
+    }
+  } pct_callback;
 public:
   friend class C_OSD_OnOpCommit;
 
index 1205f8f3ba737b47651a7465987401f663cc3194..e601c84ee688717599a0dd0a17406babf88ae88d 100644 (file)
@@ -18,6 +18,9 @@ static constexpr pg_feature_vec_t PG_FEATURE_INCARNATION_1 = 0ull;
 #define PG_HAVE_FEATURE(x, name)                               \
   (((x) & (PG_FEATUREMASK_##name)) == (PG_FEATUREMASK_##name))
 
+DEFINE_PG_FEATURE(0, 1, PCT)
+
 static constexpr pg_feature_vec_t PG_FEATURE_NONE = 0ull;
-static constexpr pg_feature_vec_t PG_FEATURE_CLASSIC_ALL = 0ull;
 static constexpr pg_feature_vec_t PG_FEATURE_CRIMSON_ALL = 0ull;
+static constexpr pg_feature_vec_t PG_FEATURE_CLASSIC_ALL =
+  PG_FEATURE_PCT;