]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd: scrub integration with crimson
authorSamuel Just <sjust@redhat.com>
Sat, 12 Aug 2023 00:38:40 +0000 (00:38 +0000)
committerSamuel Just <sjust@redhat.com>
Mon, 11 Dec 2023 04:25:41 +0000 (20:25 -0800)
Signed-off-by: Samuel Just <sjust@redhat.com>
16 files changed:
src/crimson/admin/pg_commands.cc
src/crimson/admin/pg_commands.h
src/crimson/osd/CMakeLists.txt
src/crimson/osd/ops_executer.cc
src/crimson/osd/osd.cc
src/crimson/osd/osd.h
src/crimson/osd/osd_operation.h
src/crimson/osd/osd_operation_external_tracking.h
src/crimson/osd/osd_operations/client_request.cc
src/crimson/osd/osd_operations/client_request.h
src/crimson/osd/osd_operations/scrub_events.cc [new file with mode: 0644]
src/crimson/osd/osd_operations/scrub_events.h [new file with mode: 0644]
src/crimson/osd/pg.cc
src/crimson/osd/pg.h
src/crimson/osd/scrub/pg_scrubber.cc [new file with mode: 0644]
src/crimson/osd/scrub/pg_scrubber.h [new file with mode: 0644]

index f2c84b254db2a4c80756d68a67b782b4e7f64a76..c71bd429e78a55b0fe3aed48b83a872681aaebaf 100644 (file)
 #include <seastar/core/future.hh>
 
 #include "crimson/admin/admin_socket.h"
+#include "crimson/common/log.h"
 #include "crimson/osd/osd.h"
 #include "crimson/osd/pg.h"
 
+SET_SUBSYS(osd);
 
 using crimson::osd::OSD;
 using crimson::osd::PG;
@@ -148,6 +150,43 @@ public:
   }
 };
 
+template <bool deep>
+class ScrubCommand : public PGCommand {
+public:
+  explicit ScrubCommand(crimson::osd::OSD& osd) :
+    PGCommand{
+      osd,
+      deep ? "deep_scrub" : "scrub",
+      "",
+      deep ? "deep scrub pg" : "scrub pg"}
+  {}
+
+  seastar::future<tell_result_t>
+  do_command(Ref<PG> pg,
+            const cmdmap_t& cmdmap,
+            std::string_view format,
+            ceph::bufferlist&&) const final
+  {
+    LOG_PREFIX(ScrubCommand::do_command);
+    DEBUGDPP("deep: {}", *pg, deep);
+    return PG::interruptor::with_interruption([pg] {
+      pg->scrubber.handle_scrub_requested(deep);
+      return PG::interruptor::now();
+    }, [FNAME, pg](std::exception_ptr ep) {
+      DEBUGDPP("interrupted with {}", *pg, ep);
+    }, pg).then([format] {
+      std::unique_ptr<Formatter> f{
+       Formatter::create(format, "json-pretty", "json-pretty")
+      };
+      f->open_object_section("scrub");
+      f->dump_bool("deep", deep);
+      f->dump_stream("stamp") << ceph_clock_now();
+      f->close_section();
+      return seastar::make_ready_future<tell_result_t>(std::move(f));
+    });
+  }
+};
+
 } // namespace crimson::admin::pg
 
 namespace crimson::admin {
@@ -164,4 +203,9 @@ make_asok_hook<crimson::admin::pg::QueryCommand>(crimson::osd::OSD& osd);
 template std::unique_ptr<AdminSocketHook>
 make_asok_hook<crimson::admin::pg::MarkUnfoundLostCommand>(crimson::osd::OSD& osd);
 
+template std::unique_ptr<AdminSocketHook>
+make_asok_hook<crimson::admin::pg::ScrubCommand<true>>(crimson::osd::OSD& osd);
+template std::unique_ptr<AdminSocketHook>
+make_asok_hook<crimson::admin::pg::ScrubCommand<false>>(crimson::osd::OSD& osd);
+
 } // namespace crimson::admin
index 873b3c923aaf2e45c1f228fe42b715dc93301652..eb7912e7aa42635c6c46309e574baa75932d302d 100644 (file)
@@ -6,5 +6,7 @@ namespace crimson::admin::pg {
 
 class QueryCommand;
 class MarkUnfoundLostCommand;
+template <bool deep>
+class ScrubCommand;
 
 }  // namespace crimson::admin::pg
