From: Samuel Just Date: Sat, 12 Aug 2023 00:38:40 +0000 (+0000) Subject: crimson/osd: scrub integration with crimson X-Git-Tag: v19.3.0~296^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=d75ea50d5a6463b5ddf36bbcd472481cae5daf9c;p=ceph.git crimson/osd: scrub integration with crimson Signed-off-by: Samuel Just --- diff --git a/src/crimson/admin/pg_commands.cc b/src/crimson/admin/pg_commands.cc index f2c84b254db..c71bd429e78 100644 --- a/src/crimson/admin/pg_commands.cc +++ b/src/crimson/admin/pg_commands.cc @@ -11,9 +11,11 @@ #include #include "crimson/admin/admin_socket.h" +#include "crimson/common/log.h" #include "crimson/osd/osd.h" #include "crimson/osd/pg.h" +SET_SUBSYS(osd); using crimson::osd::OSD; using crimson::osd::PG; @@ -148,6 +150,43 @@ public: } }; +template +class ScrubCommand : public PGCommand { +public: + explicit ScrubCommand(crimson::osd::OSD& osd) : + PGCommand{ + osd, + deep ? "deep_scrub" : "scrub", + "", + deep ? "deep scrub pg" : "scrub pg"} + {} + + seastar::future + do_command(Ref pg, + const cmdmap_t& cmdmap, + std::string_view format, + ceph::bufferlist&&) const final + { + LOG_PREFIX(ScrubCommand::do_command); + DEBUGDPP("deep: {}", *pg, deep); + return PG::interruptor::with_interruption([pg] { + pg->scrubber.handle_scrub_requested(deep); + return PG::interruptor::now(); + }, [FNAME, pg](std::exception_ptr ep) { + DEBUGDPP("interrupted with {}", *pg, ep); + }, pg).then([format] { + std::unique_ptr f{ + Formatter::create(format, "json-pretty", "json-pretty") + }; + f->open_object_section("scrub"); + f->dump_bool("deep", deep); + f->dump_stream("stamp") << ceph_clock_now(); + f->close_section(); + return seastar::make_ready_future(std::move(f)); + }); + } +}; + } // namespace crimson::admin::pg namespace crimson::admin { @@ -164,4 +203,9 @@ make_asok_hook(crimson::osd::OSD& osd); template std::unique_ptr make_asok_hook(crimson::osd::OSD& osd); +template std::unique_ptr +make_asok_hook>(crimson::osd::OSD& osd); +template std::unique_ptr +make_asok_hook>(crimson::osd::OSD& osd); + } // namespace crimson::admin diff --git a/src/crimson/admin/pg_commands.h b/src/crimson/admin/pg_commands.h index 873b3c923aa..eb7912e7aa4 100644 --- a/src/crimson/admin/pg_commands.h +++ b/src/crimson/admin/pg_commands.h @@ -6,5 +6,7 @@ namespace crimson::admin::pg { class QueryCommand; class MarkUnfoundLostCommand; +template +class ScrubCommand; } // namespace crimson::admin::pg diff --git a/src/crimson/osd/CMakeLists.txt b/src/crimson/osd/CMakeLists.txt index e3ab3cf4d73..65fb7201f76 100644 --- a/src/crimson/osd/CMakeLists.txt +++ b/src/crimson/osd/CMakeLists.txt @@ -28,6 +28,7 @@ add_executable(crimson-osd osd_operations/background_recovery.cc osd_operations/recovery_subrequest.cc osd_operations/snaptrim_event.cc + osd_operations/scrub_events.cc pg_recovery.cc recovery_backend.cc replicated_recovery_backend.cc @@ -35,6 +36,7 @@ add_executable(crimson-osd scheduler/mclock_scheduler.cc scrub/scrub_machine.cc scrub/scrub_validator.cc + scrub/pg_scrubber.cc osdmap_gate.cc pg_activation_blocker.cc pg_map.cc diff --git a/src/crimson/osd/ops_executer.cc b/src/crimson/osd/ops_executer.cc index 09b1a492576..7cdc7d9027b 100644 --- a/src/crimson/osd/ops_executer.cc +++ b/src/crimson/osd/ops_executer.cc @@ -1040,6 +1040,7 @@ std::pair OpsExecuter::prepare_clone( void OpsExecuter::apply_stats() { pg->get_peering_state().apply_op_stats(get_target(), delta_stats); + pg->scrubber.handle_op_stats(get_target(), delta_stats); pg->publish_stats_to_osd(); } diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc index f3648c6df27..98f5f9d7ea7 100644 --- a/src/crimson/osd/osd.cc +++ b/src/crimson/osd/osd.cc @@ -54,6 +54,7 @@ #include "crimson/osd/osd_operations/pg_advance_map.h" #include "crimson/osd/osd_operations/recovery_subrequest.h" #include "crimson/osd/osd_operations/replicated_request.h" +#include "crimson/osd/osd_operations/scrub_events.h" #include "crimson/osd/osd_operation_external_tracking.h" #include "crimson/crush/CrushLocation.h" @@ -644,6 +645,8 @@ seastar::future<> OSD::start_asok_admin() // PG commands asok->register_command(make_asok_hook(*this)); asok->register_command(make_asok_hook(*this)); + asok->register_command(make_asok_hook>(*this)); + asok->register_command(make_asok_hook>(*this)); // ops commands asok->register_command( make_asok_hook( @@ -819,7 +822,13 @@ OSD::do_ms_dispatch( case MSG_OSD_REPOPREPLY: return handle_rep_op_reply(conn, boost::static_pointer_cast(m)); case MSG_OSD_SCRUB2: - return handle_scrub(conn, boost::static_pointer_cast(m)); + return handle_scrub_command( + conn, boost::static_pointer_cast(m)); + case MSG_OSD_REP_SCRUB: + case MSG_OSD_REP_SCRUBMAP: + return handle_scrub_message( + conn, + boost::static_pointer_cast(m)); case MSG_OSD_PG_UPDATE_LOG_MISSING: return handle_update_log_missing(conn, boost::static_pointer_cast< MOSDPGUpdateLogMissing>(m)); @@ -1220,7 +1229,7 @@ seastar::future<> OSD::handle_rep_op_reply( }); } -seastar::future<> OSD::handle_scrub( +seastar::future<> OSD::handle_scrub_command( crimson::net::ConnectionRef conn, Ref m) { @@ -1230,17 +1239,22 @@ seastar::future<> OSD::handle_scrub( } return seastar::parallel_for_each(std::move(m->scrub_pgs), [m, conn, this](spg_t pgid) { - pg_shard_t from_shard{static_cast(m->get_source().num()), - pgid.shard}; - PeeringState::RequestScrub scrub_request{m->deep, m->repair}; - return pg_shard_manager.start_pg_operation( - conn, - from_shard, - pgid, - PGPeeringEvent{m->epoch, m->epoch, scrub_request}).second; + return pg_shard_manager.start_pg_operation< + crimson::osd::ScrubRequested + >(m->deep, conn, m->epoch, pgid).second; }); } +seastar::future<> OSD::handle_scrub_message( + crimson::net::ConnectionRef conn, + Ref m) +{ + ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); + return pg_shard_manager.start_pg_operation< + crimson::osd::ScrubMessage + >(m, conn, m->get_min_epoch(), m->get_spg()).second; +} + seastar::future<> OSD::handle_mark_me_down( crimson::net::ConnectionRef conn, Ref m) diff --git a/src/crimson/osd/osd.h b/src/crimson/osd/osd.h index 134376ad947..db30ad0ec73 100644 --- a/src/crimson/osd/osd.h +++ b/src/crimson/osd/osd.h @@ -206,8 +206,10 @@ private: Ref m); seastar::future<> handle_recovery_subreq(crimson::net::ConnectionRef conn, Ref m); - seastar::future<> handle_scrub(crimson::net::ConnectionRef conn, - Ref m); + seastar::future<> handle_scrub_command(crimson::net::ConnectionRef conn, + Ref m); + seastar::future<> handle_scrub_message(crimson::net::ConnectionRef conn, + Ref m); seastar::future<> handle_mark_me_down(crimson::net::ConnectionRef conn, Ref m); diff --git a/src/crimson/osd/osd_operation.h b/src/crimson/osd/osd_operation.h index 7174143fe01..b379d4515c1 100644 --- a/src/crimson/osd/osd_operation.h +++ b/src/crimson/osd/osd_operation.h @@ -54,6 +54,11 @@ enum class OperationTypeCode { logmissing_request_reply, snaptrim_event, snaptrimobj_subevent, + scrub_requested, + scrub_message, + scrub_find_range, + scrub_reserve_range, + scrub_scan, last_op }; @@ -71,6 +76,11 @@ static constexpr const char* const OP_NAMES[] = { "logmissing_request_reply", "snaptrim_event", "snaptrimobj_subevent", + "scrub_requested", + "scrub_message", + "scrub_find_range", + "scrub_reserve_range", + "scrub_scan", }; // prevent the addition of OperationTypeCode-s with no matching OP_NAMES entry: diff --git a/src/crimson/osd/osd_operation_external_tracking.h b/src/crimson/osd/osd_operation_external_tracking.h index d5e2ed45328..5bc457c7809 100644 --- a/src/crimson/osd/osd_operation_external_tracking.h +++ b/src/crimson/osd/osd_operation_external_tracking.h @@ -14,6 +14,7 @@ #include "crimson/osd/osd_operations/snaptrim_event.h" #include "crimson/osd/pg_activation_blocker.h" #include "crimson/osd/pg_map.h" +#include "crimson/osd/scrub/pg_scrubber.h" namespace crimson::osd { @@ -30,6 +31,7 @@ struct LttngBackend PG_OSDMapGate::OSDMapBlocker::BlockingEvent::Backend, ClientRequest::PGPipeline::WaitForActive::BlockingEvent::Backend, PGActivationBlocker::BlockingEvent::Backend, + scrub::PGScrubber::BlockingEvent::Backend, ClientRequest::PGPipeline::RecoverMissing::BlockingEvent::Backend, ClientRequest::PGPipeline::GetOBC::BlockingEvent::Backend, ClientRequest::PGPipeline::Process::BlockingEvent::Backend, @@ -91,6 +93,11 @@ struct LttngBackend const PGActivationBlocker& blocker) override { } + void handle(scrub::PGScrubber::BlockingEvent& ev, + const Operation& op, + const scrub::PGScrubber& blocker) override { + } + void handle(ClientRequest::PGPipeline::RecoverMissing::BlockingEvent& ev, const Operation& op, const ClientRequest::PGPipeline::RecoverMissing& blocker) override { @@ -136,6 +143,7 @@ struct HistoricBackend PG_OSDMapGate::OSDMapBlocker::BlockingEvent::Backend, ClientRequest::PGPipeline::WaitForActive::BlockingEvent::Backend, PGActivationBlocker::BlockingEvent::Backend, + scrub::PGScrubber::BlockingEvent::Backend, ClientRequest::PGPipeline::RecoverMissing::BlockingEvent::Backend, ClientRequest::PGPipeline::GetOBC::BlockingEvent::Backend, ClientRequest::PGPipeline::Process::BlockingEvent::Backend, @@ -197,6 +205,11 @@ struct HistoricBackend const PGActivationBlocker& blocker) override { } + void handle(scrub::PGScrubber::BlockingEvent& ev, + const Operation& op, + const scrub::PGScrubber& blocker) override { + } + void handle(ClientRequest::PGPipeline::RecoverMissing::BlockingEvent& ev, const Operation& op, const ClientRequest::PGPipeline::RecoverMissing& blocker) override { diff --git a/src/crimson/osd/osd_operations/client_request.cc b/src/crimson/osd/osd_operations/client_request.cc index 62eb82a281a..120b92ae094 100644 --- a/src/crimson/osd/osd_operations/client_request.cc +++ b/src/crimson/osd/osd_operations/client_request.cc @@ -250,21 +250,31 @@ ClientRequest::process_op( DEBUGDPP("{}.{}: entered get_obc stage, about to wait_scrub", *pg, *this, this_instance_id); op_info.set_from_op(&*m, *pg->get_osdmap()); - return pg->with_locked_obc( - m->get_hobj(), op_info, - [FNAME, this, pg, this_instance_id, &ihref]( - auto head, auto obc) mutable { - DEBUGDPP("{}.{}: got obc {}, entering process stage", - *pg, *this, this_instance_id, obc->obs); - return ihref.enter_stage( - client_pp(*pg).process, *this - ).then_interruptible( - [FNAME, this, pg, this_instance_id, obc, &ihref]() mutable { - DEBUGDPP("{}.{}: in process stage, calling do_process", - *pg, *this, this_instance_id); + return ihref.enter_blocker( + *this, + pg->scrubber, + &decltype(pg->scrubber)::wait_scrub, + m->get_hobj() + ).then_interruptible( + [FNAME, this, pg, this_instance_id, &ihref]() mutable { + DEBUGDPP("{}.{}: past scrub blocker, getting obc", + *pg, *this, this_instance_id); + return pg->with_locked_obc( + m->get_hobj(), op_info, + [FNAME, this, pg, this_instance_id, &ihref]( + auto head, auto obc) mutable { + DEBUGDPP("{}.{}: got obc {}, entering process stage", + *pg, *this, this_instance_id, obc->obs); + return ihref.enter_stage( + client_pp(*pg).process, *this + ).then_interruptible( + [FNAME, this, pg, this_instance_id, obc, &ihref]() mutable { + DEBUGDPP("{}.{}: in process stage, calling do_process", + *pg, *this, this_instance_id); return do_process(ihref, pg, obc, this_instance_id); }); - }); + }); + }); }); } }); @@ -354,7 +364,7 @@ ClientRequest::do_process( [FNAME, this, pg, this_instance_id, &ihref]( auto submitted, auto all_completed) mutable { return submitted.then_interruptible( - [FNAME, this, pg, this_instance_id, &ihref] { + [this, pg, &ihref] { return ihref.enter_stage(client_pp(*pg).wait_repop, *this); }).then_interruptible( [FNAME, this, pg, this_instance_id, @@ -371,12 +381,12 @@ ClientRequest::do_process( return conn->send(std::move(reply)); }); }, crimson::ct_error::eagain::handle( - [FNAME, this, pg, this_instance_id, &ihref]() mutable { + [this, pg, this_instance_id, &ihref]() mutable { return process_op(ihref, pg, this_instance_id); })); }); }, crimson::ct_error::eagain::handle( - [FNAME, this, pg, this_instance_id, &ihref]() mutable { + [this, pg, this_instance_id, &ihref]() mutable { return process_op(ihref, pg, this_instance_id); })); } diff --git a/src/crimson/osd/osd_operations/client_request.h b/src/crimson/osd/osd_operations/client_request.h index 6599e4dbc6a..ac4c46981c3 100644 --- a/src/crimson/osd/osd_operations/client_request.h +++ b/src/crimson/osd/osd_operations/client_request.h @@ -17,6 +17,7 @@ #include "crimson/osd/osd_operations/common/pg_pipeline.h" #include "crimson/osd/pg_activation_blocker.h" #include "crimson/osd/pg_map.h" +#include "crimson/osd/scrub/pg_scrubber.h" #include "crimson/common/type_helpers.h" #include "crimson/common/utility.h" #include "messages/MOSDOp.h" @@ -103,6 +104,7 @@ public: PGPipeline::WaitForActive::BlockingEvent, PGActivationBlocker::BlockingEvent, PGPipeline::RecoverMissing::BlockingEvent, + scrub::PGScrubber::BlockingEvent, PGPipeline::GetOBC::BlockingEvent, PGPipeline::Process::BlockingEvent, PGPipeline::WaitRepop::BlockingEvent, diff --git a/src/crimson/osd/osd_operations/scrub_events.cc b/src/crimson/osd/osd_operations/scrub_events.cc new file mode 100644 index 00000000000..4f54cf0b274 --- /dev/null +++ b/src/crimson/osd/osd_operations/scrub_events.cc @@ -0,0 +1,396 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "crimson/common/log.h" +#include "crimson/osd/pg.h" +#include "crimson/osd/osd_connection_priv.h" +#include "messages/MOSDRepScrubMap.h" +#include "scrub_events.h" + +SET_SUBSYS(osd); + +namespace crimson::osd { + +template +PGPeeringPipeline &RemoteScrubEventBaseT::get_peering_pipeline(PG &pg) +{ + return pg.peering_request_pg_pipeline; +} + +template +ConnectionPipeline &RemoteScrubEventBaseT::get_connection_pipeline() +{ + return get_osd_priv(conn.get()).peering_request_conn_pipeline; +} + +template +PerShardPipeline &RemoteScrubEventBaseT::get_pershard_pipeline( + ShardServices &shard_services) +{ + return shard_services.get_client_request_pipeline(); +} + +template +seastar::future<> RemoteScrubEventBaseT::with_pg( + ShardServices &shard_services, Ref pg) +{ + LOG_PREFIX(RemoteEventBaseT::with_pg); + return interruptor::with_interruption([FNAME, this, pg] { + DEBUGDPP("{} pg present", *pg, *that()); + return this->template enter_stage( + get_peering_pipeline(*pg).await_map + ).then_interruptible([this, pg] { + return this->template with_blocking_event< + PG_OSDMapGate::OSDMapBlocker::BlockingEvent + >([this, pg](auto &&trigger) { + return pg->osdmap_gate.wait_for_map( + std::move(trigger), get_epoch()); + }); + }).then_interruptible([this, pg](auto) { + return this->template enter_stage( + get_peering_pipeline(*pg).process); + }).then_interruptible([this, pg] { + return handle_event(*pg); + }); + }, [FNAME, pg, this](std::exception_ptr ep) { + DEBUGDPP("{} interrupted with {}", *pg, *that(), ep); + }, pg); +} + +ScrubRequested::ifut<> ScrubRequested::handle_event(PG &pg) +{ + pg.scrubber.handle_scrub_requested(deep); + return seastar::now(); +} + +ScrubMessage::ifut<> ScrubMessage::handle_event(PG &pg) +{ + pg.scrubber.handle_scrub_message(*m); + return seastar::now(); +} + +template class RemoteScrubEventBaseT; +template class RemoteScrubEventBaseT; + +template +ScrubAsyncOpT::ScrubAsyncOpT(Ref pg) : pg(pg) {} + +template +typename ScrubAsyncOpT::template ifut<> ScrubAsyncOpT::start() +{ + LOG_PREFIX(ScrubAsyncOpT::start); + DEBUGDPP("{} starting", *pg, *this); + return run(*pg); +} + +ScrubFindRange::ifut<> ScrubFindRange::run(PG &pg) +{ + LOG_PREFIX(ScrubFindRange::run); + using crimson::common::local_conf; + return interruptor::make_interruptible( + pg.shard_services.get_store().list_objects( + pg.get_collection_ref(), + ghobject_t(begin, ghobject_t::NO_GEN, pg.get_pgid().shard), + ghobject_t::get_max(), + local_conf().get_val("osd_scrub_chunk_max") + ) + ).then_interruptible([FNAME, this, &pg](auto ret) { + auto &[_, next] = ret; + + // We rely on seeing an entire set of snapshots in a single chunk + auto end = next.hobj.get_max_object_boundary(); + + DEBUGDPP("got next.hobj: {}, returning begin, end: {}, {}", + pg, next.hobj, begin, end); + pg.scrubber.machine.process_event( + scrub::ScrubContext::request_range_complete_t{begin, end}); + }); +} + +template class ScrubAsyncOpT; + +ScrubReserveRange::ifut<> ScrubReserveRange::run(PG &pg) +{ + LOG_PREFIX(ScrubReserveRange::run); + DEBUGDPP("", pg); + return pg.background_process_lock.lock( + ).then_interruptible([FNAME, this, &pg] { + DEBUGDPP("pg_background_io_mutex locked", pg); + auto &scrubber = pg.scrubber; + ceph_assert(!scrubber.blocked); + scrubber.blocked = scrub::blocked_range_t{begin, end}; + blocked_set = true; + auto& log = pg.peering_state.get_pg_log().get_log().log; + auto p = find_if( + log.crbegin(), log.crend(), + [this](const auto& e) -> bool { + return e.soid >= begin && e.soid < end; + }); + + if (p == log.crend()) { + return scrubber.machine.process_event( + scrub::ScrubContext::reserve_range_complete_t{eversion_t{}}); + } else { + return scrubber.machine.process_event( + scrub::ScrubContext::reserve_range_complete_t{p->version}); + } + }).finally([&pg, this] { + if (!blocked_set) { + pg.background_process_lock.unlock(); + } + }); +} + +template class ScrubAsyncOpT; + +ScrubScan::ifut<> ScrubScan::run(PG &pg) +{ + LOG_PREFIX(ScrubScan::start); + // legacy value, unused + ret.valid_through = pg.get_info().last_update; + + DEBUGDPP("begin: {}, end: {}", pg, begin, end); + return interruptor::make_interruptible( + pg.shard_services.get_store().list_objects( + pg.get_collection_ref(), + ghobject_t(begin, ghobject_t::NO_GEN, pg.get_pgid().shard), + ghobject_t(end, ghobject_t::NO_GEN, pg.get_pgid().shard), + std::numeric_limits::max()) + ).then_interruptible([FNAME, this, &pg](auto &&result) { + DEBUGDPP("listed {} objects", pg, std::get<0>(result).size()); + return seastar::do_with( + std::move(std::get<0>(result)), + [this, &pg](auto &objects) { + return interruptor::do_for_each( + objects, + [this, &pg](auto &obj) { + if (obj.is_pgmeta() || obj.hobj.is_temp()) { + return interruptor::now(); + } else { + return scan_object(pg, obj); + } + }); + }); + }).then_interruptible([FNAME, this, &pg] { + if (local) { + DEBUGDPP("complete, submitting local event", pg); + pg.scrubber.handle_event( + scrub::ScrubContext::scan_range_complete_t( + pg.get_pg_whoami(), + std::move(ret))); + return seastar::now(); + } else { + DEBUGDPP("complete, sending response to primary", pg); + auto m = crimson::make_message( + spg_t(pg.get_pgid().pgid, pg.get_primary().shard), + pg.get_osdmap_epoch(), + pg.get_pg_whoami()); + encode(ret, m->get_data()); + pg.scrubber.handle_event( + scrub::ScrubContext::generate_and_submit_chunk_result_complete_t{}); + return pg.shard_services.send_to_osd( + pg.get_primary().osd, + std::move(m), + pg.get_osdmap_epoch()); + } + }); +} + +ScrubScan::ifut<> ScrubScan::scan_object( + PG &pg, + const ghobject_t &obj) +{ + LOG_PREFIX(ScrubScan::scan_object); + DEBUGDPP("obj: {}", pg, obj); + auto &entry = ret.objects[obj.hobj]; + return interruptor::make_interruptible( + pg.shard_services.get_store().stat( + pg.get_collection_ref(), + obj) + ).then_interruptible([FNAME, &pg, &obj, &entry](struct stat obj_stat) { + DEBUGDPP("obj: {}, stat complete, size {}", pg, obj, obj_stat.st_size); + entry.size = obj_stat.st_size; + return pg.shard_services.get_store().get_attrs( + pg.get_collection_ref(), + obj); + }).safe_then_interruptible([FNAME, &pg, &obj, &entry](auto &&attrs) { + DEBUGDPP("obj: {}, got {} attrs", pg, obj, attrs.size()); + for (auto &i : attrs) { + i.second.rebuild(); + if (i.second.length() == 0) { + entry.attrs[i.first]; + } else { + entry.attrs.emplace(i.first, i.second.front()); + } + } + }).handle_error_interruptible( + ct_error::all_same_way([FNAME, &pg, &obj, &entry](auto e) { + DEBUGDPP("obj: {} stat error", pg, obj); + entry.stat_error = true; + }) + ).then_interruptible([FNAME, this, &pg, &obj] { + if (deep) { + DEBUGDPP("obj: {} doing deep scan", pg, obj); + return deep_scan_object(pg, obj); + } else { + return interruptor::now(); + } + }); + +} + +struct obj_scrub_progress_t { + // nullopt once complete + std::optional offset = 0; + ceph::buffer::hash data_hash{std::numeric_limits::max()}; + + bool header_done = false; + std::optional next_key; + bool keys_done = false; + ceph::buffer::hash omap_hash{std::numeric_limits::max()}; +}; +ScrubScan::ifut<> ScrubScan::deep_scan_object( + PG &pg, + const ghobject_t &obj) +{ + LOG_PREFIX(ScrubScan::deep_scan_object); + DEBUGDPP("obj: {}", pg, obj); + using crimson::common::local_conf; + auto &entry = ret.objects[obj.hobj]; + return interruptor::repeat( + [FNAME, this, progress = obj_scrub_progress_t(), + &obj, &entry, &pg]() mutable + -> interruptible_future { + if (progress.offset) { + DEBUGDPP("op: {}, obj: {}, progress: {} scanning data", + pg, *this, obj, progress); + const auto stride = local_conf().get_val( + "osd_deep_scrub_stride"); + return pg.shard_services.get_store().read( + pg.get_collection_ref(), + obj, + *(progress.offset), + stride + ).safe_then([stride, &progress, &entry](auto bl) { + progress.data_hash << bl; + if (bl.length() < stride) { + progress.offset = std::nullopt; + entry.digest = progress.data_hash.digest(); + entry.digest_present = true; + } else { + ceph_assert(stride == bl.length()); + *(progress.offset) += stride; + } + }).handle_error( + ct_error::all_same_way([&progress, &entry](auto e) { + entry.read_error = true; + progress.offset = std::nullopt; + }) + ).then([] { + return interruptor::make_interruptible( + seastar::make_ready_future( + seastar::stop_iteration::no)); + }); + } else if (!progress.header_done) { + DEBUGDPP("op: {}, obj: {}, progress: {} scanning omap header", + pg, *this, obj, progress); + return pg.shard_services.get_store().omap_get_header( + pg.get_collection_ref(), + obj + ).safe_then([&progress](auto bl) { + progress.omap_hash << bl; + }).handle_error( + ct_error::enodata::handle([] {}), + ct_error::all_same_way([&entry](auto e) { + entry.read_error = true; + }) + ).then([&progress] { + progress.header_done = true; + return interruptor::make_interruptible( + seastar::make_ready_future( + seastar::stop_iteration::no)); + }); + } else if (!progress.keys_done) { + DEBUGDPP("op: {}, obj: {}, progress: {} scanning omap keys", + pg, *this, obj, progress); + return pg.shard_services.get_store().omap_get_values( + pg.get_collection_ref(), + obj, + progress.next_key + ).safe_then([FNAME, this, &obj, &progress, &entry, &pg](auto result) { + const auto &[done, omap] = result; + DEBUGDPP("op: {}, obj: {}, progress: {} got {} keys", + pg, *this, obj, progress, omap.size()); + for (const auto &p : omap) { + bufferlist bl; + encode(p.first, bl); + encode(p.second, bl); + progress.omap_hash << bl; + entry.object_omap_keys++; + entry.object_omap_bytes += p.second.length(); + } + if (done) { + DEBUGDPP("op: {}, obj: {}, progress: {} omap done", + pg, *this, obj, progress); + progress.keys_done = true; + entry.omap_digest = progress.omap_hash.digest(); + entry.omap_digest_present = true; + + if ((entry.object_omap_keys > + local_conf().get_val( + "osd_deep_scrub_large_omap_object_key_threshold")) || + (entry.object_omap_bytes > + local_conf().get_val( + "osd_deep_scrub_large_omap_object_value_sum_threshold"))) { + entry.large_omap_object_found = true; + entry.large_omap_object_key_count = entry.object_omap_keys; + ret.has_large_omap_object_errors = true; + } + } else { + ceph_assert(!omap.empty()); // omap_get_values invariant + DEBUGDPP("op: {}, obj: {}, progress: {} omap not done, next {}", + pg, *this, obj, progress, omap.crbegin()->first); + progress.next_key = omap.crbegin()->first; + } + }).handle_error( + ct_error::all_same_way([FNAME, this, &obj, &progress, &entry, &pg] + (auto e) { + DEBUGDPP("op: {}, obj: {}, progress: {} error reading omap {}", + pg, *this, obj, progress, e); + progress.keys_done = true; + entry.read_error = true; + }) + ).then([] { + return interruptor::make_interruptible( + seastar::make_ready_future( + seastar::stop_iteration::no)); + }); + } else { + DEBUGDPP("op: {}, obj: {}, progress: {} done", + pg, *this, obj, progress); + return interruptor::make_interruptible( + seastar::make_ready_future( + seastar::stop_iteration::yes)); + } + }); +} + +template class ScrubAsyncOpT; + +} + +template <> +struct fmt::formatter { + constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); } + template + auto format(const crimson::osd::obj_scrub_progress_t &progress, + FormatContext& ctx) + { + return fmt::format_to( + ctx.out(), + "obj_scrub_progress_t(offset: {}, " + "header_done: {}, next_key: {}, keys_done: {})", + progress.offset, progress.header_done, + progress.next_key, progress.keys_done); + } +}; diff --git a/src/crimson/osd/osd_operations/scrub_events.h b/src/crimson/osd/osd_operations/scrub_events.h new file mode 100644 index 00000000000..0793983d8c6 --- /dev/null +++ b/src/crimson/osd/osd_operations/scrub_events.h @@ -0,0 +1,290 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include "common/Formatter.h" +#include "crimson/osd/osd_operation.h" +#include "crimson/osd/scrub/pg_scrubber.h" +#include "osd/osd_types.h" +#include "peering_event.h" + +namespace crimson::osd { + +class PG; + +template +class RemoteScrubEventBaseT : public PhasedOperationT { + T* that() { + return static_cast(this); + } + const T* that() const { + return static_cast(this); + } + + PipelineHandle handle; + + crimson::net::ConnectionRef conn; + epoch_t epoch; + spg_t pgid; + +protected: + using interruptor = InterruptibleOperation::interruptor; + + template + using ifut = InterruptibleOperation::interruptible_future; + + virtual ifut<> handle_event(PG &pg) = 0; +public: + RemoteScrubEventBaseT( + crimson::net::ConnectionRef conn, epoch_t epoch, spg_t pgid) + : conn(conn), epoch(epoch), pgid(pgid) {} + + PGPeeringPipeline &get_peering_pipeline(PG &pg); + ConnectionPipeline &get_connection_pipeline(); + PerShardPipeline &get_pershard_pipeline(ShardServices &); + + crimson::net::Connection &get_connection() { + assert(conn); + return *conn; + }; + + static constexpr bool can_create() { return false; } + + spg_t get_pgid() const { + return pgid; + } + + PipelineHandle &get_handle() { return handle; } + epoch_t get_epoch() const { return epoch; } + + seastar::future prepare_remote_submission() { + assert(conn); + return conn.get_foreign( + ).then([this](auto f_conn) { + conn.reset(); + return f_conn; + }); + } + void finish_remote_submission(crimson::net::ConnectionFRef _conn) { + assert(!conn); + conn = make_local_shared_foreign(std::move(_conn)); + } + + seastar::future<> with_pg( + ShardServices &shard_services, Ref pg); + + std::tuple< + class TrackableOperationT::StartEvent, + ConnectionPipeline::AwaitActive::BlockingEvent, + ConnectionPipeline::AwaitMap::BlockingEvent, + OSD_OSDMapGate::OSDMapBlocker::BlockingEvent, + ConnectionPipeline::GetPGMapping::BlockingEvent, + PerShardPipeline::CreateOrWaitPG::BlockingEvent, + PGMap::PGCreationBlockingEvent, + PGPeeringPipeline::AwaitMap::BlockingEvent, + PG_OSDMapGate::OSDMapBlocker::BlockingEvent, + PGPeeringPipeline::Process::BlockingEvent, + class TrackableOperationT::CompletionEvent + > tracking_events; + + virtual ~RemoteScrubEventBaseT() = default; +}; + +class ScrubRequested final : public RemoteScrubEventBaseT { + bool deep = false; +protected: + ifut<> handle_event(PG &pg) final; + +public: + static constexpr OperationTypeCode type = OperationTypeCode::scrub_requested; + + template + ScrubRequested(bool deep, Args&&... base_args) + : RemoteScrubEventBaseT(std::forward(base_args)...), + deep(deep) {} + + void print(std::ostream &out) const final { + out << "(deep=" << deep << ")"; + } + void dump_detail(ceph::Formatter *f) const final { + f->dump_bool("deep", deep); + } + +}; + +class ScrubMessage final : public RemoteScrubEventBaseT { + MessageRef m; +protected: + ifut<> handle_event(PG &pg) final; + +public: + static constexpr OperationTypeCode type = OperationTypeCode::scrub_message; + + template + ScrubMessage(MessageRef m, Args&&... base_args) + : RemoteScrubEventBaseT(std::forward(base_args)...), + m(m) { + ceph_assert(scrub::PGScrubber::is_scrub_message(*m)); + } + + void print(std::ostream &out) const final { + out << "(m=" << *m << ")"; + } + void dump_detail(ceph::Formatter *f) const final { + f->dump_stream("m") << *m; + } + +}; + +template +class ScrubAsyncOpT : public TrackableOperationT { + Ref pg; + +public: + using interruptor = InterruptibleOperation::interruptor; + template + using ifut = InterruptibleOperation::interruptible_future; + + ScrubAsyncOpT(Ref pg); + + ifut<> start(); + + virtual ~ScrubAsyncOpT() = default; + +protected: + virtual ifut<> run(PG &pg) = 0; +}; + +class ScrubFindRange : public ScrubAsyncOpT { + hobject_t begin; +public: + static constexpr OperationTypeCode type = OperationTypeCode::scrub_find_range; + + template + ScrubFindRange(const hobject_t &begin, Args&&... args) + : ScrubAsyncOpT(std::forward(args)...), begin(begin) {} + + void print(std::ostream &out) const final { + out << "(begin=" << begin << ")"; + } + void dump_detail(ceph::Formatter *f) const final { + f->dump_stream("begin") << begin; + } + + +protected: + ifut<> run(PG &pg) final; +}; + +class ScrubReserveRange : public ScrubAsyncOpT { + hobject_t begin; + hobject_t end; + + /// see run(), used to unlock background_io_mutex on interval change + bool blocked_set = false; +public: + static constexpr OperationTypeCode type = + OperationTypeCode::scrub_reserve_range; + + template + ScrubReserveRange(const hobject_t &begin, const hobject_t &end, Args&&... args) + : ScrubAsyncOpT(std::forward(args)...), begin(begin), end(end) {} + + void print(std::ostream &out) const final { + out << "(begin=" << begin << ", end=" << end << ")"; + } + void dump_detail(ceph::Formatter *f) const final { + f->dump_stream("begin") << begin; + f->dump_stream("end") << end; + } + + +protected: + ifut<> run(PG &pg) final; +}; + +class ScrubScan : public ScrubAsyncOpT { + /// deep or shallow scrub + const bool deep; + + /// true: send event locally, false: send result to primary + const bool local; + + /// object range to scan: [begin, end) + const hobject_t begin; + const hobject_t end; + + /// result, see local + ScrubMap ret; + + ifut<> scan_object(PG &pg, const ghobject_t &obj); + ifut<> deep_scan_object(PG &pg, const ghobject_t &obj); + +public: + static constexpr OperationTypeCode type = OperationTypeCode::scrub_scan; + + void print(std::ostream &out) const final { + out << "(deep=" << deep + << ", local=" << local + << ", begin=" << begin + << ", end=" << end + << ")"; + } + void dump_detail(ceph::Formatter *f) const final { + f->dump_bool("deep", deep); + f->dump_bool("local", local); + f->dump_stream("begin") << begin; + f->dump_stream("end") << end; + } + + ScrubScan( + Ref pg, bool deep, bool local, + const hobject_t &begin, const hobject_t &end) + : ScrubAsyncOpT(pg), deep(deep), local(local), begin(begin), end(end) {} + +protected: + ifut<> run(PG &pg) final; +}; + +} + +namespace crimson { + +template <> +struct EventBackendRegistry { + static std::tuple<> get_backends() { + return {}; + } +}; + +template <> +struct EventBackendRegistry { + static std::tuple<> get_backends() { + return {}; + } +}; + +} + +#if FMT_VERSION >= 90000 +template <> struct fmt::formatter + : fmt::ostream_formatter {}; + +template <> struct fmt::formatter + : fmt::ostream_formatter {}; + +template +struct fmt::formatter> + : fmt::ostream_formatter {}; + +template <> struct fmt::formatter + : fmt::ostream_formatter {}; + +template <> struct fmt::formatter + : fmt::ostream_formatter {}; + +template <> struct fmt::formatter + : fmt::ostream_formatter {}; + +#endif diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index 5b7f1fd2578..6537eed2b7d 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -125,6 +125,7 @@ PG::PG( osdmap, this, this), + scrubber(*this), obc_registry{ local_conf()}, obc_loader{ @@ -144,6 +145,7 @@ PG::PG( pgid.shard), wait_for_active_blocker(this) { + scrubber.initiate(); peering_state.set_backend_predicates( new ReadablePredicate(pg_whoami), new RecoverablePredicate()); @@ -331,6 +333,12 @@ void PG::on_activate(interval_set snaps) projected_last_update = peering_state.get_info().last_update; } +void PG::on_replica_activate() +{ + logger().debug("{}: {}", *this, __func__); + scrubber.on_replica_activate(); +} + void PG::on_activate_complete() { wait_for_active_blocker.unblock(); @@ -458,7 +466,7 @@ PG::do_delete_work(ceph::os::Transaction &t, ghobject_t _next) Context *PG::on_clean() { - // Not needed yet (will be needed for IO unblocking) + scrubber.on_primary_active_clean(); return nullptr; } @@ -1347,6 +1355,9 @@ void PG::log_operation( if (!is_primary()) { // && !is_ec_pg() replica_clear_repop_obc(logv); } + if (!logv.empty()) { + scrubber.on_log_update(logv.rbegin()->version); + } peering_state.append_log(std::move(logv), trim_to, roll_forward_to, @@ -1527,6 +1538,7 @@ void PG::on_change(ceph::os::Transaction &t) { logger().debug("{} {}: dropping requests", *this, __func__); client_request_orderer.clear_and_cancel(*this); } + scrubber.on_interval_change(); } void PG::context_registry_on_change() { diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index 7f734ecd5e7..80a181f24a2 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -39,6 +39,7 @@ #include "crimson/osd/pg_recovery_listener.h" #include "crimson/osd/recovery_backend.h" #include "crimson/osd/object_context_loader.h" +#include "crimson/osd/scrub/pg_scrubber.h" class MQuery; class OSDMap; @@ -160,8 +161,6 @@ public: bool need_write_epoch, ceph::os::Transaction &t) final; - void scrub_requested(scrub_level_t scrub_level, scrub_type_t scrub_type) final; - uint64_t get_snap_trimq_size() const final { return std::size(snap_trimq); } @@ -318,6 +317,7 @@ public: } void on_change(ceph::os::Transaction &t) final; void on_activate(interval_set to_trim) final; + void on_replica_activate() final; void on_activate_complete() final; void on_new_interval() final { // Not needed yet @@ -627,6 +627,18 @@ private: eversion_t projected_last_update; public: + // scrub state + + friend class ScrubScan; + friend class ScrubFindRange; + friend class ScrubReserveRange; + friend class scrub::PGScrubber; + template friend class RemoteScrubEventBaseT; + + scrub::PGScrubber scrubber; + + void scrub_requested(scrub_level_t scrub_level, scrub_type_t scrub_type) final; + ObjectContextRegistry obc_registry; ObjectContextLoader obc_loader; diff --git a/src/crimson/osd/scrub/pg_scrubber.cc b/src/crimson/osd/scrub/pg_scrubber.cc new file mode 100644 index 00000000000..3ac2c910db2 --- /dev/null +++ b/src/crimson/osd/scrub/pg_scrubber.cc @@ -0,0 +1,309 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab expandtab + +#include + +#include "crimson/common/log.h" +#include "crimson/osd/pg.h" +#include "crimson/osd/osd_operations/scrub_events.h" +#include "messages/MOSDRepScrub.h" +#include "messages/MOSDRepScrubMap.h" +#include "pg_scrubber.h" + +SET_SUBSYS(osd); + +namespace crimson::osd::scrub { + +void PGScrubber::dump_detail(Formatter *f) const +{ + f->dump_stream("pgid") << pg.get_pgid(); +} + +PGScrubber::PGScrubber(PG &pg) : pg(pg), dpp(pg), machine(*this) {} + +void PGScrubber::on_primary_active_clean() +{ + LOG_PREFIX(PGScrubber::on_primary_active_clean); + DEBUGDPP("", pg); + handle_event(events::primary_activate_t{}); +} + +void PGScrubber::on_replica_activate() +{ + LOG_PREFIX(PGScrubber::on_replica_activate); + DEBUGDPP("", pg); + handle_event(events::replica_activate_t{}); +} + +void PGScrubber::on_interval_change() +{ + LOG_PREFIX(PGScrubber::on_interval_change); + DEBUGDPP("", pg); + /* Once reservations and scheduling are introduced, we'll need an + * IntervalChange event to drop remote resources (they'll be automatically + * released on the other side) */ + handle_event(events::reset_t{}); + waiting_for_update = std::nullopt; + ceph_assert(!blocked); +} + +void PGScrubber::on_log_update(eversion_t v) +{ + LOG_PREFIX(PGScrubber::on_interval_change); + if (waiting_for_update && v >= *waiting_for_update) { + DEBUGDPP("waiting_for_update: {}, v: {}", pg, *waiting_for_update, v); + handle_event(await_update_complete_t{}); + waiting_for_update = std::nullopt; + } +} + +void PGScrubber::handle_scrub_requested(bool deep) +{ + LOG_PREFIX(PGScrubber::handle_scrub_requested); + DEBUGDPP("deep: {}", pg, deep); + handle_event(events::start_scrub_t{deep}); +} + +void PGScrubber::handle_scrub_message(Message &_m) +{ + LOG_PREFIX(PGScrubber::handle_scrub_requested); + switch (_m.get_type()) { + case MSG_OSD_REP_SCRUB: { + MOSDRepScrub &m = *static_cast(&_m); + DEBUGDPP("MOSDRepScrub: {}", pg, m); + handle_event(events::replica_scan_t{ + m.start, m.end, m.scrub_from, m.deep + }); + break; + } + case MSG_OSD_REP_SCRUBMAP: { + MOSDRepScrubMap &m = *static_cast(&_m); + DEBUGDPP("MOSDRepScrubMap: {}", pg, m); + ScrubMap map; + auto iter = m.get_data().cbegin(); + ::decode(map, iter); + handle_event(scan_range_complete_t{ + m.from, std::move(map) + }); + break; + } + default: + DEBUGDPP("invalid message: {}", pg, _m); + ceph_assert(is_scrub_message(_m)); + } +} + +void PGScrubber::handle_op_stats( + const hobject_t &on_object, + object_stat_sum_t delta_stats) { + handle_event(events::op_stats_t{on_object, delta_stats}); +} + +PGScrubber::ifut<> PGScrubber::wait_scrub( + PGScrubber::BlockingEvent::TriggerI&& trigger, + const hobject_t &hoid) +{ + LOG_PREFIX(PGScrubber::wait_scrub); + if (blocked && (hoid >= blocked->begin) && (hoid < blocked->end)) { + DEBUGDPP("blocked: {}, hoid: {}", pg, *blocked, hoid); + return trigger.maybe_record_blocking( + blocked->p.get_shared_future(), + *this); + } else { + return seastar::now(); + } +} + +void PGScrubber::notify_scrub_start(bool deep) +{ + LOG_PREFIX(PGScrubber::notify_scrub_start); + DEBUGDPP("deep: {}", pg, deep); + pg.peering_state.state_set(PG_STATE_SCRUBBING); + if (deep) { + pg.peering_state.state_set(PG_STATE_DEEP_SCRUB); + } + pg.publish_stats_to_osd(); +} + +void PGScrubber::notify_scrub_end(bool deep) +{ + LOG_PREFIX(PGScrubber::notify_scrub_end); + DEBUGDPP("deep: {}", pg, deep); + pg.peering_state.state_clear(PG_STATE_SCRUBBING); + if (deep) { + pg.peering_state.state_clear(PG_STATE_DEEP_SCRUB); + } + pg.publish_stats_to_osd(); +} + +const std::set &PGScrubber::get_ids_to_scrub() const +{ + return pg.peering_state.get_actingset(); +} + +chunk_validation_policy_t PGScrubber::get_policy() const +{ + return chunk_validation_policy_t{ + pg.get_primary(), + std::nullopt /* stripe_info, populate when EC is implemented */, + crimson::common::local_conf().get_val( + "osd_max_object_size"), + crimson::common::local_conf().get_val( + "osd_hit_set_namespace"), + crimson::common::local_conf().get_val( + "osd_deep_scrub_large_omap_object_value_sum_threshold"), + crimson::common::local_conf().get_val( + "osd_deep_scrub_large_omap_object_key_threshold") + }; +} + +void PGScrubber::request_range(const hobject_t &start) +{ + LOG_PREFIX(PGScrubber::request_range); + DEBUGDPP("start: {}", pg, start); + std::ignore = pg.shard_services.start_operation_may_interrupt< + interruptor, ScrubFindRange + >(start, &pg); +} + +/* TODO: This isn't actually enough. Here, classic would + * hold the pg lock from the wait_scrub through to IO submission. + * ClientRequest, however, isn't in the processing ExclusivePhase + * bit yet, and so this check may miss ops between the wait_scrub + * check and adding the IO to the log. */ + +void PGScrubber::reserve_range(const hobject_t &start, const hobject_t &end) +{ + LOG_PREFIX(PGScrubber::reserve_range); + DEBUGDPP("start: {}, end: {}", pg, start, end); + std::ignore = pg.shard_services.start_operation_may_interrupt< + interruptor, ScrubReserveRange + >(start, end, &pg); +} + +void PGScrubber::release_range() +{ + LOG_PREFIX(PGScrubber::release_range); + ceph_assert(blocked); + DEBUGDPP("blocked: {}", pg, *blocked); + pg.background_process_lock.unlock(); + blocked->p.set_value(); + blocked = std::nullopt; +} + +void PGScrubber::scan_range( + pg_shard_t target, + eversion_t version, + bool deep, + const hobject_t &start, + const hobject_t &end) +{ + LOG_PREFIX(PGScrubber::scan_range); + DEBUGDPP("target: {}, version: {}, deep: {}, start: {}, end: {}", + pg, target, version, deep, start, end); + if (target == pg.get_pg_whoami()) { + std::ignore = pg.shard_services.start_operation_may_interrupt< + interruptor, ScrubScan + >(&pg, deep, true /* local */, start, end); + } else { + std::ignore = pg.shard_services.send_to_osd( + target.osd, + crimson::make_message( + spg_t(pg.get_pgid().pgid, target.shard), + version, + pg.get_osdmap_epoch(), + pg.get_osdmap_epoch(), + start, + end, + deep, + false /* allow preemption -- irrelevant for replicas TODO */, + 64 /* priority, TODO */, + false /* high_priority TODO */), + pg.get_osdmap_epoch()); + } +} + +bool PGScrubber::await_update(const eversion_t &version) +{ + LOG_PREFIX(PGScrubber::await_update); + DEBUGDPP("version: {}", pg, version); + ceph_assert(!waiting_for_update); + auto& log = pg.peering_state.get_pg_log().get_log().log; + eversion_t current = log.empty() ? eversion_t() : log.rbegin()->version; + if (version <= current) { + return true; + } else { + waiting_for_update = version; + return false; + } +} + +void PGScrubber::generate_and_submit_chunk_result( + const hobject_t &begin, + const hobject_t &end, + bool deep) +{ + LOG_PREFIX(PGScrubber::generate_and_submit_chunk_result); + DEBUGDPP("begin: {}, end: {}, deep: {}", pg, begin, end, deep); + std::ignore = pg.shard_services.start_operation_may_interrupt< + interruptor, ScrubScan + >(&pg, deep, false /* local */, begin, end); +} + +#define LOG_SCRUB_ERROR(MSG, ...) { \ + auto errorstr = fmt::format(MSG, __VA_ARGS__); \ + ERRORDPP("{}", pg, errorstr); \ + pg.get_clog_error() << "pg " << pg.get_pgid() << ": " << errorstr; \ + } + +void PGScrubber::emit_chunk_result( + const request_range_result_t &range, + chunk_result_t &&result) +{ + LOG_PREFIX(PGScrubber::emit_chunk_result); + if (result.has_errors()) { + LOG_SCRUB_ERROR( + "Scrub errors found. range: {}, result: {}", + range, result); + } else { + DEBUGDPP("Chunk complete. range: {}", pg, range); + } +} + +void PGScrubber::emit_scrub_result( + bool deep, + object_stat_sum_t in_stats) +{ + LOG_PREFIX(PGScrubber::emit_scrub_result); + DEBUGDPP("", pg); + pg.peering_state.update_stats( + [this, FNAME, deep, &in_stats](auto &history, auto &pg_stats) { + foreach_scrub_maintained_stat( + [deep, &pg_stats, &in_stats]( + const auto &name, auto statptr, bool skip_for_shallow) { + if (deep && !skip_for_shallow) { + pg_stats.stats.sum.*statptr = in_stats.*statptr; + } + }); + foreach_scrub_checked_stat( + [this, FNAME, &pg_stats, &in_stats]( + const auto &name, auto statptr, const auto &invalid_predicate) { + if (!invalid_predicate(pg_stats) && + (in_stats.*statptr != pg_stats.stats.sum.*statptr)) { + LOG_SCRUB_ERROR( + "stat mismatch for {}: scrubbed value: {}, stored pg value: {}", + name, in_stats.*statptr, pg_stats.stats.sum.*statptr); + ++pg_stats.stats.sum.num_shallow_scrub_errors; + } + }); + history.last_scrub = pg.peering_state.get_info().last_update; + auto now = ceph_clock_now(); + history.last_scrub_stamp = now; + if (deep) { + history.last_deep_scrub_stamp = now; + } + return false; // notify_scrub_end will flush stats to osd + }); +} + +} diff --git a/src/crimson/osd/scrub/pg_scrubber.h b/src/crimson/osd/scrub/pg_scrubber.h new file mode 100644 index 00000000000..2d528e04d4b --- /dev/null +++ b/src/crimson/osd/scrub/pg_scrubber.h @@ -0,0 +1,152 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab expandtab + +#pragma once + +#include "crimson/osd/pg_interval_interrupt_condition.h" +#include "scrub_machine.h" + +namespace crimson::osd { +class PG; +class ScrubScan; +class ScrubFindRange; +class ScrubReserveRange; +} + +namespace crimson::osd::scrub { + +struct blocked_range_t { + hobject_t begin; + hobject_t end; + seastar::shared_promise<> p; +}; + +class PGScrubber : public crimson::BlockerT, ScrubContext { + friend class ::crimson::osd::ScrubScan; + friend class ::crimson::osd::ScrubFindRange; + friend class ::crimson::osd::ScrubReserveRange; + + using interruptor = ::crimson::interruptible::interruptor< + ::crimson::osd::IOInterruptCondition>; + template + using ifut = + ::crimson::interruptible::interruptible_future< + ::crimson::osd::IOInterruptCondition, T>; + + PG &pg; + + /// PG alias for logging in header functions + DoutPrefixProvider &dpp; + + ScrubMachine machine; + + std::optional blocked; + + std::optional waiting_for_update; + + template + void handle_event(E &&e) + { + LOG_PREFIX(PGScrubber::handle_event); + SUBDEBUGDPP(osd, "handle_event: {}", dpp, e); + machine.process_event(std::forward(e)); + } + +public: + static constexpr const char *type_name = "PGScrubber"; + using Blocker = PGScrubber; + void dump_detail(Formatter *f) const; + + static inline bool is_scrub_message(Message &m) { + switch (m.get_type()) { + case MSG_OSD_REP_SCRUB: + case MSG_OSD_REP_SCRUBMAP: + return true; + default: + return false; + } + return false; + } + + PGScrubber(PG &pg); + + /// setup scrub machine state + void initiate() { machine.initiate(); } + + /// notify machine on primary that PG is active+clean + void on_primary_active_clean(); + + /// notify machine on replica that PG is active + void on_replica_activate(); + + /// notify machine of interval change + void on_interval_change(); + + /// notify machine that PG has committed up to versino v + void on_log_update(eversion_t v); + + /// handle scrub request + void handle_scrub_requested(bool deep); + + + /// handle other scrub message + void handle_scrub_message(Message &m); + + /// notify machine of a mutation of on_object resulting in delta_stats + void handle_op_stats( + const hobject_t &on_object, + object_stat_sum_t delta_stats); + + /// maybe block an op trying to mutate hoid until chunk is complete + ifut<> wait_scrub( + PGScrubber::BlockingEvent::TriggerI&& trigger, + const hobject_t &hoid); + +private: + DoutPrefixProvider &get_dpp() final { return dpp; } + + void notify_scrub_start(bool deep) final; + void notify_scrub_end(bool deep) final; + + const std::set &get_ids_to_scrub() const final; + + chunk_validation_policy_t get_policy() const final; + + void request_range(const hobject_t &start) final; + void reserve_range(const hobject_t &start, const hobject_t &end) final; + void release_range() final; + void scan_range( + pg_shard_t target, + eversion_t version, + bool deep, + const hobject_t &start, + const hobject_t &end) final; + bool await_update(const eversion_t &version) final; + void generate_and_submit_chunk_result( + const hobject_t &begin, + const hobject_t &end, + bool deep) final; + void emit_chunk_result( + const request_range_result_t &range, + chunk_result_t &&result) final; + void emit_scrub_result( + bool deep, + object_stat_sum_t scrub_stats) final; +}; + +}; + +template <> +struct fmt::formatter { + constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); } + + template + auto format(const auto &range, FormatContext& ctx) + { + return fmt::format_to( + ctx.out(), + "{}~{}", + range.begin, + range.end); + } +};