... and not a shared pointer to it.
The PG's entry in the scrub queue and the scrub-job object that
is part of the scrubber are now separate entities, that can be
modified separately - each with its own locking mechanism.
the interaction between OsdScrub (the OSD's scrub scheduler
object) and the scrub queue during scrub initiation is simplified:
instead of fetching from the queue a list of scrub targets
possibly ready to be scrubbed, the OsdScrub now picks only one
candidate - the top of the queue.
Signed-off-by: Ronen Friedman <rfriedma@redhat.com>
return false;
}
+void OsdScrub::debug_log_all_jobs() const
+{
+ m_queue.for_each_job([this](const Scrub::ScrubJob& sj) {
+ dout(20) << fmt::format("\tscrub-queue jobs: {}", sj) << dendl;
+ }, 20);
+}
+
void OsdScrub::initiate_scrub(bool is_recovery_active)
{
if (g_conf()->subsys.should_gather<ceph_subsys_osd, 20>() &&
!env_restrictions.high_priority_only) {
- dout(20) << "scrub scheduling (@tick) starts" << dendl;
- auto all_jobs = m_queue.list_registered_jobs();
- for (const auto& sj : all_jobs) {
- dout(20) << fmt::format("\tscrub-queue jobs: {}", *sj) << dendl;
- }
+ debug_log_all_jobs();
}
- // at this phase of the refactoring: minimal changes to the
- // queue interface used here: we ask for a list of
- // eligible targets (based on the known restrictions).
- // We try all elements of this list until a (possibly temporary) success.
- auto candidates = m_queue.ready_to_scrub(env_restrictions, scrub_time);
- if (candidates.empty()) {
+ auto candidate = m_queue.pop_ready_pg(env_restrictions, scrub_time);
+ if (!candidate) {
dout(20) << "no PGs are ready for scrubbing" << dendl;
return;
}
- for (const auto& candidate : candidates) {
- dout(20) << fmt::format("initiating scrub on pg[{}]", candidate) << dendl;
-
- // we have a candidate to scrub. But we may fail when trying to initiate that
- // scrub. For some failures - we can continue with the next candidate. For
- // others - we should stop trying to scrub at this tick.
- auto res = initiate_a_scrub(candidate, env_restrictions);
+ auto res = initiate_a_scrub(candidate->pgid, env_restrictions);
- if (res == schedule_result_t::target_specific_failure) {
- // continue with the next job.
- // \todo: consider separate handling of "no such PG", as - later on -
- // we should be removing both related targets.
- continue;
- } else if (res == schedule_result_t::osd_wide_failure) {
- // no point in trying the other candidates at this time
+ switch (res) {
+ case schedule_result_t::target_specific_failure:
+ case schedule_result_t::osd_wide_failure:
+ // No scrub this tick.
+ // someone else will requeue the target, if needed.
break;
- } else {
- // the happy path. We are done
- dout(20) << fmt::format("scrub initiated for pg[{}]", candidate.pgid)
+
+ case schedule_result_t::scrub_initiated:
+ dout(20) << fmt::format("scrub initiated for pg[{}]", candidate->pgid)
<< dendl;
break;
- }
}
}
return locked_pg->pg()->start_scrubbing(restrictions);
}
+
void OsdScrub::on_config_change()
{
- auto to_notify = m_queue.list_registered_jobs();
+ auto to_notify = m_queue.get_pgs([](const Scrub::ScrubJob& sj) -> bool {
+ ceph_assert(sj.registered);
+ return true;
+ });
for (const auto& p : to_notify) {
- dout(30) << fmt::format("rescheduling pg[{}] scrubs", *p) << dendl;
- auto locked_pg = m_osd_svc.get_locked_pg(p->pgid);
+ dout(30) << fmt::format("rescheduling pg[{}] scrubs", p) << dendl;
+ auto locked_pg = m_osd_svc.get_locked_pg(p);
if (!locked_pg)
continue;
}
}
-
// ////////////////////////////////////////////////////////////////////////// //
// CPU load tracking and related
// forwarders to the queue
void OsdScrub::update_job(
- Scrub::ScrubJobRef sjob,
+ Scrub::ScrubJob& sjob,
const Scrub::sched_params_t& suggested,
bool reset_notbefore)
{
}
void OsdScrub::delay_on_failure(
- Scrub::ScrubJobRef sjob,
+ Scrub::ScrubJob& sjob,
std::chrono::seconds delay,
Scrub::delay_cause_t delay_cause,
utime_t now_is)
void OsdScrub::register_with_osd(
- Scrub::ScrubJobRef sjob,
+ Scrub::ScrubJob& sjob,
const Scrub::sched_params_t& suggested)
{
m_queue.register_with_osd(sjob, suggested);
}
-void OsdScrub::remove_from_osd_queue(Scrub::ScrubJobRef sjob)
+void OsdScrub::remove_from_osd_queue(spg_t pgid)
{
- m_queue.remove_from_osd_queue(sjob);
+ m_queue.remove_from_osd_queue(pgid);
}
std::unique_ptr<Scrub::LocalResourceWrapper> OsdScrub::inc_scrubs_local(
* locking: not using the jobs_lock
*/
void update_job(
- Scrub::ScrubJobRef sjob,
+ Scrub::ScrubJob& sjob,
const Scrub::sched_params_t& suggested,
bool reset_notbefore);
* locking: might lock jobs_lock
*/
void register_with_osd(
- Scrub::ScrubJobRef sjob,
+ Scrub::ScrubJob& sjob,
const Scrub::sched_params_t& suggested);
/**
* remove the pg from set of PGs to be scanned for scrubbing.
* To be used if we are no longer the PG's primary, or if the PG is removed.
*/
- void remove_from_osd_queue(Scrub::ScrubJobRef sjob);
+ void remove_from_osd_queue(spg_t pgid);
/**
* \returns std::chrono::milliseconds indicating how long to wait between
* would not be retried before 'delay' seconds have passed.
*/
void delay_on_failure(
- Scrub::ScrubJobRef sjob,
+ Scrub::ScrubJob& sjob,
std::chrono::seconds delay,
Scrub::delay_cause_t delay_cause,
utime_t now_is);
const std::string m_log_prefix{};
+ /// list all scrub queue entries
+ void debug_log_all_jobs() const;
+
/// number of PGs stuck while scrubbing, waiting for objects
int get_blocked_pgs_count() const;
using namespace ::std::chrono;
using namespace ::std::chrono_literals;
using namespace ::std::literals;
-using qu_state_t = Scrub::qu_state_t;
+
using must_scrub_t = Scrub::must_scrub_t;
using ScrubQContainer = Scrub::ScrubQContainer;
using sched_params_t = Scrub::sched_params_t;
}
/*
- * Modify the scrub job state:
- * - if 'registered' (as expected): mark as 'unregistering'. The job will be
- * dequeued the next time sched_scrub() is called.
- * - if already 'not_registered': shouldn't really happen, but not a problem.
- * The state will not be modified.
- * - same for 'unregistering'.
- *
- * Note: not holding the jobs lock
+ * Remove the scrub job from the OSD scrub queue.
+ * Caller should mark the Scrubber-owned job as 'not_registered'.
*/
-void ScrubQueue::remove_from_osd_queue(Scrub::ScrubJobRef scrub_job)
+void ScrubQueue::remove_from_osd_queue(spg_t pgid)
{
- dout(15) << "removing pg[" << scrub_job->pgid << "] from OSD scrub queue"
+ dout(10) << fmt::format(
+ "removing pg[{}] from OSD scrub queue", pgid)
<< dendl;
- qu_state_t expected_state{qu_state_t::registered};
- auto ret =
- scrub_job->state.compare_exchange_strong(expected_state,
- qu_state_t::unregistering);
-
- if (ret) {
-
- dout(10) << "pg[" << scrub_job->pgid << "] sched-state changed from "
- << ScrubJob::qu_state_text(expected_state) << " to "
- << ScrubJob::qu_state_text(scrub_job->state) << dendl;
+ std::unique_lock lck{jobs_lock};
+ std::erase_if(to_scrub, [pgid](const auto& job) {
+ return job->pgid == pgid;
+ });
+}
- } else {
- // job wasn't in state 'registered' coming in
- dout(5) << "removing pg[" << scrub_job->pgid
- << "] failed. State was: " << ScrubJob::qu_state_text(expected_state)
- << dendl;
- }
+void ScrubQueue::enqueue_target(const Scrub::ScrubJob& sjob)
+{
+ std::unique_lock lck{jobs_lock};
+ // the costly copying is only for this stage
+ to_scrub.push_back(std::make_unique<ScrubJob>(sjob));
}
+// (only for this commit)
void ScrubQueue::register_with_osd(
- Scrub::ScrubJobRef scrub_job,
- const sched_params_t& suggested)
+ Scrub::ScrubJob& scrub_job,
+ const sched_params_t& suggested)
{
- qu_state_t state_at_entry = scrub_job->state.load();
- dout(20) << fmt::format(
- "pg[{}] state at entry: <{:.14}>", scrub_job->pgid,
- state_at_entry)
- << dendl;
-
- switch (state_at_entry) {
- case qu_state_t::registered:
- // just updating the schedule?
- update_job(scrub_job, suggested, false /* keep n.b. delay */);
- break;
-
- case qu_state_t::not_registered:
- // insertion under lock
- {
- std::unique_lock lck{jobs_lock};
-
- if (state_at_entry != scrub_job->state) {
- lck.unlock();
- dout(5) << " scrub job state changed. Retrying." << dendl;
- // retry
- register_with_osd(scrub_job, suggested);
- break;
- }
-
- update_job(scrub_job, suggested, true /* resets not_before */);
- to_scrub.push_back(scrub_job);
- scrub_job->in_queues = true;
- scrub_job->state = qu_state_t::registered;
- }
- break;
-
- case qu_state_t::unregistering:
- // restore to the to_sched queue
- {
- // must be under lock, as the job might be removed from the queue
- // at any minute
- std::lock_guard lck{jobs_lock};
-
- update_job(scrub_job, suggested, true /* resets not_before */);
- if (scrub_job->state == qu_state_t::not_registered) {
- dout(5) << " scrub job state changed to 'not registered'" << dendl;
- to_scrub.push_back(scrub_job);
- }
- scrub_job->in_queues = true;
- scrub_job->state = qu_state_t::registered;
- }
- break;
- }
-
- dout(10) << fmt::format(
- "pg[{}] sched-state changed from <{:.14}> to <{:.14}> (@{:s})",
- scrub_job->pgid, state_at_entry, scrub_job->state.load(),
- scrub_job->schedule.not_before)
- << dendl;
+ update_job(scrub_job, suggested, true /* resets not_before */);
+ enqueue_target(scrub_job);
+ scrub_job.target_queued = true;
+
+ dout(10)
+ << fmt::format(
+ "pg[{}] sched-state: {} (@{:s})",
+ scrub_job.pgid, scrub_job.state_desc(), scrub_job.get_sched_time())
+ << dendl;
}
-void ScrubQueue::update_job(Scrub::ScrubJobRef scrub_job,
+void ScrubQueue::update_job(Scrub::ScrubJob& scrub_job,
const sched_params_t& suggested,
bool reset_nb)
{
// adjust the suggested scrub time according to OSD-wide status
auto adjusted = adjust_target_time(suggested);
- scrub_job->high_priority = suggested.is_must == must_scrub_t::mandatory;
- scrub_job->update_schedule(adjusted, reset_nb);
+ scrub_job.high_priority = suggested.is_must == must_scrub_t::mandatory;
+ scrub_job.update_schedule(adjusted, reset_nb);
}
void ScrubQueue::delay_on_failure(
- Scrub::ScrubJobRef sjob,
+ Scrub::ScrubJob& sjob,
std::chrono::seconds delay,
Scrub::delay_cause_t delay_cause,
utime_t now_is)
{
dout(10) << fmt::format(
"pg[{}] delay_on_failure: delay:{} now:{:s}",
- sjob->pgid, delay, now_is)
+ sjob.pgid, delay, now_is)
<< dendl;
- sjob->delay_on_failure(delay, delay_cause, now_is);
+ sjob.delay_on_failure(delay, delay_cause, now_is);
}
-std::vector<ScrubTargetId> ScrubQueue::ready_to_scrub(
- OSDRestrictions restrictions, // note: 4B in size! (copy)
- utime_t scrub_tick)
+std::unique_ptr<ScrubJob> ScrubQueue::pop_ready_pg(
+ OSDRestrictions restrictions, // note: 4B in size! (thus - copy)
+ utime_t time_now)
{
- dout(10) << fmt::format(
- " @{:s}: registered: {} ({})", scrub_tick,
- to_scrub.size(), restrictions)
- << dendl;
-
- // create a list of candidates (copying, as otherwise creating a deadlock):
- // - (if we didn't handle directly) remove invalid jobs
- // - create a copy of the to_scrub (possibly up to first not-ripe)
- // unlock, then try the lists
std::unique_lock lck{jobs_lock};
- // remove the 'updated' flag from all entries
- std::for_each(
- to_scrub.begin(), to_scrub.end(),
- [](const auto& jobref) -> void { jobref->updated = false; });
-
- // collect all valid & ripe jobs. Note that we must copy,
- // as when we use the lists we will not be holding jobs_lock (see
- // explanation above)
-
- // and in this step 1 of the refactoring (Aug 2023): the set returned must be
- // transformed into a vector of targets (which, in this phase, are
- // the PG id-s).
- auto to_scrub_copy = collect_ripe_jobs(to_scrub, restrictions, scrub_tick);
- lck.unlock();
-
- std::vector<ScrubTargetId> all_ready;
- std::transform(
- to_scrub_copy.cbegin(), to_scrub_copy.cend(),
- std::back_inserter(all_ready),
- [](const auto& jobref) -> ScrubTargetId { return jobref->pgid; });
- return all_ready;
-}
-
+ const auto eligible_filtr = [time_now, rst = restrictions](
+ const std::unique_ptr<ScrubJob>& jb) -> bool {
+ // look for jobs that have their n.b. in the past, and are not
+ // blocked by restrictions
+ return jb->get_sched_time() <= time_now &&
+ (jb->high_priority ||
+ (!rst.high_priority_only &&
+ (!rst.only_deadlined || (!jb->schedule.deadline.is_zero() &&
+ jb->schedule.deadline <= time_now))));
+ };
-// must be called under lock
-void ScrubQueue::rm_unregistered_jobs(ScrubQContainer& group)
-{
- std::for_each(group.begin(), group.end(), [](auto& job) {
- if (job->state == qu_state_t::unregistering) {
- job->in_queues = false;
- job->state = qu_state_t::not_registered;
- } else if (job->state == qu_state_t::not_registered) {
- job->in_queues = false;
- }
- });
+ auto not_ripes = rng::partition(to_scrub, eligible_filtr);
+ if (not_ripes.begin() == to_scrub.begin()) {
+ return nullptr;
+ }
+ auto top = rng::min_element(
+ to_scrub.begin(), not_ripes.begin(), rng::less(),
+ [](const std::unique_ptr<ScrubJob>& jb) -> utime_t {
+ return jb->get_sched_time();
+ });
+
+ if (top == not_ripes.begin()) {
+ return nullptr;
+ }
- group.erase(std::remove_if(group.begin(), group.end(), invalid_state),
- group.end());
+ auto top_job = std::move(*top);
+ to_scrub.erase(top);
+ return top_job;
}
+
namespace {
struct cmp_time_n_priority_t {
- bool operator()(const Scrub::ScrubJobRef& lhs, const Scrub::ScrubJobRef& rhs)
+ bool operator()(const Scrub::ScrubJob& lhs, const Scrub::ScrubJob& rhs)
const
{
- return lhs->is_high_priority() > rhs->is_high_priority() ||
- (lhs->is_high_priority() == rhs->is_high_priority() &&
- lhs->schedule.scheduled_at < rhs->schedule.scheduled_at);
+ return lhs.is_high_priority() > rhs.is_high_priority() ||
+ (lhs.is_high_priority() == rhs.is_high_priority() &&
+ lhs.schedule.scheduled_at < rhs.schedule.scheduled_at);
}
};
} // namespace
-// called under lock
-ScrubQContainer ScrubQueue::collect_ripe_jobs(
- ScrubQContainer& group,
- OSDRestrictions restrictions,
- utime_t time_now)
-{
- auto filtr = [time_now, rst = restrictions](const auto& jobref) -> bool {
- return jobref->schedule.not_before <= time_now &&
- (!rst.high_priority_only || jobref->high_priority) &&
- (!rst.only_deadlined || (!jobref->schedule.deadline.is_zero() &&
- jobref->schedule.deadline <= time_now));
- };
- rm_unregistered_jobs(group);
- // copy ripe jobs (unless prohibited by 'restrictions')
- ScrubQContainer ripes;
- ripes.reserve(group.size());
-
- std::copy_if(group.begin(), group.end(), std::back_inserter(ripes), filtr);
- std::sort(ripes.begin(), ripes.end(), cmp_time_n_priority_t{});
-
- if (g_conf()->subsys.should_gather<ceph_subsys_osd, 20>()) {
- for (const auto& jobref : group) {
- if (!filtr(jobref)) {
- dout(20) << fmt::format(
- " not eligible: {} @ {:s} ({:s},{:s})", jobref->pgid,
- jobref->schedule.not_before,
- jobref->schedule.scheduled_at, jobref->last_issue)
- << dendl;
- }
- }
- }
+/**
+ * the set of all PGs named by the entries in the queue (but only those
+ * entries that satisfy the predicate)
+ */
+std::set<spg_t> ScrubQueue::get_pgs(const ScrubQueue::EntryPred& cond) const
+{
+ std::lock_guard lck{jobs_lock};
+ std::set<spg_t> pgs_w_matching_entries;
+ rng::transform(
+ to_scrub | std::views::filter(
+ [&cond](const auto& job) -> bool { return (cond)(*job); }),
+ std::inserter(pgs_w_matching_entries, pgs_w_matching_entries.end()),
+ [](const auto& job) { return job->pgid; });
+ return pgs_w_matching_entries;
+}
- return ripes;
+void ScrubQueue::for_each_job(
+ std::function<void(const Scrub::ScrubJob&)> fn,
+ int max_jobs) const
+{
+ std::lock_guard lck(jobs_lock);
+ std::ranges::for_each(
+ to_scrub | std::views::take(max_jobs),
+ [fn](const auto& job) { fn(*job); });
}
void ScrubQueue::dump_scrubs(ceph::Formatter* f) const
{
ceph_assert(f != nullptr);
- std::lock_guard lck(jobs_lock);
-
f->open_array_section("scrubs");
- std::for_each(
- to_scrub.cbegin(), to_scrub.cend(),
- [&f](const Scrub::ScrubJobRef& j) { j->dump(f); });
+ for_each_job(
+ [&f](const Scrub::ScrubJob& j) { j.dump(f); },
+ std::numeric_limits<int>::max());
f->close_section();
}
-ScrubQContainer ScrubQueue::list_registered_jobs() const
-{
- ScrubQContainer all_jobs;
- all_jobs.reserve(to_scrub.size());
- dout(20) << " size: " << all_jobs.capacity() << dendl;
-
- std::lock_guard lck{jobs_lock};
- std::copy_if(to_scrub.begin(),
- to_scrub.end(),
- std::back_inserter(all_jobs),
- registered_job);
- return all_jobs;
-}
// ////////////////////////////////////////////////////////////////////////// //
// ScrubQueue - maintaining the 'blocked on a locked object' count
*/
// clang-format on
+#include <algorithm>
#include <optional>
+
#include "common/AsyncReserver.h"
#include "utime.h"
#include "osd/scrubber/scrub_job.h"
friend class ScrubSchedTestWrapper; ///< unit-tests structure
using sched_params_t = Scrub::sched_params_t;
- /**
- * returns the list of all scrub targets that are ready to be scrubbed.
- * Note that the following changes are expected in the near future (as part
- * of the scheduling refactoring):
- * - only one target will be requested by the OsdScrub (the OSD's sub-object
- * that initiates scrubs);
- * - that target would name a PG X scrub type;
- *
- * @param restrictions: what types of scrub are allowed, given system status
- * & config. Some of the preconditions are calculated here.
- */
- std::vector<ScrubTargetId> ready_to_scrub(
- Scrub::OSDRestrictions restrictions, // 4B! copy
- utime_t scrub_tick);
/**
* remove the pg from set of PGs to be scanned for scrubbing.
* To be used if we are no longer the PG's primary, or if the PG is removed.
*/
- void remove_from_osd_queue(Scrub::ScrubJobRef sjob);
+ void remove_from_osd_queue(spg_t pgid);
+
+ /// A predicate over the entries in the queue
+ using EntryPred = std::function<bool(const Scrub::ScrubJob&)>;
/**
- * @return the list (not std::set!) of all scrub jobs registered
- * (apart from PGs in the process of being removed)
+ * the set of all PGs named by the entries in the queue (but only those
+ * entries that satisfy the predicate)
*/
- Scrub::ScrubQContainer list_registered_jobs() const;
+ std::set<spg_t> get_pgs(const EntryPred&) const;
/**
* Add the scrub job to the list of jobs (i.e. list of PGs) to be periodically
*
* locking: might lock jobs_lock
*/
- void register_with_osd(Scrub::ScrubJobRef sjob, const sched_params_t& suggested);
+ void register_with_osd(Scrub::ScrubJob& sjob, const sched_params_t& suggested);
+
+ /**
+ * Add the scrub job to the list of jobs (i.e. list of PGs) to be periodically
+ * scrubbed by the OSD.
+ */
+ void enqueue_target(const Scrub::ScrubJob& sjob);
/**
* modify a scrub-job's scheduled time and deadline
* 'reset_notbefore' is used to reset the 'not_before' time to the updated
* 'scheduled_at' time. This is used whenever the scrub-job schedule is
* updated not as a result of a scrub attempt failure.
- *
- * locking: not using the jobs_lock
*/
void update_job(
- Scrub::ScrubJobRef sjob,
+ Scrub::ScrubJob& sjob,
const sched_params_t& suggested,
bool reset_notbefore);
void delay_on_failure(
- Scrub::ScrubJobRef sjob,
+ Scrub::ScrubJob& sjob,
std::chrono::seconds delay,
Scrub::delay_cause_t delay_cause,
utime_t now_is);
public:
void dump_scrubs(ceph::Formatter* f) const;
+ void for_each_job(
+ std::function<void(const Scrub::ScrubJob&)> fn,
+ int max_jobs) const;
+
/// counting the number of PGs stuck while scrubbing, waiting for objects
void mark_pg_scrub_blocked(spg_t blocked_pg);
void clear_pg_scrub_blocked(spg_t blocked_pg);
int get_blocked_pgs_count() const;
+ /**
+ * find the nearest scrub-job (later on - scrub target) that is ready to
+ * to be scrubbed (taking 'restrictions' into account).
+ * The selected entry in the queue is dequeued and returned.
+ * A nullptr is returned if no eligible entry is found.
+ */
+ std::unique_ptr<Scrub::ScrubJob> pop_ready_pg(
+ Scrub::OSDRestrictions restrictions, // note: 4B in size! (copy)
+ utime_t time_now);
+
private:
CephContext* cct;
Scrub::ScrubSchedListener& osd_service;
#endif
/**
- * jobs_lock protects the job containers and the relevant scrub-jobs state
- * variables. Specifically, the following are guaranteed:
- * - 'in_queues' is asserted only if the job is in one of the queues;
- * - a job will only be in state 'registered' if in one of the queues;
- * - no job will be in the two queues simultaneously;
+ * jobs_lock protects the job containers.
*
* Note that PG locks should not be acquired while holding jobs_lock.
*/
Scrub::ScrubQContainer to_scrub; ///< scrub jobs (i.e. PGs) to scrub
- static inline constexpr auto registered_job = [](const auto& jobref) -> bool {
- return jobref->state == Scrub::qu_state_t::registered;
- };
-
- static inline constexpr auto invalid_state = [](const auto& jobref) -> bool {
- return jobref->state == Scrub::qu_state_t::not_registered;
- };
-
- /**
- * clear dead entries (unregistered, or belonging to removed PGs) from a
- * queue. Job state is changed to match new status.
- */
- void rm_unregistered_jobs(Scrub::ScrubQContainer& group);
-
- /**
- * the set of all scrub jobs in 'group' which are ready to be scrubbed
- * (ready = their scheduled time has passed).
- * The scrub jobs in the new collection are sorted according to
- * their scheduled time.
- *
- * Note that the returned container holds independent refs to the
- * scrub jobs.
- * Note also that OSDRestrictions is 1L size, thus copied.
- */
- Scrub::ScrubQContainer collect_ripe_jobs(
- Scrub::ScrubQContainer& group,
- Scrub::OSDRestrictions restrictions,
- utime_t time_now);
-
/**
* The scrubbing of PGs might be delayed if the scrubbed chunk of objects is
* locked by some other operation. A bug might cause this to be an infinite
bool PgScrubber::is_scrub_registered() const
{
- return m_scrub_job && m_scrub_job->in_queues;
+ return m_scrub_job && m_scrub_job->is_registered();
}
std::string_view PgScrubber::registration_state() const
{
if (m_scrub_job) {
- return m_scrub_job->registration_state();
+ return m_scrub_job->state_desc();
}
return "(no sched job)"sv;
}
void PgScrubber::rm_from_osd_scrubbing()
{
- if (m_scrub_job && m_scrub_job->is_state_registered()) {
+ if (m_scrub_job && m_scrub_job->is_registered()) {
dout(15) << fmt::format(
"{}: prev. state: {}", __func__, registration_state())
<< dendl;
- m_osds->get_scrub_services().remove_from_osd_queue(m_scrub_job);
+ m_osds->get_scrub_services().remove_from_osd_queue(m_pg_id);
+ m_scrub_job->registered = false;
}
}
ceph_assert(is_primary());
ceph_assert(m_scrub_job);
- auto pre_state = m_scrub_job->state_desc();
auto pre_reg = registration_state();
+ m_scrub_job->registered = true;
auto suggested = determine_scrub_time(m_pg->get_pgpool().info.opts);
- m_osds->get_scrub_services().register_with_osd(m_scrub_job, suggested);
+ m_osds->get_scrub_services().register_with_osd(*m_scrub_job, suggested);
dout(10) << fmt::format(
"{}: <flags:{}> {} <{:.5}>&<{:.10}> --> <{:.5}>&<{:.14}>",
__func__, m_planned_scrub,
(is_primary() ? "Primary" : "Replica/other"), pre_reg,
- pre_state, registration_state(), m_scrub_job->state_desc())
+ pre_reg, registration_state(), m_scrub_job->state_desc())
<< dendl;
}
*/
void PgScrubber::update_scrub_job(const requested_scrub_t& request_flags)
{
- dout(10) << fmt::format("{}: flags:<{}>", __func__, request_flags) << dendl;
- // verify that the 'in_q' status matches our "Primariority"
- if (m_scrub_job && is_primary() && !m_scrub_job->in_queues) {
- dout(1) << __func__ << " !!! primary but not scheduled! " << dendl;
+ if (!is_primary() || !m_scrub_job) {
+ dout(10) << fmt::format(
+ "{}: pg[{}]: not Primary or no scrub-job", __func__,
+ m_pg_id)
+ << dendl;
+ return;
}
- if (is_primary() && m_scrub_job) {
- ceph_assert(m_pg->is_locked());
- auto suggested = determine_scrub_time(m_pg->get_pgpool().info.opts);
- m_osds->get_scrub_services().update_job(m_scrub_job, suggested, true);
- m_pg->publish_stats_to_osd();
+ // if we were marked as 'not registered' - do not try to push into
+ // the queue. And if we are already in the queue - do not push again.
+ if (!m_scrub_job->registered) {
+ dout(10) << fmt::format("{}: PG[{}] not registered", __func__, m_pg_id)
+ << dendl;
+ return;
+ }
+
+ dout(15) << fmt::format(
+ "{}: flags:<{}> job on entry:{}", __func__, request_flags,
+ *m_scrub_job)
+ << dendl;
+ if (m_scrub_job->target_queued) {
+ m_osds->get_scrub_services().remove_from_osd_queue(*m_scrub_job);
+ dout(20) << fmt::format(
+ "{}: PG[{}] dequeuing for an update", __func__, m_pg_id)
+ << dendl;
}
+
+ ceph_assert(m_pg->is_locked());
+ auto suggested = determine_scrub_time(m_pg->get_pgpool().info.opts);
+ m_osds->get_scrub_services().update_job(*m_scrub_job, suggested, true);
+ m_pg->publish_stats_to_osd();
+
dout(15) << __func__ << ": done " << registration_state() << dendl;
}
void PgScrubber::penalize_next_scrub(Scrub::delay_cause_t cause)
{
m_osds->get_scrub_services().delay_on_failure(
- m_scrub_job, 5s, cause, ceph_clock_now());
+ *m_scrub_job, 5s, cause, ceph_clock_now());
}
void PgScrubber::on_digest_updates()
false /* is periodic? unknown, actually */};
}
}
- if (m_scrub_job->state != Scrub::qu_state_t::registered) {
+ if (!m_scrub_job->is_registered()) {
return pg_scrubbing_status_t{utime_t{},
0,
pg_scrub_sched_status_t::not_queued,
m_fsm = std::make_unique<ScrubMachine>(m_pg, this);
m_fsm->initiate();
- m_scrub_job = ceph::make_ref<Scrub::ScrubJob>(
+ m_scrub_job = std::make_optional<Scrub::ScrubJob>(
m_osds->cct, m_pg->pg_id, m_osds->get_nodeid());
}
virtual void _scrub_clear_state() {}
utime_t m_scrub_reg_stamp; ///< stamp we registered for
- Scrub::ScrubJobRef m_scrub_job; ///< the scrub-job used by the OSD to
- ///< schedule us
+
+ /// the sub-object that manages this PG's scheduling parameters.
+ /// An Optional instead of a regular member, as we wish to directly
+ /// control the order of construction/destruction.
+ std::optional<Scrub::ScrubJob> m_scrub_job;
ostream& show(ostream& out) const override;
#include "./scrub_job.h"
#include "pg_scrubber.h"
-using qu_state_t = Scrub::qu_state_t;
using must_scrub_t = Scrub::must_scrub_t;
using ScrubQContainer = Scrub::ScrubQContainer;
using sched_params_t = Scrub::sched_params_t;
using OSDRestrictions = Scrub::OSDRestrictions;
+using sched_conf_t = Scrub::sched_conf_t;
+using scrub_schedule_t = Scrub::scrub_schedule_t;
using ScrubJob = Scrub::ScrubJob;
}
ScrubJob::ScrubJob(CephContext* cct, const spg_t& pg, int node_id)
- : RefCountedObject{cct}
- , pgid{pg}
+ : pgid{pg}
, whoami{node_id}
, cct{cct}
, log_msg_prefix{fmt::format("osd.{}: scrub-job:pg[{}]:", node_id, pgid)}
"was: nb:{:s}({:s}). Called with: rest?{} {:s} ({})",
schedule.not_before, schedule.scheduled_at,
reset_failure_penalty, adjusted.scheduled_at,
- registration_state())
+ state_desc())
<< dendl;
schedule.scheduled_at = adjusted.scheduled_at;
schedule.deadline = adjusted.deadline;
if (reset_failure_penalty || (schedule.not_before < schedule.scheduled_at)) {
schedule.not_before = schedule.scheduled_at;
}
-
- updated = true;
dout(10) << fmt::format(
"adjusted: nb:{:s} ({:s}) ({})", schedule.not_before,
- schedule.scheduled_at, registration_state())
+ schedule.scheduled_at, state_desc())
<< dendl;
}
const
{
// if not in the OSD scheduling queues, not a candidate for scrubbing
- if (state != qu_state_t::registered) {
+ if (!registered) {
return "no scrub is scheduled";
}
return out << log_msg_prefix << fn << ": ";
}
-// clang-format off
-std::string_view ScrubJob::qu_state_text(qu_state_t st)
-{
- switch (st) {
- case qu_state_t::not_registered: return "not registered w/ OSD"sv;
- case qu_state_t::registered: return "registered"sv;
- case qu_state_t::unregistering: return "unregistering"sv;
- }
- // g++ (unlike CLANG), requires an extra 'return' here
- return "(unknown)"sv;
-}
-// clang-format on
-
void ScrubJob::dump(ceph::Formatter* f) const
{
f->open_object_section("scrub");
#include <atomic>
#include <chrono>
+#include <compare>
#include <iostream>
#include <memory>
#include <vector>
-#include "common/RefCountedObj.h"
#include "common/ceph_atomic.h"
#include "include/utime_fmt.h"
#include "osd/osd_types.h"
enum class must_scrub_t { not_mandatory, mandatory };
-enum class qu_state_t {
- not_registered, // not a primary, thus not considered for scrubbing by this
- // OSD (also the temporary state when just created)
- registered, // in either of the two queues ('to_scrub' or 'penalized')
- unregistering // in the process of being unregistered. Will be finalized
- // under lock
-};
-
struct scrub_schedule_t {
utime_t scheduled_at{};
utime_t deadline{0, 0};
utime_t not_before{utime_t::max()};
+ // when compared - the 'not_before' is ignored, assuming
+ // we never compare jobs with different eligibility status.
+ std::partial_ordering operator<=>(const scrub_schedule_t& rhs) const
+ {
+ auto cmp1 = scheduled_at <=> rhs.scheduled_at;
+ if (cmp1 != 0) {
+ return cmp1;
+ }
+ return deadline <=> rhs.deadline;
+ };
+ bool operator==(const scrub_schedule_t& rhs) const = default;
};
struct sched_params_t {
must_scrub_t is_must{must_scrub_t::not_mandatory};
};
-class ScrubJob final : public RefCountedObject {
+/**
+ * A collection of the configuration parameters (pool & OSD) that affect
+ * scrub scheduling.
+ */
+struct sched_conf_t {
+ /// the desired interval between shallow scrubs
+ double shallow_interval{0.0};
+
+ /// the desired interval between deep scrubs
+ double deep_interval{0.0};
+
+ /**
+ * the maximum interval between shallow scrubs, as determined by either the
+ * OSD or the pool configuration. Empty if no limit is configured.
+ */
+ std::optional<double> max_shallow;
+
+ /**
+ * the maximum interval between deep scrubs.
+ * For deep scrubs - there is no equivalent of scrub_max_interval. Per the
+ * documentation, once deep_scrub_interval has passed, we are already
+ * "overdue", at least as far as the "ignore allowed load" window is
+ * concerned. \todo based on users complaints (and the fact that the
+ * interaction between the configuration parameters is clear to no one),
+ * this will be revised shortly.
+ */
+ double max_deep{0.0};
+
+ /**
+ * interval_randomize_ratio
+ *
+ * We add an extra random duration to the configured times when doing
+ * scheduling. An event configured with an interval of <interval> will
+ * actually be scheduled at a time selected uniformly from
+ * [<interval>, (1+<interval_randomize_ratio>) * <interval>)
+ */
+ double interval_randomize_ratio{0.0};
+
+ /**
+ * a randomization factor aimed at preventing 'thundering herd' problems
+ * upon deep-scrubs common intervals. If polling a random number smaller
+ * than that percentage, the next shallow scrub is upgraded to deep.
+ */
+ double deep_randomize_ratio{0.0};
+
+ /**
+ * must we schedule a scrub with high urgency if we do not have a valid
+ * last scrub stamp?
+ */
+ bool mandatory_on_invalid{true};
+};
+
+
+class ScrubJob {
public:
/**
* a time scheduled for scrub, and a deadline: The scrub could be delayed
scrub_schedule_t schedule;
/// pg to be scrubbed
- const spg_t pgid;
+ spg_t pgid;
/// the OSD id (for the log)
- const int whoami;
+ int whoami;
- ceph::atomic<qu_state_t> state{qu_state_t::not_registered};
+ /**
+ * Set whenever the PG scrubs are managed by the OSD (i.e. - from becoming
+ * an active Primary till the end of the interval).
+ */
+ bool registered{false};
/**
- * the old 'is_registered'. Set whenever the job is registered with the OSD,
- * i.e. is in 'to_scrub'.
+ * there is a scrub target for this PG in the queue.
+ * \attn: temporary. Will be replaced with a pair of flags in the
+ * two level-specific scheduling targets.
*/
- std::atomic_bool in_queues{false};
+ bool target_queued{false};
/// how the last attempt to scrub this PG ended
delay_cause_t last_issue{delay_cause_t::none};
- /**
- * 'updated' is a temporary flag, used to create a barrier after
- * 'sched_time' and 'deadline' (or any other job entry) were modified by
- * different task.
- */
- std::atomic_bool updated{false};
-
/**
* the scrubber is waiting for locked objects to be unlocked.
* Set after a grace period has passed.
utime_t get_sched_time() const { return schedule.not_before; }
- static std::string_view qu_state_text(qu_state_t st);
-
- /**
- * relatively low-cost(*) access to the scrub job's state, to be used in
- * logging.
- * (*) not a low-cost access on x64 architecture
- */
std::string_view state_desc() const
{
- return qu_state_text(state.load(std::memory_order_relaxed));
+ return registered ? (target_queued ? "queued" : "registered")
+ : "not-registered";
}
/**
void dump(ceph::Formatter* f) const;
- /*
- * as the atomic 'in_queues' appears in many log prints, accessing it for
- * display-only should be made less expensive (on ARM. On x86 the _relaxed
- * produces the same code as '_cs')
- */
- std::string_view registration_state() const
- {
- return in_queues.load(std::memory_order_relaxed) ? "in-queue"
- : "not-queued";
- }
- /**
- * access the 'state' directly, for when a distinction between 'registered'
- * and 'unregistering' is needed (both have in_queues() == true)
- */
- bool is_state_registered() const { return state == qu_state_t::registered; }
+ bool is_registered() const { return registered; }
/**
* is this a high priority scrub job?
std::string scheduling_state(utime_t now_is, bool is_deep_expected) const;
std::ostream& gen_prefix(std::ostream& out, std::string_view fn) const;
- const std::string log_msg_prefix;
-};
-
-using ScrubJobRef = ceph::ref_t<ScrubJob>;
-using ScrubQContainer = std::vector<ScrubJobRef>;
-
-/**
- * A collection of the configuration parameters (pool & OSD) that affect
- * scrub scheduling.
- */
-struct sched_conf_t {
- /// the desired interval between shallow scrubs
- double shallow_interval{0.0};
-
- /// the desired interval between deep scrubs
- double deep_interval{0.0};
-
- /**
- * the maximum interval between shallow scrubs, as determined by either the
- * OSD or the pool configuration. Empty if no limit is configured.
- */
- std::optional<double> max_shallow;
-
- /**
- * the maximum interval between deep scrubs.
- * For deep scrubs - there is no equivalent of scrub_max_interval. Per the
- * documentation, once deep_scrub_interval has passed, we are already
- * "overdue", at least as far as the "ignore allowed load" window is
- * concerned. \todo based on users complaints (and the fact that the
- * interaction between the configuration parameters is clear to no one),
- * this will be revised shortly.
- */
- double max_deep{0.0};
-
- /**
- * interval_randomize_ratio
- *
- * We add an extra random duration to the configured times when doing
- * scheduling. An event configured with an interval of <interval> will
- * actually be scheduled at a time selected uniformly from
- * [<interval>, (1+<interval_randomize_ratio>) * <interval>)
- */
- double interval_randomize_ratio{0.0};
-
- /**
- * a randomization factor aimed at preventing 'thundering herd' problems
- * upon deep-scrubs common intervals. If polling a random number smaller
- * than that percentage, the next shallow scrub is upgraded to deep.
- */
- double deep_randomize_ratio{0.0};
+ std::string log_msg_prefix;
- /**
- * must we schedule a scrub with high urgency if we do not have a valid
- * last scrub stamp?
- */
- bool mandatory_on_invalid{true};
+ // the comparison operator is used to sort the scrub jobs in the queue.
+ // Note that it would not be needed in the next iteration of this code, as
+ // the queue would *not* hold the full ScrubJob objects, but rather -
+ // SchedTarget(s).
+ std::partial_ordering operator<=>(const ScrubJob& rhs) const
+ {
+ return schedule <=> rhs.schedule;
+ };
};
+using ScrubQContainer = std::vector<std::unique_ptr<ScrubJob>>;
+
} // namespace Scrub
namespace std {
} // namespace std
namespace fmt {
-template <>
-struct formatter<Scrub::qu_state_t> : formatter<std::string_view> {
- template <typename FormatContext>
- auto format(const Scrub::qu_state_t& s, FormatContext& ctx)
- {
- auto out = ctx.out();
- out = fmt::formatter<string_view>::format(
- std::string{Scrub::ScrubJob::qu_state_text(s)}, ctx);
- return out;
- }
-};
template <>
struct formatter<Scrub::sched_params_t> {
auto format(const Scrub::ScrubJob& sjob, FormatContext& ctx)
{
return fmt::format_to(
- ctx.out(), "pg[{}] @ nb:{:s} ({:s}) (dl:{:s}) - <{}> queue state:{:.7}",
+ ctx.out(), "pg[{}] @ nb:{:s} ({:s}) (dl:{:s}) - <{}>",
sjob.pgid, sjob.schedule.not_before, sjob.schedule.scheduled_at,
- sjob.schedule.deadline, sjob.registration_state(),
- sjob.state.load(std::memory_order_relaxed));
+ sjob.schedule.deadline, sjob.state_desc());
}
};
{
return fmt::format_to(
ctx.out(),
- "periods: s:{}/{} d:{}/{} iv-ratio:{} deep-rand:{} on-inv:{}",
+ "periods:s:{}/{},d:{}/{},iv-ratio:{},deep-rand:{},on-inv:{}",
cf.shallow_interval, cf.max_shallow.value_or(-1.0), cf.deep_interval,
cf.max_deep, cf.interval_randomize_ratio, cf.deep_randomize_ratio,
cf.mandatory_on_invalid);