#include <seastar/core/future.hh>
#include "crimson/admin/admin_socket.h"
+#include "crimson/common/log.h"
#include "crimson/osd/osd.h"
#include "crimson/osd/pg.h"
+SET_SUBSYS(osd);
using crimson::osd::OSD;
using crimson::osd::PG;
}
};
+template <bool deep>
+class ScrubCommand : public PGCommand {
+public:
+ explicit ScrubCommand(crimson::osd::OSD& osd) :
+ PGCommand{
+ osd,
+ deep ? "deep_scrub" : "scrub",
+ "",
+ deep ? "deep scrub pg" : "scrub pg"}
+ {}
+
+ seastar::future<tell_result_t>
+ do_command(Ref<PG> pg,
+ const cmdmap_t& cmdmap,
+ std::string_view format,
+ ceph::bufferlist&&) const final
+ {
+ LOG_PREFIX(ScrubCommand::do_command);
+ DEBUGDPP("deep: {}", *pg, deep);
+ return PG::interruptor::with_interruption([pg] {
+ pg->scrubber.handle_scrub_requested(deep);
+ return PG::interruptor::now();
+ }, [FNAME, pg](std::exception_ptr ep) {
+ DEBUGDPP("interrupted with {}", *pg, ep);
+ }, pg).then([format] {
+ std::unique_ptr<Formatter> f{
+ Formatter::create(format, "json-pretty", "json-pretty")
+ };
+ f->open_object_section("scrub");
+ f->dump_bool("deep", deep);
+ f->dump_stream("stamp") << ceph_clock_now();
+ f->close_section();
+ return seastar::make_ready_future<tell_result_t>(std::move(f));
+ });
+ }
+};
+
} // namespace crimson::admin::pg
namespace crimson::admin {
template std::unique_ptr<AdminSocketHook>
make_asok_hook<crimson::admin::pg::MarkUnfoundLostCommand>(crimson::osd::OSD& osd);
+template std::unique_ptr<AdminSocketHook>
+make_asok_hook<crimson::admin::pg::ScrubCommand<true>>(crimson::osd::OSD& osd);
+template std::unique_ptr<AdminSocketHook>
+make_asok_hook<crimson::admin::pg::ScrubCommand<false>>(crimson::osd::OSD& osd);
+
} // namespace crimson::admin
class QueryCommand;
class MarkUnfoundLostCommand;
+template <bool deep>
+class ScrubCommand;
} // namespace crimson::admin::pg
osd_operations/background_recovery.cc
osd_operations/recovery_subrequest.cc
osd_operations/snaptrim_event.cc
+ osd_operations/scrub_events.cc
pg_recovery.cc
recovery_backend.cc
replicated_recovery_backend.cc
scheduler/mclock_scheduler.cc
scrub/scrub_machine.cc
scrub/scrub_validator.cc
+ scrub/pg_scrubber.cc
osdmap_gate.cc
pg_activation_blocker.cc
pg_map.cc
void OpsExecuter::apply_stats()
{
pg->get_peering_state().apply_op_stats(get_target(), delta_stats);
+ pg->scrubber.handle_op_stats(get_target(), delta_stats);
pg->publish_stats_to_osd();
}
#include "crimson/osd/osd_operations/pg_advance_map.h"
#include "crimson/osd/osd_operations/recovery_subrequest.h"
#include "crimson/osd/osd_operations/replicated_request.h"
+#include "crimson/osd/osd_operations/scrub_events.h"
#include "crimson/osd/osd_operation_external_tracking.h"
#include "crimson/crush/CrushLocation.h"
// PG commands
asok->register_command(make_asok_hook<pg::QueryCommand>(*this));
asok->register_command(make_asok_hook<pg::MarkUnfoundLostCommand>(*this));
+ asok->register_command(make_asok_hook<pg::ScrubCommand<true>>(*this));
+ asok->register_command(make_asok_hook<pg::ScrubCommand<false>>(*this));
// ops commands
asok->register_command(
make_asok_hook<DumpInFlightOpsHook>(
case MSG_OSD_REPOPREPLY:
return handle_rep_op_reply(conn, boost::static_pointer_cast<MOSDRepOpReply>(m));
case MSG_OSD_SCRUB2:
- return handle_scrub(conn, boost::static_pointer_cast<MOSDScrub2>(m));
+ return handle_scrub_command(
+ conn, boost::static_pointer_cast<MOSDScrub2>(m));
+ case MSG_OSD_REP_SCRUB:
+ case MSG_OSD_REP_SCRUBMAP:
+ return handle_scrub_message(
+ conn,
+ boost::static_pointer_cast<MOSDFastDispatchOp>(m));
case MSG_OSD_PG_UPDATE_LOG_MISSING:
return handle_update_log_missing(conn, boost::static_pointer_cast<
MOSDPGUpdateLogMissing>(m));
});
}
-seastar::future<> OSD::handle_scrub(
+seastar::future<> OSD::handle_scrub_command(
crimson::net::ConnectionRef conn,
Ref<MOSDScrub2> m)
{
}
return seastar::parallel_for_each(std::move(m->scrub_pgs),
[m, conn, this](spg_t pgid) {
- pg_shard_t from_shard{static_cast<int>(m->get_source().num()),
- pgid.shard};
- PeeringState::RequestScrub scrub_request{m->deep, m->repair};
- return pg_shard_manager.start_pg_operation<RemotePeeringEvent>(
- conn,
- from_shard,
- pgid,
- PGPeeringEvent{m->epoch, m->epoch, scrub_request}).second;
+ return pg_shard_manager.start_pg_operation<
+ crimson::osd::ScrubRequested
+ >(m->deep, conn, m->epoch, pgid).second;
});
}
+seastar::future<> OSD::handle_scrub_message(
+ crimson::net::ConnectionRef conn,
+ Ref<MOSDFastDispatchOp> m)
+{
+ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
+ return pg_shard_manager.start_pg_operation<
+ crimson::osd::ScrubMessage
+ >(m, conn, m->get_min_epoch(), m->get_spg()).second;
+}
+
seastar::future<> OSD::handle_mark_me_down(
crimson::net::ConnectionRef conn,
Ref<MOSDMarkMeDown> m)
Ref<MOSDPeeringOp> m);
seastar::future<> handle_recovery_subreq(crimson::net::ConnectionRef conn,
Ref<MOSDFastDispatchOp> m);
- seastar::future<> handle_scrub(crimson::net::ConnectionRef conn,
- Ref<MOSDScrub2> m);
+ seastar::future<> handle_scrub_command(crimson::net::ConnectionRef conn,
+ Ref<MOSDScrub2> m);
+ seastar::future<> handle_scrub_message(crimson::net::ConnectionRef conn,
+ Ref<MOSDFastDispatchOp> m);
seastar::future<> handle_mark_me_down(crimson::net::ConnectionRef conn,
Ref<MOSDMarkMeDown> m);
logmissing_request_reply,
snaptrim_event,
snaptrimobj_subevent,
+ scrub_requested,
+ scrub_message,
+ scrub_find_range,
+ scrub_reserve_range,
+ scrub_scan,
last_op
};
"logmissing_request_reply",
"snaptrim_event",
"snaptrimobj_subevent",
+ "scrub_requested",
+ "scrub_message",
+ "scrub_find_range",
+ "scrub_reserve_range",
+ "scrub_scan",
};
// prevent the addition of OperationTypeCode-s with no matching OP_NAMES entry:
#include "crimson/osd/osd_operations/snaptrim_event.h"
#include "crimson/osd/pg_activation_blocker.h"
#include "crimson/osd/pg_map.h"
+#include "crimson/osd/scrub/pg_scrubber.h"
namespace crimson::osd {
PG_OSDMapGate::OSDMapBlocker::BlockingEvent::Backend,
ClientRequest::PGPipeline::WaitForActive::BlockingEvent::Backend,
PGActivationBlocker::BlockingEvent::Backend,
+ scrub::PGScrubber::BlockingEvent::Backend,
ClientRequest::PGPipeline::RecoverMissing::BlockingEvent::Backend,
ClientRequest::PGPipeline::GetOBC::BlockingEvent::Backend,
ClientRequest::PGPipeline::Process::BlockingEvent::Backend,
const PGActivationBlocker& blocker) override {
}
+ void handle(scrub::PGScrubber::BlockingEvent& ev,
+ const Operation& op,
+ const scrub::PGScrubber& blocker) override {
+ }
+
void handle(ClientRequest::PGPipeline::RecoverMissing::BlockingEvent& ev,
const Operation& op,
const ClientRequest::PGPipeline::RecoverMissing& blocker) override {
PG_OSDMapGate::OSDMapBlocker::BlockingEvent::Backend,
ClientRequest::PGPipeline::WaitForActive::BlockingEvent::Backend,
PGActivationBlocker::BlockingEvent::Backend,
+ scrub::PGScrubber::BlockingEvent::Backend,
ClientRequest::PGPipeline::RecoverMissing::BlockingEvent::Backend,
ClientRequest::PGPipeline::GetOBC::BlockingEvent::Backend,
ClientRequest::PGPipeline::Process::BlockingEvent::Backend,
const PGActivationBlocker& blocker) override {
}
+ void handle(scrub::PGScrubber::BlockingEvent& ev,
+ const Operation& op,
+ const scrub::PGScrubber& blocker) override {
+ }
+
void handle(ClientRequest::PGPipeline::RecoverMissing::BlockingEvent& ev,
const Operation& op,
const ClientRequest::PGPipeline::RecoverMissing& blocker) override {
DEBUGDPP("{}.{}: entered get_obc stage, about to wait_scrub",
*pg, *this, this_instance_id);
op_info.set_from_op(&*m, *pg->get_osdmap());
- return pg->with_locked_obc(
- m->get_hobj(), op_info,
- [FNAME, this, pg, this_instance_id, &ihref](
- auto head, auto obc) mutable {
- DEBUGDPP("{}.{}: got obc {}, entering process stage",
- *pg, *this, this_instance_id, obc->obs);
- return ihref.enter_stage<interruptor>(
- client_pp(*pg).process, *this
- ).then_interruptible(
- [FNAME, this, pg, this_instance_id, obc, &ihref]() mutable {
- DEBUGDPP("{}.{}: in process stage, calling do_process",
- *pg, *this, this_instance_id);
+ return ihref.enter_blocker(
+ *this,
+ pg->scrubber,
+ &decltype(pg->scrubber)::wait_scrub,
+ m->get_hobj()
+ ).then_interruptible(
+ [FNAME, this, pg, this_instance_id, &ihref]() mutable {
+ DEBUGDPP("{}.{}: past scrub blocker, getting obc",
+ *pg, *this, this_instance_id);
+ return pg->with_locked_obc(
+ m->get_hobj(), op_info,
+ [FNAME, this, pg, this_instance_id, &ihref](
+ auto head, auto obc) mutable {
+ DEBUGDPP("{}.{}: got obc {}, entering process stage",
+ *pg, *this, this_instance_id, obc->obs);
+ return ihref.enter_stage<interruptor>(
+ client_pp(*pg).process, *this
+ ).then_interruptible(
+ [FNAME, this, pg, this_instance_id, obc, &ihref]() mutable {
+ DEBUGDPP("{}.{}: in process stage, calling do_process",
+ *pg, *this, this_instance_id);
return do_process(ihref, pg, obc, this_instance_id);
});
- });
+ });
+ });
});
}
});
[FNAME, this, pg, this_instance_id, &ihref](
auto submitted, auto all_completed) mutable {
return submitted.then_interruptible(
- [FNAME, this, pg, this_instance_id, &ihref] {
+ [this, pg, &ihref] {
return ihref.enter_stage<interruptor>(client_pp(*pg).wait_repop, *this);
}).then_interruptible(
[FNAME, this, pg, this_instance_id,
return conn->send(std::move(reply));
});
}, crimson::ct_error::eagain::handle(
- [FNAME, this, pg, this_instance_id, &ihref]() mutable {
+ [this, pg, this_instance_id, &ihref]() mutable {
return process_op(ihref, pg, this_instance_id);
}));
});
}, crimson::ct_error::eagain::handle(
- [FNAME, this, pg, this_instance_id, &ihref]() mutable {
+ [this, pg, this_instance_id, &ihref]() mutable {
return process_op(ihref, pg, this_instance_id);
}));
}
#include "crimson/osd/osd_operations/common/pg_pipeline.h"
#include "crimson/osd/pg_activation_blocker.h"
#include "crimson/osd/pg_map.h"
+#include "crimson/osd/scrub/pg_scrubber.h"
#include "crimson/common/type_helpers.h"
#include "crimson/common/utility.h"
#include "messages/MOSDOp.h"
PGPipeline::WaitForActive::BlockingEvent,
PGActivationBlocker::BlockingEvent,
PGPipeline::RecoverMissing::BlockingEvent,
+ scrub::PGScrubber::BlockingEvent,
PGPipeline::GetOBC::BlockingEvent,
PGPipeline::Process::BlockingEvent,
PGPipeline::WaitRepop::BlockingEvent,
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "crimson/common/log.h"
+#include "crimson/osd/pg.h"
+#include "crimson/osd/osd_connection_priv.h"
+#include "messages/MOSDRepScrubMap.h"
+#include "scrub_events.h"
+
+SET_SUBSYS(osd);
+
+namespace crimson::osd {
+
+template <class T>
+PGPeeringPipeline &RemoteScrubEventBaseT<T>::get_peering_pipeline(PG &pg)
+{
+ return pg.peering_request_pg_pipeline;
+}
+
+template <class T>
+ConnectionPipeline &RemoteScrubEventBaseT<T>::get_connection_pipeline()
+{
+ return get_osd_priv(conn.get()).peering_request_conn_pipeline;
+}
+
+template <class T>
+PerShardPipeline &RemoteScrubEventBaseT<T>::get_pershard_pipeline(
+ ShardServices &shard_services)
+{
+ return shard_services.get_client_request_pipeline();
+}
+
+template <class T>
+seastar::future<> RemoteScrubEventBaseT<T>::with_pg(
+ ShardServices &shard_services, Ref<PG> pg)
+{
+ LOG_PREFIX(RemoteEventBaseT::with_pg);
+ return interruptor::with_interruption([FNAME, this, pg] {
+ DEBUGDPP("{} pg present", *pg, *that());
+ return this->template enter_stage<interruptor>(
+ get_peering_pipeline(*pg).await_map
+ ).then_interruptible([this, pg] {
+ return this->template with_blocking_event<
+ PG_OSDMapGate::OSDMapBlocker::BlockingEvent
+ >([this, pg](auto &&trigger) {
+ return pg->osdmap_gate.wait_for_map(
+ std::move(trigger), get_epoch());
+ });
+ }).then_interruptible([this, pg](auto) {
+ return this->template enter_stage<interruptor>(
+ get_peering_pipeline(*pg).process);
+ }).then_interruptible([this, pg] {
+ return handle_event(*pg);
+ });
+ }, [FNAME, pg, this](std::exception_ptr ep) {
+ DEBUGDPP("{} interrupted with {}", *pg, *that(), ep);
+ }, pg);
+}
+
+ScrubRequested::ifut<> ScrubRequested::handle_event(PG &pg)
+{
+ pg.scrubber.handle_scrub_requested(deep);
+ return seastar::now();
+}
+
+ScrubMessage::ifut<> ScrubMessage::handle_event(PG &pg)
+{
+ pg.scrubber.handle_scrub_message(*m);
+ return seastar::now();
+}
+
+template class RemoteScrubEventBaseT<ScrubRequested>;
+template class RemoteScrubEventBaseT<ScrubMessage>;
+
+template <typename T>
+ScrubAsyncOpT<T>::ScrubAsyncOpT(Ref<PG> pg) : pg(pg) {}
+
+template <typename T>
+typename ScrubAsyncOpT<T>::template ifut<> ScrubAsyncOpT<T>::start()
+{
+ LOG_PREFIX(ScrubAsyncOpT::start);
+ DEBUGDPP("{} starting", *pg, *this);
+ return run(*pg);
+}
+
+ScrubFindRange::ifut<> ScrubFindRange::run(PG &pg)
+{
+ LOG_PREFIX(ScrubFindRange::run);
+ using crimson::common::local_conf;
+ return interruptor::make_interruptible(
+ pg.shard_services.get_store().list_objects(
+ pg.get_collection_ref(),
+ ghobject_t(begin, ghobject_t::NO_GEN, pg.get_pgid().shard),
+ ghobject_t::get_max(),
+ local_conf().get_val<int64_t>("osd_scrub_chunk_max")
+ )
+ ).then_interruptible([FNAME, this, &pg](auto ret) {
+ auto &[_, next] = ret;
+
+ // We rely on seeing an entire set of snapshots in a single chunk
+ auto end = next.hobj.get_max_object_boundary();
+
+ DEBUGDPP("got next.hobj: {}, returning begin, end: {}, {}",
+ pg, next.hobj, begin, end);
+ pg.scrubber.machine.process_event(
+ scrub::ScrubContext::request_range_complete_t{begin, end});
+ });
+}
+
+template class ScrubAsyncOpT<ScrubFindRange>;
+
+ScrubReserveRange::ifut<> ScrubReserveRange::run(PG &pg)
+{
+ LOG_PREFIX(ScrubReserveRange::run);
+ DEBUGDPP("", pg);
+ return pg.background_process_lock.lock(
+ ).then_interruptible([FNAME, this, &pg] {
+ DEBUGDPP("pg_background_io_mutex locked", pg);
+ auto &scrubber = pg.scrubber;
+ ceph_assert(!scrubber.blocked);
+ scrubber.blocked = scrub::blocked_range_t{begin, end};
+ blocked_set = true;
+ auto& log = pg.peering_state.get_pg_log().get_log().log;
+ auto p = find_if(
+ log.crbegin(), log.crend(),
+ [this](const auto& e) -> bool {
+ return e.soid >= begin && e.soid < end;
+ });
+
+ if (p == log.crend()) {
+ return scrubber.machine.process_event(
+ scrub::ScrubContext::reserve_range_complete_t{eversion_t{}});
+ } else {
+ return scrubber.machine.process_event(
+ scrub::ScrubContext::reserve_range_complete_t{p->version});
+ }
+ }).finally([&pg, this] {
+ if (!blocked_set) {
+ pg.background_process_lock.unlock();
+ }
+ });
+}
+
+template class ScrubAsyncOpT<ScrubReserveRange>;
+
+ScrubScan::ifut<> ScrubScan::run(PG &pg)
+{
+ LOG_PREFIX(ScrubScan::start);
+ // legacy value, unused
+ ret.valid_through = pg.get_info().last_update;
+
+ DEBUGDPP("begin: {}, end: {}", pg, begin, end);
+ return interruptor::make_interruptible(
+ pg.shard_services.get_store().list_objects(
+ pg.get_collection_ref(),
+ ghobject_t(begin, ghobject_t::NO_GEN, pg.get_pgid().shard),
+ ghobject_t(end, ghobject_t::NO_GEN, pg.get_pgid().shard),
+ std::numeric_limits<uint64_t>::max())
+ ).then_interruptible([FNAME, this, &pg](auto &&result) {
+ DEBUGDPP("listed {} objects", pg, std::get<0>(result).size());
+ return seastar::do_with(
+ std::move(std::get<0>(result)),
+ [this, &pg](auto &objects) {
+ return interruptor::do_for_each(
+ objects,
+ [this, &pg](auto &obj) {
+ if (obj.is_pgmeta() || obj.hobj.is_temp()) {
+ return interruptor::now();
+ } else {
+ return scan_object(pg, obj);
+ }
+ });
+ });
+ }).then_interruptible([FNAME, this, &pg] {
+ if (local) {
+ DEBUGDPP("complete, submitting local event", pg);
+ pg.scrubber.handle_event(
+ scrub::ScrubContext::scan_range_complete_t(
+ pg.get_pg_whoami(),
+ std::move(ret)));
+ return seastar::now();
+ } else {
+ DEBUGDPP("complete, sending response to primary", pg);
+ auto m = crimson::make_message<MOSDRepScrubMap>(
+ spg_t(pg.get_pgid().pgid, pg.get_primary().shard),
+ pg.get_osdmap_epoch(),
+ pg.get_pg_whoami());
+ encode(ret, m->get_data());
+ pg.scrubber.handle_event(
+ scrub::ScrubContext::generate_and_submit_chunk_result_complete_t{});
+ return pg.shard_services.send_to_osd(
+ pg.get_primary().osd,
+ std::move(m),
+ pg.get_osdmap_epoch());
+ }
+ });
+}
+
+ScrubScan::ifut<> ScrubScan::scan_object(
+ PG &pg,
+ const ghobject_t &obj)
+{
+ LOG_PREFIX(ScrubScan::scan_object);
+ DEBUGDPP("obj: {}", pg, obj);
+ auto &entry = ret.objects[obj.hobj];
+ return interruptor::make_interruptible(
+ pg.shard_services.get_store().stat(
+ pg.get_collection_ref(),
+ obj)
+ ).then_interruptible([FNAME, &pg, &obj, &entry](struct stat obj_stat) {
+ DEBUGDPP("obj: {}, stat complete, size {}", pg, obj, obj_stat.st_size);
+ entry.size = obj_stat.st_size;
+ return pg.shard_services.get_store().get_attrs(
+ pg.get_collection_ref(),
+ obj);
+ }).safe_then_interruptible([FNAME, &pg, &obj, &entry](auto &&attrs) {
+ DEBUGDPP("obj: {}, got {} attrs", pg, obj, attrs.size());
+ for (auto &i : attrs) {
+ i.second.rebuild();
+ if (i.second.length() == 0) {
+ entry.attrs[i.first];
+ } else {
+ entry.attrs.emplace(i.first, i.second.front());
+ }
+ }
+ }).handle_error_interruptible(
+ ct_error::all_same_way([FNAME, &pg, &obj, &entry](auto e) {
+ DEBUGDPP("obj: {} stat error", pg, obj);
+ entry.stat_error = true;
+ })
+ ).then_interruptible([FNAME, this, &pg, &obj] {
+ if (deep) {
+ DEBUGDPP("obj: {} doing deep scan", pg, obj);
+ return deep_scan_object(pg, obj);
+ } else {
+ return interruptor::now();
+ }
+ });
+
+}
+
+struct obj_scrub_progress_t {
+ // nullopt once complete
+ std::optional<uint64_t> offset = 0;
+ ceph::buffer::hash data_hash{std::numeric_limits<uint32_t>::max()};
+
+ bool header_done = false;
+ std::optional<std::string> next_key;
+ bool keys_done = false;
+ ceph::buffer::hash omap_hash{std::numeric_limits<uint32_t>::max()};
+};
+ScrubScan::ifut<> ScrubScan::deep_scan_object(
+ PG &pg,
+ const ghobject_t &obj)
+{
+ LOG_PREFIX(ScrubScan::deep_scan_object);
+ DEBUGDPP("obj: {}", pg, obj);
+ using crimson::common::local_conf;
+ auto &entry = ret.objects[obj.hobj];
+ return interruptor::repeat(
+ [FNAME, this, progress = obj_scrub_progress_t(),
+ &obj, &entry, &pg]() mutable
+ -> interruptible_future<seastar::stop_iteration> {
+ if (progress.offset) {
+ DEBUGDPP("op: {}, obj: {}, progress: {} scanning data",
+ pg, *this, obj, progress);
+ const auto stride = local_conf().get_val<Option::size_t>(
+ "osd_deep_scrub_stride");
+ return pg.shard_services.get_store().read(
+ pg.get_collection_ref(),
+ obj,
+ *(progress.offset),
+ stride
+ ).safe_then([stride, &progress, &entry](auto bl) {
+ progress.data_hash << bl;
+ if (bl.length() < stride) {
+ progress.offset = std::nullopt;
+ entry.digest = progress.data_hash.digest();
+ entry.digest_present = true;
+ } else {
+ ceph_assert(stride == bl.length());
+ *(progress.offset) += stride;
+ }
+ }).handle_error(
+ ct_error::all_same_way([&progress, &entry](auto e) {
+ entry.read_error = true;
+ progress.offset = std::nullopt;
+ })
+ ).then([] {
+ return interruptor::make_interruptible(
+ seastar::make_ready_future<seastar::stop_iteration>(
+ seastar::stop_iteration::no));
+ });
+ } else if (!progress.header_done) {
+ DEBUGDPP("op: {}, obj: {}, progress: {} scanning omap header",
+ pg, *this, obj, progress);
+ return pg.shard_services.get_store().omap_get_header(
+ pg.get_collection_ref(),
+ obj
+ ).safe_then([&progress](auto bl) {
+ progress.omap_hash << bl;
+ }).handle_error(
+ ct_error::enodata::handle([] {}),
+ ct_error::all_same_way([&entry](auto e) {
+ entry.read_error = true;
+ })
+ ).then([&progress] {
+ progress.header_done = true;
+ return interruptor::make_interruptible(
+ seastar::make_ready_future<seastar::stop_iteration>(
+ seastar::stop_iteration::no));
+ });
+ } else if (!progress.keys_done) {
+ DEBUGDPP("op: {}, obj: {}, progress: {} scanning omap keys",
+ pg, *this, obj, progress);
+ return pg.shard_services.get_store().omap_get_values(
+ pg.get_collection_ref(),
+ obj,
+ progress.next_key
+ ).safe_then([FNAME, this, &obj, &progress, &entry, &pg](auto result) {
+ const auto &[done, omap] = result;
+ DEBUGDPP("op: {}, obj: {}, progress: {} got {} keys",
+ pg, *this, obj, progress, omap.size());
+ for (const auto &p : omap) {
+ bufferlist bl;
+ encode(p.first, bl);
+ encode(p.second, bl);
+ progress.omap_hash << bl;
+ entry.object_omap_keys++;
+ entry.object_omap_bytes += p.second.length();
+ }
+ if (done) {
+ DEBUGDPP("op: {}, obj: {}, progress: {} omap done",
+ pg, *this, obj, progress);
+ progress.keys_done = true;
+ entry.omap_digest = progress.omap_hash.digest();
+ entry.omap_digest_present = true;
+
+ if ((entry.object_omap_keys >
+ local_conf().get_val<uint64_t>(
+ "osd_deep_scrub_large_omap_object_key_threshold")) ||
+ (entry.object_omap_bytes >
+ local_conf().get_val<Option::size_t>(
+ "osd_deep_scrub_large_omap_object_value_sum_threshold"))) {
+ entry.large_omap_object_found = true;
+ entry.large_omap_object_key_count = entry.object_omap_keys;
+ ret.has_large_omap_object_errors = true;
+ }
+ } else {
+ ceph_assert(!omap.empty()); // omap_get_values invariant
+ DEBUGDPP("op: {}, obj: {}, progress: {} omap not done, next {}",
+ pg, *this, obj, progress, omap.crbegin()->first);
+ progress.next_key = omap.crbegin()->first;
+ }
+ }).handle_error(
+ ct_error::all_same_way([FNAME, this, &obj, &progress, &entry, &pg]
+ (auto e) {
+ DEBUGDPP("op: {}, obj: {}, progress: {} error reading omap {}",
+ pg, *this, obj, progress, e);
+ progress.keys_done = true;
+ entry.read_error = true;
+ })
+ ).then([] {
+ return interruptor::make_interruptible(
+ seastar::make_ready_future<seastar::stop_iteration>(
+ seastar::stop_iteration::no));
+ });
+ } else {
+ DEBUGDPP("op: {}, obj: {}, progress: {} done",
+ pg, *this, obj, progress);
+ return interruptor::make_interruptible(
+ seastar::make_ready_future<seastar::stop_iteration>(
+ seastar::stop_iteration::yes));
+ }
+ });
+}
+
+template class ScrubAsyncOpT<ScrubScan>;
+
+}
+
+template <>
+struct fmt::formatter<crimson::osd::obj_scrub_progress_t> {
+ constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
+ template <typename FormatContext>
+ auto format(const crimson::osd::obj_scrub_progress_t &progress,
+ FormatContext& ctx)
+ {
+ return fmt::format_to(
+ ctx.out(),
+ "obj_scrub_progress_t(offset: {}, "
+ "header_done: {}, next_key: {}, keys_done: {})",
+ progress.offset, progress.header_done,
+ progress.next_key, progress.keys_done);
+ }
+};
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "common/Formatter.h"
+#include "crimson/osd/osd_operation.h"
+#include "crimson/osd/scrub/pg_scrubber.h"
+#include "osd/osd_types.h"
+#include "peering_event.h"
+
+namespace crimson::osd {
+
+class PG;
+
+template <typename T>
+class RemoteScrubEventBaseT : public PhasedOperationT<T> {
+ T* that() {
+ return static_cast<T*>(this);
+ }
+ const T* that() const {
+ return static_cast<const T*>(this);
+ }
+
+ PipelineHandle handle;
+
+ crimson::net::ConnectionRef conn;
+ epoch_t epoch;
+ spg_t pgid;
+
+protected:
+ using interruptor = InterruptibleOperation::interruptor;
+
+ template <typename U=void>
+ using ifut = InterruptibleOperation::interruptible_future<U>;
+
+ virtual ifut<> handle_event(PG &pg) = 0;
+public:
+ RemoteScrubEventBaseT(
+ crimson::net::ConnectionRef conn, epoch_t epoch, spg_t pgid)
+ : conn(conn), epoch(epoch), pgid(pgid) {}
+
+ PGPeeringPipeline &get_peering_pipeline(PG &pg);
+ ConnectionPipeline &get_connection_pipeline();
+ PerShardPipeline &get_pershard_pipeline(ShardServices &);
+
+ crimson::net::Connection &get_connection() {
+ assert(conn);
+ return *conn;
+ };
+
+ static constexpr bool can_create() { return false; }
+
+ spg_t get_pgid() const {
+ return pgid;
+ }
+
+ PipelineHandle &get_handle() { return handle; }
+ epoch_t get_epoch() const { return epoch; }
+
+ seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() {
+ assert(conn);
+ return conn.get_foreign(
+ ).then([this](auto f_conn) {
+ conn.reset();
+ return f_conn;
+ });
+ }
+ void finish_remote_submission(crimson::net::ConnectionFRef _conn) {
+ assert(!conn);
+ conn = make_local_shared_foreign(std::move(_conn));
+ }
+
+ seastar::future<> with_pg(
+ ShardServices &shard_services, Ref<PG> pg);
+
+ std::tuple<
+ class TrackableOperationT<T>::StartEvent,
+ ConnectionPipeline::AwaitActive::BlockingEvent,
+ ConnectionPipeline::AwaitMap::BlockingEvent,
+ OSD_OSDMapGate::OSDMapBlocker::BlockingEvent,
+ ConnectionPipeline::GetPGMapping::BlockingEvent,
+ PerShardPipeline::CreateOrWaitPG::BlockingEvent,
+ PGMap::PGCreationBlockingEvent,
+ PGPeeringPipeline::AwaitMap::BlockingEvent,
+ PG_OSDMapGate::OSDMapBlocker::BlockingEvent,
+ PGPeeringPipeline::Process::BlockingEvent,
+ class TrackableOperationT<T>::CompletionEvent
+ > tracking_events;
+
+ virtual ~RemoteScrubEventBaseT() = default;
+};
+
+class ScrubRequested final : public RemoteScrubEventBaseT<ScrubRequested> {
+ bool deep = false;
+protected:
+ ifut<> handle_event(PG &pg) final;
+
+public:
+ static constexpr OperationTypeCode type = OperationTypeCode::scrub_requested;
+
+ template <typename... Args>
+ ScrubRequested(bool deep, Args&&... base_args)
+ : RemoteScrubEventBaseT<ScrubRequested>(std::forward<Args>(base_args)...),
+ deep(deep) {}
+
+ void print(std::ostream &out) const final {
+ out << "(deep=" << deep << ")";
+ }
+ void dump_detail(ceph::Formatter *f) const final {
+ f->dump_bool("deep", deep);
+ }
+
+};
+
+class ScrubMessage final : public RemoteScrubEventBaseT<ScrubMessage> {
+ MessageRef m;
+protected:
+ ifut<> handle_event(PG &pg) final;
+
+public:
+ static constexpr OperationTypeCode type = OperationTypeCode::scrub_message;
+
+ template <typename... Args>
+ ScrubMessage(MessageRef m, Args&&... base_args)
+ : RemoteScrubEventBaseT<ScrubMessage>(std::forward<Args>(base_args)...),
+ m(m) {
+ ceph_assert(scrub::PGScrubber::is_scrub_message(*m));
+ }
+
+ void print(std::ostream &out) const final {
+ out << "(m=" << *m << ")";
+ }
+ void dump_detail(ceph::Formatter *f) const final {
+ f->dump_stream("m") << *m;
+ }
+
+};
+
+template <typename T>
+class ScrubAsyncOpT : public TrackableOperationT<T> {
+ Ref<PG> pg;
+
+public:
+ using interruptor = InterruptibleOperation::interruptor;
+ template <typename U=void>
+ using ifut = InterruptibleOperation::interruptible_future<U>;
+
+ ScrubAsyncOpT(Ref<PG> pg);
+
+ ifut<> start();
+
+ virtual ~ScrubAsyncOpT() = default;
+
+protected:
+ virtual ifut<> run(PG &pg) = 0;
+};
+
+class ScrubFindRange : public ScrubAsyncOpT<ScrubFindRange> {
+ hobject_t begin;
+public:
+ static constexpr OperationTypeCode type = OperationTypeCode::scrub_find_range;
+
+ template <typename... Args>
+ ScrubFindRange(const hobject_t &begin, Args&&... args)
+ : ScrubAsyncOpT(std::forward<Args>(args)...), begin(begin) {}
+
+ void print(std::ostream &out) const final {
+ out << "(begin=" << begin << ")";
+ }
+ void dump_detail(ceph::Formatter *f) const final {
+ f->dump_stream("begin") << begin;
+ }
+
+
+protected:
+ ifut<> run(PG &pg) final;
+};
+
+class ScrubReserveRange : public ScrubAsyncOpT<ScrubReserveRange> {
+ hobject_t begin;
+ hobject_t end;
+
+ /// see run(), used to unlock background_io_mutex on interval change
+ bool blocked_set = false;
+public:
+ static constexpr OperationTypeCode type =
+ OperationTypeCode::scrub_reserve_range;
+
+ template <typename... Args>
+ ScrubReserveRange(const hobject_t &begin, const hobject_t &end, Args&&... args)
+ : ScrubAsyncOpT(std::forward<Args>(args)...), begin(begin), end(end) {}
+
+ void print(std::ostream &out) const final {
+ out << "(begin=" << begin << ", end=" << end << ")";
+ }
+ void dump_detail(ceph::Formatter *f) const final {
+ f->dump_stream("begin") << begin;
+ f->dump_stream("end") << end;
+ }
+
+
+protected:
+ ifut<> run(PG &pg) final;
+};
+
+class ScrubScan : public ScrubAsyncOpT<ScrubScan> {
+ /// deep or shallow scrub
+ const bool deep;
+
+ /// true: send event locally, false: send result to primary
+ const bool local;
+
+ /// object range to scan: [begin, end)
+ const hobject_t begin;
+ const hobject_t end;
+
+ /// result, see local
+ ScrubMap ret;
+
+ ifut<> scan_object(PG &pg, const ghobject_t &obj);
+ ifut<> deep_scan_object(PG &pg, const ghobject_t &obj);
+
+public:
+ static constexpr OperationTypeCode type = OperationTypeCode::scrub_scan;
+
+ void print(std::ostream &out) const final {
+ out << "(deep=" << deep
+ << ", local=" << local
+ << ", begin=" << begin
+ << ", end=" << end
+ << ")";
+ }
+ void dump_detail(ceph::Formatter *f) const final {
+ f->dump_bool("deep", deep);
+ f->dump_bool("local", local);
+ f->dump_stream("begin") << begin;
+ f->dump_stream("end") << end;
+ }
+
+ ScrubScan(
+ Ref<PG> pg, bool deep, bool local,
+ const hobject_t &begin, const hobject_t &end)
+ : ScrubAsyncOpT(pg), deep(deep), local(local), begin(begin), end(end) {}
+
+protected:
+ ifut<> run(PG &pg) final;
+};
+
+}
+
+namespace crimson {
+
+template <>
+struct EventBackendRegistry<osd::ScrubRequested> {
+ static std::tuple<> get_backends() {
+ return {};
+ }
+};
+
+template <>
+struct EventBackendRegistry<osd::ScrubMessage> {
+ static std::tuple<> get_backends() {
+ return {};
+ }
+};
+
+}
+
+#if FMT_VERSION >= 90000
+template <> struct fmt::formatter<crimson::osd::ScrubRequested>
+ : fmt::ostream_formatter {};
+
+template <> struct fmt::formatter<crimson::osd::ScrubMessage>
+ : fmt::ostream_formatter {};
+
+template <typename T>
+struct fmt::formatter<crimson::osd::ScrubAsyncOpT<T>>
+ : fmt::ostream_formatter {};
+
+template <> struct fmt::formatter<crimson::osd::ScrubFindRange>
+ : fmt::ostream_formatter {};
+
+template <> struct fmt::formatter<crimson::osd::ScrubReserveRange>
+ : fmt::ostream_formatter {};
+
+template <> struct fmt::formatter<crimson::osd::ScrubScan>
+ : fmt::ostream_formatter {};
+
+#endif
osdmap,
this,
this),
+ scrubber(*this),
obc_registry{
local_conf()},
obc_loader{
pgid.shard),
wait_for_active_blocker(this)
{
+ scrubber.initiate();
peering_state.set_backend_predicates(
new ReadablePredicate(pg_whoami),
new RecoverablePredicate());
projected_last_update = peering_state.get_info().last_update;
}
+void PG::on_replica_activate()
+{
+ logger().debug("{}: {}", *this, __func__);
+ scrubber.on_replica_activate();
+}
+
void PG::on_activate_complete()
{
wait_for_active_blocker.unblock();
Context *PG::on_clean()
{
- // Not needed yet (will be needed for IO unblocking)
+ scrubber.on_primary_active_clean();
return nullptr;
}
if (!is_primary()) { // && !is_ec_pg()
replica_clear_repop_obc(logv);
}
+ if (!logv.empty()) {
+ scrubber.on_log_update(logv.rbegin()->version);
+ }
peering_state.append_log(std::move(logv),
trim_to,
roll_forward_to,
logger().debug("{} {}: dropping requests", *this, __func__);
client_request_orderer.clear_and_cancel(*this);
}
+ scrubber.on_interval_change();
}
void PG::context_registry_on_change() {
#include "crimson/osd/pg_recovery_listener.h"
#include "crimson/osd/recovery_backend.h"
#include "crimson/osd/object_context_loader.h"
+#include "crimson/osd/scrub/pg_scrubber.h"
class MQuery;
class OSDMap;
bool need_write_epoch,
ceph::os::Transaction &t) final;
- void scrub_requested(scrub_level_t scrub_level, scrub_type_t scrub_type) final;
-
uint64_t get_snap_trimq_size() const final {
return std::size(snap_trimq);
}
}
void on_change(ceph::os::Transaction &t) final;
void on_activate(interval_set<snapid_t> to_trim) final;
+ void on_replica_activate() final;
void on_activate_complete() final;
void on_new_interval() final {
// Not needed yet
eversion_t projected_last_update;
public:
+ // scrub state
+
+ friend class ScrubScan;
+ friend class ScrubFindRange;
+ friend class ScrubReserveRange;
+ friend class scrub::PGScrubber;
+ template <typename T> friend class RemoteScrubEventBaseT;
+
+ scrub::PGScrubber scrubber;
+
+ void scrub_requested(scrub_level_t scrub_level, scrub_type_t scrub_type) final;
+
ObjectContextRegistry obc_registry;
ObjectContextLoader obc_loader;
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
+
+#include <fmt/ranges.h>
+
+#include "crimson/common/log.h"
+#include "crimson/osd/pg.h"
+#include "crimson/osd/osd_operations/scrub_events.h"
+#include "messages/MOSDRepScrub.h"
+#include "messages/MOSDRepScrubMap.h"
+#include "pg_scrubber.h"
+
+SET_SUBSYS(osd);
+
+namespace crimson::osd::scrub {
+
+void PGScrubber::dump_detail(Formatter *f) const
+{
+ f->dump_stream("pgid") << pg.get_pgid();
+}
+
+PGScrubber::PGScrubber(PG &pg) : pg(pg), dpp(pg), machine(*this) {}
+
+void PGScrubber::on_primary_active_clean()
+{
+ LOG_PREFIX(PGScrubber::on_primary_active_clean);
+ DEBUGDPP("", pg);
+ handle_event(events::primary_activate_t{});
+}
+
+void PGScrubber::on_replica_activate()
+{
+ LOG_PREFIX(PGScrubber::on_replica_activate);
+ DEBUGDPP("", pg);
+ handle_event(events::replica_activate_t{});
+}
+
+void PGScrubber::on_interval_change()
+{
+ LOG_PREFIX(PGScrubber::on_interval_change);
+ DEBUGDPP("", pg);
+ /* Once reservations and scheduling are introduced, we'll need an
+ * IntervalChange event to drop remote resources (they'll be automatically
+ * released on the other side) */
+ handle_event(events::reset_t{});
+ waiting_for_update = std::nullopt;
+ ceph_assert(!blocked);
+}
+
+void PGScrubber::on_log_update(eversion_t v)
+{
+ LOG_PREFIX(PGScrubber::on_interval_change);
+ if (waiting_for_update && v >= *waiting_for_update) {
+ DEBUGDPP("waiting_for_update: {}, v: {}", pg, *waiting_for_update, v);
+ handle_event(await_update_complete_t{});
+ waiting_for_update = std::nullopt;
+ }
+}
+
+void PGScrubber::handle_scrub_requested(bool deep)
+{
+ LOG_PREFIX(PGScrubber::handle_scrub_requested);
+ DEBUGDPP("deep: {}", pg, deep);
+ handle_event(events::start_scrub_t{deep});
+}
+
+void PGScrubber::handle_scrub_message(Message &_m)
+{
+ LOG_PREFIX(PGScrubber::handle_scrub_requested);
+ switch (_m.get_type()) {
+ case MSG_OSD_REP_SCRUB: {
+ MOSDRepScrub &m = *static_cast<MOSDRepScrub*>(&_m);
+ DEBUGDPP("MOSDRepScrub: {}", pg, m);
+ handle_event(events::replica_scan_t{
+ m.start, m.end, m.scrub_from, m.deep
+ });
+ break;
+ }
+ case MSG_OSD_REP_SCRUBMAP: {
+ MOSDRepScrubMap &m = *static_cast<MOSDRepScrubMap*>(&_m);
+ DEBUGDPP("MOSDRepScrubMap: {}", pg, m);
+ ScrubMap map;
+ auto iter = m.get_data().cbegin();
+ ::decode(map, iter);
+ handle_event(scan_range_complete_t{
+ m.from, std::move(map)
+ });
+ break;
+ }
+ default:
+ DEBUGDPP("invalid message: {}", pg, _m);
+ ceph_assert(is_scrub_message(_m));
+ }
+}
+
+void PGScrubber::handle_op_stats(
+ const hobject_t &on_object,
+ object_stat_sum_t delta_stats) {
+ handle_event(events::op_stats_t{on_object, delta_stats});
+}
+
+PGScrubber::ifut<> PGScrubber::wait_scrub(
+ PGScrubber::BlockingEvent::TriggerI&& trigger,
+ const hobject_t &hoid)
+{
+ LOG_PREFIX(PGScrubber::wait_scrub);
+ if (blocked && (hoid >= blocked->begin) && (hoid < blocked->end)) {
+ DEBUGDPP("blocked: {}, hoid: {}", pg, *blocked, hoid);
+ return trigger.maybe_record_blocking(
+ blocked->p.get_shared_future(),
+ *this);
+ } else {
+ return seastar::now();
+ }
+}
+
+void PGScrubber::notify_scrub_start(bool deep)
+{
+ LOG_PREFIX(PGScrubber::notify_scrub_start);
+ DEBUGDPP("deep: {}", pg, deep);
+ pg.peering_state.state_set(PG_STATE_SCRUBBING);
+ if (deep) {
+ pg.peering_state.state_set(PG_STATE_DEEP_SCRUB);
+ }
+ pg.publish_stats_to_osd();
+}
+
+void PGScrubber::notify_scrub_end(bool deep)
+{
+ LOG_PREFIX(PGScrubber::notify_scrub_end);
+ DEBUGDPP("deep: {}", pg, deep);
+ pg.peering_state.state_clear(PG_STATE_SCRUBBING);
+ if (deep) {
+ pg.peering_state.state_clear(PG_STATE_DEEP_SCRUB);
+ }
+ pg.publish_stats_to_osd();
+}
+
+const std::set<pg_shard_t> &PGScrubber::get_ids_to_scrub() const
+{
+ return pg.peering_state.get_actingset();
+}
+
+chunk_validation_policy_t PGScrubber::get_policy() const
+{
+ return chunk_validation_policy_t{
+ pg.get_primary(),
+ std::nullopt /* stripe_info, populate when EC is implemented */,
+ crimson::common::local_conf().get_val<Option::size_t>(
+ "osd_max_object_size"),
+ crimson::common::local_conf().get_val<std::string>(
+ "osd_hit_set_namespace"),
+ crimson::common::local_conf().get_val<Option::size_t>(
+ "osd_deep_scrub_large_omap_object_value_sum_threshold"),
+ crimson::common::local_conf().get_val<uint64_t>(
+ "osd_deep_scrub_large_omap_object_key_threshold")
+ };
+}
+
+void PGScrubber::request_range(const hobject_t &start)
+{
+ LOG_PREFIX(PGScrubber::request_range);
+ DEBUGDPP("start: {}", pg, start);
+ std::ignore = pg.shard_services.start_operation_may_interrupt<
+ interruptor, ScrubFindRange
+ >(start, &pg);
+}
+
+/* TODO: This isn't actually enough. Here, classic would
+ * hold the pg lock from the wait_scrub through to IO submission.
+ * ClientRequest, however, isn't in the processing ExclusivePhase
+ * bit yet, and so this check may miss ops between the wait_scrub
+ * check and adding the IO to the log. */
+
+void PGScrubber::reserve_range(const hobject_t &start, const hobject_t &end)
+{
+ LOG_PREFIX(PGScrubber::reserve_range);
+ DEBUGDPP("start: {}, end: {}", pg, start, end);
+ std::ignore = pg.shard_services.start_operation_may_interrupt<
+ interruptor, ScrubReserveRange
+ >(start, end, &pg);
+}
+
+void PGScrubber::release_range()
+{
+ LOG_PREFIX(PGScrubber::release_range);
+ ceph_assert(blocked);
+ DEBUGDPP("blocked: {}", pg, *blocked);
+ pg.background_process_lock.unlock();
+ blocked->p.set_value();
+ blocked = std::nullopt;
+}
+
+void PGScrubber::scan_range(
+ pg_shard_t target,
+ eversion_t version,
+ bool deep,
+ const hobject_t &start,
+ const hobject_t &end)
+{
+ LOG_PREFIX(PGScrubber::scan_range);
+ DEBUGDPP("target: {}, version: {}, deep: {}, start: {}, end: {}",
+ pg, target, version, deep, start, end);
+ if (target == pg.get_pg_whoami()) {
+ std::ignore = pg.shard_services.start_operation_may_interrupt<
+ interruptor, ScrubScan
+ >(&pg, deep, true /* local */, start, end);
+ } else {
+ std::ignore = pg.shard_services.send_to_osd(
+ target.osd,
+ crimson::make_message<MOSDRepScrub>(
+ spg_t(pg.get_pgid().pgid, target.shard),
+ version,
+ pg.get_osdmap_epoch(),
+ pg.get_osdmap_epoch(),
+ start,
+ end,
+ deep,
+ false /* allow preemption -- irrelevant for replicas TODO */,
+ 64 /* priority, TODO */,
+ false /* high_priority TODO */),
+ pg.get_osdmap_epoch());
+ }
+}
+
+bool PGScrubber::await_update(const eversion_t &version)
+{
+ LOG_PREFIX(PGScrubber::await_update);
+ DEBUGDPP("version: {}", pg, version);
+ ceph_assert(!waiting_for_update);
+ auto& log = pg.peering_state.get_pg_log().get_log().log;
+ eversion_t current = log.empty() ? eversion_t() : log.rbegin()->version;
+ if (version <= current) {
+ return true;
+ } else {
+ waiting_for_update = version;
+ return false;
+ }
+}
+
+void PGScrubber::generate_and_submit_chunk_result(
+ const hobject_t &begin,
+ const hobject_t &end,
+ bool deep)
+{
+ LOG_PREFIX(PGScrubber::generate_and_submit_chunk_result);
+ DEBUGDPP("begin: {}, end: {}, deep: {}", pg, begin, end, deep);
+ std::ignore = pg.shard_services.start_operation_may_interrupt<
+ interruptor, ScrubScan
+ >(&pg, deep, false /* local */, begin, end);
+}
+
+#define LOG_SCRUB_ERROR(MSG, ...) { \
+ auto errorstr = fmt::format(MSG, __VA_ARGS__); \
+ ERRORDPP("{}", pg, errorstr); \
+ pg.get_clog_error() << "pg " << pg.get_pgid() << ": " << errorstr; \
+ }
+
+void PGScrubber::emit_chunk_result(
+ const request_range_result_t &range,
+ chunk_result_t &&result)
+{
+ LOG_PREFIX(PGScrubber::emit_chunk_result);
+ if (result.has_errors()) {
+ LOG_SCRUB_ERROR(
+ "Scrub errors found. range: {}, result: {}",
+ range, result);
+ } else {
+ DEBUGDPP("Chunk complete. range: {}", pg, range);
+ }
+}
+
+void PGScrubber::emit_scrub_result(
+ bool deep,
+ object_stat_sum_t in_stats)
+{
+ LOG_PREFIX(PGScrubber::emit_scrub_result);
+ DEBUGDPP("", pg);
+ pg.peering_state.update_stats(
+ [this, FNAME, deep, &in_stats](auto &history, auto &pg_stats) {
+ foreach_scrub_maintained_stat(
+ [deep, &pg_stats, &in_stats](
+ const auto &name, auto statptr, bool skip_for_shallow) {
+ if (deep && !skip_for_shallow) {
+ pg_stats.stats.sum.*statptr = in_stats.*statptr;
+ }
+ });
+ foreach_scrub_checked_stat(
+ [this, FNAME, &pg_stats, &in_stats](
+ const auto &name, auto statptr, const auto &invalid_predicate) {
+ if (!invalid_predicate(pg_stats) &&
+ (in_stats.*statptr != pg_stats.stats.sum.*statptr)) {
+ LOG_SCRUB_ERROR(
+ "stat mismatch for {}: scrubbed value: {}, stored pg value: {}",
+ name, in_stats.*statptr, pg_stats.stats.sum.*statptr);
+ ++pg_stats.stats.sum.num_shallow_scrub_errors;
+ }
+ });
+ history.last_scrub = pg.peering_state.get_info().last_update;
+ auto now = ceph_clock_now();
+ history.last_scrub_stamp = now;
+ if (deep) {
+ history.last_deep_scrub_stamp = now;
+ }
+ return false; // notify_scrub_end will flush stats to osd
+ });
+}
+
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
+
+#pragma once
+
+#include "crimson/osd/pg_interval_interrupt_condition.h"
+#include "scrub_machine.h"
+
+namespace crimson::osd {
+class PG;
+class ScrubScan;
+class ScrubFindRange;
+class ScrubReserveRange;
+}
+
+namespace crimson::osd::scrub {
+
+struct blocked_range_t {
+ hobject_t begin;
+ hobject_t end;
+ seastar::shared_promise<> p;
+};
+
+class PGScrubber : public crimson::BlockerT<PGScrubber>, ScrubContext {
+ friend class ::crimson::osd::ScrubScan;
+ friend class ::crimson::osd::ScrubFindRange;
+ friend class ::crimson::osd::ScrubReserveRange;
+
+ using interruptor = ::crimson::interruptible::interruptor<
+ ::crimson::osd::IOInterruptCondition>;
+ template <typename T = void>
+ using ifut =
+ ::crimson::interruptible::interruptible_future<
+ ::crimson::osd::IOInterruptCondition, T>;
+
+ PG &pg;
+
+ /// PG alias for logging in header functions
+ DoutPrefixProvider &dpp;
+
+ ScrubMachine machine;
+
+ std::optional<blocked_range_t> blocked;
+
+ std::optional<eversion_t> waiting_for_update;
+
+ template <typename E>
+ void handle_event(E &&e)
+ {
+ LOG_PREFIX(PGScrubber::handle_event);
+ SUBDEBUGDPP(osd, "handle_event: {}", dpp, e);
+ machine.process_event(std::forward<E>(e));
+ }
+
+public:
+ static constexpr const char *type_name = "PGScrubber";
+ using Blocker = PGScrubber;
+ void dump_detail(Formatter *f) const;
+
+ static inline bool is_scrub_message(Message &m) {
+ switch (m.get_type()) {
+ case MSG_OSD_REP_SCRUB:
+ case MSG_OSD_REP_SCRUBMAP:
+ return true;
+ default:
+ return false;
+ }
+ return false;
+ }
+
+ PGScrubber(PG &pg);
+
+ /// setup scrub machine state
+ void initiate() { machine.initiate(); }
+
+ /// notify machine on primary that PG is active+clean
+ void on_primary_active_clean();
+
+ /// notify machine on replica that PG is active
+ void on_replica_activate();
+
+ /// notify machine of interval change
+ void on_interval_change();
+
+ /// notify machine that PG has committed up to versino v
+ void on_log_update(eversion_t v);
+
+ /// handle scrub request
+ void handle_scrub_requested(bool deep);
+
+
+ /// handle other scrub message
+ void handle_scrub_message(Message &m);
+
+ /// notify machine of a mutation of on_object resulting in delta_stats
+ void handle_op_stats(
+ const hobject_t &on_object,
+ object_stat_sum_t delta_stats);
+
+ /// maybe block an op trying to mutate hoid until chunk is complete
+ ifut<> wait_scrub(
+ PGScrubber::BlockingEvent::TriggerI&& trigger,
+ const hobject_t &hoid);
+
+private:
+ DoutPrefixProvider &get_dpp() final { return dpp; }
+
+ void notify_scrub_start(bool deep) final;
+ void notify_scrub_end(bool deep) final;
+
+ const std::set<pg_shard_t> &get_ids_to_scrub() const final;
+
+ chunk_validation_policy_t get_policy() const final;
+
+ void request_range(const hobject_t &start) final;
+ void reserve_range(const hobject_t &start, const hobject_t &end) final;
+ void release_range() final;
+ void scan_range(
+ pg_shard_t target,
+ eversion_t version,
+ bool deep,
+ const hobject_t &start,
+ const hobject_t &end) final;
+ bool await_update(const eversion_t &version) final;
+ void generate_and_submit_chunk_result(
+ const hobject_t &begin,
+ const hobject_t &end,
+ bool deep) final;
+ void emit_chunk_result(
+ const request_range_result_t &range,
+ chunk_result_t &&result) final;
+ void emit_scrub_result(
+ bool deep,
+ object_stat_sum_t scrub_stats) final;
+};
+
+};
+
+template <>
+struct fmt::formatter<crimson::osd::scrub::blocked_range_t> {
+ constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
+
+ template <typename FormatContext>
+ auto format(const auto &range, FormatContext& ctx)
+ {
+ return fmt::format_to(
+ ctx.out(),
+ "{}~{}",
+ range.begin,
+ range.end);
+ }
+};