]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson: wire up crimson pct timer and message
authorSamuel Just <sjust@redhat.com>
Wed, 1 May 2024 22:36:24 +0000 (22:36 +0000)
committerSamuel Just <sjust@redhat.com>
Mon, 21 Apr 2025 16:31:02 +0000 (09:31 -0700)
Signed-off-by: Samuel Just <sjust@redhat.com>
src/crimson/osd/CMakeLists.txt
src/crimson/osd/osd.cc
src/crimson/osd/osd.h
src/crimson/osd/osd_operation.h
src/crimson/osd/osd_operation_external_tracking.h
src/crimson/osd/osd_operations/pgpct_request.cc [new file with mode: 0644]
src/crimson/osd/osd_operations/pgpct_request.h [new file with mode: 0644]
src/crimson/osd/pg.h
src/crimson/osd/replicated_backend.cc
src/crimson/osd/replicated_backend.h

index a3b6b47d4d44a40a1d0f655e2eda0cadea8f7341..5796500080eaade411977a9cb3e2e683a9ff3a7a 100644 (file)
@@ -28,6 +28,7 @@ add_executable(crimson-osd
   osd_operations/logmissing_request_reply.cc
   osd_operations/background_recovery.cc
   osd_operations/recovery_subrequest.cc
+  osd_operations/pgpct_request.cc
   osd_operations/snaptrim_event.cc
   osd_operations/scrub_events.cc
   pg_recovery.cc
index 27511df85822ca3ccd52da3de232247f535fdcab..96529b707982808b0eb933a0753239c7ef1ea8ca 100644 (file)
@@ -28,6 +28,7 @@
 #include "messages/MOSDPGUpdateLogMissingReply.h"
 #include "messages/MOSDRepOpReply.h"
 #include "messages/MOSDScrub2.h"
+#include "messages/MOSDPGPCT.h"
 #include "messages/MPGStats.h"
 
 #include "os/Transaction.h"
@@ -53,6 +54,7 @@
 #include "crimson/osd/pg_meta.h"
 #include "crimson/osd/osd_operations/client_request.h"
 #include "crimson/osd/osd_operations/peering_event.h"
+#include "crimson/osd/osd_operations/pgpct_request.h"
 #include "crimson/osd/osd_operations/pg_advance_map.h"
 #include "crimson/osd/osd_operations/recovery_subrequest.h"
 #include "crimson/osd/osd_operations/replicated_request.h"
@@ -994,6 +996,9 @@ OSD::do_ms_dispatch(
   case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
     return handle_update_log_missing_reply(conn, boost::static_pointer_cast<
       MOSDPGUpdateLogMissingReply>(m));
+  case MSG_OSD_PG_PCT:
+    return handle_pg_pct(conn, boost::static_pointer_cast<
+      MOSDPGPCT>(m));
   default:
     return std::nullopt;
   }
@@ -1413,6 +1418,15 @@ seastar::future<> OSD::handle_update_log_missing_reply(
     std::move(m)).second;
 }
 
+seastar::future<> OSD::handle_pg_pct(
+  crimson::net::ConnectionRef conn,
+  Ref<MOSDPGPCT> m)
+{
+  return pg_shard_manager.start_pg_operation<PGPCTRequest>(
+    std::move(conn),
+    std::move(m)).second;
+}
+
 seastar::future<> OSD::handle_rep_op(
   crimson::net::ConnectionRef conn,
   Ref<MOSDRepOp> m)
index 5d36f7ad8cee16176cef9fd240cd83c1c2da245e..fc409dca9e57d6c72f0d4e9ab29905486f31da53 100644 (file)
@@ -34,6 +34,7 @@
 
 class MCommand;
 class MOSDMap;
+class MOSDPGPCT;
 class MOSDRepOpReply;
 class MOSDRepOp;
 class MOSDScrub2;
@@ -233,6 +234,9 @@ private:
   seastar::future<> handle_update_log_missing_reply(
     crimson::net::ConnectionRef conn,
     Ref<MOSDPGUpdateLogMissingReply> m);
+  seastar::future<> handle_pg_pct(
+    crimson::net::ConnectionRef conn,
+    Ref<MOSDPGPCT> m);
 
   std::vector<DaemonHealthMetric> get_health_metrics();
 
