From adc8dfb3e5b087a113feef417b5e3c0d63c169f9 Mon Sep 17 00:00:00 2001 From: Ronen Friedman Date: Thu, 22 Apr 2021 10:56:12 +0300 Subject: [PATCH] osd/scrub: extracting scrub scheduling code from OSD.cc A separate object (ScrubQueue) now manages the collection of ScrubJob-s that are to be scrubbed. Notable changes: - ScrubQueue keeps track of the CPU load (instead of the OSD main) - ScrubJob-s are part of each PG's scrubber. They are registered with the ScrubQueue when eligible for scrubbing (i.e. - active Primary), and removed from it otherwise. - "a PG is trying to secure Replica resources" was always treated as an OSD-wide state that delays new scrubs from starting. Here - it is also implemented as such (instead of per-PG state that must be queried) Internally, the SQ maintains two groups of jobs: the regular ones, waiting to be scrubbed, and the "penalized" - those PGs that failed in securing their replicas' scrub resources (@dzafman changes, slightly modified to fit the new environment) Also included: minor fixes to the scrub state-machine and events priorities. Signed-off-by: Ronen Friedman Co-authored-by: David Zafman --- src/crimson/osd/pg.h | 8 + src/osd/CMakeLists.txt | 1 + src/osd/OSD.cc | 400 +++----------- src/osd/OSD.h | 101 +--- src/osd/PG.cc | 53 +- src/osd/PG.h | 9 +- src/osd/PeeringState.cc | 8 +- src/osd/PeeringState.h | 7 + src/osd/PrimaryLogPG.cc | 25 +- src/osd/scrubber/osd_scrub_sched.cc | 719 +++++++++++++++++++++++++ src/osd/scrubber/osd_scrub_sched.h | 346 ++++++++++++ src/osd/scrubber/pg_scrubber.cc | 202 ++++--- src/osd/scrubber/pg_scrubber.h | 28 +- src/osd/scrubber/scrub_machine.cc | 10 + src/osd/scrubber/scrub_machine.h | 1 + src/osd/scrubber/scrub_machine_lstnr.h | 9 + src/osd/scrubber_common.h | 29 +- src/test/osd/TestOSDScrub.cc | 2 +- 18 files changed, 1460 insertions(+), 498 deletions(-) create mode 100644 src/osd/scrubber/osd_scrub_sched.cc create mode 100644 src/osd/scrubber/osd_scrub_sched.h diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index 39f645fc918..b78a44dd553 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -157,6 +157,14 @@ public: // Not needed yet -- mainly for scrub scheduling } + /// Notify PG that Primary/Replica status has changed (to update scrub registration) + void on_primary_status_change(bool was_primary, bool now_primary) final { + } + + /// Need to reschedule next scrub. Assuming no change in role + void reschedule_scrub() final { + } + void scrub_requested(scrub_level_t scrub_level, scrub_type_t scrub_type) final; uint64_t get_snap_trimq_size() const final { diff --git a/src/osd/CMakeLists.txt b/src/osd/CMakeLists.txt index 82a5451804a..26a0dd519fc 100644 --- a/src/osd/CMakeLists.txt +++ b/src/osd/CMakeLists.txt @@ -22,6 +22,7 @@ set(osd_srcs PGBackend.cc OSDCap.cc scrubber/pg_scrubber.cc + scrubber/osd_scrub_sched.cc scrubber/PrimaryLogScrub.cc scrubber/scrub_machine.cc scrubber/ScrubStore.cc diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 62aa2f767a4..ab2f4a0ff5d 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -267,8 +267,7 @@ OSDService::OSDService(OSD *osd, ceph::async::io_context_pool& poolctx) : publish_lock{ceph::make_mutex("OSDService::publish_lock")}, pre_publish_lock{ceph::make_mutex("OSDService::pre_publish_lock")}, max_oldest_map(0), - scrubs_local(0), - scrubs_remote(0), + m_scrub_queue{cct, *this}, agent_valid_iterator(false), agent_ops(0), flush_mode_high_count(0), @@ -1267,79 +1266,6 @@ void OSDService::prune_pg_created() // -------------------------------------- // dispatch -bool OSDService::can_inc_scrubs() -{ - bool can_inc = false; - std::lock_guard l(sched_scrub_lock); - - if (scrubs_local + scrubs_remote < cct->_conf->osd_max_scrubs) { - dout(20) << __func__ << " == true " << scrubs_local << " local + " << scrubs_remote - << " remote < max " << cct->_conf->osd_max_scrubs << dendl; - can_inc = true; - } else { - dout(20) << __func__ << " == false " << scrubs_local << " local + " << scrubs_remote - << " remote >= max " << cct->_conf->osd_max_scrubs << dendl; - } - - return can_inc; -} - -bool OSDService::inc_scrubs_local() -{ - bool result = false; - std::lock_guard l{sched_scrub_lock}; - if (scrubs_local + scrubs_remote < cct->_conf->osd_max_scrubs) { - dout(20) << __func__ << " " << scrubs_local << " -> " << (scrubs_local+1) - << " (max " << cct->_conf->osd_max_scrubs << ", remote " << scrubs_remote << ")" << dendl; - result = true; - ++scrubs_local; - } else { - dout(20) << __func__ << " " << scrubs_local << " local + " << scrubs_remote << " remote >= max " << cct->_conf->osd_max_scrubs << dendl; - } - return result; -} - -void OSDService::dec_scrubs_local() -{ - std::lock_guard l{sched_scrub_lock}; - dout(20) << __func__ << " " << scrubs_local << " -> " << (scrubs_local-1) - << " (max " << cct->_conf->osd_max_scrubs << ", remote " << scrubs_remote << ")" << dendl; - --scrubs_local; - ceph_assert(scrubs_local >= 0); -} - -bool OSDService::inc_scrubs_remote() -{ - bool result = false; - std::lock_guard l{sched_scrub_lock}; - if (scrubs_local + scrubs_remote < cct->_conf->osd_max_scrubs) { - dout(20) << __func__ << " " << scrubs_remote << " -> " << (scrubs_remote+1) - << " (max " << cct->_conf->osd_max_scrubs << ", local " << scrubs_local << ")" << dendl; - result = true; - ++scrubs_remote; - } else { - dout(20) << __func__ << " " << scrubs_local << " local + " << scrubs_remote << " remote >= max " << cct->_conf->osd_max_scrubs << dendl; - } - return result; -} - -void OSDService::dec_scrubs_remote() -{ - std::lock_guard l{sched_scrub_lock}; - dout(20) << __func__ << " " << scrubs_remote << " -> " << (scrubs_remote-1) - << " (max " << cct->_conf->osd_max_scrubs << ", local " << scrubs_local << ")" << dendl; - --scrubs_remote; - ceph_assert(scrubs_remote >= 0); -} - -void OSDService::dump_scrub_reservations(Formatter *f) -{ - std::lock_guard l{sched_scrub_lock}; - f->dump_int("scrubs_local", scrubs_local); - f->dump_int("scrubs_remote", scrubs_remote); - f->dump_int("osd_max_scrubs", cct->_conf->osd_max_scrubs); -} - void OSDService::retrieve_epochs(epoch_t *_boot_epoch, epoch_t *_up_epoch, epoch_t *_bind_epoch) const { @@ -2793,7 +2719,7 @@ will start to track new ops received afterwards."; f->close_section(); } else if (prefix == "dump_scrub_reservations") { f->open_object_section("scrub_reservations"); - service.dump_scrub_reservations(f); + service.get_scrub_services().dump_scrub_reservations(f); f->close_section(); } else if (prefix == "get_latest_osdmap") { get_latest_osdmap(); @@ -2843,7 +2769,7 @@ will start to track new ops received afterwards."; } else if (prefix == "dump_objectstore_kv_stats") { store->get_db_statistics(f); } else if (prefix == "dump_scrubs") { - service.dumps_scrub(f); + service.get_scrub_services().dump_scrubs(f); } else if (prefix == "calc_objectstore_db_histogram") { store->generate_db_histogram(f); } else if (prefix == "flush_store_cache") { @@ -5004,8 +4930,6 @@ void OSD::load_pgs() store->set_collection_commit_queue(pg->coll, &(shards[shard_index]->context_queue)); } - pg->reg_next_scrub(); - dout(10) << __func__ << " loaded " << *pg << dendl; pg->unlock(); @@ -5915,22 +5839,10 @@ void OSD::heartbeat() ceph_assert(ceph_mutex_is_locked_by_me(heartbeat_lock)); dout(30) << "heartbeat" << dendl; - // get CPU load avg - double loadavgs[1]; - int hb_interval = cct->_conf->osd_heartbeat_interval; - int n_samples = 86400; - if (hb_interval > 1) { - n_samples /= hb_interval; - if (n_samples < 1) - n_samples = 1; - } - - if (getloadavg(loadavgs, 1) == 1) { - logger->set(l_osd_loadavg, 100 * loadavgs[0]); - daily_loadavg = (daily_loadavg * (n_samples - 1) + loadavgs[0]) / n_samples; - dout(30) << "heartbeat: daily_loadavg " << daily_loadavg << dendl; + auto load_for_logger = service.get_scrub_services().update_load_average(); + if (load_for_logger) { + logger->set(l_osd_loadavg, load_for_logger.value()); } - dout(30) << "heartbeat checking stats" << dendl; // refresh peer list and osd stats @@ -7576,264 +7488,104 @@ bool OSD::scrub_random_backoff() return false; } -OSDService::ScrubJob::ScrubJob(CephContext* cct, - const spg_t& pg, const utime_t& timestamp, - double pool_scrub_min_interval, - double pool_scrub_max_interval, bool must) - : cct(cct), - pgid(pg), - sched_time(timestamp), - deadline(timestamp) -{ - // if not explicitly requested, postpone the scrub with a random delay - if (!must) { - double scrub_min_interval = pool_scrub_min_interval > 0 ? - pool_scrub_min_interval : cct->_conf->osd_scrub_min_interval; - double scrub_max_interval = pool_scrub_max_interval > 0 ? - pool_scrub_max_interval : cct->_conf->osd_scrub_max_interval; - - sched_time += scrub_min_interval; - double r = rand() / (double)RAND_MAX; - sched_time += - scrub_min_interval * cct->_conf->osd_scrub_interval_randomize_ratio * r; - if (scrub_max_interval == 0) { - deadline = utime_t(); - } else { - deadline += scrub_max_interval; - } - } -} - -bool OSDService::ScrubJob::ScrubJob::operator<(const OSDService::ScrubJob& rhs) const { - if (sched_time < rhs.sched_time) - return true; - if (sched_time > rhs.sched_time) - return false; - return pgid < rhs.pgid; -} - -void OSDService::dumps_scrub(ceph::Formatter *f) +void OSD::sched_scrub() { - ceph_assert(f != nullptr); - std::lock_guard l(sched_scrub_lock); + auto& scrub_scheduler = service.get_scrub_services(); - f->open_array_section("scrubs"); - for (const auto &i: sched_scrub_pg) { - f->open_object_section("scrub"); - f->dump_stream("pgid") << i.pgid; - f->dump_stream("sched_time") << i.sched_time; - f->dump_stream("deadline") << i.deadline; - f->dump_bool("forced", i.sched_time == PgScrubber::scrub_must_stamp()); - f->close_section(); + // fail fast if no resources are available + if (!scrub_scheduler.can_inc_scrubs()) { + dout(20) << __func__ << ": OSD cannot inc scrubs" << dendl; + return; } - f->close_section(); -} -double OSD::scrub_sleep_time(bool must_scrub) -{ - if (must_scrub) { - return cct->_conf->osd_scrub_sleep; - } - utime_t now = ceph_clock_now(); - if (scrub_time_permit(now)) { - return cct->_conf->osd_scrub_sleep; + // if there is a PG that is just now trying to reserve scrub replica resources - + // we should wait and not initiate a new scrub + if (scrub_scheduler.is_reserving_now()) { + dout(20) << __func__ << ": scrub resources reservation in progress" << dendl; + return; } - double normal_sleep = cct->_conf->osd_scrub_sleep; - double extended_sleep = cct->_conf->osd_scrub_extended_sleep; - return std::max(extended_sleep, normal_sleep); -} -bool OSD::scrub_time_permit(utime_t now) -{ - struct tm bdt; - time_t tt = now.sec(); - localtime_r(&tt, &bdt); + Scrub::ScrubPreconds env_conditions; - bool day_permit = false; - if (cct->_conf->osd_scrub_begin_week_day < cct->_conf->osd_scrub_end_week_day) { - if (bdt.tm_wday >= cct->_conf->osd_scrub_begin_week_day && bdt.tm_wday < cct->_conf->osd_scrub_end_week_day) { - day_permit = true; - } - } else { - if (bdt.tm_wday >= cct->_conf->osd_scrub_begin_week_day || bdt.tm_wday < cct->_conf->osd_scrub_end_week_day) { - day_permit = true; + if (service.is_recovery_active() && !cct->_conf->osd_scrub_during_recovery) { + if (!cct->_conf->osd_repair_during_recovery) { + dout(15) << __func__ << ": not scheduling scrubs due to active recovery" + << dendl; + return; } + dout(10) << __func__ + << " will only schedule explicitly requested repair due to active recovery" + << dendl; + env_conditions.allow_requested_repair_only = true; } - if (!day_permit) { - dout(20) << __func__ << " should run between week day " << cct->_conf->osd_scrub_begin_week_day - << " - " << cct->_conf->osd_scrub_end_week_day - << " now " << bdt.tm_wday << " = no" << dendl; - return false; - } - - bool time_permit = false; - if (cct->_conf->osd_scrub_begin_hour < cct->_conf->osd_scrub_end_hour) { - if (bdt.tm_hour >= cct->_conf->osd_scrub_begin_hour && bdt.tm_hour < cct->_conf->osd_scrub_end_hour) { - time_permit = true; - } - } else { - if (bdt.tm_hour >= cct->_conf->osd_scrub_begin_hour || bdt.tm_hour < cct->_conf->osd_scrub_end_hour) { - time_permit = true; + if (g_conf()->subsys.should_gather()) { + dout(20) << __func__ << " sched_scrub starts" << dendl; + auto all_jobs = scrub_scheduler.list_registered_jobs(); + for (const auto& sj : all_jobs) { + dout(20) << "sched_scrub scrub-queue jobs: " << *sj << dendl; } } - if (time_permit) { - dout(20) << __func__ << " should run between " << cct->_conf->osd_scrub_begin_hour - << " - " << cct->_conf->osd_scrub_end_hour - << " now " << bdt.tm_hour << " = yes" << dendl; - } else { - dout(20) << __func__ << " should run between " << cct->_conf->osd_scrub_begin_hour - << " - " << cct->_conf->osd_scrub_end_hour - << " now " << bdt.tm_hour << " = no" << dendl; - } - return time_permit; + + auto was_started = scrub_scheduler.select_pg_and_scrub(env_conditions); + dout(20) << "sched_scrub done (" << ScrubQueue::attempt_res_text(was_started) + << ")" << dendl; } -bool OSD::scrub_load_below_threshold() +Scrub::schedule_result_t OSDService::initiate_a_scrub(spg_t pgid, + bool allow_requested_repair_only) { - double loadavgs[3]; - if (getloadavg(loadavgs, 3) != 3) { - dout(10) << __func__ << " couldn't read loadavgs\n" << dendl; - return false; - } + dout(20) << __func__ << " trying " << pgid << dendl; - // allow scrub if below configured threshold - long cpus = sysconf(_SC_NPROCESSORS_ONLN); - double loadavg_per_cpu = cpus > 0 ? loadavgs[0] / cpus : loadavgs[0]; - if (loadavg_per_cpu < cct->_conf->osd_scrub_load_threshold) { - dout(20) << __func__ << " loadavg per cpu " << loadavg_per_cpu - << " < max " << cct->_conf->osd_scrub_load_threshold - << " = yes" << dendl; - return true; - } + // we have a candidate to scrub. We need some PG information to know if scrubbing is + // allowed - // allow scrub if below daily avg and currently decreasing - if (loadavgs[0] < daily_loadavg && loadavgs[0] < loadavgs[2]) { - dout(20) << __func__ << " loadavg " << loadavgs[0] - << " < daily_loadavg " << daily_loadavg - << " and < 15m avg " << loadavgs[2] - << " = yes" << dendl; - return true; + PGRef pg = osd->lookup_lock_pg(pgid); + if (!pg) { + // the PG was dequeued in the short timespan between creating the candidates list + // (collect_ripe_jobs()) and here + dout(5) << __func__ << " pg " << pgid << " not found" << dendl; + return Scrub::schedule_result_t::no_such_pg; } - dout(20) << __func__ << " loadavg " << loadavgs[0] - << " >= max " << cct->_conf->osd_scrub_load_threshold - << " and ( >= daily_loadavg " << daily_loadavg - << " or >= 15m avg " << loadavgs[2] - << ") = no" << dendl; - return false; -} - -void OSD::sched_scrub() -{ - dout(20) << __func__ << " sched_scrub starts" << dendl; - - // if not permitted, fail fast - if (!service.can_inc_scrubs()) { - dout(20) << __func__ << ": OSD cannot inc scrubs" << dendl; - return; + // This has already started, so go on to the next scrub job + if (pg->is_scrub_active()) { + pg->unlock(); + dout(20) << __func__ << ": already in progress pgid " << pgid << dendl; + return Scrub::schedule_result_t::already_started; } - bool allow_requested_repair_only = false; - if (service.is_recovery_active() && !cct->_conf->osd_scrub_during_recovery) { - if (!cct->_conf->osd_repair_during_recovery) { - dout(15) << __func__ << ": not scheduling scrubs due to active recovery" << dendl; - return; - } - dout(10) << __func__ - << " will only schedule explicitly requested repair due to active recovery" - << dendl; - allow_requested_repair_only = true; + // Skip other kinds of scrubbing if only explicitly requested repairing is allowed + if (allow_requested_repair_only && !pg->m_planned_scrub.must_repair) { + pg->unlock(); + dout(10) << __func__ << " skip " << pgid + << " because repairing is not explicitly requested on it" << dendl; + return Scrub::schedule_result_t::preconditions; } - utime_t now = ceph_clock_now(); - bool time_permit = scrub_time_permit(now); - bool load_is_low = scrub_load_below_threshold(); - dout(20) << "sched_scrub load_is_low=" << (int)load_is_low << dendl; - - OSDService::ScrubJob scrub_job; - if (service.first_scrub_stamp(&scrub_job)) { - do { - dout(30) << "sched_scrub examine " << scrub_job.pgid << " at " << scrub_job.sched_time << dendl; - - if (scrub_job.sched_time > now) { - // save ourselves some effort - dout(20) << "sched_scrub " << scrub_job.pgid << " scheduled at " << scrub_job.sched_time - << " > " << now << dendl; - break; - } - - if ((scrub_job.deadline.is_zero() || scrub_job.deadline >= now) && !(time_permit && load_is_low)) { - dout(15) << __func__ << " not scheduling scrub for " << scrub_job.pgid << " due to " - << (!time_permit ? "time not permit" : "high load") << dendl; - continue; - } - - PGRef pg = _lookup_lock_pg(scrub_job.pgid); - if (!pg) { - dout(20) << __func__ << " pg " << scrub_job.pgid << " not found" << dendl; - continue; - } - - // This has already started, so go on to the next scrub job - if (pg->is_scrub_active()) { - pg->unlock(); - dout(20) << __func__ << ": already in progress pgid " << scrub_job.pgid << dendl; - continue; - } - // Skip other kinds of scrubbing if only explicitly requested repairing is allowed - if (allow_requested_repair_only && !pg->m_planned_scrub.must_repair) { - pg->unlock(); - dout(10) << __func__ << " skip " << scrub_job.pgid - << " because repairing is not explicitly requested on it" - << dendl; - continue; - } - - // If it is reserving, let it resolve before going to the next scrub job - if (pg->m_scrubber->is_reserving()) { - pg->unlock(); - dout(10) << __func__ << ": reserve in progress pgid " << scrub_job.pgid << dendl; - break; - } - dout(15) << "sched_scrub scrubbing " << scrub_job.pgid << " at " << scrub_job.sched_time - << (pg->get_must_scrub() ? ", explicitly requested" : - (load_is_low ? ", load_is_low" : " deadline < now")) - << dendl; - if (pg->sched_scrub()) { - pg->unlock(); - dout(10) << __func__ << " scheduled a scrub!" << " (~" << scrub_job.pgid << "~)" << dendl; - break; - } - pg->unlock(); - } while (service.next_scrub_stamp(scrub_job, &scrub_job)); - } - dout(20) << "sched_scrub done" << dendl; + auto scrub_attempt = pg->sched_scrub(); + pg->unlock(); + return scrub_attempt; } void OSD::resched_all_scrubs() { dout(10) << __func__ << ": start" << dendl; - const vector pgs = [this] { - vector pgs; - OSDService::ScrubJob job; - if (service.first_scrub_stamp(&job)) { - do { - pgs.push_back(job.pgid); - } while (service.next_scrub_stamp(job, &job)); - } - return pgs; - }(); - for (auto& pgid : pgs) { - dout(20) << __func__ << ": examine " << pgid << dendl; - PGRef pg = _lookup_lock_pg(pgid); - if (!pg) - continue; - if (!pg->m_planned_scrub.must_scrub && !pg->m_planned_scrub.need_auto) { - dout(15) << __func__ << ": reschedule " << pgid << dendl; - pg->on_info_history_change(); - } - pg->unlock(); + auto all_jobs = service.get_scrub_services().list_registered_jobs(); + for (auto& e : all_jobs) { + + auto& job = *e; + dout(20) << __func__ << ": examine " << job.pgid << dendl; + + PGRef pg = _lookup_lock_pg(job.pgid); + if (!pg) + continue; + + if (!pg->m_planned_scrub.must_scrub && !pg->m_planned_scrub.need_auto) { + dout(15) << __func__ << ": reschedule " << job.pgid << dendl; + pg->reschedule_scrub(); + } + pg->unlock(); } dout(10) << __func__ << ": done" << dendl; } diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 53e2433a033..ad85d224a09 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -53,6 +53,7 @@ #include "common/EventTrace.h" #include "osd/osd_perf_counters.h" #include "common/Finisher.h" +#include "scrubber/osd_scrub_sched.h" #define CEPH_OSD_PROTOCOL 10 /* cluster internal */ @@ -260,87 +261,39 @@ public: } entity_name_t get_cluster_msgr_name() const; -private: - // -- scrub scheduling -- - ceph::mutex sched_scrub_lock = ceph::make_mutex("OSDService::sched_scrub_lock"); - int scrubs_local; - int scrubs_remote; public: - struct ScrubJob { - CephContext* cct; - /// pg to be scrubbed - spg_t pgid; - /// a time scheduled for scrub. but the scrub could be delayed if system - /// load is too high or it fails to fall in the scrub hours - utime_t sched_time; - /// the hard upper bound of scrub time - utime_t deadline; - ScrubJob() : cct(nullptr) {} - explicit ScrubJob(CephContext* cct, const spg_t& pg, - const utime_t& timestamp, - double pool_scrub_min_interval = 0, - double pool_scrub_max_interval = 0, bool must = true); - /// order the jobs by sched_time - bool operator<(const ScrubJob& rhs) const; - }; - std::set sched_scrub_pg; - - /// @returns the scrub_reg_stamp used for unregistering the scrub job - utime_t reg_pg_scrub(spg_t pgid, - utime_t t, - double pool_scrub_min_interval, - double pool_scrub_max_interval, - bool must) { - ScrubJob scrub_job(cct, pgid, t, pool_scrub_min_interval, pool_scrub_max_interval, - must); - std::lock_guard l(OSDService::sched_scrub_lock); - sched_scrub_pg.insert(scrub_job); - return scrub_job.sched_time; - } - - void unreg_pg_scrub(spg_t pgid, utime_t t) { - std::lock_guard l(sched_scrub_lock); - size_t removed = sched_scrub_pg.erase(ScrubJob(cct, pgid, t)); - ceph_assert(removed); - } - - bool first_scrub_stamp(ScrubJob *out) { - std::lock_guard l(sched_scrub_lock); - if (sched_scrub_pg.empty()) - return false; - std::set::iterator iter = sched_scrub_pg.begin(); - *out = *iter; - return true; - } - bool next_scrub_stamp(const ScrubJob& next, - ScrubJob *out) { - std::lock_guard l(sched_scrub_lock); - if (sched_scrub_pg.empty()) - return false; - std::set::const_iterator iter = sched_scrub_pg.upper_bound(next); - if (iter == sched_scrub_pg.cend()) - return false; - *out = *iter; - return true; - } - - void dumps_scrub(ceph::Formatter* f); - - bool can_inc_scrubs(); - bool inc_scrubs_local(); - void dec_scrubs_local(); - bool inc_scrubs_remote(); - void dec_scrubs_remote(); - void dump_scrub_reservations(ceph::Formatter *f); void reply_op_error(OpRequestRef op, int err); void reply_op_error(OpRequestRef op, int err, eversion_t v, version_t uv, std::vector op_returns); void handle_misdirected_op(PG *pg, OpRequestRef op); + private: + /** + * The entity that maintains the set of PGs we may scrub (i.e. - those that we + * are their primary), and schedules their scrubbing. + */ + ScrubQueue m_scrub_queue; + + public: + ScrubQueue& get_scrub_services() { return m_scrub_queue; } -private: + /** + * A callback used by the ScrubQueue object to initiate a scrub on a specific PG. + * + * The request might fail for multiple reasons, as ScrubQueue cannot by its own + * check some of the PG-specific preconditions and those are checked here. See + * attempt_t definition. + * + * @param pgid to scrub + * @param allow_requested_repair_only + * @return a Scrub::attempt_t detailing either a success, or the failure reason. + */ + Scrub::schedule_result_t initiate_a_scrub(spg_t pgid, bool allow_requested_repair_only); + + + private: // -- agent shared state -- ceph::mutex agent_lock = ceph::make_mutex("OSDService::agent_lock"); ceph::condition_variable agent_cond; @@ -1929,8 +1882,6 @@ protected: return service.get_tid(); } - double scrub_sleep_time(bool must_scrub); - // -- generic pg peering -- void dispatch_context(PeeringCtx &ctx, PG *pg, OSDMapRef curmap, ThreadPool::TPHandle *handle = NULL); @@ -1981,8 +1932,6 @@ protected: void sched_scrub(); void resched_all_scrubs(); bool scrub_random_backoff(); - bool scrub_load_below_threshold(); - bool scrub_time_permit(utime_t now); // -- status reporting -- MPGStats *collect_pg_stats(); diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 0f992d65283..fa109e257cc 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -1324,26 +1324,22 @@ unsigned int PG::scrub_requeue_priority(Scrub::scrub_prio_t with_priority, unsig * Unless failing to start scrubbing, the 'planned scrub' flag-set is 'frozen' into * PgScrubber's m_flags, then cleared. */ -bool PG::sched_scrub() +Scrub::schedule_result_t PG::sched_scrub() { dout(15) << __func__ << " pg(" << info.pgid << (is_active() ? ") " : ") ") << (is_clean() ? " " : " ") << dendl; ceph_assert(ceph_mutex_is_locked(_lock)); - if (m_scrubber && m_scrubber->is_scrub_active()) { - return false; - } - if (!is_primary() || !is_active() || !is_clean()) { - return false; + return Scrub::schedule_result_t::bad_pg_state; } if (scrub_queued) { // only applicable to the very first time a scrub event is queued // (until handled and posted to the scrub FSM) dout(10) << __func__ << ": already queued" << dendl; - return false; + return Scrub::schedule_result_t::already_started; } // analyse the combination of the requested scrub flags, the osd/pool configuration @@ -1355,14 +1351,14 @@ bool PG::sched_scrub() // (due to configuration or priority issues) // The reason was already reported by the callee. dout(10) << __func__ << ": failed to initiate a scrub" << dendl; - return false; + return Scrub::schedule_result_t::preconditions; } // try to reserve the local OSD resources. If failing: no harm. We will // be retried by the OSD later on. if (!m_scrubber->reserve_local()) { dout(10) << __func__ << ": failed to reserve locally" << dendl; - return false; + return Scrub::schedule_result_t::no_local_resources; } // can commit to the updated flags now, as nothing will stop the scrub @@ -1379,7 +1375,7 @@ bool PG::sched_scrub() scrub_queued = true; osd->queue_for_scrub(this, Scrub::scrub_prio_t::low_priority); - return true; + return Scrub::schedule_result_t::scrub_initiated; } double PG::next_deepscrub_interval() const @@ -1532,17 +1528,37 @@ std::optional PG::verify_scrub_mode() const return upd_flags; } -void PG::reg_next_scrub() +/* + * Note: on_info_history_change() is used in those two cases where we're not sure + * whether the role of the PG was changed, and if so - was this change relayed to the + * scrub-queue. + */ +void PG::on_info_history_change() +{ + dout(20) << __func__ << " for a " << (is_primary() ? "Primary" : "non-primary") <on_maybe_registration_change(m_planned_scrub); + } +} + +void PG::reschedule_scrub() { - m_scrubber->reg_next_scrub(m_planned_scrub); + dout(20) << __func__ << " for a " << (is_primary() ? "Primary" : "non-primary") <update_scrub_job(m_planned_scrub); + } } -void PG::on_info_history_change() +void PG::on_primary_status_change(bool was_primary, bool now_primary) { - dout(20) << __func__ << dendl; - if (m_scrubber) { - m_scrubber->unreg_next_scrub(); - m_scrubber->reg_next_scrub(m_planned_scrub); + // make sure we have a working scrubber when becoming a primary + ceph_assert(m_scrubber || !now_primary); + + if ((was_primary != now_primary) && m_scrubber) { + m_scrubber->on_primary_change(m_planned_scrub); } } @@ -1575,6 +1591,9 @@ void PG::on_new_interval() { scrub_queued = false; projected_last_update = eversion_t(); cancel_recovery(); + if (m_scrubber) { + m_scrubber->on_maybe_registration_change(m_planned_scrub); + } } epoch_t PG::oldest_stored_osdmap() { diff --git a/src/osd/PG.h b/src/osd/PG.h index 95cd29a4104..63bcb7c11d0 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -68,6 +68,7 @@ namespace Scrub { class ReplicaReservations; class LocalReservation; class ReservedByRemotePrimary; + enum class schedule_result_t; } #ifdef PG_DEBUG_REFS @@ -496,8 +497,6 @@ public: forward_scrub_event(&ScrubPgIF::send_chunk_busy, queued, "ChunkIsBusy"); } - void reg_next_scrub(); - void queue_want_pg_temp(const std::vector &wanted) override; void clear_want_pg_temp() override; @@ -512,6 +511,10 @@ public: void on_info_history_change() override; + void on_primary_status_change(bool was_primary, bool now_primary) override; + + void reschedule_scrub() override; + void scrub_requested(scrub_level_t scrub_level, scrub_type_t scrub_type) override; uint64_t get_snap_trimq_size() const override { @@ -635,7 +638,7 @@ public: virtual void on_shutdown() = 0; bool get_must_scrub() const; - bool sched_scrub(); + Scrub::schedule_result_t sched_scrub(); unsigned int scrub_requeue_priority(Scrub::scrub_prio_t with_priority, unsigned int suggested_priority) const; /// the version that refers to flags_.priority diff --git a/src/osd/PeeringState.cc b/src/osd/PeeringState.cc index 698b767de82..ce4d88a88c7 100644 --- a/src/osd/PeeringState.cc +++ b/src/osd/PeeringState.cc @@ -711,6 +711,8 @@ void PeeringState::start_peering_interval( // did primary change? if (was_old_primary != is_primary()) { state_clear(PG_STATE_CLEAN); + // queue/dequeue the scrubber + pl->on_primary_status_change(was_old_primary, is_primary()); } pl->on_role_change(); @@ -734,6 +736,10 @@ void PeeringState::start_peering_interval( } } + if (is_primary() && was_old_primary) { + pl->reschedule_scrub(); + } + if (acting.empty() && !up.empty() && up_primary == pg_whoami) { psdout(10) << " acting empty, but i am up[0], clearing pg_temp" << dendl; pl->queue_want_pg_temp(acting); @@ -3969,7 +3975,7 @@ void PeeringState::update_stats( if (f(info.history, info.stats)) { pl->publish_stats_to_osd(); } - pl->on_info_history_change(); + pl->reschedule_scrub(); if (t) { dirty_info = true; diff --git a/src/osd/PeeringState.h b/src/osd/PeeringState.h index 992af4c5768..2225fcd3f4a 100644 --- a/src/osd/PeeringState.h +++ b/src/osd/PeeringState.h @@ -264,6 +264,13 @@ public: /// Notify that info/history changed (generally to update scrub registration) virtual void on_info_history_change() = 0; + + /// Notify PG that Primary/Replica status has changed (to update scrub registration) + virtual void on_primary_status_change(bool was_primary, bool now_primary) = 0; + + /// Need to reschedule next scrub. Assuming no change in role + virtual void reschedule_scrub() = 0; + /// Notify that a scrub has been requested virtual void scrub_requested(scrub_level_t scrub_level, scrub_type_t scrub_type) = 0; diff --git a/src/osd/PrimaryLogPG.cc b/src/osd/PrimaryLogPG.cc index 6899c5ea6b0..0e318c12c85 100644 --- a/src/osd/PrimaryLogPG.cc +++ b/src/osd/PrimaryLogPG.cc @@ -14,21 +14,28 @@ * Foundation. See file COPYING. * */ - #include "PrimaryLogPG.h" +#include + +#include +#include +#include + #include +#include #include "cls/cas/cls_cas_ops.h" +#include "common/CDC.h" #include "common/EventTrace.h" #include "common/ceph_crypto.h" -#include "common/CDC.h" #include "common/config.h" #include "common/errno.h" -#include "common/EventTrace.h" #include "common/perf_counters.h" #include "common/scrub_types.h" #include "include/compat.h" +#include "json_spirit/json_spirit_reader.h" +#include "json_spirit/json_spirit_value.h" #include "messages/MCommandReply.h" #include "messages/MOSDBackoff.h" #include "messages/MOSDOp.h" @@ -44,10 +51,15 @@ #include "mon/MonClient.h" #include "objclass/objclass.h" #include "osd/ClassHandler.h" -#include "osd/OpRequest.h" -#include "osd/Session.h" #include "osdc/Objecter.h" #include "scrubber/PrimaryLogScrub.h" +#include "scrubber/ScrubStore.h" +#include "scrubber/pg_scrubber.h" + +#include "OSD.h" +#include "OpRequest.h" +#include "PG.h" +#include "Session.h" // required includes order: #include "json_spirit/json_spirit_value.h" @@ -12855,8 +12867,7 @@ void PrimaryLogPG::on_shutdown() } m_scrubber->scrub_clear_state(); - - m_scrubber->unreg_next_scrub(); + m_scrubber->rm_from_osd_scrubbing(); vector tids; cancel_copy_ops(false, &tids); diff --git a/src/osd/scrubber/osd_scrub_sched.cc b/src/osd/scrubber/osd_scrub_sched.cc new file mode 100644 index 00000000000..6cba89094c6 --- /dev/null +++ b/src/osd/scrubber/osd_scrub_sched.cc @@ -0,0 +1,719 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#include "./osd_scrub_sched.h" + +#include "include/utime.h" +#include "osd/OSD.h" + +#include "pg_scrubber.h" + +using namespace ::std::literals; + +// ////////////////////////////////////////////////////////////////////////// // +// ScrubJob + +#define dout_context (cct) +#define dout_subsys ceph_subsys_osd +#undef dout_prefix +#define dout_prefix *_dout << "osd." << whoami << " " + +ScrubQueue::ScrubJob::ScrubJob(CephContext* cct, const spg_t& pg, int node_id) + : RefCountedObject{cct}, pgid{pg}, whoami{node_id}, cct{cct} +{} + +// debug usage only +ostream& operator<<(ostream& out, const ScrubQueue::ScrubJob& sjob) +{ + out << sjob.pgid << ", " << sjob.schedule.scheduled_at + << " dead: " << sjob.schedule.deadline << " - " << sjob.registration_state() + << " / failure: " << sjob.resources_failure + << " / pen. t.o.: " << sjob.penalty_timeout + << " / queue state: " << ScrubQueue::qu_state_text(sjob.state); + + return out; +} + +void ScrubQueue::ScrubJob::update_schedule( + const ScrubQueue::scrub_schedule_t& adjusted) +{ + schedule = adjusted; + penalty_timeout = utime_t(0, 0); // helps with debugging + + // 'updated' is changed here while not holding jobs_lock. That's OK, as + // the (atomic) flag will only be cleared by select_pg_and_scrub() after + // scan_penalized() is called and the job was moved to the to_scrub queue. + updated = true; + + dout(10) << " pg[" << pgid << "] adjusted: " << schedule.scheduled_at << " " + << registration_state() << dendl; +} + +// ////////////////////////////////////////////////////////////////////////// // +// ScrubQueue + +#undef dout_context +#define dout_context (cct) +#undef dout_prefix +#define dout_prefix \ + *_dout << "osd." << osd_service.whoami << " scrub-queue::" << __func__ << " " + + +ScrubQueue::ScrubQueue(CephContext* cct, OSDService& osds) + : cct{cct}, osd_service{osds} +{ + // initialize the daily loadavg with current 15min loadavg + if (double loadavgs[3]; getloadavg(loadavgs, 3) == 3) { + daily_loadavg = loadavgs[2]; + } else { + derr << "OSD::init() : couldn't read loadavgs\n" << dendl; + daily_loadavg = 1.0; + } +} + +std::optional ScrubQueue::update_load_average() +{ + int hb_interval = cct->_conf->osd_heartbeat_interval; + int n_samples = 60 * 24 * 24; + if (hb_interval > 1) { + n_samples /= hb_interval; + if (n_samples < 1) + n_samples = 1; + } + + // get CPU load avg + double loadavg; + if (getloadavg(&loadavg, 1) == 1) { + daily_loadavg = (daily_loadavg * (n_samples - 1) + loadavg) / n_samples; + dout(17) << "heartbeat: daily_loadavg " << daily_loadavg << dendl; + return 100 * loadavg; + } + + return std::nullopt; +} + +/* + * Modify the scrub job state: + * - if 'registered' (as expected): mark as 'unregistering'. The job will be + * dequeued the next time sched_scrub() is called. + * - if already 'not_registered': shouldn't really happen, but not a problem. + * The state will not be modified. + * - same for 'unregistering'. + * + * Note: not holding the jobs lock + */ +void ScrubQueue::remove_from_osd_queue(ScrubJobRef scrub_job) +{ + dout(15) << "removing pg[" << scrub_job->pgid << "] from OSD scrub queue" + << dendl; + + qu_state_t expected_state{qu_state_t::registered}; + auto ret = scrub_job->state.compare_exchange_strong(expected_state, + qu_state_t::unregistering); + + if (ret) { + + dout(10) << "pg[" << scrub_job->pgid << "] sched-state changed from " + << qu_state_text(expected_state) << " to " + << qu_state_text(scrub_job->state) << dendl; + + } else { + + // job wasn't in state 'registered' coming in + dout(5) << "removing pg[" << scrub_job->pgid + << "] failed. State was: " << qu_state_text(expected_state) << dendl; + } +} + +void ScrubQueue::register_with_osd(ScrubJobRef scrub_job, + const ScrubQueue::sched_params_t& suggested) +{ + qu_state_t state_at_entry = scrub_job->state.load(); + + dout(15) << "pg[" << scrub_job->pgid << "] was " + << qu_state_text(state_at_entry) << dendl; + + switch (state_at_entry) { + case qu_state_t::registered: + // just updating the schedule? + update_job(scrub_job, suggested); + break; + + case qu_state_t::not_registered: + // insertion under lock + { + std::unique_lock lck{jobs_lock}; + + if (state_at_entry != scrub_job->state) { + lck.unlock(); + dout(5) << " scrub job state changed" << dendl; + // retry + register_with_osd(scrub_job, suggested); + break; + } + + update_job(scrub_job, suggested); + to_scrub.push_back(scrub_job); + scrub_job->in_queues = true; + scrub_job->state = qu_state_t::registered; + } + + break; + + case qu_state_t::unregistering: + // restore to the to_sched queue + { + // must be under lock, as the job might be removed from the queue + // at any minute + std::lock_guard lck{jobs_lock}; + + update_job(scrub_job, suggested); + if (scrub_job->state == qu_state_t::not_registered) { + dout(5) << " scrub job state changed to 'not registered'" << dendl; + to_scrub.push_back(scrub_job); + } + scrub_job->in_queues = true; + scrub_job->state = qu_state_t::registered; + } + break; + } + + dout(10) << "pg(" << scrub_job->pgid << ") sched-state changed from " + << qu_state_text(state_at_entry) << " to " + << qu_state_text(scrub_job->state) + << " at: " << scrub_job->schedule.scheduled_at << dendl; +} + +// look mommy - no locks! +void ScrubQueue::update_job(ScrubJobRef scrub_job, + const ScrubQueue::sched_params_t& suggested) +{ + // adjust the suggested scrub time according to OSD-wide status + auto adjusted = adjust_target_time(suggested); + scrub_job->update_schedule(adjusted); +} + +// used under jobs_lock +void ScrubQueue::move_failed_pgs(utime_t now_is) +{ + int punished_cnt{0}; // for log/debug only + + for (auto job = to_scrub.begin(); job != to_scrub.end();) { + if ((*job)->resources_failure) { + auto sjob = *job; + + // last time it was scheduled for a scrub, this PG failed in securing + // remote resources. Move it to the secondary scrub queue. + + dout(15) << "moving " << sjob->pgid + << " state: " << ScrubQueue::qu_state_text(sjob->state) << dendl; + + // determine the penalty time, after which the job should be reinstated + utime_t after = now_is; + after += cct->_conf->osd_scrub_sleep * 2 + utime_t{300'000ms}; + + // note: currently - not taking 'deadline' into account when determining + // 'penalty_timeout'. + sjob->penalty_timeout = after; + sjob->resources_failure = false; + sjob->updated = false; // as otherwise will be pardoned immediately + + // place in the penalty list, and remove from the to-scrub group + penalized.push_back(sjob); + job = to_scrub.erase(job); + punished_cnt++; + } else { + job++; + } + } + + if (punished_cnt) { + dout(15) << "# of jobs penalized: " << punished_cnt << dendl; + } +} + +// clang-format off +/* + * Implementation note: + * Clang (10 & 11) produces here efficient table-based code, comparable to using + * a direct index into an array of strings. + * Gcc (11, trunk) is almost as efficient. + */ +std::string_view ScrubQueue::attempt_res_text(Scrub::schedule_result_t v) +{ + switch (v) { + case Scrub::schedule_result_t::scrub_initiated: return "scrubbing"sv; + case Scrub::schedule_result_t::none_ready: return "no ready job"sv; + case Scrub::schedule_result_t::no_local_resources: return "local resources shortage"sv; + case Scrub::schedule_result_t::already_started: return "denied as already started"sv; + case Scrub::schedule_result_t::no_such_pg: return "pg not found"sv; + case Scrub::schedule_result_t::bad_pg_state: return "prevented by pg state"sv; + case Scrub::schedule_result_t::preconditions: return "preconditions not met"sv; + } + // g++ (unlike CLANG), requires an extra 'return' here + return "(unknown)"sv; +} + +std::string_view ScrubQueue::qu_state_text(qu_state_t st) +{ + switch (st) { + case qu_state_t::not_registered: return "not registered w/ OSD"sv; + case qu_state_t::registered: return "registered"sv; + case qu_state_t::unregistering: return "unregistering"sv; + } + // g++ (unlike CLANG), requires an extra 'return' here + return "(unknown)"sv; +} +// clang-format on + +/** + * a note regarding 'to_scrub_copy': + * 'to_scrub_copy' is a sorted set of all the ripe jobs from to_copy. + * As we usually expect to refer to only the first job in this set, we could + * consider an alternative implementation: + * - have collect_ripe_jobs() return the copied set without sorting it; + * - loop, performing: + * - use std::min_element() to find a candidate; + * - try that one. If not suitable, discard from 'to_scrub_copy' + */ +Scrub::schedule_result_t ScrubQueue::select_pg_and_scrub( + Scrub::ScrubPreconds& preconds) +{ + dout(10) << " reg./pen. sizes: " << to_scrub.size() << " / " << penalized.size() + << dendl; + + utime_t now_is = ceph_clock_now(); + + preconds.time_permit = scrub_time_permit(now_is); + preconds.load_is_low = scrub_load_below_threshold(); + preconds.only_deadlined = !preconds.time_permit || !preconds.load_is_low; + + // create a list of candidates (copying, as otherwise creating a deadlock): + // - possibly restore penalized + // - (if we didn't handle directly) remove invalid jobs + // - create a copy of the to_scrub (possibly up to first not-ripe) + // - same for the penalized (although that usually be a waste) + // unlock, then try the lists + + std::unique_lock lck{jobs_lock}; + + // pardon all penalized jobs that have deadlined (or were updated) + scan_penalized(restore_penalized, now_is); + restore_penalized = false; + + // remove the 'updated' flag from all entries + std::for_each(to_scrub.begin(), to_scrub.end(), + [](const auto& jobref) -> void { jobref->updated = false; }); + + // add failed scrub attempts to the penalized list + move_failed_pgs(now_is); + + // collect all valid & ripe jobs from the two lists. Note that we must copy, + // as when we use the lists we will not be holding jobs_lock (see + // explanation above) + + auto to_scrub_copy = collect_ripe_jobs(to_scrub, now_is); + auto penalized_copy = collect_ripe_jobs(penalized, now_is); + lck.unlock(); + + // try the regular queue first + auto res = select_from_group(to_scrub_copy, preconds, now_is); + + // in the sole scenario in which we've gone over all ripe jobs without success + // - we will try the penalized + if (res == Scrub::schedule_result_t::none_ready && !penalized_copy.empty()) { + res = select_from_group(penalized_copy, preconds, now_is); + dout(10) << "tried the penalized. Res: " << ScrubQueue::attempt_res_text(res) + << dendl; + restore_penalized = true; + } + + dout(15) << dendl; + return res; +} + +// must be called under lock +void ScrubQueue::rm_unregistered_jobs(ScrubQContainer& group) +{ + std::for_each(group.begin(), group.end(), [](auto& job) { + if (job->state == qu_state_t::unregistering) { + job->in_queues = false; + job->state = qu_state_t::not_registered; + } else if (job->state == qu_state_t::not_registered) { + job->in_queues = false; + } + }); + + group.erase(std::remove_if(group.begin(), group.end(), invalid_state), + group.end()); +} + +namespace { +struct cmp_sched_time_t { + bool operator()(const ScrubQueue::ScrubJobRef& lhs, + const ScrubQueue::ScrubJobRef& rhs) const + { + return lhs->schedule.scheduled_at < rhs->schedule.scheduled_at; + } +}; +} // namespace + +// called under lock +ScrubQueue::ScrubQContainer ScrubQueue::collect_ripe_jobs(ScrubQContainer& group, + utime_t time_now) +{ + rm_unregistered_jobs(group); + + // copy ripe jobs + ScrubQueue::ScrubQContainer ripes; + ripes.reserve(group.size()); + + std::copy_if(group.begin(), group.end(), std::back_inserter(ripes), + [time_now](const auto& jobref) -> bool { + return jobref->schedule.scheduled_at <= time_now; + }); + std::sort(ripes.begin(), ripes.end(), cmp_sched_time_t{}); + + if (g_conf()->subsys.should_gather()) { + for (const auto& jobref : group) { + if (jobref->schedule.scheduled_at > time_now) { + dout(20) << " not ripe: " << jobref->pgid << " @ " + << jobref->schedule.scheduled_at << dendl; + } + } + } + + return ripes; +} + +// not holding jobs_lock. 'group' is a copy of the actual list. +Scrub::schedule_result_t ScrubQueue::select_from_group( + ScrubQContainer& group, const Scrub::ScrubPreconds& preconds, utime_t now_is) +{ + dout(15) << "jobs #: " << group.size() << dendl; + + for (auto& candidate : group) { + + // we expect the first job in the list to be a good candidate (if any) + + dout(20) << "try initiating scrub for " << candidate->pgid << dendl; + + if (preconds.only_deadlined && (candidate->schedule.deadline.is_zero() || + candidate->schedule.deadline >= now_is)) { + dout(15) << " not scheduling scrub for " << candidate->pgid << " due to " + << (preconds.time_permit ? "high load" : "time not permitting") + << dendl; + continue; + } + + // we have a candidate to scrub. We turn to the OSD to verify that the PG + // configuration allows the specified type of scrub, and to initiate the + // scrub. + switch (osd_service.initiate_a_scrub(candidate->pgid, + preconds.allow_requested_repair_only)) { + + case Scrub::schedule_result_t::scrub_initiated: + // the happy path. We are done + dout(20) << " initiated for " << candidate->pgid << dendl; + return Scrub::schedule_result_t::scrub_initiated; + + case Scrub::schedule_result_t::already_started: + case Scrub::schedule_result_t::preconditions: + case Scrub::schedule_result_t::bad_pg_state: + // continue with the next job + dout(20) << "failed (state/cond/started) " << candidate->pgid << dendl; + break; + + case Scrub::schedule_result_t::no_such_pg: + // The pg is no longer there + dout(20) << "failed (no pg) " << candidate->pgid << dendl; + break; + + case Scrub::schedule_result_t::no_local_resources: + // failure to secure local resources. No point in trying the other + // PGs at this time. Note that this is not the same as replica resources + // failure! + dout(20) << "failed (local) " << candidate->pgid << dendl; + return Scrub::schedule_result_t::no_local_resources; + + case Scrub::schedule_result_t::none_ready: + // can't happen. Just for the compiler. + dout(5) << "failed !!! " << candidate->pgid << dendl; + return Scrub::schedule_result_t::none_ready; + } + } + + dout(20) << " returning 'none ready' " << dendl; + return Scrub::schedule_result_t::none_ready; +} + +ScrubQueue::scrub_schedule_t ScrubQueue::adjust_target_time( + const sched_params_t& times) const +{ + ScrubQueue::scrub_schedule_t sched_n_dead{times.proposed_time, + times.proposed_time}; + + if (g_conf()->subsys.should_gather()) { + dout(20) << "min t: " << times.min_interval + << " osd: " << cct->_conf->osd_scrub_min_interval + << " max t: " << times.max_interval + << " osd: " << cct->_conf->osd_scrub_max_interval << dendl; + + dout(20) << "at " << sched_n_dead.scheduled_at << " ratio " + << cct->_conf->osd_scrub_interval_randomize_ratio << dendl; + } + + if (times.is_must == ScrubQueue::must_scrub_t::not_mandatory) { + + // if not explicitly requested, postpone the scrub with a random delay + double scrub_min_interval = times.min_interval > 0 + ? times.min_interval + : cct->_conf->osd_scrub_min_interval; + double scrub_max_interval = times.max_interval > 0 + ? times.max_interval + : cct->_conf->osd_scrub_max_interval; + + sched_n_dead.scheduled_at += scrub_min_interval; + double r = rand() / (double)RAND_MAX; + sched_n_dead.scheduled_at += + scrub_min_interval * cct->_conf->osd_scrub_interval_randomize_ratio * r; + + if (scrub_max_interval <= 0) { + sched_n_dead.deadline = utime_t{}; + } else { + sched_n_dead.deadline += scrub_max_interval; + } + } + + dout(17) << "at (final) " << sched_n_dead.scheduled_at << " - " + << sched_n_dead.deadline << dendl; + return sched_n_dead; +} + +double ScrubQueue::scrub_sleep_time(bool must_scrub) const +{ + double regular_sleep_period = cct->_conf->osd_scrub_sleep; + + if (must_scrub || scrub_time_permit(ceph_clock_now())) { + return regular_sleep_period; + } + + // relevant if scrubbing started during allowed time, but continued into + // forbidden hours + double extended_sleep = cct->_conf->osd_scrub_extended_sleep; + dout(20) << "w/ extended sleep (" << extended_sleep << ")" << dendl; + return std::max(extended_sleep, regular_sleep_period); +} + +bool ScrubQueue::scrub_load_below_threshold() const +{ + double loadavgs[3]; + if (getloadavg(loadavgs, 3) != 3) { + dout(10) << __func__ << " couldn't read loadavgs\n" << dendl; + return false; + } + + // allow scrub if below configured threshold + long cpus = sysconf(_SC_NPROCESSORS_ONLN); + double loadavg_per_cpu = cpus > 0 ? loadavgs[0] / cpus : loadavgs[0]; + if (loadavg_per_cpu < cct->_conf->osd_scrub_load_threshold) { + dout(20) << "loadavg per cpu " << loadavg_per_cpu << " < max " + << cct->_conf->osd_scrub_load_threshold << " = yes" << dendl; + return true; + } + + // allow scrub if below daily avg and currently decreasing + if (loadavgs[0] < daily_loadavg && loadavgs[0] < loadavgs[2]) { + dout(20) << "loadavg " << loadavgs[0] << " < daily_loadavg " << daily_loadavg + << " and < 15m avg " << loadavgs[2] << " = yes" << dendl; + return true; + } + + dout(20) << "loadavg " << loadavgs[0] << " >= max " + << cct->_conf->osd_scrub_load_threshold << " and ( >= daily_loadavg " + << daily_loadavg << " or >= 15m avg " << loadavgs[2] << ") = no" + << dendl; + return false; +} + + +// note: called with jobs_lock held +void ScrubQueue::scan_penalized(bool forgive_all, utime_t time_now) +{ + dout(20) << time_now << (forgive_all ? " all " : " - ") << penalized.size() + << dendl; + + // clear dead entries (deleted PGs, or those PGs we are no longer their + // primary) + rm_unregistered_jobs(penalized); + + if (forgive_all) { + + std::copy(penalized.begin(), penalized.end(), std::back_inserter(to_scrub)); + penalized.clear(); + + } else { + + auto forgiven_last = std::partition( + penalized.begin(), penalized.end(), [time_now](const auto& e) { + return (*e).updated || ((*e).penalty_timeout <= time_now); + }); + + std::copy(penalized.begin(), forgiven_last, std::back_inserter(to_scrub)); + penalized.erase(penalized.begin(), forgiven_last); + dout(20) << "penalized after screening: " << penalized.size() << dendl; + } +} + +// checks for half-closed ranges. Modify the (p state{qu_state_t::not_registered}; + + /** + * the old 'is_registered'. Set whenever the job is registered with the OSD, + * i.e. is in either the 'to_scrub' or the 'penalized' vectors. + */ + std::atomic_bool in_queues{false}; + + /// last scrub attempt failed to secure replica resources + bool resources_failure{false}; + + /** + * 'updated' is a temporary flag, used to create a barrier after + * 'sched_time' and 'deadline' (or any other job entry) were modified by + * different task. + * 'updated' also signals the need to move a job back from the penalized + * queue to the regular one. + */ + std::atomic_bool updated{false}; + + utime_t penalty_timeout{0, 0}; + + CephContext* cct; + + ScrubJob(CephContext* cct, const spg_t& pg, int node_id); + + utime_t get_sched_time() const { return schedule.scheduled_at; } + + /** + * relatively low-cost(*) access to the scrub job's state, to be used in + * logging. + * (*) not a low-cost access on x64 architecture + */ + std::string_view state_desc() const + { + return ScrubQueue::qu_state_text(state.load(std::memory_order_relaxed)); + } + + void update_schedule(const ScrubQueue::scrub_schedule_t& adjusted); + + void dump(ceph::Formatter* f) const; + + /* + * as the atomic 'in_queues' appears in many log prints, accessing it for + * display-only should be made less expensive (on ARM. On x86 the _relaxed + * produces the same code as '_cs') + */ + std::string_view registration_state() const + { + return in_queues.load(std::memory_order_relaxed) ? " in-queue" + : " not-queued"; + } + + friend std::ostream& operator<<(std::ostream& out, const ScrubJob& pg); + }; + + friend class TestOSDScrub; + + using ScrubJobRef = ceph::ref_t; + using ScrubQContainer = std::vector; + + static std::string_view qu_state_text(qu_state_t st); + + /** + * called periodically by the OSD to select the first scrub-eligible PG + * and scrub it. + * + * Selection is affected by: + * - time of day: scheduled scrubbing might be configured to only happen + * during certain hours; + * - same for days of the week, and for the system load; + * + * @param preconds: what types of scrub are allowed, given system status & + * config. Some of the preconditions are calculated here. + * @return Scrub::attempt_t::scrubbing if a scrub session was successfully + * initiated. Otherwise - the failure cause. + * + * locking: locks jobs_lock + */ + Scrub::schedule_result_t select_pg_and_scrub(Scrub::ScrubPreconds& preconds); + + /** + * Translate attempt_ values into readable text + */ + static std::string_view attempt_res_text(Scrub::schedule_result_t v); + + /** + * remove the pg from set of PGs to be scanned for scrubbing. + * To be used if we are no longer the PG's primary, or if the PG is removed. + */ + void remove_from_osd_queue(ScrubJobRef sjob); + + /** + * @return the list (not std::set!) of all scrub jobs registered + * (apart from PGs in the process of being removed) + */ + ScrubQContainer list_registered_jobs() const; + + /** + * Add the scrub job to the list of jobs (i.e. list of PGs) to be periodically + * scrubbed by the OSD. + * The registration is active as long as the PG exists and the OSD is its + * primary. + * + * See update_job() for the handling of the 'suggested' parameter. + * + * locking: might lock jobs_lock + */ + void register_with_osd(ScrubJobRef sjob, const sched_params_t& suggested); + + /** + * modify a scrub-job's schduled time and deadline + * + * There are 3 argument combinations to consider: + * - 'must' is asserted, and the suggested time is 'scrub_must_stamp': + * the registration will be with "beginning of time" target, making the + * scrub-job eligible to immediate scrub (given that external conditions + * do not prevent scrubbing) + * + * - 'must' is asserted, and the suggested time is 'now': + * This happens if our stats are unknown. The results are similar to the + * previous scenario. + * + * - not a 'must': we take the suggested time as a basis, and add to it some + * configuration / random delays. + * + * ('must' is sched_params_t.is_must) + * + * locking: not using the jobs_lock + */ + void update_job(ScrubJobRef sjob, const sched_params_t& suggested); + + public: + void dump_scrubs(ceph::Formatter* f) const; + + /** + * No new scrub session will start while a scrub was initiated on a PG, + * and that PG is trying to acquire replica resources. + */ + void set_reserving_now() { a_pg_is_reserving = true; } + void clear_reserving_now() { a_pg_is_reserving = false; } + bool is_reserving_now() const { return a_pg_is_reserving; } + + bool can_inc_scrubs() const; + bool inc_scrubs_local(); + void dec_scrubs_local(); + bool inc_scrubs_remote(); + void dec_scrubs_remote(); + void dump_scrub_reservations(ceph::Formatter* f) const; + + /** + * Pacing the scrub operation by inserting delays (mostly between chunks) + * + * Special handling for regular scrubs that continued into "no scrub" times. + * Scrubbing will continue, but the delays will be controlled by a separate + * (read - with higher value) configuration element + * (osd_scrub_extended_sleep). + */ + double scrub_sleep_time( + bool must_scrub) const; /// \todo (future) return milliseconds + + /** + * called every heartbeat to update the "daily" load average + * + * @returns a load value for the logger + */ + [[nodiscard]] std::optional update_load_average(); + + private: + CephContext* cct; + OSDService& osd_service; + + /** + * jobs_lock protects the job containers and the relevant scrub-jobs state + * variables. Specifically, the following are guaranteed: + * - 'in_queues' is asserted only if the job is in one of the queues; + * - a job will only be in state 'registered' if in one of the queues; + * - no job will be in the two queues simulatenously + * + * Note that PG locks should not be acquired while holding jobs_lock. + */ + mutable ceph::mutex jobs_lock = ceph::make_mutex("ScrubQueue::jobs_lock"); + + ScrubQContainer to_scrub; ///< scrub jobs (i.e. PGs) to scrub + ScrubQContainer penalized; ///< those that failed to reserve remote resources + bool restore_penalized{false}; + + double daily_loadavg{0.0}; + + static inline constexpr auto registered_job = [](const auto& jobref) -> bool { + return jobref->state == qu_state_t::registered; + }; + + static inline constexpr auto invalid_state = [](const auto& jobref) -> bool { + return jobref->state == qu_state_t::not_registered; + }; + + /** + * Are there scrub jobs that should be reinstated? + */ + void scan_penalized(bool forgive_all, utime_t time_now); + + /** + * clear dead entries (unregistered, or belonging to removed PGs) from a + * queue. Job state is changed to match new status. + */ + void rm_unregistered_jobs(ScrubQContainer& group); + + /** + * the set of all scrub jobs in 'group' which are ready to be scrubbed + * (ready = their scheduled time has passed). + * The scrub jobs in the new collection are sorted according to + * their scheduled time. + * + * Note that the returned container holds independent refs to the + * scrub jobs. + */ + ScrubQContainer collect_ripe_jobs(ScrubQContainer& group, utime_t time_now); + + + /// scrub resources management lock (guarding scrubs_local & scrubs_remote) + mutable ceph::mutex resource_lock = + ceph::make_mutex("ScrubQueue::resource_lock"); + + // the counters used to manage scrub activity parallelism: + int scrubs_local{0}; + int scrubs_remote{0}; + + std::atomic_bool a_pg_is_reserving{false}; + + [[nodiscard]] bool scrub_load_below_threshold() const; + [[nodiscard]] bool scrub_time_permit(utime_t now) const; + + /** + * If the scrub job was not explicitly requested, we postpone it by some + * random length of time. + * And if delaying the scrub - we calculate, based on pool parameters, a + * deadline we should scrub before. + * + * @return a pair of values: the determined scrub time, and the deadline + */ + scrub_schedule_t adjust_target_time( + const sched_params_t& recomputed_params) const; + + /** + * Look for scrub jobs that have their 'resources_failure' set. These jobs + * have failed to acquire remote resources last time we've initiated a scrub + * session on them. They are now moved from the 'to_scrub' queue to the + * 'penalized' set. + * + * locking: called with job_lock held + */ + void move_failed_pgs(utime_t now_is); + + Scrub::schedule_result_t select_from_group(ScrubQContainer& group, + const Scrub::ScrubPreconds& preconds, + utime_t now_is); +}; diff --git a/src/osd/scrubber/pg_scrubber.cc b/src/osd/scrubber/pg_scrubber.cc index a9405ad82af..dce25081a3a 100644 --- a/src/osd/scrubber/pg_scrubber.cc +++ b/src/osd/scrubber/pg_scrubber.cc @@ -443,62 +443,118 @@ unsigned int PgScrubber::scrub_requeue_priority(Scrub::scrub_prio_t with_priorit // ///////////////////////////////////////////////////////////////////// // // scrub-op registration handling +void PgScrubber::unregister_from_osd() +{ + if (m_scrub_job) { + dout(15) << __func__ << " prev. state: " << registration_state() << dendl; + m_osds->get_scrub_services().remove_from_osd_queue(m_scrub_job); + } +} + bool PgScrubber::is_scrub_registered() const { - return !m_scrub_reg_stamp.is_zero(); + return m_scrub_job && m_scrub_job->in_queues; } -void PgScrubber::reg_next_scrub(const requested_scrub_t& request_flags) +std::string_view PgScrubber::registration_state() const { - if (!is_primary()) { - // normal. No warning is required. - return; + if (m_scrub_job) { + return m_scrub_job->registration_state(); } + return "(no sched job)"sv; +} - dout(10) << __func__ << " planned: must? " << request_flags.must_scrub << " need-auto? " - << request_flags.need_auto << " stamp: " << m_pg->info.history.last_scrub_stamp - << dendl; +void PgScrubber::rm_from_osd_scrubbing() +{ + // make sure the OSD won't try to scrub this one just now + unregister_from_osd(); +} - ceph_assert(!is_scrub_registered()); +void PgScrubber::on_primary_change(const requested_scrub_t& request_flags) +{ + dout(10) << __func__ << (is_primary() ? " Primary " : " Replica ") + << " flags: " << request_flags << dendl; - utime_t reg_stamp; - bool must = false; + if (!m_scrub_job) { + return; + } - if (request_flags.must_scrub || request_flags.need_auto) { - // Set the smallest time that isn't utime_t() - reg_stamp = PgScrubber::scrub_must_stamp(); - must = true; - } else if (m_pg->info.stats.stats_invalid && - m_pg->cct->_conf->osd_scrub_invalid_stats) { - reg_stamp = ceph_clock_now(); - must = true; + dout(15) << __func__ << " scrub-job state: " << m_scrub_job->state_desc() << dendl; + + if (is_primary()) { + auto suggested = determine_scrub_time(request_flags); + m_osds->get_scrub_services().register_with_osd(m_scrub_job, suggested); } else { - reg_stamp = m_pg->info.history.last_scrub_stamp; + m_osds->get_scrub_services().remove_from_osd_queue(m_scrub_job); } - dout(15) << __func__ << " pg(" << m_pg_id << ") must: " << must - << " required:" << m_flags.required << " flags: " << request_flags - << " stamp: " << reg_stamp << dendl; + dout(15) << __func__ << " done " << registration_state() << dendl; +} - const double scrub_min_interval = - m_pg->pool.info.opts.value_or(pool_opts_t::SCRUB_MIN_INTERVAL, 0.0); - const double scrub_max_interval = - m_pg->pool.info.opts.value_or(pool_opts_t::SCRUB_MAX_INTERVAL, 0.0); +void PgScrubber::on_maybe_registration_change(const requested_scrub_t& request_flags) +{ + dout(10) << __func__ << (is_primary() ? " Primary " : " Replica/other ") + << registration_state() << " flags: " << request_flags << dendl; - // note the sched_time, so we can locate this scrub, and remove it later - m_scrub_reg_stamp = m_osds->reg_pg_scrub(m_pg->info.pgid, reg_stamp, scrub_min_interval, - scrub_max_interval, must); - dout(15) << __func__ << " pg(" << m_pg_id << ") register next scrub, scrub time " - << m_scrub_reg_stamp << ", must = " << (int)must << dendl; + on_primary_change(request_flags); + dout(15) << __func__ << " done " << registration_state() << dendl; } -void PgScrubber::unreg_next_scrub() +void PgScrubber::update_scrub_job(const requested_scrub_t& request_flags) { - if (is_scrub_registered()) { - dout(15) << __func__ << " existing-" << m_scrub_reg_stamp << dendl; - m_osds->unreg_pg_scrub(m_pg->info.pgid, m_scrub_reg_stamp); - m_scrub_reg_stamp = utime_t{}; + dout(10) << __func__ << " flags: " << request_flags << dendl; + + { + // verify that the 'in_q' status matches our "Primariority" + if (m_scrub_job && is_primary() && !m_scrub_job->in_queues) { + dout(1) << __func__ << " !!! primary but not scheduled! " << dendl; + } + } + + if (is_primary() && m_scrub_job) { + auto suggested = determine_scrub_time(request_flags); + m_osds->get_scrub_services().update_job(m_scrub_job, suggested); } + + dout(15) << __func__ << " done " << registration_state() << dendl; +} + +ScrubQueue::sched_params_t +PgScrubber::determine_scrub_time(const requested_scrub_t& request_flags) +{ + ScrubQueue::sched_params_t res; + + if (!is_primary()) { + return res; // with ok_to_scrub set to 'false' + } + + if (request_flags.must_scrub || request_flags.need_auto) { + + // Set the smallest time that isn't utime_t() + res.proposed_time = PgScrubber::scrub_must_stamp(); + res.is_must = ScrubQueue::must_scrub_t::mandatory; + // we do not need the interval data in this case + + } else if (m_pg->info.stats.stats_invalid && + m_pg->cct->_conf->osd_scrub_invalid_stats) { + res.proposed_time = ceph_clock_now(); + res.is_must = ScrubQueue::must_scrub_t::mandatory; + + } else { + res.proposed_time = m_pg->info.history.last_scrub_stamp; + res.min_interval = + m_pg->get_pool().info.opts.value_or(pool_opts_t::SCRUB_MIN_INTERVAL, 0.0); + res.max_interval = + m_pg->get_pool().info.opts.value_or(pool_opts_t::SCRUB_MAX_INTERVAL, 0.0); + } + + dout(15) << __func__ << " suggested: " << res.proposed_time << " hist: " + << m_pg->info.history.last_scrub_stamp << " v:" << m_pg->info.stats.stats_invalid + << " / " << m_pg->cct->_conf->osd_scrub_invalid_stats << " must:" + << (res.is_must==ScrubQueue::must_scrub_t::mandatory ? "y" : "n" ) + << " pool min: " << res.min_interval + << dendl; + return res; } void PgScrubber::scrub_requested(scrub_level_t scrub_level, @@ -507,11 +563,10 @@ void PgScrubber::scrub_requested(scrub_level_t scrub_level, { dout(10) << __func__ << (scrub_level == scrub_level_t::deep ? " deep " : " shallow ") << (scrub_type == scrub_type_t::do_repair ? " repair-scrub " : " not-repair ") - << " prev stamp: " << m_scrub_reg_stamp << " " << is_scrub_registered() + << " prev stamp: " << m_scrub_job->get_sched_time() + << " registered? " << registration_state() << dendl; - unreg_next_scrub(); - req_flags.must_scrub = true; req_flags.must_deep_scrub = (scrub_level == scrub_level_t::deep) || (scrub_type == scrub_type_t::do_repair); @@ -522,17 +577,16 @@ void PgScrubber::scrub_requested(scrub_level_t scrub_level, dout(20) << __func__ << " pg(" << m_pg_id << ") planned:" << req_flags << dendl; - reg_next_scrub(req_flags); + update_scrub_job(req_flags); } -void PgScrubber::request_rescrubbing(requested_scrub_t& req_flags) + +void PgScrubber::request_rescrubbing(requested_scrub_t& request_flags) { - dout(10) << __func__ << " existing-" << m_scrub_reg_stamp << ". was registered? " - << is_scrub_registered() << dendl; + dout(10) << __func__ << " flags: " << request_flags << dendl; - unreg_next_scrub(); - req_flags.need_auto = true; - reg_next_scrub(req_flags); + request_flags.need_auto = true; + update_scrub_job(request_flags); } bool PgScrubber::reserve_local() @@ -606,11 +660,11 @@ bool PgScrubber::select_range() * left end of the range if we are a tier because they may legitimately * not exist (see _scrub). */ - int min_idx = std::max( - 3, m_pg->get_cct()->_conf->osd_scrub_chunk_min / preemption_data.chunk_divisor()); + int min_idx = static_cast(std::max( + 3, m_pg->get_cct()->_conf->osd_scrub_chunk_min / (int)preemption_data.chunk_divisor())); - int max_idx = std::max(min_idx, m_pg->get_cct()->_conf->osd_scrub_chunk_max / - preemption_data.chunk_divisor()); + int max_idx = static_cast(std::max(min_idx, m_pg->get_cct()->_conf->osd_scrub_chunk_max / + (int)preemption_data.chunk_divisor())); dout(10) << __func__ << " Min: " << min_idx << " Max: " << max_idx << " Div: " << preemption_data.chunk_divisor() << dendl; @@ -738,7 +792,8 @@ void PgScrubber::add_delayed_scheduling() milliseconds sleep_time{0ms}; if (m_needs_sleep) { - double scrub_sleep = 1000.0 * m_osds->osd->scrub_sleep_time(m_flags.required); + double scrub_sleep = + 1000.0 * m_osds->get_scrub_services().scrub_sleep_time(m_flags.required); sleep_time = milliseconds{long(scrub_sleep)}; } dout(15) << __func__ << " sleep: " << sleep_time.count() << "ms. needed? " @@ -756,8 +811,8 @@ void PgScrubber::add_delayed_scheduling() // the 'delayer' for crimson is different. Will be factored out. spg_t pgid = m_pg->get_pgid(); - auto callbk = new LambdaContext([osds = m_osds, pgid, - scrbr = this]([[maybe_unused]] int r) mutable { + auto callbk = new LambdaContext([osds = m_osds, pgid, scrbr = this]( + [[maybe_unused]] int r) mutable { PGRef pg = osds->osd->lookup_lock_pg(pgid); if (!pg) { lgeneric_subdout(g_ceph_context, osd, 10) @@ -1623,6 +1678,17 @@ void PgScrubber::unreserve_replicas() m_reservations.reset(); } +void PgScrubber::set_reserving_now() +{ + m_osds->get_scrub_services().set_reserving_now(); +} + +void PgScrubber::clear_reserving_now() +{ + m_osds->get_scrub_services().clear_reserving_now(); +} + + [[nodiscard]] bool PgScrubber::scrub_process_inconsistent() { dout(10) << __func__ << ": checking authoritative (mode=" @@ -1877,7 +1943,7 @@ void PgScrubber::dump(ceph::Formatter* f) const f->dump_bool("auto_repair", m_flags.auto_repair); f->dump_bool("check_repair", m_flags.check_repair); f->dump_bool("deep_scrub_on_error", m_flags.deep_scrub_on_error); - f->dump_stream("scrub_reg_stamp") << m_scrub_reg_stamp; // utime_t + f->dump_stream("scrub_reg_stamp") << m_scrub_job->get_sched_time(); // utime_t f->dump_unsigned("priority", m_flags.priority); f->dump_int("shallow_errors", m_shallow_errors); f->dump_int("deep_errors", m_deep_errors); @@ -1919,7 +1985,14 @@ void PgScrubber::handle_query_state(ceph::Formatter* f) f->close_section(); } -PgScrubber::~PgScrubber() = default; +PgScrubber::~PgScrubber() +{ + if (m_scrub_job) { + // make sure the OSD won't try to scrub this one just now + rm_from_osd_scrubbing(); + m_scrub_job.reset(); + } +} PgScrubber::PgScrubber(PG* pg) : m_pg{pg} @@ -1930,12 +2003,15 @@ PgScrubber::PgScrubber(PG* pg) { m_fsm = std::make_unique(m_pg, this); m_fsm->initiate(); + + m_scrub_job = ceph::make_ref(m_osds->cct, m_pg->pg_id, + m_osds->get_nodeid()); } void PgScrubber::reserve_replicas() { dout(10) << __func__ << dendl; - m_reservations.emplace(m_pg, m_pg_whoami); + m_reservations.emplace(m_pg, m_pg_whoami, m_scrub_job); } void PgScrubber::cleanup_on_finish() @@ -2129,12 +2205,13 @@ void ReplicaReservations::release_replica(pg_shard_t peer, epoch_t epoch) m_osds->send_message_osd_cluster(peer.osd, m, epoch); } -ReplicaReservations::ReplicaReservations(PG* pg, pg_shard_t whoami) +ReplicaReservations::ReplicaReservations(PG* pg, pg_shard_t whoami, ScrubQueue::ScrubJobRef scrubjob) : m_pg{pg} , m_acting_set{pg->get_actingset()} , m_osds{m_pg->get_pg_osd(ScrubberPasskey())} , m_pending{static_cast(m_acting_set.size()) - 1} , m_pg_info{m_pg->get_pg_info(ScrubberPasskey())} + , m_scrub_job{scrubjob} { epoch_t epoch = m_pg->get_osdmap_epoch(); @@ -2164,6 +2241,7 @@ void ReplicaReservations::send_all_done() void ReplicaReservations::send_reject() { + m_scrub_job->resources_failure = true; m_osds->queue_for_scrub_denied(m_pg, scrub_prio_t::low_priority); } @@ -2274,7 +2352,7 @@ LocalReservation::LocalReservation(PG* pg, OSDService* osds) : m_pg{pg} // holding the "whole PG" for dout() sake , m_osds{osds} { - if (!m_osds->inc_scrubs_local()) { + if (!m_osds->get_scrub_services().inc_scrubs_local()) { dout(10) << __func__ << ": failed to reserve locally " << dendl; // the failure is signalled by not having m_holding_local_reservation set return; @@ -2288,7 +2366,7 @@ LocalReservation::~LocalReservation() { if (m_holding_local_reservation) { m_holding_local_reservation = false; - m_osds->dec_scrubs_local(); + m_osds->get_scrub_services().dec_scrubs_local(); } } @@ -2298,7 +2376,7 @@ LocalReservation::~LocalReservation() ReservedByRemotePrimary::ReservedByRemotePrimary(PG* pg, OSDService* osds, epoch_t epoch) : m_pg{pg}, m_osds{osds}, m_reserved_at{epoch} { - if (!m_osds->inc_scrubs_remote()) { + if (!m_osds->get_scrub_services().inc_scrubs_remote()) { dout(10) << __func__ << ": failed to reserve at Primary request" << dendl; // the failure is signalled by not having m_reserved_by_remote_primary set return; @@ -2317,7 +2395,7 @@ ReservedByRemotePrimary::~ReservedByRemotePrimary() { if (m_reserved_by_remote_primary) { m_reserved_by_remote_primary = false; - m_osds->dec_scrubs_remote(); + m_osds->get_scrub_services().dec_scrubs_remote(); } } diff --git a/src/osd/scrubber/pg_scrubber.h b/src/osd/scrubber/pg_scrubber.h index c08279efb61..9077bfcf3bc 100644 --- a/src/osd/scrubber/pg_scrubber.h +++ b/src/osd/scrubber/pg_scrubber.h @@ -16,6 +16,7 @@ #include "ScrubStore.h" #include "scrub_machine_lstnr.h" #include "osd/scrubber_common.h" +#include "osd_scrub_sched.h" class Callback; @@ -46,6 +47,7 @@ class ReplicaReservations { bool m_had_rejections{false}; int m_pending{-1}; const pg_info_t& m_pg_info; + ScrubQueue::ScrubJobRef m_scrub_job; ///< a ref to this PG's scrub job void release_replica(pg_shard_t peer, epoch_t epoch); @@ -63,7 +65,7 @@ class ReplicaReservations { */ void discard_all(); - ReplicaReservations(PG* pg, pg_shard_t whoami); + ReplicaReservations(PG* pg, pg_shard_t whoami, ScrubQueue::ScrubJobRef scrubjob); ~ReplicaReservations(); @@ -131,7 +133,7 @@ class MapsCollectionStatus { /// @returns true if indeed waiting for this one. Otherwise: an error string auto mark_arriving_map(pg_shard_t from) -> std::tuple; - std::vector get_awaited() const { return m_maps_awaited_for; } + [[nodiscard]] std::vector get_awaited() const { return m_maps_awaited_for; } void reset(); @@ -262,9 +264,13 @@ class PgScrubber : public ScrubPgIF, public ScrubMachineListener { // managing scrub op registration - void reg_next_scrub(const requested_scrub_t& request_flags) final; + void update_scrub_job(const requested_scrub_t& request_flags) final; - void unreg_next_scrub() final; + void rm_from_osd_scrubbing() final; + + void on_primary_change(const requested_scrub_t& request_flags) final; + + void on_maybe_registration_change(const requested_scrub_t& request_flags) final; void scrub_requested(scrub_level_t scrub_level, scrub_type_t scrub_type, @@ -312,7 +318,7 @@ class PgScrubber : public ScrubPgIF, public ScrubMachineListener { /** * add to scrub statistics, but only if the soid is below the scrub start */ - virtual void stats_of_handled_objects(const object_stat_sum_t& delta_stats, + void stats_of_handled_objects(const object_stat_sum_t& delta_stats, const hobject_t& soid) override { ceph_assert(false); @@ -404,6 +410,9 @@ class PgScrubber : public ScrubPgIF, public ScrubMachineListener { void reserve_replicas() final; + void set_reserving_now() final; + void clear_reserving_now() final; + [[nodiscard]] bool was_epoch_changed() const final; void mark_local_map_ready() final; @@ -419,9 +428,13 @@ class PgScrubber : public ScrubPgIF, public ScrubMachineListener { [[nodiscard]] bool is_scrub_registered() const; + /// the 'is-in-scheduling-queue' status, using relaxed-semantics access to the status + std::string_view registration_state() const; + virtual void _scrub_clear_state() {} utime_t m_scrub_reg_stamp; ///< stamp we registered for + ScrubQueue::ScrubJobRef m_scrub_job; ///< the scrub-job used by the OSD to schedule us ostream& show(ostream& out) const override; @@ -646,6 +659,11 @@ private: */ void request_rescrubbing(requested_scrub_t& req_flags); + ScrubQueue::sched_params_t + determine_scrub_time(const requested_scrub_t& request_flags); + + void unregister_from_osd(); + /* * Select a range of objects to scrub. * diff --git a/src/osd/scrubber/scrub_machine.cc b/src/osd/scrubber/scrub_machine.cc index 41e3cd1f162..4f9ed5e7f83 100644 --- a/src/osd/scrubber/scrub_machine.cc +++ b/src/osd/scrubber/scrub_machine.cc @@ -90,9 +90,19 @@ ReservingReplicas::ReservingReplicas(my_context ctx) : my_base(ctx) { dout(10) << "-- state -->> ReservingReplicas" << dendl; DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + + // prevent the OSD from starting another scrub while we are trying to secure + // replicas resources + scrbr->set_reserving_now(); scrbr->reserve_replicas(); } +ReservingReplicas::~ReservingReplicas() +{ + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + scrbr->clear_reserving_now(); +} + sc::result ReservingReplicas::react(const ReservationFailure&) { DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases diff --git a/src/osd/scrubber/scrub_machine.h b/src/osd/scrubber/scrub_machine.h index 7f187005609..f75c5acdc2e 100644 --- a/src/osd/scrubber/scrub_machine.h +++ b/src/osd/scrubber/scrub_machine.h @@ -164,6 +164,7 @@ struct NotActive : sc::state { struct ReservingReplicas : sc::state { explicit ReservingReplicas(my_context ctx); + ~ReservingReplicas(); using reactions = mpl::list, // all replicas granted our resources request sc::transition, diff --git a/src/osd/scrubber/scrub_machine_lstnr.h b/src/osd/scrubber/scrub_machine_lstnr.h index 25bd080fbca..28745d469d9 100644 --- a/src/osd/scrubber/scrub_machine_lstnr.h +++ b/src/osd/scrubber/scrub_machine_lstnr.h @@ -148,6 +148,15 @@ struct ScrubMachineListener { virtual void unreserve_replicas() = 0; + /** + * No new scrub session will start while a scrub was initiate on a PG, + * and that PG is trying to acquire replica resources. + * set_reserving_now()/clear_reserving_now() let's the OSD scrub-queue know + * we are busy reserving. + */ + virtual void set_reserving_now() = 0; + virtual void clear_reserving_now() = 0; + /** * the FSM interface into the "are we waiting for maps, either our own or from * replicas" state. diff --git a/src/osd/scrubber_common.h b/src/osd/scrubber_common.h index 668ae282fd4..2f4376a8b7b 100644 --- a/src/osd/scrubber_common.h +++ b/src/osd/scrubber_common.h @@ -21,6 +21,14 @@ enum class scrub_prio_t : bool { low_priority = false, high_priority = true }; /// see ScrubPGgIF::m_current_token using act_token_t = uint32_t; +/// "environment" preconditions affecting which PGs are eligible for scrubbing +struct ScrubPreconds { + bool allow_requested_repair_only{false}; + bool load_is_low{true}; + bool time_permit{true}; + bool only_deadlined{false}; +}; + } // namespace Scrub @@ -266,6 +274,23 @@ struct ScrubPgIF { */ virtual bool reserve_local() = 0; + /** + * Register/de-register with the OSD scrub queue + * + * Following our status as Primary or replica. + */ + virtual void on_primary_change(const requested_scrub_t& request_flags) = 0; + + /** + * Recalculate the required scrub time. + * + * This function assumes that the queue registration status is up-to-date, + * i.e. the OSD "knows our name" if-f we are the Primary. + */ + virtual void update_scrub_job(const requested_scrub_t& request_flags) = 0; + + virtual void on_maybe_registration_change(const requested_scrub_t& request_flags) = 0; + // on the replica: virtual void handle_scrub_reserve_request(OpRequestRef op) = 0; virtual void handle_scrub_reserve_release(OpRequestRef op) = 0; @@ -274,8 +299,8 @@ struct ScrubPgIF { virtual void handle_scrub_reserve_grant(OpRequestRef op, pg_shard_t from) = 0; virtual void handle_scrub_reserve_reject(OpRequestRef op, pg_shard_t from) = 0; - virtual void reg_next_scrub(const requested_scrub_t& request_flags) = 0; - virtual void unreg_next_scrub() = 0; + virtual void rm_from_osd_scrubbing() = 0; + virtual void scrub_requested(scrub_level_t scrub_level, scrub_type_t scrub_type, requested_scrub_t& req_flags) = 0; diff --git a/src/test/osd/TestOSDScrub.cc b/src/test/osd/TestOSDScrub.cc index 7b81f84738a..4c6d4cceedf 100644 --- a/src/test/osd/TestOSDScrub.cc +++ b/src/test/osd/TestOSDScrub.cc @@ -52,7 +52,7 @@ public: } bool scrub_time_permit(utime_t now) { - return OSD::scrub_time_permit(now); + return service.get_scrub_services().scrub_time_permit(now); } }; -- 2.47.3