From 5eade05950b75fd3ad1d944a918dc1d7199a1db1 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Wed, 1 May 2024 22:36:24 +0000 Subject: [PATCH] crimson: wire up crimson pct timer and message Signed-off-by: Samuel Just --- src/crimson/osd/CMakeLists.txt | 1 + src/crimson/osd/osd.cc | 14 +++ src/crimson/osd/osd.h | 4 + src/crimson/osd/osd_operation.h | 2 + .../osd/osd_operation_external_tracking.h | 9 ++ .../osd/osd_operations/pgpct_request.cc | 95 ++++++++++++++++ .../osd/osd_operations/pgpct_request.h | 103 ++++++++++++++++++ src/crimson/osd/pg.h | 5 +- src/crimson/osd/replicated_backend.cc | 78 ++++++++++++- src/crimson/osd/replicated_backend.h | 22 ++++ 10 files changed, 331 insertions(+), 2 deletions(-) create mode 100644 src/crimson/osd/osd_operations/pgpct_request.cc create mode 100644 src/crimson/osd/osd_operations/pgpct_request.h diff --git a/src/crimson/osd/CMakeLists.txt b/src/crimson/osd/CMakeLists.txt index a3b6b47d4d4..5796500080e 100644 --- a/src/crimson/osd/CMakeLists.txt +++ b/src/crimson/osd/CMakeLists.txt @@ -28,6 +28,7 @@ add_executable(crimson-osd osd_operations/logmissing_request_reply.cc osd_operations/background_recovery.cc osd_operations/recovery_subrequest.cc + osd_operations/pgpct_request.cc osd_operations/snaptrim_event.cc osd_operations/scrub_events.cc pg_recovery.cc diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc index 27511df8582..96529b70798 100644 --- a/src/crimson/osd/osd.cc +++ b/src/crimson/osd/osd.cc @@ -28,6 +28,7 @@ #include "messages/MOSDPGUpdateLogMissingReply.h" #include "messages/MOSDRepOpReply.h" #include "messages/MOSDScrub2.h" +#include "messages/MOSDPGPCT.h" #include "messages/MPGStats.h" #include "os/Transaction.h" @@ -53,6 +54,7 @@ #include "crimson/osd/pg_meta.h" #include "crimson/osd/osd_operations/client_request.h" #include "crimson/osd/osd_operations/peering_event.h" +#include "crimson/osd/osd_operations/pgpct_request.h" #include "crimson/osd/osd_operations/pg_advance_map.h" #include "crimson/osd/osd_operations/recovery_subrequest.h" #include "crimson/osd/osd_operations/replicated_request.h" @@ -994,6 +996,9 @@ OSD::do_ms_dispatch( case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY: return handle_update_log_missing_reply(conn, boost::static_pointer_cast< MOSDPGUpdateLogMissingReply>(m)); + case MSG_OSD_PG_PCT: + return handle_pg_pct(conn, boost::static_pointer_cast< + MOSDPGPCT>(m)); default: return std::nullopt; } @@ -1413,6 +1418,15 @@ seastar::future<> OSD::handle_update_log_missing_reply( std::move(m)).second; } +seastar::future<> OSD::handle_pg_pct( + crimson::net::ConnectionRef conn, + Ref m) +{ + return pg_shard_manager.start_pg_operation( + std::move(conn), + std::move(m)).second; +} + seastar::future<> OSD::handle_rep_op( crimson::net::ConnectionRef conn, Ref m) diff --git a/src/crimson/osd/osd.h b/src/crimson/osd/osd.h index 5d36f7ad8ce..fc409dca9e5 100644 --- a/src/crimson/osd/osd.h +++ b/src/crimson/osd/osd.h @@ -34,6 +34,7 @@ class MCommand; class MOSDMap; +class MOSDPGPCT; class MOSDRepOpReply; class MOSDRepOp; class MOSDScrub2; @@ -233,6 +234,9 @@ private: seastar::future<> handle_update_log_missing_reply( crimson::net::ConnectionRef conn, Ref m); + seastar::future<> handle_pg_pct( + crimson::net::ConnectionRef conn, + Ref m); std::vector get_health_metrics(); diff --git a/src/crimson/osd/osd_operation.h b/src/crimson/osd/osd_operation.h index 6e75f2f826f..c9f9923127f 100644 --- a/src/crimson/osd/osd_operation.h +++ b/src/crimson/osd/osd_operation.h @@ -103,6 +103,7 @@ enum class OperationTypeCode { scrub_find_range, scrub_reserve_range, scrub_scan, + pgpct_request, last_op }; @@ -126,6 +127,7 @@ static constexpr const char* const OP_NAMES[] = { "scrub_find_range", "scrub_reserve_range", "scrub_scan", + "pgpct_request", }; // prevent the addition of OperationTypeCode-s with no matching OP_NAMES entry: diff --git a/src/crimson/osd/osd_operation_external_tracking.h b/src/crimson/osd/osd_operation_external_tracking.h index 0f70e8407a2..a24b455e849 100644 --- a/src/crimson/osd/osd_operation_external_tracking.h +++ b/src/crimson/osd/osd_operation_external_tracking.h @@ -8,6 +8,7 @@ #include "crimson/osd/osd_operations/background_recovery.h" #include "crimson/osd/osd_operations/client_request.h" #include "crimson/osd/osd_operations/peering_event.h" +#include "crimson/osd/osd_operations/pgpct_request.h" #include "crimson/osd/osd_operations/pg_advance_map.h" #include "crimson/osd/osd_operations/recovery_subrequest.h" #include "crimson/osd/osd_operations/replicated_request.h" @@ -335,6 +336,14 @@ struct EventBackendRegistry { } }; +template <> +struct EventBackendRegistry { + static std::tuple<> get_backends() { + return {/* no extenral backends */}; + } +}; + + template <> struct EventBackendRegistry { static std::tuple<> get_backends() { diff --git a/src/crimson/osd/osd_operations/pgpct_request.cc b/src/crimson/osd/osd_operations/pgpct_request.cc new file mode 100644 index 00000000000..3a9e1444686 --- /dev/null +++ b/src/crimson/osd/osd_operations/pgpct_request.cc @@ -0,0 +1,95 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "logmissing_request.h" + +#include "common/Formatter.h" + +#include "crimson/osd/osd.h" +#include "crimson/osd/osd_connection_priv.h" +#include "crimson/osd/osd_operation_external_tracking.h" +#include "crimson/osd/pg.h" +#include "crimson/osd/replicated_backend.h" + +SET_SUBSYS(osd); + +namespace crimson::osd { + +PGPCTRequest::PGPCTRequest(crimson::net::ConnectionRef&& conn, + Ref &&req) + : l_conn{std::move(conn)}, + req{std::move(req)} +{} + +void PGPCTRequest::print(std::ostream& os) const +{ + os << "PGPCTRequest(" + << " req=" << *req + << ")"; +} + +void PGPCTRequest::dump_detail(Formatter *f) const +{ + f->open_object_section("PGPCTRequest"); + f->dump_stream("pgid") << req->get_spg(); + f->dump_unsigned("map_epoch", req->get_map_epoch()); + f->dump_unsigned("min_epoch", req->get_min_epoch()); + f->dump_stream("pg_committed_to") << req->pg_committed_to; + f->close_section(); +} + +ConnectionPipeline &PGPCTRequest::get_connection_pipeline() +{ + return get_osd_priv( + &get_local_connection() + ).client_request_conn_pipeline; +} + +PerShardPipeline &PGPCTRequest::get_pershard_pipeline( + ShardServices &shard_services) +{ + return shard_services.get_replicated_request_pipeline(); +} + +PGPCTRequest::interruptible_future<> PGPCTRequest::with_pg_interruptible( + PG &pg) +{ + LOG_PREFIX(PGPCTRequest::with_pg_interruptible); + DEBUGDPP("{}", pg, *this); + co_await this->template enter_stage(pg.repop_pipeline.process); + + { + auto fut = this->template with_blocking_event< + PG_OSDMapGate::OSDMapBlocker::BlockingEvent + >([this, &pg](auto &&trigger) { + return pg.osdmap_gate.wait_for_map( + std::move(trigger), req->min_epoch); + }); + co_await interruptor::make_interruptible(std::move(fut)); + } + + // This *must* be a replicated backend, ec doesn't have pct messages + static_cast(*(pg.backend)).do_pct(*req); +} + +seastar::future<> PGPCTRequest::with_pg( + ShardServices &shard_services, Ref pgref) +{ + LOG_PREFIX(PGPCTRequest::with_pg); + DEBUGDPP("{}", *pgref, *this); + + PG &pg = *pgref; + IRef ref = this; + return interruptor::with_interruption([this, &pg] { + return with_pg_interruptible(pg); + }, [](std::exception_ptr) { + return seastar::now(); + }, pgref, pgref->get_osdmap_epoch()).finally( + [FNAME, this, ref=std::move(ref), pgref=std::move(pgref)]() mutable { + DEBUGDPP("exit", *pgref, *this); + return handle.complete( + ).then([ref=std::move(ref), pgref=std::move(pgref)] {}); + }); +} + +} diff --git a/src/crimson/osd/osd_operations/pgpct_request.h b/src/crimson/osd/osd_operations/pgpct_request.h new file mode 100644 index 00000000000..39c1c2e2d87 --- /dev/null +++ b/src/crimson/osd/osd_operations/pgpct_request.h @@ -0,0 +1,103 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include "crimson/net/Connection.h" +#include "crimson/osd/osdmap_gate.h" +#include "crimson/osd/osd_operation.h" +#include "crimson/osd/osd_operations/client_request.h" +#include "crimson/osd/pg_map.h" +#include "crimson/common/type_helpers.h" +#include "messages/MOSDPGPCT.h" + +namespace ceph { + class Formatter; +} + +namespace crimson::osd { + +class ShardServices; + +class OSD; +class PG; + +class PGPCTRequest final : public PhasedOperationT { +public: + static constexpr OperationTypeCode type = OperationTypeCode::pgpct_request; + PGPCTRequest(crimson::net::ConnectionRef&&, Ref&&); + + void print(std::ostream &) const final; + void dump_detail(ceph::Formatter* f) const final; + + static constexpr bool can_create() { return false; } + spg_t get_pgid() const { + return req->get_spg(); + } + PipelineHandle &get_handle() { return handle; } + epoch_t get_epoch() const { return req->get_min_epoch(); } + epoch_t get_epoch_sent_at() const { return req->get_map_epoch(); } + + ConnectionPipeline &get_connection_pipeline(); + + PerShardPipeline &get_pershard_pipeline(ShardServices &); + + crimson::net::Connection &get_local_connection() { + assert(l_conn); + assert(!r_conn); + return *l_conn; + }; + + crimson::net::Connection &get_foreign_connection() { + assert(r_conn); + assert(!l_conn); + return *r_conn; + }; + + crimson::net::ConnectionFFRef prepare_remote_submission() { + assert(l_conn); + assert(!r_conn); + auto ret = seastar::make_foreign(std::move(l_conn)); + l_conn.reset(); + return ret; + } + + void finish_remote_submission(crimson::net::ConnectionFFRef conn) { + assert(conn); + assert(!l_conn); + assert(!r_conn); + r_conn = make_local_shared_foreign(std::move(conn)); + } + + seastar::future<> with_pg( + ShardServices &shard_services, Ref pg); + + interruptible_future<> with_pg_interruptible( + PG& pg); + + std::tuple< + StartEvent, + ConnectionPipeline::AwaitActive::BlockingEvent, + ConnectionPipeline::AwaitMap::BlockingEvent, + ConnectionPipeline::GetPGMapping::BlockingEvent, + PerShardPipeline::CreateOrWaitPG::BlockingEvent, + PGRepopPipeline::Process::BlockingEvent, + PG_OSDMapGate::OSDMapBlocker::BlockingEvent, + PGMap::PGCreationBlockingEvent, + OSD_OSDMapGate::OSDMapBlocker::BlockingEvent + > tracking_events; + +private: + crimson::net::ConnectionRef l_conn; + crimson::net::ConnectionXcoreRef r_conn; + + // must be after `conn` to ensure the ConnectionPipeline is alive + PipelineHandle handle; + Ref req; +}; + +} + +#if FMT_VERSION >= 90000 +template <> struct fmt::formatter : fmt::ostream_formatter {}; +#endif diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index 0a20f2c4d62..13691c0d5bd 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -13,6 +13,7 @@ #include "common/ostream_temp.h" #include "include/interval_set.h" #include "crimson/net/Fwd.h" +#include "messages/MOSDPGPCT.h" #include "messages/MOSDRepOpReply.h" #include "messages/MOSDOpReply.h" #include "os/Transaction.h" @@ -747,10 +748,11 @@ private: std::unique_ptr recovery_handler; C_PG_FinishRecovery *recovery_finisher; - PeeringState peering_state; eversion_t projected_last_update; public: + PeeringState peering_state; + // scrub state friend class ScrubScan; @@ -924,6 +926,7 @@ private: friend class RepRequest; friend class LogMissingRequest; friend class LogMissingRequestReply; + friend class PGPCTRequest; friend struct PGFacade; friend class InternalClientRequest; friend class WatchTimeoutRequest; diff --git a/src/crimson/osd/replicated_backend.cc b/src/crimson/osd/replicated_backend.cc index 392cb9735ea..120c9da08e8 100644 --- a/src/crimson/osd/replicated_backend.cc +++ b/src/crimson/osd/replicated_backend.cc @@ -24,7 +24,16 @@ ReplicatedBackend::ReplicatedBackend(pg_t pgid, : PGBackend{whoami.shard, coll, shard_services, dpp}, pgid{pgid}, whoami{whoami}, - pg(pg) + pg(pg), + pct_timer([this, &pg]() mutable { + Ref pgref(&pg); + std::ignore = interruptor::with_interruption([this] { + return send_pct_update(); + }, [](std::exception_ptr ep) { + // nothing to do, new interval + return seastar::now(); + }, pgref, pgref->get_osdmap_epoch()); + }) {} ReplicatedBackend::ll_read_ierrorator::future @@ -97,6 +106,8 @@ ReplicatedBackend::submit_transaction( auto osd_op_p = std::move(opp); auto _new_clone = std::move(new_clone); + cancel_pct_update(); + const ceph_tid_t tid = shard_services.get_tid(); auto pending_txn = pending_trans.try_emplace( @@ -195,6 +206,7 @@ ReplicatedBackend::submit_transaction( if (!to_push_delete.empty()) { pg.enqueue_delete_for_backfill(hoid, {}, to_push_delete); } + maybe_kick_pct_update(); return seastar::now(); }); @@ -211,6 +223,7 @@ void ReplicatedBackend::on_actingset_changed(bool same_primary) pending_txn.all_committed.set_exception(e_actingset_changed); } pending_trans.clear(); + cancel_pct_update(); } void ReplicatedBackend::got_rep_op_reply(const MOSDRepOpReply& reply) @@ -279,3 +292,66 @@ ReplicatedBackend::request_committed(const osd_reqid_t& reqid, return seastar::now(); } } + +ReplicatedBackend::interruptible_future<> ReplicatedBackend::send_pct_update() +{ + LOG_PREFIX(ReplicatedBackend::send_pct_update); + DEBUGDPP("", dpp); + ceph_assert( + PG_HAVE_FEATURE(pg.peering_state.get_pg_acting_features(), PCT)); + for (const auto &i: pg.peering_state.get_actingset()) { + if (i == pg.get_pg_whoami()) continue; + + auto pct_update = crimson::make_message( + spg_t(pg.get_pgid().pgid, i.shard), + pg.get_osdmap_epoch(), pg.get_same_interval_since(), + pg.peering_state.get_pg_committed_to() + ); + + co_await interruptor::make_interruptible( + shard_services.send_to_osd( + i.osd, + std::move(pct_update), + pg.get_osdmap_epoch())); + } +} + +void ReplicatedBackend::maybe_kick_pct_update() +{ + LOG_PREFIX(ReplicatedBackend::maybe_kick_pct_update); + DEBUGDPP("", dpp); + if (!pending_trans.empty()) { + DEBUGDPP("pending_trans queue not empty", dpp); + return; + } + + if (!PG_HAVE_FEATURE( + pg.peering_state.get_pg_acting_features(), PCT)) { + DEBUGDPP("no PCT feature", dpp); + return; + } + + int64_t pct_delay; + if (!pg.peering_state.get_pgpool().info.opts.get( + pool_opts_t::PCT_UPDATE_DELAY, &pct_delay)) { + DEBUGDPP("pct update delay not set", dpp); + return; + } + + DEBUGDPP("scheduling pct callback in {} seconds", dpp, pct_delay); + pct_timer.arm(std::chrono::seconds(pct_delay)); +} + +void ReplicatedBackend::cancel_pct_update() +{ + LOG_PREFIX(ReplicatedBackend::cancel_pct_update); + DEBUGDPP("", dpp); + pct_timer.cancel(); +} + +void ReplicatedBackend::do_pct(const MOSDPGPCT &m) +{ + LOG_PREFIX(ReplicatedBackend::do_pct); + DEBUGDPP("{}", dpp, m); + pg.peering_state.update_pct(m.pg_committed_to); +} diff --git a/src/crimson/osd/replicated_backend.h b/src/crimson/osd/replicated_backend.h index 9f4132f7ae7..43030bd0609 100644 --- a/src/crimson/osd/replicated_backend.h +++ b/src/crimson/osd/replicated_backend.h @@ -6,6 +6,7 @@ #include #include #include +#include "messages/MOSDPGPCT.h" #include "include/buffer_fwd.h" #include "osd/osd_types.h" @@ -20,6 +21,12 @@ namespace crimson::osd { class ReplicatedBackend : public PGBackend { public: + using interruptor = ::crimson::interruptible::interruptor< + ::crimson::osd::IOInterruptCondition>; + template + using interruptible_future = + ::crimson::interruptible::interruptible_future< + ::crimson::osd::IOInterruptCondition, T>; ReplicatedBackend(pg_t pgid, pg_shard_t whoami, crimson::osd::PG& pg, CollectionRef coll, @@ -81,4 +88,19 @@ private: seastar::future<> request_committed( const osd_reqid_t& reqid, const eversion_t& at_version) final; + + seastar::timer pct_timer; + + /// Invoked by pct_timer to update PCT after a pause in IO + interruptible_future<> send_pct_update(); + + /// Kick pct timer if repop_queue is empty + void maybe_kick_pct_update(); + + /// Cancel pct timer if scheduled + void cancel_pct_update(); + +public: + /// Handle MOSDPGPCT message + void do_pct(const MOSDPGPCT &m); }; -- 2.39.5