index 6e75f2f826f25cc3e08140866509ffad26cf400c..c9f9923127fb399bf7cd60b8d426f1950c182d24 100644 (file)
@@ -103,6 +103,7 @@ enum class OperationTypeCode {
   scrub_find_range,
   scrub_reserve_range,
   scrub_scan,
+  pgpct_request,
   last_op
 };
 
@@ -126,6 +127,7 @@ static constexpr const char* const OP_NAMES[] = {
   "scrub_find_range",
   "scrub_reserve_range",
   "scrub_scan",
+  "pgpct_request",
 };
 
 // prevent the addition of OperationTypeCode-s with no matching OP_NAMES entry:
index 0f70e8407a2b440dabdb8203864afddd094ea779..a24b455e849c63460e15566714e863737f3f3aa4 100644 (file)
@@ -8,6 +8,7 @@
 #include "crimson/osd/osd_operations/background_recovery.h"
 #include "crimson/osd/osd_operations/client_request.h"
 #include "crimson/osd/osd_operations/peering_event.h"
+#include "crimson/osd/osd_operations/pgpct_request.h"
 #include "crimson/osd/osd_operations/pg_advance_map.h"
 #include "crimson/osd/osd_operations/recovery_subrequest.h"
 #include "crimson/osd/osd_operations/replicated_request.h"
@@ -335,6 +336,14 @@ struct EventBackendRegistry<osd::LogMissingRequestReply> {
   }
 };
 