index e3ab3cf4d735e19b53eb35ec0ca5c0ed27065a4a..65fb7201f7661562a007cf33208e1986650b0c9d 100644 (file)
@@ -28,6 +28,7 @@ add_executable(crimson-osd
   osd_operations/background_recovery.cc
   osd_operations/recovery_subrequest.cc
   osd_operations/snaptrim_event.cc
+  osd_operations/scrub_events.cc
   pg_recovery.cc
   recovery_backend.cc
   replicated_recovery_backend.cc
@@ -35,6 +36,7 @@ add_executable(crimson-osd
   scheduler/mclock_scheduler.cc
   scrub/scrub_machine.cc
   scrub/scrub_validator.cc
+  scrub/pg_scrubber.cc
   osdmap_gate.cc
   pg_activation_blocker.cc
   pg_map.cc
index 09b1a492576fe12d196d6eb12c5c7076fa943789..7cdc7d9027b00a4cb9dc02ab8e0581840da5f329 100644 (file)
@@ -1040,6 +1040,7 @@ std::pair<object_info_t, ObjectContextRef> OpsExecuter::prepare_clone(
 void OpsExecuter::apply_stats()
 {
   pg->get_peering_state().apply_op_stats(get_target(), delta_stats);
+  pg->scrubber.handle_op_stats(get_target(), delta_stats);
   pg->publish_stats_to_osd();
 }
 
index f3648c6df277065e585480d16a19ab64ea5391f0..98f5f9d7ea7960db54e99257c5fc4c1b52a11d3e 100644 (file)
@@ -54,6 +54,7 @@
 #include "crimson/osd/osd_operations/pg_advance_map.h"
 #include "crimson/osd/osd_operations/recovery_subrequest.h"
 #include "crimson/osd/osd_operations/replicated_request.h"
+#include "crimson/osd/osd_operations/scrub_events.h"
 #include "crimson/osd/osd_operation_external_tracking.h"
 #include "crimson/crush/CrushLocation.h"
 
@@ -644,6 +645,8 @@ seastar::future<> OSD::start_asok_admin()
     // PG commands
     asok->register_command(make_asok_hook<pg::QueryCommand>(*this));
     asok->register_command(make_asok_hook<pg::MarkUnfoundLostCommand>(*this));
+    asok->register_command(make_asok_hook<pg::ScrubCommand<true>>(*this));
+    asok->register_command(make_asok_hook<pg::ScrubCommand<false>>(*this));
     // ops commands
     asok->register_command(
       make_asok_hook<DumpInFlightOpsHook>(
@@ -819,7 +822,13 @@ OSD::do_ms_dispatch(
   case MSG_OSD_REPOPREPLY:
     return handle_rep_op_reply(conn, boost::static_pointer_cast<MOSDRepOpReply>(m));
   case MSG_OSD_SCRUB2:
-    return handle_scrub(conn, boost::static_pointer_cast<MOSDScrub2>(m));
+    return handle_scrub_command(
+      conn, boost::static_pointer_cast<MOSDScrub2>(m));
+  case MSG_OSD_REP_SCRUB:
+  case MSG_OSD_REP_SCRUBMAP:
+    return handle_scrub_message(
+      conn,
+      boost::static_pointer_cast<MOSDFastDispatchOp>(m));
   case MSG_OSD_PG_UPDATE_LOG_MISSING:
     return handle_update_log_missing(conn, boost::static_pointer_cast<
       MOSDPGUpdateLogMissing>(m));
@@ -1220,7 +1229,7 @@ seastar::future<> OSD::handle_rep_op_reply(
     });
 }
 
-seastar::future<> OSD::handle_scrub(
+seastar::future<> OSD::handle_scrub_command(
   crimson::net::ConnectionRef conn,
   Ref<MOSDScrub2> m)
 {
@@ -1230,17 +1239,22 @@ seastar::future<> OSD::handle_scrub(
   }
   return seastar::parallel_for_each(std::move(m->scrub_pgs),
     [m, conn, this](spg_t pgid) {
-    pg_shard_t from_shard{static_cast<int>(m->get_source().num()),
-                          pgid.shard};
-    PeeringState::RequestScrub scrub_request{m->deep, m->repair};
-    return pg_shard_manager.start_pg_operation<RemotePeeringEvent>(
-      conn,
-      from_shard,
-      pgid,
-      PGPeeringEvent{m->epoch, m->epoch, scrub_request}).second;
+    return pg_shard_manager.start_pg_operation<
+      crimson::osd::ScrubRequested
+      >(m->deep, conn, m->epoch, pgid).second;
   });
 }
 
+seastar::future<> OSD::handle_scrub_message(
+  crimson::net::ConnectionRef conn,
+  Ref<MOSDFastDispatchOp> m)
+{
+  ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
+  return pg_shard_manager.start_pg_operation<
+    crimson::osd::ScrubMessage
+    >(m, conn, m->get_min_epoch(), m->get_spg()).second;
+}
+
 seastar::future<> OSD::handle_mark_me_down(
   crimson::net::ConnectionRef conn,
   Ref<MOSDMarkMeDown> m)
index 134376ad947ee03a12b92e7bcdf916383c666a6a..db30ad0ec7373887804806913c75beadd0028347 100644 (file)
@@ -206,8 +206,10 @@ private:
                                       Ref<MOSDPeeringOp> m);
   seastar::future<> handle_recovery_subreq(crimson::net::ConnectionRef conn,
                                            Ref<MOSDFastDispatchOp> m);
-  seastar::future<> handle_scrub(crimson::net::ConnectionRef conn,
-                                 Ref<MOSDScrub2> m);
+  seastar::future<> handle_scrub_command(crimson::net::ConnectionRef conn,
+                                        Ref<MOSDScrub2> m);
+  seastar::future<> handle_scrub_message(crimson::net::ConnectionRef conn,
+                                        Ref<MOSDFastDispatchOp> m);
   seastar::future<> handle_mark_me_down(crimson::net::ConnectionRef conn,
                                         Ref<MOSDMarkMeDown> m);
 
index 7174143fe01e994cb3033208aba08a70470f2ffb..b379d4515c1d9a0f7bf1623c945d5d4b9c95deee 100644 (file)
@@ -54,6 +54,11 @@ enum class OperationTypeCode {
   logmissing_request_reply,
   snaptrim_event,
   snaptrimobj_subevent,
+  scrub_requested,
+  scrub_message,
+  scrub_find_range,
+  scrub_reserve_range,
+  scrub_scan,
   last_op
 };
 
@@ -71,6 +76,11 @@ static constexpr const char* const OP_NAMES[] = {
   "logmissing_request_reply",
   "snaptrim_event",
   "snaptrimobj_subevent",
+  "scrub_requested",
+  "scrub_message",
+  "scrub_find_range",
+  "scrub_reserve_range",
+  "scrub_scan",
 };
 
 // prevent the addition of OperationTypeCode-s with no matching OP_NAMES entry:
index d5e2ed45328445c627f25544d5ac0b162a19ef44..5bc457c7809616c763d81dba8fe2d06855dbde3c 100644 (file)
@@ -14,6 +14,7 @@
 #include "crimson/osd/osd_operations/snaptrim_event.h"
 #include "crimson/osd/pg_activation_blocker.h"
 #include "crimson/osd/pg_map.h"
+#include "crimson/osd/scrub/pg_scrubber.h"
 
 namespace crimson::osd {
 
@@ -30,6 +31,7 @@ struct LttngBackend
     PG_OSDMapGate::OSDMapBlocker::BlockingEvent::Backend,
     ClientRequest::PGPipeline::WaitForActive::BlockingEvent::Backend,
     PGActivationBlocker::BlockingEvent::Backend,
+    scrub::PGScrubber::BlockingEvent::Backend,
     ClientRequest::PGPipeline::RecoverMissing::BlockingEvent::Backend,
     ClientRequest::PGPipeline::GetOBC::BlockingEvent::Backend,
     ClientRequest::PGPipeline::Process::BlockingEvent::Backend,
@@ -91,6 +93,11 @@ struct LttngBackend
               const PGActivationBlocker& blocker) override {
   }
 
+  void handle(scrub::PGScrubber::BlockingEvent& ev,
+              const Operation& op,
+              const scrub::PGScrubber& blocker) override {
+  }
+
   void handle(ClientRequest::PGPipeline::RecoverMissing::BlockingEvent& ev,
               const Operation& op,
               const ClientRequest::PGPipeline::RecoverMissing& blocker) override {
@@ -136,6 +143,7 @@ struct HistoricBackend
     PG_OSDMapGate::OSDMapBlocker::BlockingEvent::Backend,
     ClientRequest::PGPipeline::WaitForActive::BlockingEvent::Backend,
     PGActivationBlocker::BlockingEvent::Backend,
+    scrub::PGScrubber::BlockingEvent::Backend,
     ClientRequest::PGPipeline::RecoverMissing::BlockingEvent::Backend,
     ClientRequest::PGPipeline::GetOBC::BlockingEvent::Backend,
     ClientRequest::PGPipeline::Process::BlockingEvent::Backend,
@@ -197,6 +205,11 @@ struct HistoricBackend
               const PGActivationBlocker& blocker) override {
   }
 
+  void handle(scrub::PGScrubber::BlockingEvent& ev,
+              const Operation& op,
+              const scrub::PGScrubber& blocker) override {
+  }
+
   void handle(ClientRequest::PGPipeline::RecoverMissing::BlockingEvent& ev,
               const Operation& op,
               const ClientRequest::PGPipeline::RecoverMissing& blocker) override {
index 62eb82a281a984b9c3b8272d4a2f3a6f20afda37..120b92ae0949bd466125635d68352cde7350a990 100644 (file)
@@ -250,21 +250,31 @@ ClientRequest::process_op(
          DEBUGDPP("{}.{}: entered get_obc stage, about to wait_scrub",
                   *pg, *this, this_instance_id);
           op_info.set_from_op(&*m, *pg->get_osdmap());
-         return pg->with_locked_obc(
-           m->get_hobj(), op_info,
-           [FNAME, this, pg, this_instance_id, &ihref](
-             auto head, auto obc) mutable {
-             DEBUGDPP("{}.{}: got obc {}, entering process stage",
-                      *pg, *this, this_instance_id, obc->obs);
-             return ihref.enter_stage<interruptor>(
-               client_pp(*pg).process, *this
-             ).then_interruptible(
-               [FNAME, this, pg, this_instance_id, obc, &ihref]() mutable {
-                 DEBUGDPP("{}.{}: in process stage, calling do_process",
-                          *pg, *this, this_instance_id);
+         return ihref.enter_blocker(
+           *this,
+           pg->scrubber,
+           &decltype(pg->scrubber)::wait_scrub,
+           m->get_hobj()
+         ).then_interruptible(
+           [FNAME, this, pg, this_instance_id, &ihref]() mutable {
+             DEBUGDPP("{}.{}: past scrub blocker, getting obc",
+                      *pg, *this, this_instance_id);
+           return pg->with_locked_obc(
+             m->get_hobj(), op_info,
+             [FNAME, this, pg, this_instance_id, &ihref](
+               auto head, auto obc) mutable {
+               DEBUGDPP("{}.{}: got obc {}, entering process stage",
+                        *pg, *this, this_instance_id, obc->obs);
+               return ihref.enter_stage<interruptor>(
+                 client_pp(*pg).process, *this
+               ).then_interruptible(
+                 [FNAME, this, pg, this_instance_id, obc, &ihref]() mutable {
+                   DEBUGDPP("{}.{}: in process stage, calling do_process",
+                            *pg, *this, this_instance_id);
                  return do_process(ihref, pg, obc, this_instance_id);
                });
-           });
+             });
+         });
         });
       }
     });
@@ -354,7 +364,7 @@ ClientRequest::do_process(
     [FNAME, this, pg, this_instance_id, &ihref](
       auto submitted, auto all_completed) mutable {
       return submitted.then_interruptible(
-       [FNAME, this, pg, this_instance_id, &ihref] {
+       [this, pg, &ihref] {
        return ihref.enter_stage<interruptor>(client_pp(*pg).wait_repop, *this);
       }).then_interruptible(
        [FNAME, this, pg, this_instance_id,
@@ -371,12 +381,12 @@ ClientRequest::do_process(
                  return conn->send(std::move(reply));
                });
            }, crimson::ct_error::eagain::handle(
-             [FNAME, this, pg, this_instance_id, &ihref]() mutable {
+             [this, pg, this_instance_id, &ihref]() mutable {
                return process_op(ihref, pg, this_instance_id);
            }));
        });
     }, crimson::ct_error::eagain::handle(
-      [FNAME, this, pg, this_instance_id, &ihref]() mutable {
+      [this, pg, this_instance_id, &ihref]() mutable {
        return process_op(ihref, pg, this_instance_id);
       }));
 }
index 6599e4dbc6ab95e48775622bd23ac175a5c13b47..ac4c46981c3c31f720e89c8668b2e8c7db127e92 100644 (file)
@@ -17,6 +17,7 @@
 #include "crimson/osd/osd_operations/common/pg_pipeline.h"
 #include "crimson/osd/pg_activation_blocker.h"
 #include "crimson/osd/pg_map.h"
+#include "crimson/osd/scrub/pg_scrubber.h"
 #include "crimson/common/type_helpers.h"
 #include "crimson/common/utility.h"
 #include "messages/MOSDOp.h"
@@ -103,6 +104,7 @@ public:
       PGPipeline::WaitForActive::BlockingEvent,
       PGActivationBlocker::BlockingEvent,
       PGPipeline::RecoverMissing::BlockingEvent,
+      scrub::PGScrubber::BlockingEvent,
       PGPipeline::GetOBC::BlockingEvent,
       PGPipeline::Process::BlockingEvent,
       PGPipeline::WaitRepop::BlockingEvent,
diff --git a/src/crimson/osd/osd_operations/scrub_events.cc b/src/crimson/osd/osd_operations/scrub_events.cc
new file mode 100644 (file)
index 0000000..4f54cf0
--- /dev/null
@@ -0,0 +1,396 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "crimson/common/log.h"
+#include "crimson/osd/pg.h"
+#include "crimson/osd/osd_connection_priv.h"
+#include "messages/MOSDRepScrubMap.h"
+#include "scrub_events.h"
+
+SET_SUBSYS(osd);
+
+namespace crimson::osd {
+
+template <class T>
+PGPeeringPipeline &RemoteScrubEventBaseT<T>::get_peering_pipeline(PG &pg)
+{
+  return pg.peering_request_pg_pipeline;
+}
+
+template <class T>
+ConnectionPipeline &RemoteScrubEventBaseT<T>::get_connection_pipeline()
+{
+  return get_osd_priv(conn.get()).peering_request_conn_pipeline;
+}
+
+template <class T>
+PerShardPipeline &RemoteScrubEventBaseT<T>::get_pershard_pipeline(
+  ShardServices &shard_services)
+{
+  return shard_services.get_client_request_pipeline();
+}
+
+template <class T>
+seastar::future<> RemoteScrubEventBaseT<T>::with_pg(
+  ShardServices &shard_services, Ref<PG> pg)
+{
+  LOG_PREFIX(RemoteEventBaseT::with_pg);
+  return interruptor::with_interruption([FNAME, this, pg] {
+    DEBUGDPP("{} pg present", *pg, *that());
+    return this->template enter_stage<interruptor>(
+      get_peering_pipeline(*pg).await_map
+    ).then_interruptible([this, pg] {
+      return this->template with_blocking_event<
+       PG_OSDMapGate::OSDMapBlocker::BlockingEvent
+       >([this, pg](auto &&trigger) {
+         return pg->osdmap_gate.wait_for_map(
+           std::move(trigger), get_epoch());
+       });
+    }).then_interruptible([this, pg](auto) {
+      return this->template enter_stage<interruptor>(
+       get_peering_pipeline(*pg).process);
+    }).then_interruptible([this, pg] {
+      return handle_event(*pg);
+    });
+  }, [FNAME, pg, this](std::exception_ptr ep) {
+    DEBUGDPP("{} interrupted with {}", *pg, *that(), ep);
+  }, pg);
+}
+
+ScrubRequested::ifut<> ScrubRequested::handle_event(PG &pg)
+{
+  pg.scrubber.handle_scrub_requested(deep);
+  return seastar::now();
+}
+
+ScrubMessage::ifut<> ScrubMessage::handle_event(PG &pg)
+{
+  pg.scrubber.handle_scrub_message(*m);
+  return seastar::now();
+}
+
+template class RemoteScrubEventBaseT<ScrubRequested>;
+template class RemoteScrubEventBaseT<ScrubMessage>;
+
+template <typename T>
+ScrubAsyncOpT<T>::ScrubAsyncOpT(Ref<PG> pg) : pg(pg) {}
+
+template <typename T>
+typename ScrubAsyncOpT<T>::template ifut<> ScrubAsyncOpT<T>::start()
+{
+  LOG_PREFIX(ScrubAsyncOpT::start);
+  DEBUGDPP("{} starting", *pg, *this);
+  return run(*pg);
+}
+
+ScrubFindRange::ifut<> ScrubFindRange::run(PG &pg)
+{
+  LOG_PREFIX(ScrubFindRange::run);
+  using crimson::common::local_conf;
+  return interruptor::make_interruptible(
+    pg.shard_services.get_store().list_objects(
+      pg.get_collection_ref(),
+      ghobject_t(begin, ghobject_t::NO_GEN, pg.get_pgid().shard),
+      ghobject_t::get_max(),
+      local_conf().get_val<int64_t>("osd_scrub_chunk_max")
+    )
+  ).then_interruptible([FNAME, this, &pg](auto ret) {
+    auto &[_, next] = ret;
+
+    // We rely on seeing an entire set of snapshots in a single chunk
+    auto end = next.hobj.get_max_object_boundary();
+
+    DEBUGDPP("got next.hobj: {}, returning begin, end: {}, {}",
+            pg, next.hobj, begin, end);
+    pg.scrubber.machine.process_event(
+      scrub::ScrubContext::request_range_complete_t{begin, end});
+  });
+}
+
+template class ScrubAsyncOpT<ScrubFindRange>;
+
+ScrubReserveRange::ifut<> ScrubReserveRange::run(PG &pg)
+{
+  LOG_PREFIX(ScrubReserveRange::run);
+  DEBUGDPP("", pg);
+  return pg.background_process_lock.lock(
+  ).then_interruptible([FNAME, this, &pg] {
+    DEBUGDPP("pg_background_io_mutex locked", pg);
+    auto &scrubber = pg.scrubber;
+    ceph_assert(!scrubber.blocked);
+    scrubber.blocked = scrub::blocked_range_t{begin, end};
+    blocked_set = true;
+    auto& log = pg.peering_state.get_pg_log().get_log().log;
+    auto p = find_if(
+      log.crbegin(), log.crend(),
+      [this](const auto& e) -> bool {
+       return e.soid >= begin && e.soid < end;
+      });
+
+    if (p == log.crend()) {
+      return scrubber.machine.process_event(
+       scrub::ScrubContext::reserve_range_complete_t{eversion_t{}});
+    } else {
+      return scrubber.machine.process_event(
+       scrub::ScrubContext::reserve_range_complete_t{p->version});
+    }
+  }).finally([&pg, this] {
+    if (!blocked_set) {
+      pg.background_process_lock.unlock();
+    }
+  });
+}
+
+template class ScrubAsyncOpT<ScrubReserveRange>;
+
+ScrubScan::ifut<> ScrubScan::run(PG &pg)
+{
+  LOG_PREFIX(ScrubScan::start);
+  // legacy value, unused
+  ret.valid_through = pg.get_info().last_update;
+
+  DEBUGDPP("begin: {}, end: {}", pg, begin, end);
+  return interruptor::make_interruptible(
+    pg.shard_services.get_store().list_objects(
+      pg.get_collection_ref(),
+      ghobject_t(begin, ghobject_t::NO_GEN, pg.get_pgid().shard),
+      ghobject_t(end, ghobject_t::NO_GEN, pg.get_pgid().shard),
+      std::numeric_limits<uint64_t>::max())
+  ).then_interruptible([FNAME, this, &pg](auto &&result) {
+    DEBUGDPP("listed {} objects", pg, std::get<0>(result).size());
+    return seastar::do_with(
+      std::move(std::get<0>(result)),
+      [this, &pg](auto &objects) {
+       return interruptor::do_for_each(
+         objects,
+         [this, &pg](auto &obj) {
+           if (obj.is_pgmeta() || obj.hobj.is_temp()) {
+             return interruptor::now();
+           } else {
+             return scan_object(pg, obj);
+           }
+         });
+      });
+  }).then_interruptible([FNAME, this, &pg] {
+    if (local) {
+      DEBUGDPP("complete, submitting local event", pg);
+      pg.scrubber.handle_event(
+       scrub::ScrubContext::scan_range_complete_t(
+         pg.get_pg_whoami(),
+         std::move(ret)));
+      return seastar::now();
+    } else {
+      DEBUGDPP("complete, sending response to primary", pg);
+      auto m = crimson::make_message<MOSDRepScrubMap>(
+       spg_t(pg.get_pgid().pgid, pg.get_primary().shard),
+       pg.get_osdmap_epoch(),
+       pg.get_pg_whoami());
+      encode(ret, m->get_data());
+      pg.scrubber.handle_event(
+       scrub::ScrubContext::generate_and_submit_chunk_result_complete_t{});
+      return pg.shard_services.send_to_osd(
+       pg.get_primary().osd,
+       std::move(m),
+       pg.get_osdmap_epoch());
+    }
+  });
+}
+
+ScrubScan::ifut<> ScrubScan::scan_object(
+  PG &pg,
+  const ghobject_t &obj)
+{
+  LOG_PREFIX(ScrubScan::scan_object);
+  DEBUGDPP("obj: {}", pg, obj);
+  auto &entry = ret.objects[obj.hobj];
+  return interruptor::make_interruptible(
+    pg.shard_services.get_store().stat(
+      pg.get_collection_ref(),
+      obj)
+  ).then_interruptible([FNAME, &pg, &obj, &entry](struct stat obj_stat) {
+    DEBUGDPP("obj: {}, stat complete, size {}", pg, obj, obj_stat.st_size);
+    entry.size = obj_stat.st_size;
+    return pg.shard_services.get_store().get_attrs(
+      pg.get_collection_ref(),
+      obj);
+  }).safe_then_interruptible([FNAME, &pg, &obj, &entry](auto &&attrs) {
+    DEBUGDPP("obj: {}, got {} attrs", pg, obj, attrs.size());
+    for (auto &i : attrs) {
+      i.second.rebuild();
+      if (i.second.length() == 0) {
+       entry.attrs[i.first];
+      } else {
+       entry.attrs.emplace(i.first, i.second.front());
+      }
+    }
+  }).handle_error_interruptible(
+    ct_error::all_same_way([FNAME, &pg, &obj, &entry](auto e) {
+      DEBUGDPP("obj: {} stat error", pg, obj);
+      entry.stat_error = true;
+    })
+  ).then_interruptible([FNAME, this, &pg, &obj] {
+    if (deep) {
+      DEBUGDPP("obj: {} doing deep scan", pg, obj);
+      return deep_scan_object(pg, obj);
+    } else {
+      return interruptor::now();
+    }
+  });
+
+}
+
+struct obj_scrub_progress_t {
+  // nullopt once complete
+  std::optional<uint64_t> offset = 0;
+  ceph::buffer::hash data_hash{std::numeric_limits<uint32_t>::max()};
+
+  bool header_done = false;
+  std::optional<std::string> next_key;
+  bool keys_done = false;
+  ceph::buffer::hash omap_hash{std::numeric_limits<uint32_t>::max()};
+};
+ScrubScan::ifut<> ScrubScan::deep_scan_object(
+  PG &pg,
+  const ghobject_t &obj)
+{
+  LOG_PREFIX(ScrubScan::deep_scan_object);
+  DEBUGDPP("obj: {}", pg, obj);
+  using crimson::common::local_conf;
+  auto &entry = ret.objects[obj.hobj];
+  return interruptor::repeat(
+    [FNAME, this, progress = obj_scrub_progress_t(),
+     &obj, &entry, &pg]() mutable
+    -> interruptible_future<seastar::stop_iteration> {
+      if (progress.offset) {
+       DEBUGDPP("op: {}, obj: {}, progress: {} scanning data",
+                pg, *this, obj, progress);
+       const auto stride = local_conf().get_val<Option::size_t>(
+         "osd_deep_scrub_stride");
+       return pg.shard_services.get_store().read(
+         pg.get_collection_ref(),
+         obj,
+         *(progress.offset),
+         stride
+       ).safe_then([stride, &progress, &entry](auto bl) {
+         progress.data_hash << bl;
+         if (bl.length() < stride) {
+           progress.offset = std::nullopt;
+           entry.digest = progress.data_hash.digest();
+           entry.digest_present = true;
+         } else {
+           ceph_assert(stride == bl.length());
+           *(progress.offset) += stride;
+         }
+       }).handle_error(
+         ct_error::all_same_way([&progress, &entry](auto e) {
+           entry.read_error = true;
+           progress.offset = std::nullopt;
+         })
+       ).then([] {
+         return interruptor::make_interruptible(
+           seastar::make_ready_future<seastar::stop_iteration>(
+             seastar::stop_iteration::no));
+       });
+      } else if (!progress.header_done) {
+       DEBUGDPP("op: {}, obj: {}, progress: {} scanning omap header",
+                pg, *this, obj, progress);
+       return pg.shard_services.get_store().omap_get_header(
+         pg.get_collection_ref(),
+         obj
+       ).safe_then([&progress](auto bl) {
+         progress.omap_hash << bl;
+       }).handle_error(
+         ct_error::enodata::handle([] {}),
+         ct_error::all_same_way([&entry](auto e) {
+           entry.read_error = true;
+         })
+       ).then([&progress] {
+         progress.header_done = true;
+         return interruptor::make_interruptible(
+           seastar::make_ready_future<seastar::stop_iteration>(
+             seastar::stop_iteration::no));
+       });
+      } else if (!progress.keys_done) {
+       DEBUGDPP("op: {}, obj: {}, progress: {} scanning omap keys",
+                pg, *this, obj, progress);
+       return pg.shard_services.get_store().omap_get_values(
+         pg.get_collection_ref(),
+         obj,
+         progress.next_key
+       ).safe_then([FNAME, this, &obj, &progress, &entry, &pg](auto result) {
+         const auto &[done, omap] = result;
+         DEBUGDPP("op: {}, obj: {}, progress: {} got {} keys",
+                  pg, *this, obj, progress, omap.size());
+         for (const auto &p : omap) {
+           bufferlist bl;
+           encode(p.first, bl);
+           encode(p.second, bl);
+           progress.omap_hash << bl;
+           entry.object_omap_keys++;
+           entry.object_omap_bytes += p.second.length();
+         }
+         if (done) {
+           DEBUGDPP("op: {}, obj: {}, progress: {} omap done",
+                    pg, *this, obj, progress);
+           progress.keys_done = true;
+           entry.omap_digest = progress.omap_hash.digest();
+           entry.omap_digest_present = true;
+
+           if ((entry.object_omap_keys >
+                local_conf().get_val<uint64_t>(
+                  "osd_deep_scrub_large_omap_object_key_threshold")) ||
+               (entry.object_omap_bytes >
+                local_conf().get_val<Option::size_t>(
+                  "osd_deep_scrub_large_omap_object_value_sum_threshold"))) {
+             entry.large_omap_object_found = true;
+             entry.large_omap_object_key_count = entry.object_omap_keys;
+             ret.has_large_omap_object_errors = true;
+           }
+         } else {
+           ceph_assert(!omap.empty()); // omap_get_values invariant
+           DEBUGDPP("op: {}, obj: {}, progress: {} omap not done, next {}",
+                    pg, *this, obj, progress, omap.crbegin()->first);
+           progress.next_key = omap.crbegin()->first;
+         }
+       }).handle_error(
+         ct_error::all_same_way([FNAME, this, &obj, &progress, &entry, &pg]
+                                (auto e) {
+           DEBUGDPP("op: {}, obj: {}, progress: {} error reading omap {}",
+                    pg, *this, obj, progress, e);
+           progress.keys_done = true;
+           entry.read_error = true;
+         })
+       ).then([] {
+         return interruptor::make_interruptible(
+           seastar::make_ready_future<seastar::stop_iteration>(
+             seastar::stop_iteration::no));
+       });
+      } else {
+       DEBUGDPP("op: {}, obj: {}, progress: {} done",
+                pg, *this, obj, progress);
+       return interruptor::make_interruptible(
+         seastar::make_ready_future<seastar::stop_iteration>(
+           seastar::stop_iteration::yes));
+      }
+    });
+}
+
+template class ScrubAsyncOpT<ScrubScan>;
+
+}
+
+template <>
+struct fmt::formatter<crimson::osd::obj_scrub_progress_t> {
+  constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
+  template <typename FormatContext>
+  auto format(const crimson::osd::obj_scrub_progress_t &progress,
+             FormatContext& ctx)
+  {
+    return fmt::format_to(
+      ctx.out(),
+      "obj_scrub_progress_t(offset: {}, "
+      "header_done: {}, next_key: {}, keys_done: {})",
+      progress.offset, progress.header_done,
+      progress.next_key, progress.keys_done);
+  }
+};
diff --git a/src/crimson/osd/osd_operations/scrub_events.h b/src/crimson/osd/osd_operations/scrub_events.h
new file mode 100644 (file)
index 0000000..0793983
--- /dev/null
@@ -0,0 +1,290 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "common/Formatter.h"
+#include "crimson/osd/osd_operation.h"
+#include "crimson/osd/scrub/pg_scrubber.h"
+#include "osd/osd_types.h"
+#include "peering_event.h"
+
+namespace crimson::osd {
+
+class PG;
+
+template <typename T>
+class RemoteScrubEventBaseT : public PhasedOperationT<T> {
+  T* that() {
+    return static_cast<T*>(this);
+  }
+  const T* that() const {
+    return static_cast<const T*>(this);
+  }
+
+  PipelineHandle handle;
+
+  crimson::net::ConnectionRef conn;
+  epoch_t epoch;
+  spg_t pgid;
+
+protected:
+  using interruptor = InterruptibleOperation::interruptor;
+
+  template <typename U=void>
+  using ifut = InterruptibleOperation::interruptible_future<U>;
+
+  virtual ifut<> handle_event(PG &pg) = 0;
+public:
+  RemoteScrubEventBaseT(
+    crimson::net::ConnectionRef conn, epoch_t epoch, spg_t pgid)
+    : conn(conn), epoch(epoch), pgid(pgid) {}
+
+  PGPeeringPipeline &get_peering_pipeline(PG &pg);
+  ConnectionPipeline &get_connection_pipeline();
+  PerShardPipeline &get_pershard_pipeline(ShardServices &);
+
+  crimson::net::Connection &get_connection() {
+    assert(conn);
+    return *conn;
+  };
+
+  static constexpr bool can_create() { return false; }
+
+  spg_t get_pgid() const {
+    return pgid;
+  }
+
+  PipelineHandle &get_handle() { return handle; }
+  epoch_t get_epoch() const { return epoch; }
+
+  seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() {
+    assert(conn);
+    return conn.get_foreign(
+    ).then([this](auto f_conn) {
+      conn.reset();
+      return f_conn;
+    });
+  }
+  void finish_remote_submission(crimson::net::ConnectionFRef _conn) {
+    assert(!conn);
+    conn = make_local_shared_foreign(std::move(_conn));
+  }
+
+  seastar::future<> with_pg(
+    ShardServices &shard_services, Ref<PG> pg);
+
+  std::tuple<
+    class TrackableOperationT<T>::StartEvent,
+    ConnectionPipeline::AwaitActive::BlockingEvent,
+    ConnectionPipeline::AwaitMap::BlockingEvent,
+    OSD_OSDMapGate::OSDMapBlocker::BlockingEvent,
+    ConnectionPipeline::GetPGMapping::BlockingEvent,
+    PerShardPipeline::CreateOrWaitPG::BlockingEvent,
+    PGMap::PGCreationBlockingEvent,
+    PGPeeringPipeline::AwaitMap::BlockingEvent,
+    PG_OSDMapGate::OSDMapBlocker::BlockingEvent,
+    PGPeeringPipeline::Process::BlockingEvent,
+    class TrackableOperationT<T>::CompletionEvent
+  > tracking_events;
+
+  virtual ~RemoteScrubEventBaseT() = default;
+};
+
+class ScrubRequested final : public RemoteScrubEventBaseT<ScrubRequested> {
+  bool deep = false;
+protected:
+  ifut<> handle_event(PG &pg) final;
+
+public:
+  static constexpr OperationTypeCode type = OperationTypeCode::scrub_requested;
+
+  template <typename... Args>
+  ScrubRequested(bool deep, Args&&... base_args)
+    : RemoteScrubEventBaseT<ScrubRequested>(std::forward<Args>(base_args)...),
+      deep(deep) {}
+
+  void print(std::ostream &out) const final {
+    out << "(deep=" << deep << ")";
+  }
+  void dump_detail(ceph::Formatter *f) const final {
+    f->dump_bool("deep", deep);
+  }
+
+};
+
+class ScrubMessage final : public RemoteScrubEventBaseT<ScrubMessage> {
+  MessageRef m;
+protected:
+  ifut<> handle_event(PG &pg) final;
+
+public:
+  static constexpr OperationTypeCode type = OperationTypeCode::scrub_message;
+
+  template <typename... Args>
+  ScrubMessage(MessageRef m, Args&&... base_args)
+    : RemoteScrubEventBaseT<ScrubMessage>(std::forward<Args>(base_args)...),
+      m(m) {
+    ceph_assert(scrub::PGScrubber::is_scrub_message(*m));
+  }
+
+  void print(std::ostream &out) const final {
+    out << "(m=" << *m << ")";
+  }
+  void dump_detail(ceph::Formatter *f) const final {
+    f->dump_stream("m") << *m;
+  }
+
+};
+
+template <typename T>
+class ScrubAsyncOpT : public TrackableOperationT<T> {
+  Ref<PG> pg;
+
+public:
+  using interruptor = InterruptibleOperation::interruptor;
+  template <typename U=void>
+  using ifut = InterruptibleOperation::interruptible_future<U>;
+
+  ScrubAsyncOpT(Ref<PG> pg);
+
+  ifut<> start();
+
+  virtual ~ScrubAsyncOpT() = default;
+
+protected:
+  virtual ifut<> run(PG &pg) = 0;
+};
+
+class ScrubFindRange : public ScrubAsyncOpT<ScrubFindRange> {
+  hobject_t begin;
+public:
+  static constexpr OperationTypeCode type = OperationTypeCode::scrub_find_range;
+
+  template <typename... Args>
+  ScrubFindRange(const hobject_t &begin, Args&&... args)
+    : ScrubAsyncOpT(std::forward<Args>(args)...), begin(begin) {}
+
+  void print(std::ostream &out) const final {
+    out << "(begin=" << begin << ")";
+  }
+  void dump_detail(ceph::Formatter *f) const final {
+    f->dump_stream("begin") << begin;
+  }
+
+
+protected:
+  ifut<> run(PG &pg) final;
+};
+
+class ScrubReserveRange : public ScrubAsyncOpT<ScrubReserveRange> {
+  hobject_t begin;
+  hobject_t end;
+
+  /// see run(), used to unlock background_io_mutex on interval change
+  bool blocked_set = false;
+public:
+  static constexpr OperationTypeCode type =
+    OperationTypeCode::scrub_reserve_range;
+
+  template <typename... Args>
+  ScrubReserveRange(const hobject_t &begin, const hobject_t &end, Args&&... args)
+    : ScrubAsyncOpT(std::forward<Args>(args)...), begin(begin), end(end) {}
+
+  void print(std::ostream &out) const final {
+    out << "(begin=" << begin << ", end=" << end << ")";
+  }
+  void dump_detail(ceph::Formatter *f) const final {
+    f->dump_stream("begin") << begin;
+    f->dump_stream("end") << end;
+  }
+
+
+protected:
+  ifut<> run(PG &pg) final;
+};
+
+class ScrubScan : public ScrubAsyncOpT<ScrubScan> {
+  /// deep or shallow scrub
+  const bool deep;
+
+  /// true: send event locally, false: send result to primary
+  const bool local;
+
+  /// object range to scan: [begin, end)
+  const hobject_t begin;
+  const hobject_t end;
+
+  /// result, see local
+  ScrubMap ret;
+
+  ifut<> scan_object(PG &pg, const ghobject_t &obj);
+  ifut<> deep_scan_object(PG &pg, const ghobject_t &obj);
+
+public:
+  static constexpr OperationTypeCode type = OperationTypeCode::scrub_scan;
+
+  void print(std::ostream &out) const final {
+    out << "(deep=" << deep
+       << ", local=" << local
+       << ", begin=" << begin
+       << ", end=" << end
+       << ")";
+  }
+  void dump_detail(ceph::Formatter *f) const final {
+    f->dump_bool("deep", deep);
+    f->dump_bool("local", local);
+    f->dump_stream("begin") << begin;
+    f->dump_stream("end") << end;
+  }
+
+  ScrubScan(
+    Ref<PG> pg, bool deep, bool local,
+    const hobject_t &begin, const hobject_t &end)
+    : ScrubAsyncOpT(pg), deep(deep), local(local), begin(begin), end(end) {}
+
+protected:
+  ifut<> run(PG &pg) final;
+};
+
+}
+
+namespace crimson {
+
+template <>
+struct EventBackendRegistry<osd::ScrubRequested> {
+  static std::tuple<> get_backends() {
+    return {};
+  }
+};
+
+template <>
+struct EventBackendRegistry<osd::ScrubMessage> {
+  static std::tuple<> get_backends() {
+    return {};
+  }
+};
+
+}
+
+#if FMT_VERSION >= 90000
+template <> struct fmt::formatter<crimson::osd::ScrubRequested>
+  : fmt::ostream_formatter {};
+
+template <> struct fmt::formatter<crimson::osd::ScrubMessage>
+  : fmt::ostream_formatter {};
+
+template <typename T>
+struct fmt::formatter<crimson::osd::ScrubAsyncOpT<T>>
+  : fmt::ostream_formatter {};
+
+template <> struct fmt::formatter<crimson::osd::ScrubFindRange>
+  : fmt::ostream_formatter {};
+
+template <> struct fmt::formatter<crimson::osd::ScrubReserveRange>
+  : fmt::ostream_formatter {};
+
+template <> struct fmt::formatter<crimson::osd::ScrubScan>
+  : fmt::ostream_formatter {};
+
+#endif
index 5b7f1fd2578141035c1fb5f1bd22a71dc6a5d03c..6537eed2b7d534f240d3bff6c6a8cb9d7decd727 100644 (file)
@@ -125,6 +125,7 @@ PG::PG(
       osdmap,
       this,
       this),
+    scrubber(*this),
     obc_registry{
       local_conf()},
     obc_loader{
@@ -144,6 +145,7 @@ PG::PG(
       pgid.shard),
     wait_for_active_blocker(this)
 {
+  scrubber.initiate();
   peering_state.set_backend_predicates(
     new ReadablePredicate(pg_whoami),
     new RecoverablePredicate());
@@ -331,6 +333,12 @@ void PG::on_activate(interval_set<snapid_t> snaps)
   projected_last_update = peering_state.get_info().last_update;
 }
 
+void PG::on_replica_activate()
+{
+  logger().debug("{}: {}", *this, __func__);
+  scrubber.on_replica_activate();
+}
+
 void PG::on_activate_complete()
 {
   wait_for_active_blocker.unblock();
@@ -458,7 +466,7 @@ PG::do_delete_work(ceph::os::Transaction &t, ghobject_t _next)
 
 Context *PG::on_clean()
 {
-  // Not needed yet (will be needed for IO unblocking)
+  scrubber.on_primary_active_clean();
   return nullptr;
 }
 
@@ -1347,6 +1355,9 @@ void PG::log_operation(
   if (!is_primary()) { // && !is_ec_pg()
     replica_clear_repop_obc(logv);
   }
+  if (!logv.empty()) {
+    scrubber.on_log_update(logv.rbegin()->version);
+  }
   peering_state.append_log(std::move(logv),
                            trim_to,
                            roll_forward_to,
@@ -1527,6 +1538,7 @@ void PG::on_change(ceph::os::Transaction &t) {
     logger().debug("{} {}: dropping requests", *this, __func__);
     client_request_orderer.clear_and_cancel(*this);
   }
+  scrubber.on_interval_change();
 }
 
 void PG::context_registry_on_change() {
index 7f734ecd5e778b4e5646e93dbd4e5e629b9d5bd5..80a181f24a23aba92b439160fb14fa581d2b721b 100644 (file)
@@ -39,6 +39,7 @@
 #include "crimson/osd/pg_recovery_listener.h"
 #include "crimson/osd/recovery_backend.h"
 #include "crimson/osd/object_context_loader.h"
+#include "crimson/osd/scrub/pg_scrubber.h"
 
 class MQuery;
 class OSDMap;
@@ -160,8 +161,6 @@ public:
     bool need_write_epoch,
     ceph::os::Transaction &t) final;
 
-  void scrub_requested(scrub_level_t scrub_level, scrub_type_t scrub_type) final;
-
   uint64_t get_snap_trimq_size() const final {
     return std::size(snap_trimq);
   }
@@ -318,6 +317,7 @@ public:
   }
   void on_change(ceph::os::Transaction &t) final;
   void on_activate(interval_set<snapid_t> to_trim) final;
+  void on_replica_activate() final;
   void on_activate_complete() final;
   void on_new_interval() final {
     // Not needed yet
@@ -627,6 +627,18 @@ private:
   eversion_t projected_last_update;
 
 public:
+  // scrub state
+
+  friend class ScrubScan;
+  friend class ScrubFindRange;
+  friend class ScrubReserveRange;
+  friend class scrub::PGScrubber;
+  template <typename T> friend class RemoteScrubEventBaseT;
+
+  scrub::PGScrubber scrubber;
+
+  void scrub_requested(scrub_level_t scrub_level, scrub_type_t scrub_type) final;
+
   ObjectContextRegistry obc_registry;
   ObjectContextLoader obc_loader;
 
diff --git a/src/crimson/osd/scrub/pg_scrubber.cc b/src/crimson/osd/scrub/pg_scrubber.cc
new file mode 100644 (file)
index 0000000..3ac2c91
--- /dev/null
@@ -0,0 +1,309 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
+
+#include <fmt/ranges.h>
+
+#include "crimson/common/log.h"
+#include "crimson/osd/pg.h"
+#include "crimson/osd/osd_operations/scrub_events.h"
+#include "messages/MOSDRepScrub.h"
+#include "messages/MOSDRepScrubMap.h"
+#include "pg_scrubber.h"
+
+SET_SUBSYS(osd);
+
+namespace crimson::osd::scrub {
+
+void PGScrubber::dump_detail(Formatter *f) const
+{
+  f->dump_stream("pgid") << pg.get_pgid();
+}
+
+PGScrubber::PGScrubber(PG &pg) : pg(pg), dpp(pg), machine(*this) {}
+
+void PGScrubber::on_primary_active_clean()
+{
+  LOG_PREFIX(PGScrubber::on_primary_active_clean);
+  DEBUGDPP("", pg);
+  handle_event(events::primary_activate_t{});
+}
+
+void PGScrubber::on_replica_activate()
+{
+  LOG_PREFIX(PGScrubber::on_replica_activate);
+  DEBUGDPP("", pg);
+  handle_event(events::replica_activate_t{});
+}
+
+void PGScrubber::on_interval_change()
+{
+  LOG_PREFIX(PGScrubber::on_interval_change);
+  DEBUGDPP("", pg);
+  /* Once reservations and scheduling are introduced, we'll need an
+   * IntervalChange event to drop remote resources (they'll be automatically
+   * released on the other side) */
+  handle_event(events::reset_t{});
+  waiting_for_update = std::nullopt;
+  ceph_assert(!blocked);
+}
+
+void PGScrubber::on_log_update(eversion_t v)
+{
+  LOG_PREFIX(PGScrubber::on_interval_change);
+  if (waiting_for_update && v >= *waiting_for_update) {
+    DEBUGDPP("waiting_for_update: {}, v: {}", pg, *waiting_for_update, v);
+    handle_event(await_update_complete_t{});
+    waiting_for_update = std::nullopt;
+  }
+}
+
+void PGScrubber::handle_scrub_requested(bool deep)
+{
+  LOG_PREFIX(PGScrubber::handle_scrub_requested);
+  DEBUGDPP("deep: {}", pg, deep);
+  handle_event(events::start_scrub_t{deep});
+}
+
+void PGScrubber::handle_scrub_message(Message &_m)
+{
+  LOG_PREFIX(PGScrubber::handle_scrub_requested);
+  switch (_m.get_type()) {
+  case MSG_OSD_REP_SCRUB: {
+    MOSDRepScrub &m = *static_cast<MOSDRepScrub*>(&_m);
+    DEBUGDPP("MOSDRepScrub: {}", pg, m);
+    handle_event(events::replica_scan_t{
+       m.start, m.end, m.scrub_from, m.deep
+      });
+    break;
+  }
+  case MSG_OSD_REP_SCRUBMAP: {
+    MOSDRepScrubMap &m = *static_cast<MOSDRepScrubMap*>(&_m);
+    DEBUGDPP("MOSDRepScrubMap: {}", pg, m);
+    ScrubMap map;
+    auto iter = m.get_data().cbegin();
+    ::decode(map, iter);
+    handle_event(scan_range_complete_t{
+       m.from, std::move(map)
+      });
+    break;
+  }
+  default:
+    DEBUGDPP("invalid message: {}", pg, _m);
+    ceph_assert(is_scrub_message(_m));
+  }
+}
+
+void PGScrubber::handle_op_stats(
+  const hobject_t &on_object,
+  object_stat_sum_t delta_stats) {
+  handle_event(events::op_stats_t{on_object, delta_stats});
+}
+
+PGScrubber::ifut<> PGScrubber::wait_scrub(
+  PGScrubber::BlockingEvent::TriggerI&& trigger,
+  const hobject_t &hoid)
+{
+  LOG_PREFIX(PGScrubber::wait_scrub);
+  if (blocked && (hoid >= blocked->begin) && (hoid < blocked->end)) {
+    DEBUGDPP("blocked: {}, hoid: {}", pg, *blocked, hoid);
+    return trigger.maybe_record_blocking(
+      blocked->p.get_shared_future(),
+      *this);
+  } else {
+    return seastar::now();
+  }
+}
+
+void PGScrubber::notify_scrub_start(bool deep)
+{
+  LOG_PREFIX(PGScrubber::notify_scrub_start);
+  DEBUGDPP("deep: {}", pg, deep);
+  pg.peering_state.state_set(PG_STATE_SCRUBBING);
+  if (deep) {
+    pg.peering_state.state_set(PG_STATE_DEEP_SCRUB);
+  }
+  pg.publish_stats_to_osd();
+}
+
+void PGScrubber::notify_scrub_end(bool deep)
+{
+  LOG_PREFIX(PGScrubber::notify_scrub_end);
+  DEBUGDPP("deep: {}", pg, deep);
+  pg.peering_state.state_clear(PG_STATE_SCRUBBING);
+  if (deep) {
+    pg.peering_state.state_clear(PG_STATE_DEEP_SCRUB);
+  }
+  pg.publish_stats_to_osd();
+}
+
+const std::set<pg_shard_t> &PGScrubber::get_ids_to_scrub() const
+{
+  return pg.peering_state.get_actingset();
+}
+
+chunk_validation_policy_t PGScrubber::get_policy() const
+{
+  return chunk_validation_policy_t{
+    pg.get_primary(),
+    std::nullopt /* stripe_info, populate when EC is implemented */,
+    crimson::common::local_conf().get_val<Option::size_t>(
+      "osd_max_object_size"),
+    crimson::common::local_conf().get_val<std::string>(
+      "osd_hit_set_namespace"),
+    crimson::common::local_conf().get_val<Option::size_t>(
+      "osd_deep_scrub_large_omap_object_value_sum_threshold"),
+    crimson::common::local_conf().get_val<uint64_t>(
+      "osd_deep_scrub_large_omap_object_key_threshold")
+  };
+}
+
+void PGScrubber::request_range(const hobject_t &start)
+{
+  LOG_PREFIX(PGScrubber::request_range);
+  DEBUGDPP("start: {}", pg, start);
+  std::ignore = pg.shard_services.start_operation_may_interrupt<
+    interruptor, ScrubFindRange
+    >(start, &pg);
+}
+
+/* TODO: This isn't actually enough.  Here, classic would
+ * hold the pg lock from the wait_scrub through to IO submission.
+ * ClientRequest, however, isn't in the processing ExclusivePhase
+ * bit yet, and so this check may miss ops between the wait_scrub
+ * check and adding the IO to the log. */
+
+void PGScrubber::reserve_range(const hobject_t &start, const hobject_t &end)
+{
+  LOG_PREFIX(PGScrubber::reserve_range);
+  DEBUGDPP("start: {}, end: {}", pg, start, end);
+  std::ignore = pg.shard_services.start_operation_may_interrupt<
+    interruptor, ScrubReserveRange
+    >(start, end, &pg);
+}
+
+void PGScrubber::release_range()
+{
+  LOG_PREFIX(PGScrubber::release_range);
+  ceph_assert(blocked);
+  DEBUGDPP("blocked: {}", pg, *blocked);
+  pg.background_process_lock.unlock();
+  blocked->p.set_value();
+  blocked = std::nullopt;
+}
+
+void PGScrubber::scan_range(
+  pg_shard_t target,
+  eversion_t version,
+  bool deep,
+  const hobject_t &start,
+  const hobject_t &end)
+{
+  LOG_PREFIX(PGScrubber::scan_range);
+  DEBUGDPP("target: {}, version: {}, deep: {}, start: {}, end: {}",
+          pg, target, version, deep, start, end);
+  if (target == pg.get_pg_whoami()) {
+    std::ignore = pg.shard_services.start_operation_may_interrupt<
+      interruptor, ScrubScan
+      >(&pg, deep, true /* local */, start, end);
+  } else {
+    std::ignore = pg.shard_services.send_to_osd(
+      target.osd,
+      crimson::make_message<MOSDRepScrub>(
+       spg_t(pg.get_pgid().pgid, target.shard),
+       version,
+       pg.get_osdmap_epoch(),
+       pg.get_osdmap_epoch(),
+       start,
+       end,
+       deep,
+       false /* allow preemption -- irrelevant for replicas TODO */,
+       64 /* priority, TODO */,
+       false /* high_priority TODO */),
+      pg.get_osdmap_epoch());
+  }
+}
+
+bool PGScrubber::await_update(const eversion_t &version)
+{
+  LOG_PREFIX(PGScrubber::await_update);
+  DEBUGDPP("version: {}", pg, version);
+  ceph_assert(!waiting_for_update);
+  auto& log = pg.peering_state.get_pg_log().get_log().log;
+  eversion_t current = log.empty() ? eversion_t() : log.rbegin()->version;
+  if (version <= current) {
+    return true;
+  } else {
+    waiting_for_update = version;
+    return false;
+  }
+}
+
+void PGScrubber::generate_and_submit_chunk_result(
+  const hobject_t &begin,
+  const hobject_t &end,
+  bool deep)
+{
+  LOG_PREFIX(PGScrubber::generate_and_submit_chunk_result);
+  DEBUGDPP("begin: {}, end: {}, deep: {}", pg, begin, end, deep);
+  std::ignore = pg.shard_services.start_operation_may_interrupt<
+    interruptor, ScrubScan
+    >(&pg, deep, false /* local */, begin, end);
+}
+
+#define LOG_SCRUB_ERROR(MSG, ...) {                                    \
+    auto errorstr = fmt::format(MSG, __VA_ARGS__);                     \
+    ERRORDPP("{}", pg, errorstr);                                      \
+    pg.get_clog_error() << "pg " << pg.get_pgid() << ": " << errorstr; \
+  }
+
+void PGScrubber::emit_chunk_result(
+  const request_range_result_t &range,
+  chunk_result_t &&result)
+{
+  LOG_PREFIX(PGScrubber::emit_chunk_result);
+  if (result.has_errors()) {
+    LOG_SCRUB_ERROR(
+      "Scrub errors found. range: {}, result: {}",
+      range, result);
+  } else {
+    DEBUGDPP("Chunk complete. range: {}", pg, range);
+  }
+}
+
+void PGScrubber::emit_scrub_result(
+  bool deep,
+  object_stat_sum_t in_stats)
+{
+  LOG_PREFIX(PGScrubber::emit_scrub_result);
+  DEBUGDPP("", pg);
+  pg.peering_state.update_stats(
+    [this, FNAME, deep, &in_stats](auto &history, auto &pg_stats) {
+      foreach_scrub_maintained_stat(
+       [deep, &pg_stats, &in_stats](
+         const auto &name, auto statptr, bool skip_for_shallow) {
+         if (deep && !skip_for_shallow) {
+           pg_stats.stats.sum.*statptr = in_stats.*statptr;
+         }
+       });
+      foreach_scrub_checked_stat(
+       [this, FNAME, &pg_stats, &in_stats](
+         const auto &name, auto statptr, const auto &invalid_predicate) {
+         if (!invalid_predicate(pg_stats) &&
+             (in_stats.*statptr != pg_stats.stats.sum.*statptr)) {
+           LOG_SCRUB_ERROR(
+             "stat mismatch for {}: scrubbed value: {}, stored pg value: {}",
+             name, in_stats.*statptr, pg_stats.stats.sum.*statptr);
+           ++pg_stats.stats.sum.num_shallow_scrub_errors;
+         }
+       });
+      history.last_scrub = pg.peering_state.get_info().last_update;
+      auto now = ceph_clock_now();
+      history.last_scrub_stamp = now;
+      if (deep) {
+       history.last_deep_scrub_stamp = now;
+      }
+      return false; // notify_scrub_end will flush stats to osd
+    });
+}
+
+}
diff --git a/src/crimson/osd/scrub/pg_scrubber.h b/src/crimson/osd/scrub/pg_scrubber.h
new file mode 100644 (file)
index 0000000..2d528e0
--- /dev/null
@@ -0,0 +1,152 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
+
+#pragma once
+
+#include "crimson/osd/pg_interval_interrupt_condition.h"
+#include "scrub_machine.h"
+
+namespace crimson::osd {
+class PG;
+class ScrubScan;
+class ScrubFindRange;
+class ScrubReserveRange;
+}
+
+namespace crimson::osd::scrub {
+
+struct blocked_range_t {
+  hobject_t begin;
+  hobject_t end;
+  seastar::shared_promise<> p;
+};
+
+class PGScrubber : public crimson::BlockerT<PGScrubber>, ScrubContext {
+  friend class ::crimson::osd::ScrubScan;
+  friend class ::crimson::osd::ScrubFindRange;
+  friend class ::crimson::osd::ScrubReserveRange;
+
+  using interruptor = ::crimson::interruptible::interruptor<
+    ::crimson::osd::IOInterruptCondition>;
+  template <typename T = void>
+  using ifut =
+    ::crimson::interruptible::interruptible_future<
+      ::crimson::osd::IOInterruptCondition, T>;
+
+  PG &pg;
+
+  /// PG alias for logging in header functions
+  DoutPrefixProvider &dpp;
+
+  ScrubMachine machine;
+
+  std::optional<blocked_range_t> blocked;
+
+  std::optional<eversion_t> waiting_for_update;
+
+  template <typename E>
+  void handle_event(E &&e)
+  {
+    LOG_PREFIX(PGScrubber::handle_event);
+    SUBDEBUGDPP(osd, "handle_event: {}", dpp, e);
+    machine.process_event(std::forward<E>(e));
+  }
+
+public:
+  static constexpr const char *type_name = "PGScrubber";
+  using Blocker = PGScrubber;
+  void dump_detail(Formatter *f) const;
+
+  static inline bool is_scrub_message(Message &m) {
+    switch (m.get_type()) {
+    case MSG_OSD_REP_SCRUB:
+    case MSG_OSD_REP_SCRUBMAP:
+      return true;
+    default:
+      return false;
+    }
+    return false;
+  }
+
+  PGScrubber(PG &pg);
+
+  /// setup scrub machine state
+  void initiate() { machine.initiate(); }
+
+  /// notify machine on primary that PG is active+clean
+  void on_primary_active_clean();
+
+  /// notify machine on replica that PG is active
+  void on_replica_activate();
+
+  /// notify machine of interval change
+  void on_interval_change();
+
+  /// notify machine that PG has committed up to versino v
+  void on_log_update(eversion_t v);
+
+  /// handle scrub request
+  void handle_scrub_requested(bool deep);
+
+
+  /// handle other scrub message
+  void handle_scrub_message(Message &m);
+
+  /// notify machine of a mutation of on_object resulting in delta_stats
+  void handle_op_stats(
+    const hobject_t &on_object,
+    object_stat_sum_t delta_stats);
+
+  /// maybe block an op trying to mutate hoid until chunk is complete
+  ifut<> wait_scrub(
+    PGScrubber::BlockingEvent::TriggerI&& trigger,
+    const hobject_t &hoid);
+
+private:
+  DoutPrefixProvider &get_dpp() final { return dpp; }
+
+  void notify_scrub_start(bool deep) final;
+  void notify_scrub_end(bool deep) final;
+
+  const std::set<pg_shard_t> &get_ids_to_scrub() const final;
+
+  chunk_validation_policy_t get_policy() const final;
+
+  void request_range(const hobject_t &start) final;
+  void reserve_range(const hobject_t &start, const hobject_t &end) final;
+  void release_range() final;
+  void scan_range(
+    pg_shard_t target,
+    eversion_t version,
+    bool deep,
+    const hobject_t &start,
+    const hobject_t &end) final;
+  bool await_update(const eversion_t &version) final;
+  void generate_and_submit_chunk_result(
+    const hobject_t &begin,
+    const hobject_t &end,
+    bool deep) final;
+  void emit_chunk_result(
+    const request_range_result_t &range,
+    chunk_result_t &&result) final;
+  void emit_scrub_result(
+    bool deep,
+    object_stat_sum_t scrub_stats) final;
+};
+
+};
+
+template <>
+struct fmt::formatter<crimson::osd::scrub::blocked_range_t> {
+  constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
+
+  template <typename FormatContext>
+  auto format(const auto &range, FormatContext& ctx)
+  {
+    return fmt::format_to(
+      ctx.out(),
+      "{}~{}",
+      range.begin,
+      range.end);
+  }
+};