osd_operations/logmissing_request_reply.cc
osd_operations/background_recovery.cc
osd_operations/recovery_subrequest.cc
+ osd_operations/pgpct_request.cc
osd_operations/snaptrim_event.cc
osd_operations/scrub_events.cc
pg_recovery.cc
#include "messages/MOSDPGUpdateLogMissingReply.h"
#include "messages/MOSDRepOpReply.h"
#include "messages/MOSDScrub2.h"
+#include "messages/MOSDPGPCT.h"
#include "messages/MPGStats.h"
#include "os/Transaction.h"
#include "crimson/osd/pg_meta.h"
#include "crimson/osd/osd_operations/client_request.h"
#include "crimson/osd/osd_operations/peering_event.h"
+#include "crimson/osd/osd_operations/pgpct_request.h"
#include "crimson/osd/osd_operations/pg_advance_map.h"
#include "crimson/osd/osd_operations/recovery_subrequest.h"
#include "crimson/osd/osd_operations/replicated_request.h"
case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
return handle_update_log_missing_reply(conn, boost::static_pointer_cast<
MOSDPGUpdateLogMissingReply>(m));
+ case MSG_OSD_PG_PCT:
+ return handle_pg_pct(conn, boost::static_pointer_cast<
+ MOSDPGPCT>(m));
default:
return std::nullopt;
}
std::move(m)).second;
}
+seastar::future<> OSD::handle_pg_pct(
+ crimson::net::ConnectionRef conn,
+ Ref<MOSDPGPCT> m)
+{
+ return pg_shard_manager.start_pg_operation<PGPCTRequest>(
+ std::move(conn),
+ std::move(m)).second;
+}
+
seastar::future<> OSD::handle_rep_op(
crimson::net::ConnectionRef conn,
Ref<MOSDRepOp> m)
class MCommand;
class MOSDMap;
+class MOSDPGPCT;
class MOSDRepOpReply;
class MOSDRepOp;
class MOSDScrub2;
seastar::future<> handle_update_log_missing_reply(
crimson::net::ConnectionRef conn,
Ref<MOSDPGUpdateLogMissingReply> m);
+ seastar::future<> handle_pg_pct(
+ crimson::net::ConnectionRef conn,
+ Ref<MOSDPGPCT> m);
std::vector<DaemonHealthMetric> get_health_metrics();
scrub_find_range,
scrub_reserve_range,
scrub_scan,
+ pgpct_request,
last_op
};
"scrub_find_range",
"scrub_reserve_range",
"scrub_scan",
+ "pgpct_request",
};
// prevent the addition of OperationTypeCode-s with no matching OP_NAMES entry:
#include "crimson/osd/osd_operations/background_recovery.h"
#include "crimson/osd/osd_operations/client_request.h"
#include "crimson/osd/osd_operations/peering_event.h"
+#include "crimson/osd/osd_operations/pgpct_request.h"
#include "crimson/osd/osd_operations/pg_advance_map.h"
#include "crimson/osd/osd_operations/recovery_subrequest.h"
#include "crimson/osd/osd_operations/replicated_request.h"
}
};
+template <>
+struct EventBackendRegistry<osd::PGPCTRequest> {
+ static std::tuple<> get_backends() {
+ return {/* no extenral backends */};
+ }
+};
+
+
template <>
struct EventBackendRegistry<osd::RecoverySubRequest> {
static std::tuple<> get_backends() {
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "logmissing_request.h"
+
+#include "common/Formatter.h"
+
+#include "crimson/osd/osd.h"
+#include "crimson/osd/osd_connection_priv.h"
+#include "crimson/osd/osd_operation_external_tracking.h"
+#include "crimson/osd/pg.h"
+#include "crimson/osd/replicated_backend.h"
+
+SET_SUBSYS(osd);
+
+namespace crimson::osd {
+
+PGPCTRequest::PGPCTRequest(crimson::net::ConnectionRef&& conn,
+ Ref<MOSDPGPCT> &&req)
+ : l_conn{std::move(conn)},
+ req{std::move(req)}
+{}
+
+void PGPCTRequest::print(std::ostream& os) const
+{
+ os << "PGPCTRequest("
+ << " req=" << *req
+ << ")";
+}
+
+void PGPCTRequest::dump_detail(Formatter *f) const
+{
+ f->open_object_section("PGPCTRequest");
+ f->dump_stream("pgid") << req->get_spg();
+ f->dump_unsigned("map_epoch", req->get_map_epoch());
+ f->dump_unsigned("min_epoch", req->get_min_epoch());
+ f->dump_stream("pg_committed_to") << req->pg_committed_to;
+ f->close_section();
+}
+
+ConnectionPipeline &PGPCTRequest::get_connection_pipeline()
+{
+ return get_osd_priv(
+ &get_local_connection()
+ ).client_request_conn_pipeline;
+}
+
+PerShardPipeline &PGPCTRequest::get_pershard_pipeline(
+ ShardServices &shard_services)
+{
+ return shard_services.get_replicated_request_pipeline();
+}
+
+PGPCTRequest::interruptible_future<> PGPCTRequest::with_pg_interruptible(
+ PG &pg)
+{
+ LOG_PREFIX(PGPCTRequest::with_pg_interruptible);
+ DEBUGDPP("{}", pg, *this);
+ co_await this->template enter_stage<interruptor>(pg.repop_pipeline.process);
+
+ {
+ auto fut = this->template with_blocking_event<
+ PG_OSDMapGate::OSDMapBlocker::BlockingEvent
+ >([this, &pg](auto &&trigger) {
+ return pg.osdmap_gate.wait_for_map(
+ std::move(trigger), req->min_epoch);
+ });
+ co_await interruptor::make_interruptible(std::move(fut));
+ }
+
+ // This *must* be a replicated backend, ec doesn't have pct messages
+ static_cast<ReplicatedBackend&>(*(pg.backend)).do_pct(*req);
+}
+
+seastar::future<> PGPCTRequest::with_pg(
+ ShardServices &shard_services, Ref<PG> pgref)
+{
+ LOG_PREFIX(PGPCTRequest::with_pg);
+ DEBUGDPP("{}", *pgref, *this);
+
+ PG &pg = *pgref;
+ IRef ref = this;
+ return interruptor::with_interruption([this, &pg] {
+ return with_pg_interruptible(pg);
+ }, [](std::exception_ptr) {
+ return seastar::now();
+ }, pgref, pgref->get_osdmap_epoch()).finally(
+ [FNAME, this, ref=std::move(ref), pgref=std::move(pgref)]() mutable {
+ DEBUGDPP("exit", *pgref, *this);
+ return handle.complete(
+ ).then([ref=std::move(ref), pgref=std::move(pgref)] {});
+ });
+}
+
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "crimson/net/Connection.h"
+#include "crimson/osd/osdmap_gate.h"
+#include "crimson/osd/osd_operation.h"
+#include "crimson/osd/osd_operations/client_request.h"
+#include "crimson/osd/pg_map.h"
+#include "crimson/common/type_helpers.h"
+#include "messages/MOSDPGPCT.h"
+
+namespace ceph {
+ class Formatter;
+}
+
+namespace crimson::osd {
+
+class ShardServices;
+
+class OSD;
+class PG;
+
+class PGPCTRequest final : public PhasedOperationT<PGPCTRequest> {
+public:
+ static constexpr OperationTypeCode type = OperationTypeCode::pgpct_request;
+ PGPCTRequest(crimson::net::ConnectionRef&&, Ref<MOSDPGPCT>&&);
+
+ void print(std::ostream &) const final;
+ void dump_detail(ceph::Formatter* f) const final;
+
+ static constexpr bool can_create() { return false; }
+ spg_t get_pgid() const {
+ return req->get_spg();
+ }
+ PipelineHandle &get_handle() { return handle; }
+ epoch_t get_epoch() const { return req->get_min_epoch(); }
+ epoch_t get_epoch_sent_at() const { return req->get_map_epoch(); }
+
+ ConnectionPipeline &get_connection_pipeline();
+
+ PerShardPipeline &get_pershard_pipeline(ShardServices &);
+
+ crimson::net::Connection &get_local_connection() {
+ assert(l_conn);
+ assert(!r_conn);
+ return *l_conn;
+ };
+
+ crimson::net::Connection &get_foreign_connection() {
+ assert(r_conn);
+ assert(!l_conn);
+ return *r_conn;
+ };
+
+ crimson::net::ConnectionFFRef prepare_remote_submission() {
+ assert(l_conn);
+ assert(!r_conn);
+ auto ret = seastar::make_foreign(std::move(l_conn));
+ l_conn.reset();
+ return ret;
+ }
+
+ void finish_remote_submission(crimson::net::ConnectionFFRef conn) {
+ assert(conn);
+ assert(!l_conn);
+ assert(!r_conn);
+ r_conn = make_local_shared_foreign(std::move(conn));
+ }
+
+ seastar::future<> with_pg(
+ ShardServices &shard_services, Ref<PG> pg);
+
+ interruptible_future<> with_pg_interruptible(
+ PG& pg);
+
+ std::tuple<
+ StartEvent,
+ ConnectionPipeline::AwaitActive::BlockingEvent,
+ ConnectionPipeline::AwaitMap::BlockingEvent,
+ ConnectionPipeline::GetPGMapping::BlockingEvent,
+ PerShardPipeline::CreateOrWaitPG::BlockingEvent,
+ PGRepopPipeline::Process::BlockingEvent,
+ PG_OSDMapGate::OSDMapBlocker::BlockingEvent,
+ PGMap::PGCreationBlockingEvent,
+ OSD_OSDMapGate::OSDMapBlocker::BlockingEvent
+ > tracking_events;
+
+private:
+ crimson::net::ConnectionRef l_conn;
+ crimson::net::ConnectionXcoreRef r_conn;
+
+ // must be after `conn` to ensure the ConnectionPipeline is alive
+ PipelineHandle handle;
+ Ref<MOSDPGPCT> req;
+};
+
+}
+
+#if FMT_VERSION >= 90000
+template <> struct fmt::formatter<crimson::osd::PGPCTRequest> : fmt::ostream_formatter {};
+#endif
#include "common/ostream_temp.h"
#include "include/interval_set.h"
#include "crimson/net/Fwd.h"
+#include "messages/MOSDPGPCT.h"
#include "messages/MOSDRepOpReply.h"
#include "messages/MOSDOpReply.h"
#include "os/Transaction.h"
std::unique_ptr<PGRecovery> recovery_handler;
C_PG_FinishRecovery *recovery_finisher;
- PeeringState peering_state;
eversion_t projected_last_update;
public:
+ PeeringState peering_state;
+
// scrub state
friend class ScrubScan;
friend class RepRequest;
friend class LogMissingRequest;
friend class LogMissingRequestReply;
+ friend class PGPCTRequest;
friend struct PGFacade;
friend class InternalClientRequest;
friend class WatchTimeoutRequest;
: PGBackend{whoami.shard, coll, shard_services, dpp},
pgid{pgid},
whoami{whoami},
- pg(pg)
+ pg(pg),
+ pct_timer([this, &pg]() mutable {
+ Ref<crimson::osd::PG> pgref(&pg);
+ std::ignore = interruptor::with_interruption([this] {
+ return send_pct_update();
+ }, [](std::exception_ptr ep) {
+ // nothing to do, new interval
+ return seastar::now();
+ }, pgref, pgref->get_osdmap_epoch());
+ })
{}
ReplicatedBackend::ll_read_ierrorator::future<ceph::bufferlist>
auto osd_op_p = std::move(opp);
auto _new_clone = std::move(new_clone);
+ cancel_pct_update();
+
const ceph_tid_t tid = shard_services.get_tid();
auto pending_txn =
pending_trans.try_emplace(
if (!to_push_delete.empty()) {
pg.enqueue_delete_for_backfill(hoid, {}, to_push_delete);
}
+ maybe_kick_pct_update();
return seastar::now();
});
pending_txn.all_committed.set_exception(e_actingset_changed);
}
pending_trans.clear();
+ cancel_pct_update();
}
void ReplicatedBackend::got_rep_op_reply(const MOSDRepOpReply& reply)
return seastar::now();
}
}
+
+ReplicatedBackend::interruptible_future<> ReplicatedBackend::send_pct_update()
+{
+ LOG_PREFIX(ReplicatedBackend::send_pct_update);
+ DEBUGDPP("", dpp);
+ ceph_assert(
+ PG_HAVE_FEATURE(pg.peering_state.get_pg_acting_features(), PCT));
+ for (const auto &i: pg.peering_state.get_actingset()) {
+ if (i == pg.get_pg_whoami()) continue;
+
+ auto pct_update = crimson::make_message<MOSDPGPCT>(
+ spg_t(pg.get_pgid().pgid, i.shard),
+ pg.get_osdmap_epoch(), pg.get_same_interval_since(),
+ pg.peering_state.get_pg_committed_to()
+ );
+
+ co_await interruptor::make_interruptible(
+ shard_services.send_to_osd(
+ i.osd,
+ std::move(pct_update),
+ pg.get_osdmap_epoch()));
+ }
+}
+
+void ReplicatedBackend::maybe_kick_pct_update()
+{
+ LOG_PREFIX(ReplicatedBackend::maybe_kick_pct_update);
+ DEBUGDPP("", dpp);
+ if (!pending_trans.empty()) {
+ DEBUGDPP("pending_trans queue not empty", dpp);
+ return;
+ }
+
+ if (!PG_HAVE_FEATURE(
+ pg.peering_state.get_pg_acting_features(), PCT)) {
+ DEBUGDPP("no PCT feature", dpp);
+ return;
+ }
+
+ int64_t pct_delay;
+ if (!pg.peering_state.get_pgpool().info.opts.get(
+ pool_opts_t::PCT_UPDATE_DELAY, &pct_delay)) {
+ DEBUGDPP("pct update delay not set", dpp);
+ return;
+ }
+
+ DEBUGDPP("scheduling pct callback in {} seconds", dpp, pct_delay);
+ pct_timer.arm(std::chrono::seconds(pct_delay));
+}
+
+void ReplicatedBackend::cancel_pct_update()
+{
+ LOG_PREFIX(ReplicatedBackend::cancel_pct_update);
+ DEBUGDPP("", dpp);
+ pct_timer.cancel();
+}
+
+void ReplicatedBackend::do_pct(const MOSDPGPCT &m)
+{
+ LOG_PREFIX(ReplicatedBackend::do_pct);
+ DEBUGDPP("{}", dpp, m);
+ pg.peering_state.update_pct(m.pg_committed_to);
+}
#include <boost/intrusive_ptr.hpp>
#include <seastar/core/future.hh>
#include <seastar/core/weak_ptr.hh>
+#include "messages/MOSDPGPCT.h"
#include "include/buffer_fwd.h"
#include "osd/osd_types.h"
class ReplicatedBackend : public PGBackend
{
public:
+ using interruptor = ::crimson::interruptible::interruptor<
+ ::crimson::osd::IOInterruptCondition>;
+ template <typename T = void>
+ using interruptible_future =
+ ::crimson::interruptible::interruptible_future<
+ ::crimson::osd::IOInterruptCondition, T>;
ReplicatedBackend(pg_t pgid, pg_shard_t whoami,
crimson::osd::PG& pg,
CollectionRef coll,
seastar::future<> request_committed(
const osd_reqid_t& reqid, const eversion_t& at_version) final;
+
+ seastar::timer<seastar::lowres_clock> pct_timer;
+
+ /// Invoked by pct_timer to update PCT after a pause in IO
+ interruptible_future<> send_pct_update();
+
+ /// Kick pct timer if repop_queue is empty
+ void maybe_kick_pct_update();
+
+ /// Cancel pct timer if scheduled
+ void cancel_pct_update();
+
+public:
+ /// Handle MOSDPGPCT message
+ void do_pct(const MOSDPGPCT &m);
};