From: Samuel Just Date: Wed, 24 Apr 2024 23:26:27 +0000 (-0700) Subject: osd: wire up async primary->replica pct updates X-Git-Tag: v20.0.0~707^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=8ab313fb699d81dc12a4e442356ccb85e1988a71;p=ceph.git osd: wire up async primary->replica pct updates Signed-off-by: Samuel Just --- diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index f5b75b726997..5d7f67137ef9 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -504,6 +504,8 @@ void OSDService::shutdown_reserver() void OSDService::shutdown() { + pg_timer.stop(); + mono_timer.suspend(); { diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 7c9aed7c6ba7..a6cd03dc5185 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -48,6 +48,7 @@ #include "include/unordered_map.h" +#include "common/intrusive_timer.h" #include "common/shared_cache.hpp" #include "common/simple_cache.hpp" #include "messages/MOSDOp.h" @@ -877,6 +878,8 @@ public: bool prepare_to_stop(); void got_stop_ack(); + // -- PG timer -- + common::intrusive_timer pg_timer; #ifdef PG_DEBUG_REFS ceph::mutex pgid_lock = ceph::make_mutex("OSDService::pgid_lock"); @@ -1941,6 +1944,7 @@ private: case MSG_OSD_REP_SCRUBMAP: case MSG_OSD_PG_UPDATE_LOG_MISSING: case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY: + case MSG_OSD_PG_PCT: case MSG_OSD_PG_RECOVERY_DELETE: case MSG_OSD_PG_RECOVERY_DELETE_REPLY: case MSG_OSD_PG_LEASE: diff --git a/src/osd/PG.cc b/src/osd/PG.cc index a327c407e367..307651fd6272 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -43,6 +43,7 @@ #include "messages/MOSDECSubOpReadReply.h" #include "messages/MOSDPGUpdateLogMissing.h" #include "messages/MOSDPGUpdateLogMissingReply.h" +#include "messages/MOSDPGPCT.h" #include "messages/MOSDBackoff.h" #include "messages/MOSDScrubReserve.h" #include "messages/MOSDRepOp.h" @@ -2092,6 +2093,9 @@ bool PG::can_discard_request(OpRequestRef& op) case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY: return can_discard_replica_op< MOSDPGUpdateLogMissingReply, MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY>(op); + case MSG_OSD_PG_PCT: + return can_discard_replica_op< + MOSDPGPCT, MSG_OSD_PG_PCT>(op); case MSG_OSD_PG_SCAN: return can_discard_scan(op); diff --git a/src/osd/PGBackend.h b/src/osd/PGBackend.h index 362226006bab..b87aa1da6771 100644 --- a/src/osd/PGBackend.h +++ b/src/osd/PGBackend.h @@ -20,6 +20,8 @@ #include "ECCommon.h" #include "osd_types.h" +#include "pg_features.h" +#include "common/intrusive_timer.h" #include "common/WorkQueue.h" #include "include/Context.h" #include "os/ObjectStore.h" @@ -136,6 +138,17 @@ typedef std::shared_ptr OSDMapRef; eversion_t v, Context *on_complete) = 0; + /** + * pg_lock, pg_unlock, pg_add_ref, pg_dec_ref + * + * Utilities for locking and manipulating refcounts on + * implementation. + */ + virtual void pg_lock() = 0; + virtual void pg_unlock() = 0; + virtual void pg_add_ref() = 0; + virtual void pg_dec_ref() = 0; + /** * Bless a context * @@ -193,6 +206,7 @@ typedef std::shared_ptr OSDMapRef; virtual epoch_t pgb_get_osdmap_epoch() const = 0; virtual const pg_info_t &get_info() const = 0; virtual const pg_pool_t &get_pool() const = 0; + virtual eversion_t get_pg_committed_to() const = 0; virtual ObjectContextRef get_obc( const hobject_t &hoid, @@ -240,6 +254,9 @@ typedef std::shared_ptr OSDMapRef; virtual void update_last_complete_ondisk( eversion_t lcod) = 0; + virtual void update_pct( + eversion_t pct) = 0; + virtual void update_stats( const pg_stat_t &stat) = 0; @@ -247,6 +264,8 @@ typedef std::shared_ptr OSDMapRef; GenContext *c, uint64_t cost) = 0; + virtual common::intrusive_timer &get_pg_timer() = 0; + virtual pg_shard_t whoami_shard() const = 0; int whoami() const { return whoami_shard().osd; @@ -259,6 +278,7 @@ typedef std::shared_ptr OSDMapRef; virtual pg_shard_t primary_shard() const = 0; virtual uint64_t min_peer_features() const = 0; virtual uint64_t min_upacting_features() const = 0; + virtual pg_feature_vec_t get_pg_acting_features() const = 0; virtual hobject_t get_temp_recovery_object(const hobject_t& target, eversion_t version) = 0; diff --git a/src/osd/PeeringState.h b/src/osd/PeeringState.h index de2a4cae042f..4b5285b18786 100644 --- a/src/osd/PeeringState.h +++ b/src/osd/PeeringState.h @@ -1941,6 +1941,16 @@ public: bool transaction_applied, bool async); + /** + * update_pct + * + * Updates pg_committed_to. Generally invoked on replica on + * receipt of MODPGPCT from primary. + */ + void update_pct(eversion_t pct) { + pg_committed_to = pct; + } + /** * retrieve the min last_backfill among backfill targets */ diff --git a/src/osd/PrimaryLogPG.cc b/src/osd/PrimaryLogPG.cc index b58089904269..14d2f85f40f0 100644 --- a/src/osd/PrimaryLogPG.cc +++ b/src/osd/PrimaryLogPG.cc @@ -543,6 +543,11 @@ void PrimaryLogPG::schedule_recovery_work( recovery_state.get_recovery_op_priority()); } +common::intrusive_timer &PrimaryLogPG::get_pg_timer() +{ + return osd->pg_timer; +} + void PrimaryLogPG::replica_clear_repop_obc( const vector &logv, ObjectStore::Transaction &t) diff --git a/src/osd/PrimaryLogPG.h b/src/osd/PrimaryLogPG.h index 9ee305165e38..f66b5c6e16ae 100644 --- a/src/osd/PrimaryLogPG.h +++ b/src/osd/PrimaryLogPG.h @@ -27,6 +27,7 @@ #include "messages/MOSDOpReply.h" #include "common/admin_finisher.h" #include "common/Checksummer.h" +#include "common/intrusive_timer.h" #include "common/sharedptr_registry.hpp" #include "common/shared_cache.hpp" #include "ReplicatedBackend.h" @@ -349,6 +350,19 @@ public: eversion_t v, Context *on_complete) override; + void pg_lock() override { + lock(); + } + void pg_unlock() override { + unlock(); + } + void pg_add_ref() override { + intrusive_ptr_add_ref(this); + } + void pg_dec_ref() override { + intrusive_ptr_release(this); + } + template class BlessedGenContext; template class UnlockedBlessedGenContext; class BlessedContext; @@ -439,6 +453,9 @@ public: const pg_pool_t &get_pool() const override { return pool.info; } + eversion_t get_pg_committed_to() const override { + return recovery_state.get_pg_committed_to(); + } ObjectContextRef get_obc( const hobject_t &hoid, @@ -552,6 +569,10 @@ public: recovery_state.update_last_complete_ondisk(lcod); } + void update_pct(eversion_t pct) override { + recovery_state.update_pct(pct); + } + void update_stats( const pg_stat_t &stat) override { recovery_state.update_stats( @@ -565,6 +586,8 @@ public: GenContext *c, uint64_t cost) override; + common::intrusive_timer &get_pg_timer() override; + pg_shard_t whoami_shard() const override { return pg_whoami; } @@ -580,6 +603,9 @@ public: uint64_t min_upacting_features() const override { return recovery_state.get_min_upacting_features(); } + pg_feature_vec_t get_pg_acting_features() const override { + return recovery_state.get_pg_acting_features(); + } void send_message_osd_cluster( int peer, Message *m, epoch_t from_epoch) override { osd->send_message_osd_cluster(peer, m, from_epoch); diff --git a/src/osd/ReplicatedBackend.cc b/src/osd/ReplicatedBackend.cc index beb379ca0594..7ce8fbcd2102 100644 --- a/src/osd/ReplicatedBackend.cc +++ b/src/osd/ReplicatedBackend.cc @@ -14,6 +14,7 @@ #include "common/errno.h" #include "ReplicatedBackend.h" #include "messages/MOSDOp.h" +#include "messages/MOSDPGPCT.h" #include "messages/MOSDRepOp.h" #include "messages/MOSDRepOpReply.h" #include "messages/MOSDPGPush.h" @@ -124,7 +125,9 @@ ReplicatedBackend::ReplicatedBackend( ObjectStore::CollectionHandle &c, ObjectStore *store, CephContext *cct) : - PGBackend(cct, pg, store, coll, c) {} + PGBackend(cct, pg, store, coll, c), + pct_callback(this) +{} void ReplicatedBackend::run_recovery_op( PGBackend::RecoveryHandle *_h, @@ -229,6 +232,10 @@ bool ReplicatedBackend::_handle_message( return true; } + case MSG_OSD_PG_PCT: + do_pct(op); + return true; + default: break; } @@ -261,6 +268,7 @@ void ReplicatedBackend::on_change() } in_progress_ops.clear(); clear_recovery_state(); + cancel_pct_update(); } int ReplicatedBackend::objects_read_sync( @@ -462,6 +470,79 @@ void generate_transaction( }); } +void ReplicatedBackend::do_pct(OpRequestRef op) +{ + const MOSDPGPCT *m = static_cast(op->get_req()); + dout(10) << __func__ << ": received pct update to " + << m->pg_committed_to << dendl; + parent->update_pct(m->pg_committed_to); +} + +void ReplicatedBackend::send_pct_update() +{ + dout(10) << __func__ << ": sending pct update" << dendl; + ceph_assert( + PG_HAVE_FEATURE(parent->get_pg_acting_features(), PCT)); + for (const auto &i: parent->get_acting_shards()) { + if (i == parent->whoami_shard()) continue; + + auto *pct_update = new MOSDPGPCT( + spg_t(parent->whoami_spg_t().pgid, i.shard), + get_osdmap_epoch(), parent->get_interval_start_epoch(), + parent->get_pg_committed_to() + ); + + dout(10) << __func__ << ": sending pct update to i " << i + << ", i.osd " << i.osd << dendl; + parent->send_message_osd_cluster( + i.osd, pct_update, get_osdmap_epoch()); + } + dout(10) << __func__ << ": sending pct update complete" << dendl; +} + +void ReplicatedBackend::maybe_kick_pct_update() +{ + if (!in_progress_ops.empty()) { + dout(20) << __func__ << ": not scheduling pct update, " + << in_progress_ops.size() << " ops pending" << dendl; + return; + } + + if (!PG_HAVE_FEATURE(parent->get_pg_acting_features(), PCT)) { + dout(20) << __func__ << ": not scheduling pct update, PCT feature not" + << " supported" << dendl; + return; + } + + if (pct_callback.is_scheduled()) { + derr << __func__ + << ": pct_callback is already scheduled, this should be impossible" + << dendl; + return; + } + + int64_t pct_delay; + if (!parent->get_pool().opts.get( + pool_opts_t::PCT_UPDATE_DELAY, &pct_delay)) { + dout(20) << __func__ << ": not scheduling pct update, PCT_UPDATE_DELAY not" + << " set" << dendl; + return; + } + + dout(10) << __func__ << ": scheduling pct update after " + << pct_delay << " seconds" << dendl; + parent->get_pg_timer().schedule_after( + pct_callback, std::chrono::seconds(pct_delay)); +} + +void ReplicatedBackend::cancel_pct_update() +{ + if (pct_callback.is_scheduled()) { + dout(10) << __func__ << ": canceling pct update" << dendl; + parent->get_pg_timer().cancel(pct_callback); + } +} + void ReplicatedBackend::submit_transaction( const hobject_t &soid, const object_stat_sum_t &delta_stats, @@ -476,6 +557,8 @@ void ReplicatedBackend::submit_transaction( osd_reqid_t reqid, OpRequestRef orig_op) { + cancel_pct_update(); + parent->apply_stats( soid, delta_stats); @@ -572,6 +655,7 @@ void ReplicatedBackend::op_commit(const ceph::ref_t& op) op->on_commit = 0; in_progress_ops.erase(op->tid); } + maybe_kick_pct_update(); } void ReplicatedBackend::do_repop_reply(OpRequestRef op) @@ -628,6 +712,7 @@ void ReplicatedBackend::do_repop_reply(OpRequestRef op) in_progress_ops.erase(iter); } } + maybe_kick_pct_update(); } int ReplicatedBackend::be_deep_scrub( diff --git a/src/osd/ReplicatedBackend.h b/src/osd/ReplicatedBackend.h index 2f3c1ea2509e..3dcae2060594 100644 --- a/src/osd/ReplicatedBackend.h +++ b/src/osd/ReplicatedBackend.h @@ -341,6 +341,40 @@ private: op(op), v(v) {} }; std::map> in_progress_ops; + + /// Invoked by pct_callback to update PCT after a pause in IO + void send_pct_update(); + + /// Handle MOSDPGPCT message + void do_pct(OpRequestRef op); + + /// Kick pct timer if repop_queue is empty + void maybe_kick_pct_update(); + + /// Kick pct timer if repop_queue is empty + void cancel_pct_update(); + + struct pct_callback_t final : public common::intrusive_timer::callback_t { + ReplicatedBackend *backend; + + pct_callback_t(ReplicatedBackend *backend) : backend(backend) {} + + void lock() override { + return backend->parent->pg_lock(); + } + void unlock() override { + return backend->parent->pg_unlock(); + } + void add_ref() override { + return backend->parent->pg_add_ref(); + } + void dec_ref() override { + return backend->parent->pg_dec_ref(); + } + void invoke() override { + return backend->send_pct_update(); + } + } pct_callback; public: friend class C_OSD_OnOpCommit; diff --git a/src/osd/pg_features.h b/src/osd/pg_features.h index 1205f8f3ba73..e601c84ee688 100644 --- a/src/osd/pg_features.h +++ b/src/osd/pg_features.h @@ -18,6 +18,9 @@ static constexpr pg_feature_vec_t PG_FEATURE_INCARNATION_1 = 0ull; #define PG_HAVE_FEATURE(x, name) \ (((x) & (PG_FEATUREMASK_##name)) == (PG_FEATUREMASK_##name)) +DEFINE_PG_FEATURE(0, 1, PCT) + static constexpr pg_feature_vec_t PG_FEATURE_NONE = 0ull; -static constexpr pg_feature_vec_t PG_FEATURE_CLASSIC_ALL = 0ull; static constexpr pg_feature_vec_t PG_FEATURE_CRIMSON_ALL = 0ull; +static constexpr pg_feature_vec_t PG_FEATURE_CLASSIC_ALL = + PG_FEATURE_PCT;