+template <>
+struct EventBackendRegistry<osd::PGPCTRequest> {
+  static std::tuple<> get_backends() {
+    return {/* no extenral backends */};
+  }
+};
+
+
 template <>
 struct EventBackendRegistry<osd::RecoverySubRequest> {
   static std::tuple<> get_backends() {
diff --git a/src/crimson/osd/osd_operations/pgpct_request.cc b/src/crimson/osd/osd_operations/pgpct_request.cc
new file mode 100644 (file)
index 0000000..3a9e144
--- /dev/null
@@ -0,0 +1,95 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "logmissing_request.h"
+
+#include "common/Formatter.h"
+
+#include "crimson/osd/osd.h"
+#include "crimson/osd/osd_connection_priv.h"
+#include "crimson/osd/osd_operation_external_tracking.h"
+#include "crimson/osd/pg.h"
+#include "crimson/osd/replicated_backend.h"
+
+SET_SUBSYS(osd);
+
+namespace crimson::osd {
+
+PGPCTRequest::PGPCTRequest(crimson::net::ConnectionRef&& conn,
+                      Ref<MOSDPGPCT> &&req)
+  : l_conn{std::move(conn)},
+    req{std::move(req)}
+{}
+
+void PGPCTRequest::print(std::ostream& os) const
+{
+  os << "PGPCTRequest("
+     << " req=" << *req
+     << ")";
+}
+
+void PGPCTRequest::dump_detail(Formatter *f) const
+{
+  f->open_object_section("PGPCTRequest");
+  f->dump_stream("pgid") << req->get_spg();
+  f->dump_unsigned("map_epoch", req->get_map_epoch());
+  f->dump_unsigned("min_epoch", req->get_min_epoch());
+  f->dump_stream("pg_committed_to") << req->pg_committed_to;
+  f->close_section();
+}
+
+ConnectionPipeline &PGPCTRequest::get_connection_pipeline()
+{
+  return get_osd_priv(
+    &get_local_connection()
+  ).client_request_conn_pipeline;
+}
+
+PerShardPipeline &PGPCTRequest::get_pershard_pipeline(
+    ShardServices &shard_services)
+{
+  return shard_services.get_replicated_request_pipeline();
+}
+
+PGPCTRequest::interruptible_future<> PGPCTRequest::with_pg_interruptible(
+  PG &pg)
+{
+  LOG_PREFIX(PGPCTRequest::with_pg_interruptible);
+  DEBUGDPP("{}", pg, *this);
+  co_await this->template enter_stage<interruptor>(pg.repop_pipeline.process);
+
+  {
+    auto fut = this->template with_blocking_event<
+      PG_OSDMapGate::OSDMapBlocker::BlockingEvent
+      >([this, &pg](auto &&trigger) {
+       return pg.osdmap_gate.wait_for_map(
+         std::move(trigger), req->min_epoch);
+      });
+    co_await interruptor::make_interruptible(std::move(fut));
+  }
+
+  // This *must* be a replicated backend, ec doesn't have pct messages
+  static_cast<ReplicatedBackend&>(*(pg.backend)).do_pct(*req);
+}
+
+seastar::future<> PGPCTRequest::with_pg(
+  ShardServices &shard_services, Ref<PG> pgref)
+{
+  LOG_PREFIX(PGPCTRequest::with_pg);
+  DEBUGDPP("{}", *pgref, *this);
+
+  PG &pg = *pgref;
+  IRef ref = this;
+  return interruptor::with_interruption([this, &pg] {
+    return with_pg_interruptible(pg);
+  }, [](std::exception_ptr) {
+    return seastar::now();
+  }, pgref, pgref->get_osdmap_epoch()).finally(
+    [FNAME, this, ref=std::move(ref), pgref=std::move(pgref)]() mutable {
+      DEBUGDPP("exit", *pgref, *this);
+      return handle.complete(
+      ).then([ref=std::move(ref), pgref=std::move(pgref)] {});
+    });
+}
+
+}
diff --git a/src/crimson/osd/osd_operations/pgpct_request.h b/src/crimson/osd/osd_operations/pgpct_request.h
new file mode 100644 (file)
index 0000000..39c1c2e
--- /dev/null
@@ -0,0 +1,103 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "crimson/net/Connection.h"
+#include "crimson/osd/osdmap_gate.h"
+#include "crimson/osd/osd_operation.h"
+#include "crimson/osd/osd_operations/client_request.h"
+#include "crimson/osd/pg_map.h"
+#include "crimson/common/type_helpers.h"
+#include "messages/MOSDPGPCT.h"
+
+namespace ceph {
+  class Formatter;
+}
+
+namespace crimson::osd {
+
+class ShardServices;
+
+class OSD;
+class PG;
+
+class PGPCTRequest final : public PhasedOperationT<PGPCTRequest> {
+public:
+  static constexpr OperationTypeCode type = OperationTypeCode::pgpct_request;
+  PGPCTRequest(crimson::net::ConnectionRef&&, Ref<MOSDPGPCT>&&);
+
+  void print(std::ostream &) const final;
+  void dump_detail(ceph::Formatter* f) const final;
+
+  static constexpr bool can_create() { return false; }
+  spg_t get_pgid() const {
+    return req->get_spg();
+  }
+  PipelineHandle &get_handle() { return handle; }
+  epoch_t get_epoch() const { return req->get_min_epoch(); }
+  epoch_t get_epoch_sent_at() const { return req->get_map_epoch(); }
+
+  ConnectionPipeline &get_connection_pipeline();
+
+  PerShardPipeline &get_pershard_pipeline(ShardServices &);
+
+  crimson::net::Connection &get_local_connection() {
+    assert(l_conn);
+    assert(!r_conn);
+    return *l_conn;
+  };
+
+  crimson::net::Connection &get_foreign_connection() {
+    assert(r_conn);
+    assert(!l_conn);
+    return *r_conn;
+  };
+
+  crimson::net::ConnectionFFRef prepare_remote_submission() {
+    assert(l_conn);
+    assert(!r_conn);
+    auto ret = seastar::make_foreign(std::move(l_conn));
+    l_conn.reset();
+    return ret;
+  }
+
+  void finish_remote_submission(crimson::net::ConnectionFFRef conn) {
+    assert(conn);
+    assert(!l_conn);
+    assert(!r_conn);
+    r_conn = make_local_shared_foreign(std::move(conn));
+  }
+
+  seastar::future<> with_pg(
+    ShardServices &shard_services, Ref<PG> pg);
+
+  interruptible_future<> with_pg_interruptible(
+    PG& pg);
+
+  std::tuple<
+    StartEvent,
+    ConnectionPipeline::AwaitActive::BlockingEvent,
+    ConnectionPipeline::AwaitMap::BlockingEvent,
+    ConnectionPipeline::GetPGMapping::BlockingEvent,
+    PerShardPipeline::CreateOrWaitPG::BlockingEvent,
+    PGRepopPipeline::Process::BlockingEvent,
+    PG_OSDMapGate::OSDMapBlocker::BlockingEvent,
+    PGMap::PGCreationBlockingEvent,
+    OSD_OSDMapGate::OSDMapBlocker::BlockingEvent
+  > tracking_events;
+
+private:
+  crimson::net::ConnectionRef l_conn;
+  crimson::net::ConnectionXcoreRef r_conn;
+
+  // must be after `conn` to ensure the ConnectionPipeline is alive
+  PipelineHandle handle;
+  Ref<MOSDPGPCT> req;
+};
+
+}
+
+#if FMT_VERSION >= 90000
+template <> struct fmt::formatter<crimson::osd::PGPCTRequest> : fmt::ostream_formatter {};
+#endif
index 0a20f2c4d6241cd31d266d384749bfa1c0fbe863..13691c0d5bd5e2830b3f584980503f5207a8c40c 100644 (file)
@@ -13,6 +13,7 @@
 #include "common/ostream_temp.h"
 #include "include/interval_set.h"
 #include "crimson/net/Fwd.h"
+#include "messages/MOSDPGPCT.h"
 #include "messages/MOSDRepOpReply.h"
 #include "messages/MOSDOpReply.h"
 #include "os/Transaction.h"
@@ -747,10 +748,11 @@ private:
   std::unique_ptr<PGRecovery> recovery_handler;
   C_PG_FinishRecovery *recovery_finisher;
 
-  PeeringState peering_state;
   eversion_t projected_last_update;
 
 public:
+  PeeringState peering_state;
+
   // scrub state
 
   friend class ScrubScan;
@@ -924,6 +926,7 @@ private:
   friend class RepRequest;
   friend class LogMissingRequest;
   friend class LogMissingRequestReply;
+  friend class PGPCTRequest;
   friend struct PGFacade;
   friend class InternalClientRequest;
   friend class WatchTimeoutRequest;
index 392cb9735ea4d9c2a22611341515f0111c36b2a4..120c9da08e83ac3d5180b73fbb76a94a1cf18878 100644 (file)
@@ -24,7 +24,16 @@ ReplicatedBackend::ReplicatedBackend(pg_t pgid,
   : PGBackend{whoami.shard, coll, shard_services, dpp},
     pgid{pgid},
     whoami{whoami},
-    pg(pg)
+    pg(pg),
+    pct_timer([this, &pg]() mutable {
+      Ref<crimson::osd::PG> pgref(&pg);
+      std::ignore = interruptor::with_interruption([this] {
+       return send_pct_update();
+      }, [](std::exception_ptr ep) {
+       // nothing to do, new interval
+       return seastar::now();
+      }, pgref, pgref->get_osdmap_epoch());
+    })
 {}
 
 ReplicatedBackend::ll_read_ierrorator::future<ceph::bufferlist>
@@ -97,6 +106,8 @@ ReplicatedBackend::submit_transaction(
   auto osd_op_p = std::move(opp);
   auto _new_clone = std::move(new_clone);
 
+  cancel_pct_update();
+
   const ceph_tid_t tid = shard_services.get_tid();
   auto pending_txn =
     pending_trans.try_emplace(
@@ -195,6 +206,7 @@ ReplicatedBackend::submit_transaction(
     if (!to_push_delete.empty()) {
       pg.enqueue_delete_for_backfill(hoid, {}, to_push_delete);
     }
+    maybe_kick_pct_update();
     return seastar::now();
   });
 
@@ -211,6 +223,7 @@ void ReplicatedBackend::on_actingset_changed(bool same_primary)
     pending_txn.all_committed.set_exception(e_actingset_changed);
   }
   pending_trans.clear();
+  cancel_pct_update();
 }
 
 void ReplicatedBackend::got_rep_op_reply(const MOSDRepOpReply& reply)
@@ -279,3 +292,66 @@ ReplicatedBackend::request_committed(const osd_reqid_t& reqid,
     return seastar::now();
   }
 }
+
+ReplicatedBackend::interruptible_future<> ReplicatedBackend::send_pct_update()
+{
+  LOG_PREFIX(ReplicatedBackend::send_pct_update);
+  DEBUGDPP("", dpp);
+  ceph_assert(
+    PG_HAVE_FEATURE(pg.peering_state.get_pg_acting_features(), PCT));
+  for (const auto &i: pg.peering_state.get_actingset()) {
+    if (i == pg.get_pg_whoami()) continue;
+
+    auto pct_update = crimson::make_message<MOSDPGPCT>(
+      spg_t(pg.get_pgid().pgid, i.shard),
+      pg.get_osdmap_epoch(), pg.get_same_interval_since(),
+      pg.peering_state.get_pg_committed_to()
+    );
+
+    co_await interruptor::make_interruptible(
+      shard_services.send_to_osd(
+       i.osd,
+       std::move(pct_update),
+       pg.get_osdmap_epoch()));
+  }
+}
+
+void ReplicatedBackend::maybe_kick_pct_update()
+{
+  LOG_PREFIX(ReplicatedBackend::maybe_kick_pct_update);
+  DEBUGDPP("", dpp);
+  if (!pending_trans.empty()) {
+    DEBUGDPP("pending_trans queue not empty", dpp);
+    return;
+  }
+
+  if (!PG_HAVE_FEATURE(
+       pg.peering_state.get_pg_acting_features(), PCT)) {
+    DEBUGDPP("no PCT feature", dpp);
+    return;
+  }
+
+  int64_t pct_delay;
+  if (!pg.peering_state.get_pgpool().info.opts.get(
+        pool_opts_t::PCT_UPDATE_DELAY, &pct_delay)) {
+    DEBUGDPP("pct update delay not set", dpp);
+    return;
+  }
+
+  DEBUGDPP("scheduling pct callback in {} seconds", dpp, pct_delay);
+  pct_timer.arm(std::chrono::seconds(pct_delay));
+}
+
+void ReplicatedBackend::cancel_pct_update()
+{
+  LOG_PREFIX(ReplicatedBackend::cancel_pct_update);
+  DEBUGDPP("", dpp);
+  pct_timer.cancel();
+}
+
+void ReplicatedBackend::do_pct(const MOSDPGPCT &m)
+{
+  LOG_PREFIX(ReplicatedBackend::do_pct);
+  DEBUGDPP("{}", dpp, m);
+  pg.peering_state.update_pct(m.pg_committed_to);
+}
index 9f4132f7ae70c96d1925cc8fce14fcf2c202e677..43030bd0609d06e73d192f732ff0deee1367acf5 100644 (file)
@@ -6,6 +6,7 @@
 #include <boost/intrusive_ptr.hpp>
 #include <seastar/core/future.hh>
 #include <seastar/core/weak_ptr.hh>
+#include "messages/MOSDPGPCT.h"
 #include "include/buffer_fwd.h"
 #include "osd/osd_types.h"
 
@@ -20,6 +21,12 @@ namespace crimson::osd {
 class ReplicatedBackend : public PGBackend
 {
 public:
+  using interruptor = ::crimson::interruptible::interruptor<
+    ::crimson::osd::IOInterruptCondition>;
+  template <typename T = void>
+  using interruptible_future =
+    ::crimson::interruptible::interruptible_future<
+      ::crimson::osd::IOInterruptCondition, T>;
   ReplicatedBackend(pg_t pgid, pg_shard_t whoami,
                    crimson::osd::PG& pg,
                    CollectionRef coll,
@@ -81,4 +88,19 @@ private:
 
   seastar::future<> request_committed(
     const osd_reqid_t& reqid, const eversion_t& at_version) final;
+
+  seastar::timer<seastar::lowres_clock> pct_timer;
+
+  /// Invoked by pct_timer to update PCT after a pause in IO
+  interruptible_future<> send_pct_update();
+
+  /// Kick pct timer if repop_queue is empty
+  void maybe_kick_pct_update();
+
+  /// Cancel pct timer if scheduled
+  void cancel_pct_update();
+
+public:
+  /// Handle MOSDPGPCT message
+  void do_pct(const MOSDPGPCT &m);
 };