// Not needed yet -- mainly for scrub scheduling
}
+ /// Notify PG that Primary/Replica status has changed (to update scrub registration)
+ void on_primary_status_change(bool was_primary, bool now_primary) final {
+ }
+
+ /// Need to reschedule next scrub. Assuming no change in role
+ void reschedule_scrub() final {
+ }
+
void scrub_requested(scrub_level_t scrub_level, scrub_type_t scrub_type) final;
uint64_t get_snap_trimq_size() const final {
PGBackend.cc
OSDCap.cc
scrubber/pg_scrubber.cc
+ scrubber/osd_scrub_sched.cc
scrubber/PrimaryLogScrub.cc
scrubber/scrub_machine.cc
scrubber/ScrubStore.cc
publish_lock{ceph::make_mutex("OSDService::publish_lock")},
pre_publish_lock{ceph::make_mutex("OSDService::pre_publish_lock")},
max_oldest_map(0),
- scrubs_local(0),
- scrubs_remote(0),
+ m_scrub_queue{cct, *this},
agent_valid_iterator(false),
agent_ops(0),
flush_mode_high_count(0),
// --------------------------------------
// dispatch
-bool OSDService::can_inc_scrubs()
-{
- bool can_inc = false;
- std::lock_guard l(sched_scrub_lock);
-
- if (scrubs_local + scrubs_remote < cct->_conf->osd_max_scrubs) {
- dout(20) << __func__ << " == true " << scrubs_local << " local + " << scrubs_remote
- << " remote < max " << cct->_conf->osd_max_scrubs << dendl;
- can_inc = true;
- } else {
- dout(20) << __func__ << " == false " << scrubs_local << " local + " << scrubs_remote
- << " remote >= max " << cct->_conf->osd_max_scrubs << dendl;
- }
-
- return can_inc;
-}
-
-bool OSDService::inc_scrubs_local()
-{
- bool result = false;
- std::lock_guard l{sched_scrub_lock};
- if (scrubs_local + scrubs_remote < cct->_conf->osd_max_scrubs) {
- dout(20) << __func__ << " " << scrubs_local << " -> " << (scrubs_local+1)
- << " (max " << cct->_conf->osd_max_scrubs << ", remote " << scrubs_remote << ")" << dendl;
- result = true;
- ++scrubs_local;
- } else {
- dout(20) << __func__ << " " << scrubs_local << " local + " << scrubs_remote << " remote >= max " << cct->_conf->osd_max_scrubs << dendl;
- }
- return result;
-}
-
-void OSDService::dec_scrubs_local()
-{
- std::lock_guard l{sched_scrub_lock};
- dout(20) << __func__ << " " << scrubs_local << " -> " << (scrubs_local-1)
- << " (max " << cct->_conf->osd_max_scrubs << ", remote " << scrubs_remote << ")" << dendl;
- --scrubs_local;
- ceph_assert(scrubs_local >= 0);
-}
-
-bool OSDService::inc_scrubs_remote()
-{
- bool result = false;
- std::lock_guard l{sched_scrub_lock};
- if (scrubs_local + scrubs_remote < cct->_conf->osd_max_scrubs) {
- dout(20) << __func__ << " " << scrubs_remote << " -> " << (scrubs_remote+1)
- << " (max " << cct->_conf->osd_max_scrubs << ", local " << scrubs_local << ")" << dendl;
- result = true;
- ++scrubs_remote;
- } else {
- dout(20) << __func__ << " " << scrubs_local << " local + " << scrubs_remote << " remote >= max " << cct->_conf->osd_max_scrubs << dendl;
- }
- return result;
-}
-
-void OSDService::dec_scrubs_remote()
-{
- std::lock_guard l{sched_scrub_lock};
- dout(20) << __func__ << " " << scrubs_remote << " -> " << (scrubs_remote-1)
- << " (max " << cct->_conf->osd_max_scrubs << ", local " << scrubs_local << ")" << dendl;
- --scrubs_remote;
- ceph_assert(scrubs_remote >= 0);
-}
-
-void OSDService::dump_scrub_reservations(Formatter *f)
-{
- std::lock_guard l{sched_scrub_lock};
- f->dump_int("scrubs_local", scrubs_local);
- f->dump_int("scrubs_remote", scrubs_remote);
- f->dump_int("osd_max_scrubs", cct->_conf->osd_max_scrubs);
-}
-
void OSDService::retrieve_epochs(epoch_t *_boot_epoch, epoch_t *_up_epoch,
epoch_t *_bind_epoch) const
{
f->close_section();
} else if (prefix == "dump_scrub_reservations") {
f->open_object_section("scrub_reservations");
- service.dump_scrub_reservations(f);
+ service.get_scrub_services().dump_scrub_reservations(f);
f->close_section();
} else if (prefix == "get_latest_osdmap") {
get_latest_osdmap();
} else if (prefix == "dump_objectstore_kv_stats") {
store->get_db_statistics(f);
} else if (prefix == "dump_scrubs") {
- service.dumps_scrub(f);
+ service.get_scrub_services().dump_scrubs(f);
} else if (prefix == "calc_objectstore_db_histogram") {
store->generate_db_histogram(f);
} else if (prefix == "flush_store_cache") {
store->set_collection_commit_queue(pg->coll, &(shards[shard_index]->context_queue));
}
- pg->reg_next_scrub();
-
dout(10) << __func__ << " loaded " << *pg << dendl;
pg->unlock();
ceph_assert(ceph_mutex_is_locked_by_me(heartbeat_lock));
dout(30) << "heartbeat" << dendl;
- // get CPU load avg
- double loadavgs[1];
- int hb_interval = cct->_conf->osd_heartbeat_interval;
- int n_samples = 86400;
- if (hb_interval > 1) {
- n_samples /= hb_interval;
- if (n_samples < 1)
- n_samples = 1;
- }
-
- if (getloadavg(loadavgs, 1) == 1) {
- logger->set(l_osd_loadavg, 100 * loadavgs[0]);
- daily_loadavg = (daily_loadavg * (n_samples - 1) + loadavgs[0]) / n_samples;
- dout(30) << "heartbeat: daily_loadavg " << daily_loadavg << dendl;
+ auto load_for_logger = service.get_scrub_services().update_load_average();
+ if (load_for_logger) {
+ logger->set(l_osd_loadavg, load_for_logger.value());
}
-
dout(30) << "heartbeat checking stats" << dendl;
// refresh peer list and osd stats
return false;
}
-OSDService::ScrubJob::ScrubJob(CephContext* cct,
- const spg_t& pg, const utime_t& timestamp,
- double pool_scrub_min_interval,
- double pool_scrub_max_interval, bool must)
- : cct(cct),
- pgid(pg),
- sched_time(timestamp),
- deadline(timestamp)
-{
- // if not explicitly requested, postpone the scrub with a random delay
- if (!must) {
- double scrub_min_interval = pool_scrub_min_interval > 0 ?
- pool_scrub_min_interval : cct->_conf->osd_scrub_min_interval;
- double scrub_max_interval = pool_scrub_max_interval > 0 ?
- pool_scrub_max_interval : cct->_conf->osd_scrub_max_interval;
-
- sched_time += scrub_min_interval;
- double r = rand() / (double)RAND_MAX;
- sched_time +=
- scrub_min_interval * cct->_conf->osd_scrub_interval_randomize_ratio * r;
- if (scrub_max_interval == 0) {
- deadline = utime_t();
- } else {
- deadline += scrub_max_interval;
- }
- }
-}
-
-bool OSDService::ScrubJob::ScrubJob::operator<(const OSDService::ScrubJob& rhs) const {
- if (sched_time < rhs.sched_time)
- return true;
- if (sched_time > rhs.sched_time)
- return false;
- return pgid < rhs.pgid;
-}
-
-void OSDService::dumps_scrub(ceph::Formatter *f)
+void OSD::sched_scrub()
{
- ceph_assert(f != nullptr);
- std::lock_guard l(sched_scrub_lock);
+ auto& scrub_scheduler = service.get_scrub_services();
- f->open_array_section("scrubs");
- for (const auto &i: sched_scrub_pg) {
- f->open_object_section("scrub");
- f->dump_stream("pgid") << i.pgid;
- f->dump_stream("sched_time") << i.sched_time;
- f->dump_stream("deadline") << i.deadline;
- f->dump_bool("forced", i.sched_time == PgScrubber::scrub_must_stamp());
- f->close_section();
+ // fail fast if no resources are available
+ if (!scrub_scheduler.can_inc_scrubs()) {
+ dout(20) << __func__ << ": OSD cannot inc scrubs" << dendl;
+ return;
}
- f->close_section();
-}
-double OSD::scrub_sleep_time(bool must_scrub)
-{
- if (must_scrub) {
- return cct->_conf->osd_scrub_sleep;
- }
- utime_t now = ceph_clock_now();
- if (scrub_time_permit(now)) {
- return cct->_conf->osd_scrub_sleep;
+ // if there is a PG that is just now trying to reserve scrub replica resources -
+ // we should wait and not initiate a new scrub
+ if (scrub_scheduler.is_reserving_now()) {
+ dout(20) << __func__ << ": scrub resources reservation in progress" << dendl;
+ return;
}
- double normal_sleep = cct->_conf->osd_scrub_sleep;
- double extended_sleep = cct->_conf->osd_scrub_extended_sleep;
- return std::max(extended_sleep, normal_sleep);
-}
-bool OSD::scrub_time_permit(utime_t now)
-{
- struct tm bdt;
- time_t tt = now.sec();
- localtime_r(&tt, &bdt);
+ Scrub::ScrubPreconds env_conditions;
- bool day_permit = false;
- if (cct->_conf->osd_scrub_begin_week_day < cct->_conf->osd_scrub_end_week_day) {
- if (bdt.tm_wday >= cct->_conf->osd_scrub_begin_week_day && bdt.tm_wday < cct->_conf->osd_scrub_end_week_day) {
- day_permit = true;
- }
- } else {
- if (bdt.tm_wday >= cct->_conf->osd_scrub_begin_week_day || bdt.tm_wday < cct->_conf->osd_scrub_end_week_day) {
- day_permit = true;
+ if (service.is_recovery_active() && !cct->_conf->osd_scrub_during_recovery) {
+ if (!cct->_conf->osd_repair_during_recovery) {
+ dout(15) << __func__ << ": not scheduling scrubs due to active recovery"
+ << dendl;
+ return;
}
+ dout(10) << __func__
+ << " will only schedule explicitly requested repair due to active recovery"
+ << dendl;
+ env_conditions.allow_requested_repair_only = true;
}
- if (!day_permit) {
- dout(20) << __func__ << " should run between week day " << cct->_conf->osd_scrub_begin_week_day
- << " - " << cct->_conf->osd_scrub_end_week_day
- << " now " << bdt.tm_wday << " = no" << dendl;
- return false;
- }
-
- bool time_permit = false;
- if (cct->_conf->osd_scrub_begin_hour < cct->_conf->osd_scrub_end_hour) {
- if (bdt.tm_hour >= cct->_conf->osd_scrub_begin_hour && bdt.tm_hour < cct->_conf->osd_scrub_end_hour) {
- time_permit = true;
- }
- } else {
- if (bdt.tm_hour >= cct->_conf->osd_scrub_begin_hour || bdt.tm_hour < cct->_conf->osd_scrub_end_hour) {
- time_permit = true;
+ if (g_conf()->subsys.should_gather<ceph_subsys_osd, 20>()) {
+ dout(20) << __func__ << " sched_scrub starts" << dendl;
+ auto all_jobs = scrub_scheduler.list_registered_jobs();
+ for (const auto& sj : all_jobs) {
+ dout(20) << "sched_scrub scrub-queue jobs: " << *sj << dendl;
}
}
- if (time_permit) {
- dout(20) << __func__ << " should run between " << cct->_conf->osd_scrub_begin_hour
- << " - " << cct->_conf->osd_scrub_end_hour
- << " now " << bdt.tm_hour << " = yes" << dendl;
- } else {
- dout(20) << __func__ << " should run between " << cct->_conf->osd_scrub_begin_hour
- << " - " << cct->_conf->osd_scrub_end_hour
- << " now " << bdt.tm_hour << " = no" << dendl;
- }
- return time_permit;
+
+ auto was_started = scrub_scheduler.select_pg_and_scrub(env_conditions);
+ dout(20) << "sched_scrub done (" << ScrubQueue::attempt_res_text(was_started)
+ << ")" << dendl;
}
-bool OSD::scrub_load_below_threshold()
+Scrub::schedule_result_t OSDService::initiate_a_scrub(spg_t pgid,
+ bool allow_requested_repair_only)
{
- double loadavgs[3];
- if (getloadavg(loadavgs, 3) != 3) {
- dout(10) << __func__ << " couldn't read loadavgs\n" << dendl;
- return false;
- }
+ dout(20) << __func__ << " trying " << pgid << dendl;
- // allow scrub if below configured threshold
- long cpus = sysconf(_SC_NPROCESSORS_ONLN);
- double loadavg_per_cpu = cpus > 0 ? loadavgs[0] / cpus : loadavgs[0];
- if (loadavg_per_cpu < cct->_conf->osd_scrub_load_threshold) {
- dout(20) << __func__ << " loadavg per cpu " << loadavg_per_cpu
- << " < max " << cct->_conf->osd_scrub_load_threshold
- << " = yes" << dendl;
- return true;
- }
+ // we have a candidate to scrub. We need some PG information to know if scrubbing is
+ // allowed
- // allow scrub if below daily avg and currently decreasing
- if (loadavgs[0] < daily_loadavg && loadavgs[0] < loadavgs[2]) {
- dout(20) << __func__ << " loadavg " << loadavgs[0]
- << " < daily_loadavg " << daily_loadavg
- << " and < 15m avg " << loadavgs[2]
- << " = yes" << dendl;
- return true;
+ PGRef pg = osd->lookup_lock_pg(pgid);
+ if (!pg) {
+ // the PG was dequeued in the short timespan between creating the candidates list
+ // (collect_ripe_jobs()) and here
+ dout(5) << __func__ << " pg " << pgid << " not found" << dendl;
+ return Scrub::schedule_result_t::no_such_pg;
}
- dout(20) << __func__ << " loadavg " << loadavgs[0]
- << " >= max " << cct->_conf->osd_scrub_load_threshold
- << " and ( >= daily_loadavg " << daily_loadavg
- << " or >= 15m avg " << loadavgs[2]
- << ") = no" << dendl;
- return false;
-}
-
-void OSD::sched_scrub()
-{
- dout(20) << __func__ << " sched_scrub starts" << dendl;
-
- // if not permitted, fail fast
- if (!service.can_inc_scrubs()) {
- dout(20) << __func__ << ": OSD cannot inc scrubs" << dendl;
- return;
+ // This has already started, so go on to the next scrub job
+ if (pg->is_scrub_active()) {
+ pg->unlock();
+ dout(20) << __func__ << ": already in progress pgid " << pgid << dendl;
+ return Scrub::schedule_result_t::already_started;
}
- bool allow_requested_repair_only = false;
- if (service.is_recovery_active() && !cct->_conf->osd_scrub_during_recovery) {
- if (!cct->_conf->osd_repair_during_recovery) {
- dout(15) << __func__ << ": not scheduling scrubs due to active recovery" << dendl;
- return;
- }
- dout(10) << __func__
- << " will only schedule explicitly requested repair due to active recovery"
- << dendl;
- allow_requested_repair_only = true;
+ // Skip other kinds of scrubbing if only explicitly requested repairing is allowed
+ if (allow_requested_repair_only && !pg->m_planned_scrub.must_repair) {
+ pg->unlock();
+ dout(10) << __func__ << " skip " << pgid
+ << " because repairing is not explicitly requested on it" << dendl;
+ return Scrub::schedule_result_t::preconditions;
}
- utime_t now = ceph_clock_now();
- bool time_permit = scrub_time_permit(now);
- bool load_is_low = scrub_load_below_threshold();
- dout(20) << "sched_scrub load_is_low=" << (int)load_is_low << dendl;
-
- OSDService::ScrubJob scrub_job;
- if (service.first_scrub_stamp(&scrub_job)) {
- do {
- dout(30) << "sched_scrub examine " << scrub_job.pgid << " at " << scrub_job.sched_time << dendl;
-
- if (scrub_job.sched_time > now) {
- // save ourselves some effort
- dout(20) << "sched_scrub " << scrub_job.pgid << " scheduled at " << scrub_job.sched_time
- << " > " << now << dendl;
- break;
- }
-
- if ((scrub_job.deadline.is_zero() || scrub_job.deadline >= now) && !(time_permit && load_is_low)) {
- dout(15) << __func__ << " not scheduling scrub for " << scrub_job.pgid << " due to "
- << (!time_permit ? "time not permit" : "high load") << dendl;
- continue;
- }
-
- PGRef pg = _lookup_lock_pg(scrub_job.pgid);
- if (!pg) {
- dout(20) << __func__ << " pg " << scrub_job.pgid << " not found" << dendl;
- continue;
- }
-
- // This has already started, so go on to the next scrub job
- if (pg->is_scrub_active()) {
- pg->unlock();
- dout(20) << __func__ << ": already in progress pgid " << scrub_job.pgid << dendl;
- continue;
- }
- // Skip other kinds of scrubbing if only explicitly requested repairing is allowed
- if (allow_requested_repair_only && !pg->m_planned_scrub.must_repair) {
- pg->unlock();
- dout(10) << __func__ << " skip " << scrub_job.pgid
- << " because repairing is not explicitly requested on it"
- << dendl;
- continue;
- }
-
- // If it is reserving, let it resolve before going to the next scrub job
- if (pg->m_scrubber->is_reserving()) {
- pg->unlock();
- dout(10) << __func__ << ": reserve in progress pgid " << scrub_job.pgid << dendl;
- break;
- }
- dout(15) << "sched_scrub scrubbing " << scrub_job.pgid << " at " << scrub_job.sched_time
- << (pg->get_must_scrub() ? ", explicitly requested" :
- (load_is_low ? ", load_is_low" : " deadline < now"))
- << dendl;
- if (pg->sched_scrub()) {
- pg->unlock();
- dout(10) << __func__ << " scheduled a scrub!" << " (~" << scrub_job.pgid << "~)" << dendl;
- break;
- }
- pg->unlock();
- } while (service.next_scrub_stamp(scrub_job, &scrub_job));
- }
- dout(20) << "sched_scrub done" << dendl;
+ auto scrub_attempt = pg->sched_scrub();
+ pg->unlock();
+ return scrub_attempt;
}
void OSD::resched_all_scrubs()
{
dout(10) << __func__ << ": start" << dendl;
- const vector<spg_t> pgs = [this] {
- vector<spg_t> pgs;
- OSDService::ScrubJob job;
- if (service.first_scrub_stamp(&job)) {
- do {
- pgs.push_back(job.pgid);
- } while (service.next_scrub_stamp(job, &job));
- }
- return pgs;
- }();
- for (auto& pgid : pgs) {
- dout(20) << __func__ << ": examine " << pgid << dendl;
- PGRef pg = _lookup_lock_pg(pgid);
- if (!pg)
- continue;
- if (!pg->m_planned_scrub.must_scrub && !pg->m_planned_scrub.need_auto) {
- dout(15) << __func__ << ": reschedule " << pgid << dendl;
- pg->on_info_history_change();
- }
- pg->unlock();
+ auto all_jobs = service.get_scrub_services().list_registered_jobs();
+ for (auto& e : all_jobs) {
+
+ auto& job = *e;
+ dout(20) << __func__ << ": examine " << job.pgid << dendl;
+
+ PGRef pg = _lookup_lock_pg(job.pgid);
+ if (!pg)
+ continue;
+
+ if (!pg->m_planned_scrub.must_scrub && !pg->m_planned_scrub.need_auto) {
+ dout(15) << __func__ << ": reschedule " << job.pgid << dendl;
+ pg->reschedule_scrub();
+ }
+ pg->unlock();
}
dout(10) << __func__ << ": done" << dendl;
}
#include "common/EventTrace.h"
#include "osd/osd_perf_counters.h"
#include "common/Finisher.h"
+#include "scrubber/osd_scrub_sched.h"
#define CEPH_OSD_PROTOCOL 10 /* cluster internal */
}
entity_name_t get_cluster_msgr_name() const;
-private:
- // -- scrub scheduling --
- ceph::mutex sched_scrub_lock = ceph::make_mutex("OSDService::sched_scrub_lock");
- int scrubs_local;
- int scrubs_remote;
public:
- struct ScrubJob {
- CephContext* cct;
- /// pg to be scrubbed
- spg_t pgid;
- /// a time scheduled for scrub. but the scrub could be delayed if system
- /// load is too high or it fails to fall in the scrub hours
- utime_t sched_time;
- /// the hard upper bound of scrub time
- utime_t deadline;
- ScrubJob() : cct(nullptr) {}
- explicit ScrubJob(CephContext* cct, const spg_t& pg,
- const utime_t& timestamp,
- double pool_scrub_min_interval = 0,
- double pool_scrub_max_interval = 0, bool must = true);
- /// order the jobs by sched_time
- bool operator<(const ScrubJob& rhs) const;
- };
- std::set<ScrubJob> sched_scrub_pg;
-
- /// @returns the scrub_reg_stamp used for unregistering the scrub job
- utime_t reg_pg_scrub(spg_t pgid,
- utime_t t,
- double pool_scrub_min_interval,
- double pool_scrub_max_interval,
- bool must) {
- ScrubJob scrub_job(cct, pgid, t, pool_scrub_min_interval, pool_scrub_max_interval,
- must);
- std::lock_guard l(OSDService::sched_scrub_lock);
- sched_scrub_pg.insert(scrub_job);
- return scrub_job.sched_time;
- }
-
- void unreg_pg_scrub(spg_t pgid, utime_t t) {
- std::lock_guard l(sched_scrub_lock);
- size_t removed = sched_scrub_pg.erase(ScrubJob(cct, pgid, t));
- ceph_assert(removed);
- }
-
- bool first_scrub_stamp(ScrubJob *out) {
- std::lock_guard l(sched_scrub_lock);
- if (sched_scrub_pg.empty())
- return false;
- std::set<ScrubJob>::iterator iter = sched_scrub_pg.begin();
- *out = *iter;
- return true;
- }
- bool next_scrub_stamp(const ScrubJob& next,
- ScrubJob *out) {
- std::lock_guard l(sched_scrub_lock);
- if (sched_scrub_pg.empty())
- return false;
- std::set<ScrubJob>::const_iterator iter = sched_scrub_pg.upper_bound(next);
- if (iter == sched_scrub_pg.cend())
- return false;
- *out = *iter;
- return true;
- }
-
- void dumps_scrub(ceph::Formatter* f);
-
- bool can_inc_scrubs();
- bool inc_scrubs_local();
- void dec_scrubs_local();
- bool inc_scrubs_remote();
- void dec_scrubs_remote();
- void dump_scrub_reservations(ceph::Formatter *f);
void reply_op_error(OpRequestRef op, int err);
void reply_op_error(OpRequestRef op, int err, eversion_t v, version_t uv,
std::vector<pg_log_op_return_item_t> op_returns);
void handle_misdirected_op(PG *pg, OpRequestRef op);
+ private:
+ /**
+ * The entity that maintains the set of PGs we may scrub (i.e. - those that we
+ * are their primary), and schedules their scrubbing.
+ */
+ ScrubQueue m_scrub_queue;
+
+ public:
+ ScrubQueue& get_scrub_services() { return m_scrub_queue; }
-private:
+ /**
+ * A callback used by the ScrubQueue object to initiate a scrub on a specific PG.
+ *
+ * The request might fail for multiple reasons, as ScrubQueue cannot by its own
+ * check some of the PG-specific preconditions and those are checked here. See
+ * attempt_t definition.
+ *
+ * @param pgid to scrub
+ * @param allow_requested_repair_only
+ * @return a Scrub::attempt_t detailing either a success, or the failure reason.
+ */
+ Scrub::schedule_result_t initiate_a_scrub(spg_t pgid, bool allow_requested_repair_only);
+
+
+ private:
// -- agent shared state --
ceph::mutex agent_lock = ceph::make_mutex("OSDService::agent_lock");
ceph::condition_variable agent_cond;
return service.get_tid();
}
- double scrub_sleep_time(bool must_scrub);
-
// -- generic pg peering --
void dispatch_context(PeeringCtx &ctx, PG *pg, OSDMapRef curmap,
ThreadPool::TPHandle *handle = NULL);
void sched_scrub();
void resched_all_scrubs();
bool scrub_random_backoff();
- bool scrub_load_below_threshold();
- bool scrub_time_permit(utime_t now);
// -- status reporting --
MPGStats *collect_pg_stats();
* Unless failing to start scrubbing, the 'planned scrub' flag-set is 'frozen' into
* PgScrubber's m_flags, then cleared.
*/
-bool PG::sched_scrub()
+Scrub::schedule_result_t PG::sched_scrub()
{
dout(15) << __func__ << " pg(" << info.pgid
<< (is_active() ? ") <active>" : ") <not-active>")
<< (is_clean() ? " <clean>" : " <not-clean>") << dendl;
ceph_assert(ceph_mutex_is_locked(_lock));
- if (m_scrubber && m_scrubber->is_scrub_active()) {
- return false;
- }
-
if (!is_primary() || !is_active() || !is_clean()) {
- return false;
+ return Scrub::schedule_result_t::bad_pg_state;
}
if (scrub_queued) {
// only applicable to the very first time a scrub event is queued
// (until handled and posted to the scrub FSM)
dout(10) << __func__ << ": already queued" << dendl;
- return false;
+ return Scrub::schedule_result_t::already_started;
}
// analyse the combination of the requested scrub flags, the osd/pool configuration
// (due to configuration or priority issues)
// The reason was already reported by the callee.
dout(10) << __func__ << ": failed to initiate a scrub" << dendl;
- return false;
+ return Scrub::schedule_result_t::preconditions;
}
// try to reserve the local OSD resources. If failing: no harm. We will
// be retried by the OSD later on.
if (!m_scrubber->reserve_local()) {
dout(10) << __func__ << ": failed to reserve locally" << dendl;
- return false;
+ return Scrub::schedule_result_t::no_local_resources;
}
// can commit to the updated flags now, as nothing will stop the scrub
scrub_queued = true;
osd->queue_for_scrub(this, Scrub::scrub_prio_t::low_priority);
- return true;
+ return Scrub::schedule_result_t::scrub_initiated;
}
double PG::next_deepscrub_interval() const
return upd_flags;
}
-void PG::reg_next_scrub()
+/*
+ * Note: on_info_history_change() is used in those two cases where we're not sure
+ * whether the role of the PG was changed, and if so - was this change relayed to the
+ * scrub-queue.
+ */
+void PG::on_info_history_change()
+{
+ dout(20) << __func__ << " for a " << (is_primary() ? "Primary" : "non-primary") <<dendl;
+
+ if (m_scrubber) {
+ m_scrubber->on_maybe_registration_change(m_planned_scrub);
+ }
+}
+
+void PG::reschedule_scrub()
{
- m_scrubber->reg_next_scrub(m_planned_scrub);
+ dout(20) << __func__ << " for a " << (is_primary() ? "Primary" : "non-primary") <<dendl;
+
+ // we are assuming no change in primary status
+ if (is_primary() && m_scrubber) {
+ m_scrubber->update_scrub_job(m_planned_scrub);
+ }
}
-void PG::on_info_history_change()
+void PG::on_primary_status_change(bool was_primary, bool now_primary)
{
- dout(20) << __func__ << dendl;
- if (m_scrubber) {
- m_scrubber->unreg_next_scrub();
- m_scrubber->reg_next_scrub(m_planned_scrub);
+ // make sure we have a working scrubber when becoming a primary
+ ceph_assert(m_scrubber || !now_primary);
+
+ if ((was_primary != now_primary) && m_scrubber) {
+ m_scrubber->on_primary_change(m_planned_scrub);
}
}
scrub_queued = false;
projected_last_update = eversion_t();
cancel_recovery();
+ if (m_scrubber) {
+ m_scrubber->on_maybe_registration_change(m_planned_scrub);
+ }
}
epoch_t PG::oldest_stored_osdmap() {
class ReplicaReservations;
class LocalReservation;
class ReservedByRemotePrimary;
+ enum class schedule_result_t;
}
#ifdef PG_DEBUG_REFS
forward_scrub_event(&ScrubPgIF::send_chunk_busy, queued, "ChunkIsBusy");
}
- void reg_next_scrub();
-
void queue_want_pg_temp(const std::vector<int> &wanted) override;
void clear_want_pg_temp() override;
void on_info_history_change() override;
+ void on_primary_status_change(bool was_primary, bool now_primary) override;
+
+ void reschedule_scrub() override;
+
void scrub_requested(scrub_level_t scrub_level, scrub_type_t scrub_type) override;
uint64_t get_snap_trimq_size() const override {
virtual void on_shutdown() = 0;
bool get_must_scrub() const;
- bool sched_scrub();
+ Scrub::schedule_result_t sched_scrub();
unsigned int scrub_requeue_priority(Scrub::scrub_prio_t with_priority, unsigned int suggested_priority) const;
/// the version that refers to flags_.priority
// did primary change?
if (was_old_primary != is_primary()) {
state_clear(PG_STATE_CLEAN);
+ // queue/dequeue the scrubber
+ pl->on_primary_status_change(was_old_primary, is_primary());
}
pl->on_role_change();
}
}
+ if (is_primary() && was_old_primary) {
+ pl->reschedule_scrub();
+ }
+
if (acting.empty() && !up.empty() && up_primary == pg_whoami) {
psdout(10) << " acting empty, but i am up[0], clearing pg_temp" << dendl;
pl->queue_want_pg_temp(acting);
if (f(info.history, info.stats)) {
pl->publish_stats_to_osd();
}
- pl->on_info_history_change();
+ pl->reschedule_scrub();
if (t) {
dirty_info = true;
/// Notify that info/history changed (generally to update scrub registration)
virtual void on_info_history_change() = 0;
+
+ /// Notify PG that Primary/Replica status has changed (to update scrub registration)
+ virtual void on_primary_status_change(bool was_primary, bool now_primary) = 0;
+
+ /// Need to reschedule next scrub. Assuming no change in role
+ virtual void reschedule_scrub() = 0;
+
/// Notify that a scrub has been requested
virtual void scrub_requested(scrub_level_t scrub_level, scrub_type_t scrub_type) = 0;
* Foundation. See file COPYING.
*
*/
-
#include "PrimaryLogPG.h"
+#include <errno.h>
+
+#include <charconv>
+#include <sstream>
+#include <utility>
+
#include <boost/intrusive_ptr.hpp>
+#include <boost/tuple/tuple.hpp>
#include "cls/cas/cls_cas_ops.h"
+#include "common/CDC.h"
#include "common/EventTrace.h"
#include "common/ceph_crypto.h"
-#include "common/CDC.h"
#include "common/config.h"
#include "common/errno.h"
-#include "common/EventTrace.h"
#include "common/perf_counters.h"
#include "common/scrub_types.h"
#include "include/compat.h"
+#include "json_spirit/json_spirit_reader.h"
+#include "json_spirit/json_spirit_value.h"
#include "messages/MCommandReply.h"
#include "messages/MOSDBackoff.h"
#include "messages/MOSDOp.h"
#include "mon/MonClient.h"
#include "objclass/objclass.h"
#include "osd/ClassHandler.h"
-#include "osd/OpRequest.h"
-#include "osd/Session.h"
#include "osdc/Objecter.h"
#include "scrubber/PrimaryLogScrub.h"
+#include "scrubber/ScrubStore.h"
+#include "scrubber/pg_scrubber.h"
+
+#include "OSD.h"
+#include "OpRequest.h"
+#include "PG.h"
+#include "Session.h"
// required includes order:
#include "json_spirit/json_spirit_value.h"
}
m_scrubber->scrub_clear_state();
-
- m_scrubber->unreg_next_scrub();
+ m_scrubber->rm_from_osd_scrubbing();
vector<ceph_tid_t> tids;
cancel_copy_ops(false, &tids);
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+#include "./osd_scrub_sched.h"
+
+#include "include/utime.h"
+#include "osd/OSD.h"
+
+#include "pg_scrubber.h"
+
+using namespace ::std::literals;
+
+// ////////////////////////////////////////////////////////////////////////// //
+// ScrubJob
+
+#define dout_context (cct)
+#define dout_subsys ceph_subsys_osd
+#undef dout_prefix
+#define dout_prefix *_dout << "osd." << whoami << " "
+
+ScrubQueue::ScrubJob::ScrubJob(CephContext* cct, const spg_t& pg, int node_id)
+ : RefCountedObject{cct}, pgid{pg}, whoami{node_id}, cct{cct}
+{}
+
+// debug usage only
+ostream& operator<<(ostream& out, const ScrubQueue::ScrubJob& sjob)
+{
+ out << sjob.pgid << ", " << sjob.schedule.scheduled_at
+ << " dead: " << sjob.schedule.deadline << " - " << sjob.registration_state()
+ << " / failure: " << sjob.resources_failure
+ << " / pen. t.o.: " << sjob.penalty_timeout
+ << " / queue state: " << ScrubQueue::qu_state_text(sjob.state);
+
+ return out;
+}
+
+void ScrubQueue::ScrubJob::update_schedule(
+ const ScrubQueue::scrub_schedule_t& adjusted)
+{
+ schedule = adjusted;
+ penalty_timeout = utime_t(0, 0); // helps with debugging
+
+ // 'updated' is changed here while not holding jobs_lock. That's OK, as
+ // the (atomic) flag will only be cleared by select_pg_and_scrub() after
+ // scan_penalized() is called and the job was moved to the to_scrub queue.
+ updated = true;
+
+ dout(10) << " pg[" << pgid << "] adjusted: " << schedule.scheduled_at << " "
+ << registration_state() << dendl;
+}
+
+// ////////////////////////////////////////////////////////////////////////// //
+// ScrubQueue
+
+#undef dout_context
+#define dout_context (cct)
+#undef dout_prefix
+#define dout_prefix \
+ *_dout << "osd." << osd_service.whoami << " scrub-queue::" << __func__ << " "
+
+
+ScrubQueue::ScrubQueue(CephContext* cct, OSDService& osds)
+ : cct{cct}, osd_service{osds}
+{
+ // initialize the daily loadavg with current 15min loadavg
+ if (double loadavgs[3]; getloadavg(loadavgs, 3) == 3) {
+ daily_loadavg = loadavgs[2];
+ } else {
+ derr << "OSD::init() : couldn't read loadavgs\n" << dendl;
+ daily_loadavg = 1.0;
+ }
+}
+
+std::optional<double> ScrubQueue::update_load_average()
+{
+ int hb_interval = cct->_conf->osd_heartbeat_interval;
+ int n_samples = 60 * 24 * 24;
+ if (hb_interval > 1) {
+ n_samples /= hb_interval;
+ if (n_samples < 1)
+ n_samples = 1;
+ }
+
+ // get CPU load avg
+ double loadavg;
+ if (getloadavg(&loadavg, 1) == 1) {
+ daily_loadavg = (daily_loadavg * (n_samples - 1) + loadavg) / n_samples;
+ dout(17) << "heartbeat: daily_loadavg " << daily_loadavg << dendl;
+ return 100 * loadavg;
+ }
+
+ return std::nullopt;
+}
+
+/*
+ * Modify the scrub job state:
+ * - if 'registered' (as expected): mark as 'unregistering'. The job will be
+ * dequeued the next time sched_scrub() is called.
+ * - if already 'not_registered': shouldn't really happen, but not a problem.
+ * The state will not be modified.
+ * - same for 'unregistering'.
+ *
+ * Note: not holding the jobs lock
+ */
+void ScrubQueue::remove_from_osd_queue(ScrubJobRef scrub_job)
+{
+ dout(15) << "removing pg[" << scrub_job->pgid << "] from OSD scrub queue"
+ << dendl;
+
+ qu_state_t expected_state{qu_state_t::registered};
+ auto ret = scrub_job->state.compare_exchange_strong(expected_state,
+ qu_state_t::unregistering);
+
+ if (ret) {
+
+ dout(10) << "pg[" << scrub_job->pgid << "] sched-state changed from "
+ << qu_state_text(expected_state) << " to "
+ << qu_state_text(scrub_job->state) << dendl;
+
+ } else {
+
+ // job wasn't in state 'registered' coming in
+ dout(5) << "removing pg[" << scrub_job->pgid
+ << "] failed. State was: " << qu_state_text(expected_state) << dendl;
+ }
+}
+
+void ScrubQueue::register_with_osd(ScrubJobRef scrub_job,
+ const ScrubQueue::sched_params_t& suggested)
+{
+ qu_state_t state_at_entry = scrub_job->state.load();
+
+ dout(15) << "pg[" << scrub_job->pgid << "] was "
+ << qu_state_text(state_at_entry) << dendl;
+
+ switch (state_at_entry) {
+ case qu_state_t::registered:
+ // just updating the schedule?
+ update_job(scrub_job, suggested);
+ break;
+
+ case qu_state_t::not_registered:
+ // insertion under lock
+ {
+ std::unique_lock lck{jobs_lock};
+
+ if (state_at_entry != scrub_job->state) {
+ lck.unlock();
+ dout(5) << " scrub job state changed" << dendl;
+ // retry
+ register_with_osd(scrub_job, suggested);
+ break;
+ }
+
+ update_job(scrub_job, suggested);
+ to_scrub.push_back(scrub_job);
+ scrub_job->in_queues = true;
+ scrub_job->state = qu_state_t::registered;
+ }
+
+ break;
+
+ case qu_state_t::unregistering:
+ // restore to the to_sched queue
+ {
+ // must be under lock, as the job might be removed from the queue
+ // at any minute
+ std::lock_guard lck{jobs_lock};
+
+ update_job(scrub_job, suggested);
+ if (scrub_job->state == qu_state_t::not_registered) {
+ dout(5) << " scrub job state changed to 'not registered'" << dendl;
+ to_scrub.push_back(scrub_job);
+ }
+ scrub_job->in_queues = true;
+ scrub_job->state = qu_state_t::registered;
+ }
+ break;
+ }
+
+ dout(10) << "pg(" << scrub_job->pgid << ") sched-state changed from "
+ << qu_state_text(state_at_entry) << " to "
+ << qu_state_text(scrub_job->state)
+ << " at: " << scrub_job->schedule.scheduled_at << dendl;
+}
+
+// look mommy - no locks!
+void ScrubQueue::update_job(ScrubJobRef scrub_job,
+ const ScrubQueue::sched_params_t& suggested)
+{
+ // adjust the suggested scrub time according to OSD-wide status
+ auto adjusted = adjust_target_time(suggested);
+ scrub_job->update_schedule(adjusted);
+}
+
+// used under jobs_lock
+void ScrubQueue::move_failed_pgs(utime_t now_is)
+{
+ int punished_cnt{0}; // for log/debug only
+
+ for (auto job = to_scrub.begin(); job != to_scrub.end();) {
+ if ((*job)->resources_failure) {
+ auto sjob = *job;
+
+ // last time it was scheduled for a scrub, this PG failed in securing
+ // remote resources. Move it to the secondary scrub queue.
+
+ dout(15) << "moving " << sjob->pgid
+ << " state: " << ScrubQueue::qu_state_text(sjob->state) << dendl;
+
+ // determine the penalty time, after which the job should be reinstated
+ utime_t after = now_is;
+ after += cct->_conf->osd_scrub_sleep * 2 + utime_t{300'000ms};
+
+ // note: currently - not taking 'deadline' into account when determining
+ // 'penalty_timeout'.
+ sjob->penalty_timeout = after;
+ sjob->resources_failure = false;
+ sjob->updated = false; // as otherwise will be pardoned immediately
+
+ // place in the penalty list, and remove from the to-scrub group
+ penalized.push_back(sjob);
+ job = to_scrub.erase(job);
+ punished_cnt++;
+ } else {
+ job++;
+ }
+ }
+
+ if (punished_cnt) {
+ dout(15) << "# of jobs penalized: " << punished_cnt << dendl;
+ }
+}
+
+// clang-format off
+/*
+ * Implementation note:
+ * Clang (10 & 11) produces here efficient table-based code, comparable to using
+ * a direct index into an array of strings.
+ * Gcc (11, trunk) is almost as efficient.
+ */
+std::string_view ScrubQueue::attempt_res_text(Scrub::schedule_result_t v)
+{
+ switch (v) {
+ case Scrub::schedule_result_t::scrub_initiated: return "scrubbing"sv;
+ case Scrub::schedule_result_t::none_ready: return "no ready job"sv;
+ case Scrub::schedule_result_t::no_local_resources: return "local resources shortage"sv;
+ case Scrub::schedule_result_t::already_started: return "denied as already started"sv;
+ case Scrub::schedule_result_t::no_such_pg: return "pg not found"sv;
+ case Scrub::schedule_result_t::bad_pg_state: return "prevented by pg state"sv;
+ case Scrub::schedule_result_t::preconditions: return "preconditions not met"sv;
+ }
+ // g++ (unlike CLANG), requires an extra 'return' here
+ return "(unknown)"sv;
+}
+
+std::string_view ScrubQueue::qu_state_text(qu_state_t st)
+{
+ switch (st) {
+ case qu_state_t::not_registered: return "not registered w/ OSD"sv;
+ case qu_state_t::registered: return "registered"sv;
+ case qu_state_t::unregistering: return "unregistering"sv;
+ }
+ // g++ (unlike CLANG), requires an extra 'return' here
+ return "(unknown)"sv;
+}
+// clang-format on
+
+/**
+ * a note regarding 'to_scrub_copy':
+ * 'to_scrub_copy' is a sorted set of all the ripe jobs from to_copy.
+ * As we usually expect to refer to only the first job in this set, we could
+ * consider an alternative implementation:
+ * - have collect_ripe_jobs() return the copied set without sorting it;
+ * - loop, performing:
+ * - use std::min_element() to find a candidate;
+ * - try that one. If not suitable, discard from 'to_scrub_copy'
+ */
+Scrub::schedule_result_t ScrubQueue::select_pg_and_scrub(
+ Scrub::ScrubPreconds& preconds)
+{
+ dout(10) << " reg./pen. sizes: " << to_scrub.size() << " / " << penalized.size()
+ << dendl;
+
+ utime_t now_is = ceph_clock_now();
+
+ preconds.time_permit = scrub_time_permit(now_is);
+ preconds.load_is_low = scrub_load_below_threshold();
+ preconds.only_deadlined = !preconds.time_permit || !preconds.load_is_low;
+
+ // create a list of candidates (copying, as otherwise creating a deadlock):
+ // - possibly restore penalized
+ // - (if we didn't handle directly) remove invalid jobs
+ // - create a copy of the to_scrub (possibly up to first not-ripe)
+ // - same for the penalized (although that usually be a waste)
+ // unlock, then try the lists
+
+ std::unique_lock lck{jobs_lock};
+
+ // pardon all penalized jobs that have deadlined (or were updated)
+ scan_penalized(restore_penalized, now_is);
+ restore_penalized = false;
+
+ // remove the 'updated' flag from all entries
+ std::for_each(to_scrub.begin(), to_scrub.end(),
+ [](const auto& jobref) -> void { jobref->updated = false; });
+
+ // add failed scrub attempts to the penalized list
+ move_failed_pgs(now_is);
+
+ // collect all valid & ripe jobs from the two lists. Note that we must copy,
+ // as when we use the lists we will not be holding jobs_lock (see
+ // explanation above)
+
+ auto to_scrub_copy = collect_ripe_jobs(to_scrub, now_is);
+ auto penalized_copy = collect_ripe_jobs(penalized, now_is);
+ lck.unlock();
+
+ // try the regular queue first
+ auto res = select_from_group(to_scrub_copy, preconds, now_is);
+
+ // in the sole scenario in which we've gone over all ripe jobs without success
+ // - we will try the penalized
+ if (res == Scrub::schedule_result_t::none_ready && !penalized_copy.empty()) {
+ res = select_from_group(penalized_copy, preconds, now_is);
+ dout(10) << "tried the penalized. Res: " << ScrubQueue::attempt_res_text(res)
+ << dendl;
+ restore_penalized = true;
+ }
+
+ dout(15) << dendl;
+ return res;
+}
+
+// must be called under lock
+void ScrubQueue::rm_unregistered_jobs(ScrubQContainer& group)
+{
+ std::for_each(group.begin(), group.end(), [](auto& job) {
+ if (job->state == qu_state_t::unregistering) {
+ job->in_queues = false;
+ job->state = qu_state_t::not_registered;
+ } else if (job->state == qu_state_t::not_registered) {
+ job->in_queues = false;
+ }
+ });
+
+ group.erase(std::remove_if(group.begin(), group.end(), invalid_state),
+ group.end());
+}
+
+namespace {
+struct cmp_sched_time_t {
+ bool operator()(const ScrubQueue::ScrubJobRef& lhs,
+ const ScrubQueue::ScrubJobRef& rhs) const
+ {
+ return lhs->schedule.scheduled_at < rhs->schedule.scheduled_at;
+ }
+};
+} // namespace
+
+// called under lock
+ScrubQueue::ScrubQContainer ScrubQueue::collect_ripe_jobs(ScrubQContainer& group,
+ utime_t time_now)
+{
+ rm_unregistered_jobs(group);
+
+ // copy ripe jobs
+ ScrubQueue::ScrubQContainer ripes;
+ ripes.reserve(group.size());
+
+ std::copy_if(group.begin(), group.end(), std::back_inserter(ripes),
+ [time_now](const auto& jobref) -> bool {
+ return jobref->schedule.scheduled_at <= time_now;
+ });
+ std::sort(ripes.begin(), ripes.end(), cmp_sched_time_t{});
+
+ if (g_conf()->subsys.should_gather<ceph_subsys_osd, 20>()) {
+ for (const auto& jobref : group) {
+ if (jobref->schedule.scheduled_at > time_now) {
+ dout(20) << " not ripe: " << jobref->pgid << " @ "
+ << jobref->schedule.scheduled_at << dendl;
+ }
+ }
+ }
+
+ return ripes;
+}
+
+// not holding jobs_lock. 'group' is a copy of the actual list.
+Scrub::schedule_result_t ScrubQueue::select_from_group(
+ ScrubQContainer& group, const Scrub::ScrubPreconds& preconds, utime_t now_is)
+{
+ dout(15) << "jobs #: " << group.size() << dendl;
+
+ for (auto& candidate : group) {
+
+ // we expect the first job in the list to be a good candidate (if any)
+
+ dout(20) << "try initiating scrub for " << candidate->pgid << dendl;
+
+ if (preconds.only_deadlined && (candidate->schedule.deadline.is_zero() ||
+ candidate->schedule.deadline >= now_is)) {
+ dout(15) << " not scheduling scrub for " << candidate->pgid << " due to "
+ << (preconds.time_permit ? "high load" : "time not permitting")
+ << dendl;
+ continue;
+ }
+
+ // we have a candidate to scrub. We turn to the OSD to verify that the PG
+ // configuration allows the specified type of scrub, and to initiate the
+ // scrub.
+ switch (osd_service.initiate_a_scrub(candidate->pgid,
+ preconds.allow_requested_repair_only)) {
+
+ case Scrub::schedule_result_t::scrub_initiated:
+ // the happy path. We are done
+ dout(20) << " initiated for " << candidate->pgid << dendl;
+ return Scrub::schedule_result_t::scrub_initiated;
+
+ case Scrub::schedule_result_t::already_started:
+ case Scrub::schedule_result_t::preconditions:
+ case Scrub::schedule_result_t::bad_pg_state:
+ // continue with the next job
+ dout(20) << "failed (state/cond/started) " << candidate->pgid << dendl;
+ break;
+
+ case Scrub::schedule_result_t::no_such_pg:
+ // The pg is no longer there
+ dout(20) << "failed (no pg) " << candidate->pgid << dendl;
+ break;
+
+ case Scrub::schedule_result_t::no_local_resources:
+ // failure to secure local resources. No point in trying the other
+ // PGs at this time. Note that this is not the same as replica resources
+ // failure!
+ dout(20) << "failed (local) " << candidate->pgid << dendl;
+ return Scrub::schedule_result_t::no_local_resources;
+
+ case Scrub::schedule_result_t::none_ready:
+ // can't happen. Just for the compiler.
+ dout(5) << "failed !!! " << candidate->pgid << dendl;
+ return Scrub::schedule_result_t::none_ready;
+ }
+ }
+
+ dout(20) << " returning 'none ready' " << dendl;
+ return Scrub::schedule_result_t::none_ready;
+}
+
+ScrubQueue::scrub_schedule_t ScrubQueue::adjust_target_time(
+ const sched_params_t& times) const
+{
+ ScrubQueue::scrub_schedule_t sched_n_dead{times.proposed_time,
+ times.proposed_time};
+
+ if (g_conf()->subsys.should_gather<ceph_subsys_osd, 20>()) {
+ dout(20) << "min t: " << times.min_interval
+ << " osd: " << cct->_conf->osd_scrub_min_interval
+ << " max t: " << times.max_interval
+ << " osd: " << cct->_conf->osd_scrub_max_interval << dendl;
+
+ dout(20) << "at " << sched_n_dead.scheduled_at << " ratio "
+ << cct->_conf->osd_scrub_interval_randomize_ratio << dendl;
+ }
+
+ if (times.is_must == ScrubQueue::must_scrub_t::not_mandatory) {
+
+ // if not explicitly requested, postpone the scrub with a random delay
+ double scrub_min_interval = times.min_interval > 0
+ ? times.min_interval
+ : cct->_conf->osd_scrub_min_interval;
+ double scrub_max_interval = times.max_interval > 0
+ ? times.max_interval
+ : cct->_conf->osd_scrub_max_interval;
+
+ sched_n_dead.scheduled_at += scrub_min_interval;
+ double r = rand() / (double)RAND_MAX;
+ sched_n_dead.scheduled_at +=
+ scrub_min_interval * cct->_conf->osd_scrub_interval_randomize_ratio * r;
+
+ if (scrub_max_interval <= 0) {
+ sched_n_dead.deadline = utime_t{};
+ } else {
+ sched_n_dead.deadline += scrub_max_interval;
+ }
+ }
+
+ dout(17) << "at (final) " << sched_n_dead.scheduled_at << " - "
+ << sched_n_dead.deadline << dendl;
+ return sched_n_dead;
+}
+
+double ScrubQueue::scrub_sleep_time(bool must_scrub) const
+{
+ double regular_sleep_period = cct->_conf->osd_scrub_sleep;
+
+ if (must_scrub || scrub_time_permit(ceph_clock_now())) {
+ return regular_sleep_period;
+ }
+
+ // relevant if scrubbing started during allowed time, but continued into
+ // forbidden hours
+ double extended_sleep = cct->_conf->osd_scrub_extended_sleep;
+ dout(20) << "w/ extended sleep (" << extended_sleep << ")" << dendl;
+ return std::max(extended_sleep, regular_sleep_period);
+}
+
+bool ScrubQueue::scrub_load_below_threshold() const
+{
+ double loadavgs[3];
+ if (getloadavg(loadavgs, 3) != 3) {
+ dout(10) << __func__ << " couldn't read loadavgs\n" << dendl;
+ return false;
+ }
+
+ // allow scrub if below configured threshold
+ long cpus = sysconf(_SC_NPROCESSORS_ONLN);
+ double loadavg_per_cpu = cpus > 0 ? loadavgs[0] / cpus : loadavgs[0];
+ if (loadavg_per_cpu < cct->_conf->osd_scrub_load_threshold) {
+ dout(20) << "loadavg per cpu " << loadavg_per_cpu << " < max "
+ << cct->_conf->osd_scrub_load_threshold << " = yes" << dendl;
+ return true;
+ }
+
+ // allow scrub if below daily avg and currently decreasing
+ if (loadavgs[0] < daily_loadavg && loadavgs[0] < loadavgs[2]) {
+ dout(20) << "loadavg " << loadavgs[0] << " < daily_loadavg " << daily_loadavg
+ << " and < 15m avg " << loadavgs[2] << " = yes" << dendl;
+ return true;
+ }
+
+ dout(20) << "loadavg " << loadavgs[0] << " >= max "
+ << cct->_conf->osd_scrub_load_threshold << " and ( >= daily_loadavg "
+ << daily_loadavg << " or >= 15m avg " << loadavgs[2] << ") = no"
+ << dendl;
+ return false;
+}
+
+
+// note: called with jobs_lock held
+void ScrubQueue::scan_penalized(bool forgive_all, utime_t time_now)
+{
+ dout(20) << time_now << (forgive_all ? " all " : " - ") << penalized.size()
+ << dendl;
+
+ // clear dead entries (deleted PGs, or those PGs we are no longer their
+ // primary)
+ rm_unregistered_jobs(penalized);
+
+ if (forgive_all) {
+
+ std::copy(penalized.begin(), penalized.end(), std::back_inserter(to_scrub));
+ penalized.clear();
+
+ } else {
+
+ auto forgiven_last = std::partition(
+ penalized.begin(), penalized.end(), [time_now](const auto& e) {
+ return (*e).updated || ((*e).penalty_timeout <= time_now);
+ });
+
+ std::copy(penalized.begin(), forgiven_last, std::back_inserter(to_scrub));
+ penalized.erase(penalized.begin(), forgiven_last);
+ dout(20) << "penalized after screening: " << penalized.size() << dendl;
+ }
+}
+
+// checks for half-closed ranges. Modify the (p<till)to '<=' to check for
+// closed.
+static inline bool isbetween_modulo(int64_t from, int64_t till, int p)
+{
+ // the 1st condition is because we have defined from==till as "always true"
+ return (till == from) || ((till >= from) ^ (p >= from) ^ (p < till));
+}
+
+bool ScrubQueue::scrub_time_permit(utime_t now) const
+{
+ tm bdt;
+ time_t tt = now.sec();
+ localtime_r(&tt, &bdt);
+
+ bool day_permit =
+ isbetween_modulo(cct->_conf->osd_scrub_begin_week_day,
+ cct->_conf->osd_scrub_end_week_day, bdt.tm_wday);
+ if (!day_permit) {
+ dout(20) << "should run between week day "
+ << cct->_conf->osd_scrub_begin_week_day << " - "
+ << cct->_conf->osd_scrub_end_week_day << " now " << bdt.tm_wday
+ << " - no" << dendl;
+ return false;
+ }
+
+ bool time_permit =
+ isbetween_modulo(cct->_conf->osd_scrub_begin_hour,
+ cct->_conf->osd_scrub_end_hour, bdt.tm_hour);
+ dout(20) << "should run between " << cct->_conf->osd_scrub_begin_hour << " - "
+ << cct->_conf->osd_scrub_end_hour << " now (" << bdt.tm_hour
+ << ") = " << (time_permit ? "yes" : "no") << dendl;
+ return time_permit;
+}
+
+void ScrubQueue::ScrubJob::dump(ceph::Formatter* f) const
+{
+ f->open_object_section("scrub");
+ f->dump_stream("pgid") << pgid;
+ f->dump_stream("sched_time") << schedule.scheduled_at;
+ f->dump_stream("deadline") << schedule.deadline;
+ f->dump_bool("forced", schedule.scheduled_at == PgScrubber::scrub_must_stamp());
+ f->close_section();
+}
+
+void ScrubQueue::dump_scrubs(ceph::Formatter* f) const
+{
+ ceph_assert(f != nullptr);
+ std::lock_guard lck(jobs_lock);
+
+ f->open_array_section("scrubs");
+
+ std::for_each(to_scrub.cbegin(), to_scrub.cend(),
+ [&f](const ScrubJobRef& j) { j->dump(f); });
+
+ std::for_each(penalized.cbegin(), penalized.cend(),
+ [&f](const ScrubJobRef& j) { j->dump(f); });
+
+ f->close_section();
+}
+
+ScrubQueue::ScrubQContainer ScrubQueue::list_registered_jobs() const
+{
+ ScrubQueue::ScrubQContainer all_jobs;
+ all_jobs.reserve(to_scrub.size() + penalized.size());
+ dout(20) << " size: " << all_jobs.capacity() << dendl;
+
+ std::lock_guard lck{jobs_lock};
+
+ std::copy_if(to_scrub.begin(), to_scrub.end(), std::back_inserter(all_jobs),
+ registered_job);
+ std::copy_if(penalized.begin(), penalized.end(), std::back_inserter(all_jobs),
+ registered_job);
+
+ return all_jobs;
+}
+
+// ////////////////////////////////////////////////////////////////////////// //
+// ScrubJob - scrub resource management
+
+bool ScrubQueue::can_inc_scrubs() const
+{
+ // consider removing the lock here. Caller already handles delayed
+ // inc_scrubs_local() failures
+ std::lock_guard lck{resource_lock};
+
+ if (scrubs_local + scrubs_remote < cct->_conf->osd_max_scrubs) {
+ return true;
+ }
+
+ dout(20) << " == false. " << scrubs_local << " local + " << scrubs_remote
+ << " remote >= max " << cct->_conf->osd_max_scrubs << dendl;
+ return false;
+}
+
+bool ScrubQueue::inc_scrubs_local()
+{
+ std::lock_guard lck{resource_lock};
+
+ if (scrubs_local + scrubs_remote < cct->_conf->osd_max_scrubs) {
+ ++scrubs_local;
+ return true;
+ }
+
+ dout(20) << ": " << scrubs_local << " local + " << scrubs_remote
+ << " remote >= max " << cct->_conf->osd_max_scrubs << dendl;
+ return false;
+}
+
+void ScrubQueue::dec_scrubs_local()
+{
+ std::lock_guard lck{resource_lock};
+ dout(20) << ": " << scrubs_local << " -> " << (scrubs_local - 1) << " (max "
+ << cct->_conf->osd_max_scrubs << ", remote " << scrubs_remote << ")"
+ << dendl;
+
+ --scrubs_local;
+ ceph_assert(scrubs_local >= 0);
+}
+
+bool ScrubQueue::inc_scrubs_remote()
+{
+ std::lock_guard lck{resource_lock};
+
+ if (scrubs_local + scrubs_remote < cct->_conf->osd_max_scrubs) {
+ dout(20) << ": " << scrubs_remote << " -> " << (scrubs_remote + 1) << " (max "
+ << cct->_conf->osd_max_scrubs << ", local " << scrubs_local << ")"
+ << dendl;
+ ++scrubs_remote;
+ return true;
+ }
+
+ dout(20) << ": " << scrubs_local << " local + " << scrubs_remote
+ << " remote >= max " << cct->_conf->osd_max_scrubs << dendl;
+ return false;
+}
+
+void ScrubQueue::dec_scrubs_remote()
+{
+ std::lock_guard lck{resource_lock};
+ dout(20) << ": " << scrubs_remote << " -> " << (scrubs_remote - 1) << " (max "
+ << cct->_conf->osd_max_scrubs << ", local " << scrubs_local << ")"
+ << dendl;
+ --scrubs_remote;
+ ceph_assert(scrubs_remote >= 0);
+}
+
+void ScrubQueue::dump_scrub_reservations(ceph::Formatter* f) const
+{
+ std::lock_guard lck{resource_lock};
+ f->dump_int("scrubs_local", scrubs_local);
+ f->dump_int("scrubs_remote", scrubs_remote);
+ f->dump_int("osd_max_scrubs", cct->_conf->osd_max_scrubs);
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <atomic>
+#include <chrono>
+#include <memory>
+#include <optional>
+#include <vector>
+
+#include "common/RefCountedObj.h"
+#include "common/ceph_atomic.h"
+#include "osd/osd_types.h"
+#include "osd/scrubber_common.h"
+
+#include "utime.h"
+
+class PG;
+
+namespace Scrub {
+
+using namespace ::std::literals;
+
+// possible outcome when trying to select a PG and scrub it
+enum class schedule_result_t {
+ scrub_initiated, // successfully started a scrub
+ none_ready, // no pg to scrub
+ no_local_resources, // failure to secure local OSD scrub resource
+ already_started, // failed, as already started scrubbing this pg
+ no_such_pg, // can't find this pg
+ bad_pg_state, // pg state (clean, active, etc.)
+ preconditions // time, configuration, etc.
+};
+
+} // namespace Scrub
+
+/**
+ * the queue of PGs waiting to be scrubbed.
+ * Main operations are scheduling/unscheduling a PG to be scrubbed at a certain
+ * time.
+ *
+ * A "penalty" queue maintains those PGs that have failed to reserve the
+ * resources of their replicas. The PGs in this list will be reinstated into the
+ * scrub queue when all eligible PGs were already handled, or after a timeout
+ * (or if their deadline has passed [[disabled at this time]]).
+ */
+class ScrubQueue {
+ public:
+ enum class must_scrub_t { not_mandatory, mandatory };
+
+ enum class qu_state_t {
+ not_registered, // not a primary, thus not considered for scrubbing by this
+ // OSD (also the temporary state when just created)
+ registered, // in either of the two queues ('to_scrub' or 'penalized')
+ unregistering // in the process of being unregistered. Will be finalized
+ // under lock
+ };
+
+ ScrubQueue(CephContext* cct, OSDService& osds);
+
+ struct scrub_schedule_t {
+ utime_t scheduled_at{};
+ utime_t deadline{0, 0};
+ };
+
+ struct sched_params_t {
+ utime_t proposed_time{};
+ double min_interval{0.0};
+ double max_interval{0.0};
+ must_scrub_t is_must{ScrubQueue::must_scrub_t::not_mandatory};
+ };
+
+ struct ScrubJob final : public RefCountedObject {
+
+ /**
+ * a time scheduled for scrub, and a deadline: The scrub could be delayed if
+ * system load is too high (but not if after the deadline),or if trying to
+ * scrub out of scrub hours.
+ */
+ scrub_schedule_t schedule;
+
+ /// pg to be scrubbed
+ const spg_t pgid;
+
+ /// the OSD id (for the log)
+ const int whoami;
+
+ ceph::atomic<qu_state_t> state{qu_state_t::not_registered};
+
+ /**
+ * the old 'is_registered'. Set whenever the job is registered with the OSD,
+ * i.e. is in either the 'to_scrub' or the 'penalized' vectors.
+ */
+ std::atomic_bool in_queues{false};
+
+ /// last scrub attempt failed to secure replica resources
+ bool resources_failure{false};
+
+ /**
+ * 'updated' is a temporary flag, used to create a barrier after
+ * 'sched_time' and 'deadline' (or any other job entry) were modified by
+ * different task.
+ * 'updated' also signals the need to move a job back from the penalized
+ * queue to the regular one.
+ */
+ std::atomic_bool updated{false};
+
+ utime_t penalty_timeout{0, 0};
+
+ CephContext* cct;
+
+ ScrubJob(CephContext* cct, const spg_t& pg, int node_id);
+
+ utime_t get_sched_time() const { return schedule.scheduled_at; }
+
+ /**
+ * relatively low-cost(*) access to the scrub job's state, to be used in
+ * logging.
+ * (*) not a low-cost access on x64 architecture
+ */
+ std::string_view state_desc() const
+ {
+ return ScrubQueue::qu_state_text(state.load(std::memory_order_relaxed));
+ }
+
+ void update_schedule(const ScrubQueue::scrub_schedule_t& adjusted);
+
+ void dump(ceph::Formatter* f) const;
+
+ /*
+ * as the atomic 'in_queues' appears in many log prints, accessing it for
+ * display-only should be made less expensive (on ARM. On x86 the _relaxed
+ * produces the same code as '_cs')
+ */
+ std::string_view registration_state() const
+ {
+ return in_queues.load(std::memory_order_relaxed) ? " in-queue"
+ : " not-queued";
+ }
+
+ friend std::ostream& operator<<(std::ostream& out, const ScrubJob& pg);
+ };
+
+ friend class TestOSDScrub;
+
+ using ScrubJobRef = ceph::ref_t<ScrubJob>;
+ using ScrubQContainer = std::vector<ScrubJobRef>;
+
+ static std::string_view qu_state_text(qu_state_t st);
+
+ /**
+ * called periodically by the OSD to select the first scrub-eligible PG
+ * and scrub it.
+ *
+ * Selection is affected by:
+ * - time of day: scheduled scrubbing might be configured to only happen
+ * during certain hours;
+ * - same for days of the week, and for the system load;
+ *
+ * @param preconds: what types of scrub are allowed, given system status &
+ * config. Some of the preconditions are calculated here.
+ * @return Scrub::attempt_t::scrubbing if a scrub session was successfully
+ * initiated. Otherwise - the failure cause.
+ *
+ * locking: locks jobs_lock
+ */
+ Scrub::schedule_result_t select_pg_and_scrub(Scrub::ScrubPreconds& preconds);
+
+ /**
+ * Translate attempt_ values into readable text
+ */
+ static std::string_view attempt_res_text(Scrub::schedule_result_t v);
+
+ /**
+ * remove the pg from set of PGs to be scanned for scrubbing.
+ * To be used if we are no longer the PG's primary, or if the PG is removed.
+ */
+ void remove_from_osd_queue(ScrubJobRef sjob);
+
+ /**
+ * @return the list (not std::set!) of all scrub jobs registered
+ * (apart from PGs in the process of being removed)
+ */
+ ScrubQContainer list_registered_jobs() const;
+
+ /**
+ * Add the scrub job to the list of jobs (i.e. list of PGs) to be periodically
+ * scrubbed by the OSD.
+ * The registration is active as long as the PG exists and the OSD is its
+ * primary.
+ *
+ * See update_job() for the handling of the 'suggested' parameter.
+ *
+ * locking: might lock jobs_lock
+ */
+ void register_with_osd(ScrubJobRef sjob, const sched_params_t& suggested);
+
+ /**
+ * modify a scrub-job's schduled time and deadline
+ *
+ * There are 3 argument combinations to consider:
+ * - 'must' is asserted, and the suggested time is 'scrub_must_stamp':
+ * the registration will be with "beginning of time" target, making the
+ * scrub-job eligible to immediate scrub (given that external conditions
+ * do not prevent scrubbing)
+ *
+ * - 'must' is asserted, and the suggested time is 'now':
+ * This happens if our stats are unknown. The results are similar to the
+ * previous scenario.
+ *
+ * - not a 'must': we take the suggested time as a basis, and add to it some
+ * configuration / random delays.
+ *
+ * ('must' is sched_params_t.is_must)
+ *
+ * locking: not using the jobs_lock
+ */
+ void update_job(ScrubJobRef sjob, const sched_params_t& suggested);
+
+ public:
+ void dump_scrubs(ceph::Formatter* f) const;
+
+ /**
+ * No new scrub session will start while a scrub was initiated on a PG,
+ * and that PG is trying to acquire replica resources.
+ */
+ void set_reserving_now() { a_pg_is_reserving = true; }
+ void clear_reserving_now() { a_pg_is_reserving = false; }
+ bool is_reserving_now() const { return a_pg_is_reserving; }
+
+ bool can_inc_scrubs() const;
+ bool inc_scrubs_local();
+ void dec_scrubs_local();
+ bool inc_scrubs_remote();
+ void dec_scrubs_remote();
+ void dump_scrub_reservations(ceph::Formatter* f) const;
+
+ /**
+ * Pacing the scrub operation by inserting delays (mostly between chunks)
+ *
+ * Special handling for regular scrubs that continued into "no scrub" times.
+ * Scrubbing will continue, but the delays will be controlled by a separate
+ * (read - with higher value) configuration element
+ * (osd_scrub_extended_sleep).
+ */
+ double scrub_sleep_time(
+ bool must_scrub) const; /// \todo (future) return milliseconds
+
+ /**
+ * called every heartbeat to update the "daily" load average
+ *
+ * @returns a load value for the logger
+ */
+ [[nodiscard]] std::optional<double> update_load_average();
+
+ private:
+ CephContext* cct;
+ OSDService& osd_service;
+
+ /**
+ * jobs_lock protects the job containers and the relevant scrub-jobs state
+ * variables. Specifically, the following are guaranteed:
+ * - 'in_queues' is asserted only if the job is in one of the queues;
+ * - a job will only be in state 'registered' if in one of the queues;
+ * - no job will be in the two queues simulatenously
+ *
+ * Note that PG locks should not be acquired while holding jobs_lock.
+ */
+ mutable ceph::mutex jobs_lock = ceph::make_mutex("ScrubQueue::jobs_lock");
+
+ ScrubQContainer to_scrub; ///< scrub jobs (i.e. PGs) to scrub
+ ScrubQContainer penalized; ///< those that failed to reserve remote resources
+ bool restore_penalized{false};
+
+ double daily_loadavg{0.0};
+
+ static inline constexpr auto registered_job = [](const auto& jobref) -> bool {
+ return jobref->state == qu_state_t::registered;
+ };
+
+ static inline constexpr auto invalid_state = [](const auto& jobref) -> bool {
+ return jobref->state == qu_state_t::not_registered;
+ };
+
+ /**
+ * Are there scrub jobs that should be reinstated?
+ */
+ void scan_penalized(bool forgive_all, utime_t time_now);
+
+ /**
+ * clear dead entries (unregistered, or belonging to removed PGs) from a
+ * queue. Job state is changed to match new status.
+ */
+ void rm_unregistered_jobs(ScrubQContainer& group);
+
+ /**
+ * the set of all scrub jobs in 'group' which are ready to be scrubbed
+ * (ready = their scheduled time has passed).
+ * The scrub jobs in the new collection are sorted according to
+ * their scheduled time.
+ *
+ * Note that the returned container holds independent refs to the
+ * scrub jobs.
+ */
+ ScrubQContainer collect_ripe_jobs(ScrubQContainer& group, utime_t time_now);
+
+
+ /// scrub resources management lock (guarding scrubs_local & scrubs_remote)
+ mutable ceph::mutex resource_lock =
+ ceph::make_mutex("ScrubQueue::resource_lock");
+
+ // the counters used to manage scrub activity parallelism:
+ int scrubs_local{0};
+ int scrubs_remote{0};
+
+ std::atomic_bool a_pg_is_reserving{false};
+
+ [[nodiscard]] bool scrub_load_below_threshold() const;
+ [[nodiscard]] bool scrub_time_permit(utime_t now) const;
+
+ /**
+ * If the scrub job was not explicitly requested, we postpone it by some
+ * random length of time.
+ * And if delaying the scrub - we calculate, based on pool parameters, a
+ * deadline we should scrub before.
+ *
+ * @return a pair of values: the determined scrub time, and the deadline
+ */
+ scrub_schedule_t adjust_target_time(
+ const sched_params_t& recomputed_params) const;
+
+ /**
+ * Look for scrub jobs that have their 'resources_failure' set. These jobs
+ * have failed to acquire remote resources last time we've initiated a scrub
+ * session on them. They are now moved from the 'to_scrub' queue to the
+ * 'penalized' set.
+ *
+ * locking: called with job_lock held
+ */
+ void move_failed_pgs(utime_t now_is);
+
+ Scrub::schedule_result_t select_from_group(ScrubQContainer& group,
+ const Scrub::ScrubPreconds& preconds,
+ utime_t now_is);
+};
// ///////////////////////////////////////////////////////////////////// //
// scrub-op registration handling
+void PgScrubber::unregister_from_osd()
+{
+ if (m_scrub_job) {
+ dout(15) << __func__ << " prev. state: " << registration_state() << dendl;
+ m_osds->get_scrub_services().remove_from_osd_queue(m_scrub_job);
+ }
+}
+
bool PgScrubber::is_scrub_registered() const
{
- return !m_scrub_reg_stamp.is_zero();
+ return m_scrub_job && m_scrub_job->in_queues;
}
-void PgScrubber::reg_next_scrub(const requested_scrub_t& request_flags)
+std::string_view PgScrubber::registration_state() const
{
- if (!is_primary()) {
- // normal. No warning is required.
- return;
+ if (m_scrub_job) {
+ return m_scrub_job->registration_state();
}
+ return "(no sched job)"sv;
+}
- dout(10) << __func__ << " planned: must? " << request_flags.must_scrub << " need-auto? "
- << request_flags.need_auto << " stamp: " << m_pg->info.history.last_scrub_stamp
- << dendl;
+void PgScrubber::rm_from_osd_scrubbing()
+{
+ // make sure the OSD won't try to scrub this one just now
+ unregister_from_osd();
+}
- ceph_assert(!is_scrub_registered());
+void PgScrubber::on_primary_change(const requested_scrub_t& request_flags)
+{
+ dout(10) << __func__ << (is_primary() ? " Primary " : " Replica ")
+ << " flags: " << request_flags << dendl;
- utime_t reg_stamp;
- bool must = false;
+ if (!m_scrub_job) {
+ return;
+ }
- if (request_flags.must_scrub || request_flags.need_auto) {
- // Set the smallest time that isn't utime_t()
- reg_stamp = PgScrubber::scrub_must_stamp();
- must = true;
- } else if (m_pg->info.stats.stats_invalid &&
- m_pg->cct->_conf->osd_scrub_invalid_stats) {
- reg_stamp = ceph_clock_now();
- must = true;
+ dout(15) << __func__ << " scrub-job state: " << m_scrub_job->state_desc() << dendl;
+
+ if (is_primary()) {
+ auto suggested = determine_scrub_time(request_flags);
+ m_osds->get_scrub_services().register_with_osd(m_scrub_job, suggested);
} else {
- reg_stamp = m_pg->info.history.last_scrub_stamp;
+ m_osds->get_scrub_services().remove_from_osd_queue(m_scrub_job);
}
- dout(15) << __func__ << " pg(" << m_pg_id << ") must: " << must
- << " required:" << m_flags.required << " flags: " << request_flags
- << " stamp: " << reg_stamp << dendl;
+ dout(15) << __func__ << " done " << registration_state() << dendl;
+}
- const double scrub_min_interval =
- m_pg->pool.info.opts.value_or(pool_opts_t::SCRUB_MIN_INTERVAL, 0.0);
- const double scrub_max_interval =
- m_pg->pool.info.opts.value_or(pool_opts_t::SCRUB_MAX_INTERVAL, 0.0);
+void PgScrubber::on_maybe_registration_change(const requested_scrub_t& request_flags)
+{
+ dout(10) << __func__ << (is_primary() ? " Primary " : " Replica/other ")
+ << registration_state() << " flags: " << request_flags << dendl;
- // note the sched_time, so we can locate this scrub, and remove it later
- m_scrub_reg_stamp = m_osds->reg_pg_scrub(m_pg->info.pgid, reg_stamp, scrub_min_interval,
- scrub_max_interval, must);
- dout(15) << __func__ << " pg(" << m_pg_id << ") register next scrub, scrub time "
- << m_scrub_reg_stamp << ", must = " << (int)must << dendl;
+ on_primary_change(request_flags);
+ dout(15) << __func__ << " done " << registration_state() << dendl;
}
-void PgScrubber::unreg_next_scrub()
+void PgScrubber::update_scrub_job(const requested_scrub_t& request_flags)
{
- if (is_scrub_registered()) {
- dout(15) << __func__ << " existing-" << m_scrub_reg_stamp << dendl;
- m_osds->unreg_pg_scrub(m_pg->info.pgid, m_scrub_reg_stamp);
- m_scrub_reg_stamp = utime_t{};
+ dout(10) << __func__ << " flags: " << request_flags << dendl;
+
+ {
+ // verify that the 'in_q' status matches our "Primariority"
+ if (m_scrub_job && is_primary() && !m_scrub_job->in_queues) {
+ dout(1) << __func__ << " !!! primary but not scheduled! " << dendl;
+ }
+ }
+
+ if (is_primary() && m_scrub_job) {
+ auto suggested = determine_scrub_time(request_flags);
+ m_osds->get_scrub_services().update_job(m_scrub_job, suggested);
}
+
+ dout(15) << __func__ << " done " << registration_state() << dendl;
+}
+
+ScrubQueue::sched_params_t
+PgScrubber::determine_scrub_time(const requested_scrub_t& request_flags)
+{
+ ScrubQueue::sched_params_t res;
+
+ if (!is_primary()) {
+ return res; // with ok_to_scrub set to 'false'
+ }
+
+ if (request_flags.must_scrub || request_flags.need_auto) {
+
+ // Set the smallest time that isn't utime_t()
+ res.proposed_time = PgScrubber::scrub_must_stamp();
+ res.is_must = ScrubQueue::must_scrub_t::mandatory;
+ // we do not need the interval data in this case
+
+ } else if (m_pg->info.stats.stats_invalid &&
+ m_pg->cct->_conf->osd_scrub_invalid_stats) {
+ res.proposed_time = ceph_clock_now();
+ res.is_must = ScrubQueue::must_scrub_t::mandatory;
+
+ } else {
+ res.proposed_time = m_pg->info.history.last_scrub_stamp;
+ res.min_interval =
+ m_pg->get_pool().info.opts.value_or(pool_opts_t::SCRUB_MIN_INTERVAL, 0.0);
+ res.max_interval =
+ m_pg->get_pool().info.opts.value_or(pool_opts_t::SCRUB_MAX_INTERVAL, 0.0);
+ }
+
+ dout(15) << __func__ << " suggested: " << res.proposed_time << " hist: "
+ << m_pg->info.history.last_scrub_stamp << " v:" << m_pg->info.stats.stats_invalid
+ << " / " << m_pg->cct->_conf->osd_scrub_invalid_stats << " must:"
+ << (res.is_must==ScrubQueue::must_scrub_t::mandatory ? "y" : "n" )
+ << " pool min: " << res.min_interval
+ << dendl;
+ return res;
}
void PgScrubber::scrub_requested(scrub_level_t scrub_level,
{
dout(10) << __func__ << (scrub_level == scrub_level_t::deep ? " deep " : " shallow ")
<< (scrub_type == scrub_type_t::do_repair ? " repair-scrub " : " not-repair ")
- << " prev stamp: " << m_scrub_reg_stamp << " " << is_scrub_registered()
+ << " prev stamp: " << m_scrub_job->get_sched_time()
+ << " registered? " << registration_state()
<< dendl;
- unreg_next_scrub();
-
req_flags.must_scrub = true;
req_flags.must_deep_scrub =
(scrub_level == scrub_level_t::deep) || (scrub_type == scrub_type_t::do_repair);
dout(20) << __func__ << " pg(" << m_pg_id << ") planned:" << req_flags << dendl;
- reg_next_scrub(req_flags);
+ update_scrub_job(req_flags);
}
-void PgScrubber::request_rescrubbing(requested_scrub_t& req_flags)
+
+void PgScrubber::request_rescrubbing(requested_scrub_t& request_flags)
{
- dout(10) << __func__ << " existing-" << m_scrub_reg_stamp << ". was registered? "
- << is_scrub_registered() << dendl;
+ dout(10) << __func__ << " flags: " << request_flags << dendl;
- unreg_next_scrub();
- req_flags.need_auto = true;
- reg_next_scrub(req_flags);
+ request_flags.need_auto = true;
+ update_scrub_job(request_flags);
}
bool PgScrubber::reserve_local()
* left end of the range if we are a tier because they may legitimately
* not exist (see _scrub).
*/
- int min_idx = std::max<int64_t>(
- 3, m_pg->get_cct()->_conf->osd_scrub_chunk_min / preemption_data.chunk_divisor());
+ int min_idx = static_cast<int>(std::max<int64_t>(
+ 3, m_pg->get_cct()->_conf->osd_scrub_chunk_min / (int)preemption_data.chunk_divisor()));
- int max_idx = std::max<int64_t>(min_idx, m_pg->get_cct()->_conf->osd_scrub_chunk_max /
- preemption_data.chunk_divisor());
+ int max_idx = static_cast<int>(std::max<int64_t>(min_idx, m_pg->get_cct()->_conf->osd_scrub_chunk_max /
+ (int)preemption_data.chunk_divisor()));
dout(10) << __func__ << " Min: " << min_idx << " Max: " << max_idx
<< " Div: " << preemption_data.chunk_divisor() << dendl;
milliseconds sleep_time{0ms};
if (m_needs_sleep) {
- double scrub_sleep = 1000.0 * m_osds->osd->scrub_sleep_time(m_flags.required);
+ double scrub_sleep =
+ 1000.0 * m_osds->get_scrub_services().scrub_sleep_time(m_flags.required);
sleep_time = milliseconds{long(scrub_sleep)};
}
dout(15) << __func__ << " sleep: " << sleep_time.count() << "ms. needed? "
// the 'delayer' for crimson is different. Will be factored out.
spg_t pgid = m_pg->get_pgid();
- auto callbk = new LambdaContext([osds = m_osds, pgid,
- scrbr = this]([[maybe_unused]] int r) mutable {
+ auto callbk = new LambdaContext([osds = m_osds, pgid, scrbr = this](
+ [[maybe_unused]] int r) mutable {
PGRef pg = osds->osd->lookup_lock_pg(pgid);
if (!pg) {
lgeneric_subdout(g_ceph_context, osd, 10)
m_reservations.reset();
}
+void PgScrubber::set_reserving_now()
+{
+ m_osds->get_scrub_services().set_reserving_now();
+}
+
+void PgScrubber::clear_reserving_now()
+{
+ m_osds->get_scrub_services().clear_reserving_now();
+}
+
+
[[nodiscard]] bool PgScrubber::scrub_process_inconsistent()
{
dout(10) << __func__ << ": checking authoritative (mode="
f->dump_bool("auto_repair", m_flags.auto_repair);
f->dump_bool("check_repair", m_flags.check_repair);
f->dump_bool("deep_scrub_on_error", m_flags.deep_scrub_on_error);
- f->dump_stream("scrub_reg_stamp") << m_scrub_reg_stamp; // utime_t
+ f->dump_stream("scrub_reg_stamp") << m_scrub_job->get_sched_time(); // utime_t
f->dump_unsigned("priority", m_flags.priority);
f->dump_int("shallow_errors", m_shallow_errors);
f->dump_int("deep_errors", m_deep_errors);
f->close_section();
}
-PgScrubber::~PgScrubber() = default;
+PgScrubber::~PgScrubber()
+{
+ if (m_scrub_job) {
+ // make sure the OSD won't try to scrub this one just now
+ rm_from_osd_scrubbing();
+ m_scrub_job.reset();
+ }
+}
PgScrubber::PgScrubber(PG* pg)
: m_pg{pg}
{
m_fsm = std::make_unique<ScrubMachine>(m_pg, this);
m_fsm->initiate();
+
+ m_scrub_job = ceph::make_ref<ScrubQueue::ScrubJob>(m_osds->cct, m_pg->pg_id,
+ m_osds->get_nodeid());
}
void PgScrubber::reserve_replicas()
{
dout(10) << __func__ << dendl;
- m_reservations.emplace(m_pg, m_pg_whoami);
+ m_reservations.emplace(m_pg, m_pg_whoami, m_scrub_job);
}
void PgScrubber::cleanup_on_finish()
m_osds->send_message_osd_cluster(peer.osd, m, epoch);
}
-ReplicaReservations::ReplicaReservations(PG* pg, pg_shard_t whoami)
+ReplicaReservations::ReplicaReservations(PG* pg, pg_shard_t whoami, ScrubQueue::ScrubJobRef scrubjob)
: m_pg{pg}
, m_acting_set{pg->get_actingset()}
, m_osds{m_pg->get_pg_osd(ScrubberPasskey())}
, m_pending{static_cast<int>(m_acting_set.size()) - 1}
, m_pg_info{m_pg->get_pg_info(ScrubberPasskey())}
+ , m_scrub_job{scrubjob}
{
epoch_t epoch = m_pg->get_osdmap_epoch();
void ReplicaReservations::send_reject()
{
+ m_scrub_job->resources_failure = true;
m_osds->queue_for_scrub_denied(m_pg, scrub_prio_t::low_priority);
}
: m_pg{pg} // holding the "whole PG" for dout() sake
, m_osds{osds}
{
- if (!m_osds->inc_scrubs_local()) {
+ if (!m_osds->get_scrub_services().inc_scrubs_local()) {
dout(10) << __func__ << ": failed to reserve locally " << dendl;
// the failure is signalled by not having m_holding_local_reservation set
return;
{
if (m_holding_local_reservation) {
m_holding_local_reservation = false;
- m_osds->dec_scrubs_local();
+ m_osds->get_scrub_services().dec_scrubs_local();
}
}
ReservedByRemotePrimary::ReservedByRemotePrimary(PG* pg, OSDService* osds, epoch_t epoch)
: m_pg{pg}, m_osds{osds}, m_reserved_at{epoch}
{
- if (!m_osds->inc_scrubs_remote()) {
+ if (!m_osds->get_scrub_services().inc_scrubs_remote()) {
dout(10) << __func__ << ": failed to reserve at Primary request" << dendl;
// the failure is signalled by not having m_reserved_by_remote_primary set
return;
{
if (m_reserved_by_remote_primary) {
m_reserved_by_remote_primary = false;
- m_osds->dec_scrubs_remote();
+ m_osds->get_scrub_services().dec_scrubs_remote();
}
}
#include "ScrubStore.h"
#include "scrub_machine_lstnr.h"
#include "osd/scrubber_common.h"
+#include "osd_scrub_sched.h"
class Callback;
bool m_had_rejections{false};
int m_pending{-1};
const pg_info_t& m_pg_info;
+ ScrubQueue::ScrubJobRef m_scrub_job; ///< a ref to this PG's scrub job
void release_replica(pg_shard_t peer, epoch_t epoch);
*/
void discard_all();
- ReplicaReservations(PG* pg, pg_shard_t whoami);
+ ReplicaReservations(PG* pg, pg_shard_t whoami, ScrubQueue::ScrubJobRef scrubjob);
~ReplicaReservations();
/// @returns true if indeed waiting for this one. Otherwise: an error string
auto mark_arriving_map(pg_shard_t from) -> std::tuple<bool, std::string_view>;
- std::vector<pg_shard_t> get_awaited() const { return m_maps_awaited_for; }
+ [[nodiscard]] std::vector<pg_shard_t> get_awaited() const { return m_maps_awaited_for; }
void reset();
// managing scrub op registration
- void reg_next_scrub(const requested_scrub_t& request_flags) final;
+ void update_scrub_job(const requested_scrub_t& request_flags) final;
- void unreg_next_scrub() final;
+ void rm_from_osd_scrubbing() final;
+
+ void on_primary_change(const requested_scrub_t& request_flags) final;
+
+ void on_maybe_registration_change(const requested_scrub_t& request_flags) final;
void scrub_requested(scrub_level_t scrub_level,
scrub_type_t scrub_type,
/**
* add to scrub statistics, but only if the soid is below the scrub start
*/
- virtual void stats_of_handled_objects(const object_stat_sum_t& delta_stats,
+ void stats_of_handled_objects(const object_stat_sum_t& delta_stats,
const hobject_t& soid) override
{
ceph_assert(false);
void reserve_replicas() final;
+ void set_reserving_now() final;
+ void clear_reserving_now() final;
+
[[nodiscard]] bool was_epoch_changed() const final;
void mark_local_map_ready() final;
[[nodiscard]] bool is_scrub_registered() const;
+ /// the 'is-in-scheduling-queue' status, using relaxed-semantics access to the status
+ std::string_view registration_state() const;
+
virtual void _scrub_clear_state() {}
utime_t m_scrub_reg_stamp; ///< stamp we registered for
+ ScrubQueue::ScrubJobRef m_scrub_job; ///< the scrub-job used by the OSD to schedule us
ostream& show(ostream& out) const override;
*/
void request_rescrubbing(requested_scrub_t& req_flags);
+ ScrubQueue::sched_params_t
+ determine_scrub_time(const requested_scrub_t& request_flags);
+
+ void unregister_from_osd();
+
/*
* Select a range of objects to scrub.
*
{
dout(10) << "-- state -->> ReservingReplicas" << dendl;
DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases
+
+ // prevent the OSD from starting another scrub while we are trying to secure
+ // replicas resources
+ scrbr->set_reserving_now();
scrbr->reserve_replicas();
}
+ReservingReplicas::~ReservingReplicas()
+{
+ DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases
+ scrbr->clear_reserving_now();
+}
+
sc::result ReservingReplicas::react(const ReservationFailure&)
{
DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases
struct ReservingReplicas : sc::state<ReservingReplicas, ScrubMachine> {
explicit ReservingReplicas(my_context ctx);
+ ~ReservingReplicas();
using reactions = mpl::list<sc::custom_reaction<FullReset>,
// all replicas granted our resources request
sc::transition<RemotesReserved, ActiveScrubbing>,
virtual void unreserve_replicas() = 0;
+ /**
+ * No new scrub session will start while a scrub was initiate on a PG,
+ * and that PG is trying to acquire replica resources.
+ * set_reserving_now()/clear_reserving_now() let's the OSD scrub-queue know
+ * we are busy reserving.
+ */
+ virtual void set_reserving_now() = 0;
+ virtual void clear_reserving_now() = 0;
+
/**
* the FSM interface into the "are we waiting for maps, either our own or from
* replicas" state.
/// see ScrubPGgIF::m_current_token
using act_token_t = uint32_t;
+/// "environment" preconditions affecting which PGs are eligible for scrubbing
+struct ScrubPreconds {
+ bool allow_requested_repair_only{false};
+ bool load_is_low{true};
+ bool time_permit{true};
+ bool only_deadlined{false};
+};
+
} // namespace Scrub
*/
virtual bool reserve_local() = 0;
+ /**
+ * Register/de-register with the OSD scrub queue
+ *
+ * Following our status as Primary or replica.
+ */
+ virtual void on_primary_change(const requested_scrub_t& request_flags) = 0;
+
+ /**
+ * Recalculate the required scrub time.
+ *
+ * This function assumes that the queue registration status is up-to-date,
+ * i.e. the OSD "knows our name" if-f we are the Primary.
+ */
+ virtual void update_scrub_job(const requested_scrub_t& request_flags) = 0;
+
+ virtual void on_maybe_registration_change(const requested_scrub_t& request_flags) = 0;
+
// on the replica:
virtual void handle_scrub_reserve_request(OpRequestRef op) = 0;
virtual void handle_scrub_reserve_release(OpRequestRef op) = 0;
virtual void handle_scrub_reserve_grant(OpRequestRef op, pg_shard_t from) = 0;
virtual void handle_scrub_reserve_reject(OpRequestRef op, pg_shard_t from) = 0;
- virtual void reg_next_scrub(const requested_scrub_t& request_flags) = 0;
- virtual void unreg_next_scrub() = 0;
+ virtual void rm_from_osd_scrubbing() = 0;
+
virtual void scrub_requested(scrub_level_t scrub_level,
scrub_type_t scrub_type,
requested_scrub_t& req_flags) = 0;
}
bool scrub_time_permit(utime_t now) {
- return OSD::scrub_time_permit(now);
+ return service.get_scrub_services().scrub_time_permit(now);
}
};