}
-// used under jobs_lock
-void ScrubQueue::move_failed_pgs(utime_t now_is)
-{
- int punished_cnt{0}; // for log/debug only
-
- for (auto job = to_scrub.begin(); job != to_scrub.end();) {
- if ((*job)->resources_failure) {
- auto sjob = *job;
-
- // last time it was scheduled for a scrub, this PG failed in securing
- // remote resources. Move it to the secondary scrub queue.
-
- dout(15) << "moving " << sjob->pgid
- << " state: " << ScrubJob::qu_state_text(sjob->state) << dendl;
-
- // determine the penalty time, after which the job should be reinstated
- utime_t after = now_is;
- after += conf()->osd_scrub_sleep * 2 + utime_t{300'000ms};
-
- // note: currently - not taking 'deadline' into account when determining
- // 'penalty_timeout'.
- sjob->penalty_timeout = after;
- sjob->resources_failure = false;
- sjob->updated = false; // as otherwise will be pardoned immediately
-
- // place in the penalty list, and remove from the to-scrub group
- penalized.push_back(sjob);
- job = to_scrub.erase(job);
- punished_cnt++;
- } else {
- job++;
- }
- }
-
- if (punished_cnt) {
- dout(15) << "# of jobs penalized: " << punished_cnt << dendl;
- }
-}
-
std::vector<ScrubTargetId> ScrubQueue::ready_to_scrub(
OSDRestrictions restrictions, // note: 4B in size! (copy)
utime_t scrub_tick)
{
dout(10) << fmt::format(
- " @{:s}: reg./pen. sizes: {} / {} ({})", scrub_tick,
- to_scrub.size(), penalized.size(), restrictions)
+ " @{:s}: registered: {} ({})", scrub_tick,
+ to_scrub.size(), restrictions)
<< dendl;
+
// create a list of candidates (copying, as otherwise creating a deadlock):
- // - possibly restore penalized
// - (if we didn't handle directly) remove invalid jobs
// - create a copy of the to_scrub (possibly up to first not-ripe)
- // - same for the penalized (although that usually be a waste)
// unlock, then try the lists
-
std::unique_lock lck{jobs_lock};
- // pardon all penalized jobs that have deadlined (or were updated)
- scan_penalized(restore_penalized, scrub_tick);
- restore_penalized = false;
-
// remove the 'updated' flag from all entries
std::for_each(
to_scrub.begin(), to_scrub.end(),
[](const auto& jobref) -> void { jobref->updated = false; });
- // add failed scrub attempts to the penalized list
- move_failed_pgs(scrub_tick);
-
- // collect all valid & ripe jobs from the two lists. Note that we must copy,
+ // collect all valid & ripe jobs. Note that we must copy,
// as when we use the lists we will not be holding jobs_lock (see
// explanation above)
// transformed into a vector of targets (which, in this phase, are
// the PG id-s).
auto to_scrub_copy = collect_ripe_jobs(to_scrub, restrictions, scrub_tick);
- auto penalized_copy = collect_ripe_jobs(penalized, restrictions, scrub_tick);
lck.unlock();
std::vector<ScrubTargetId> all_ready;
to_scrub_copy.cbegin(), to_scrub_copy.cend(),
std::back_inserter(all_ready),
[](const auto& jobref) -> ScrubTargetId { return jobref->pgid; });
- // not bothering to handle the "reached the penalized - so all should be
- // forgiven" case, as the penalty queue is destined to be removed in a
- // followup PR.
- std::transform(
- penalized_copy.cbegin(), penalized_copy.cend(),
- std::back_inserter(all_ready),
- [](const auto& jobref) -> ScrubTargetId { return jobref->pgid; });
return all_ready;
}
}
-// note: called with jobs_lock held
-void ScrubQueue::scan_penalized(bool forgive_all, utime_t time_now)
-{
- dout(20) << time_now << (forgive_all ? " all " : " - ") << penalized.size()
- << dendl;
-
- // clear dead entries (deleted PGs, or those PGs we are no longer their
- // primary)
- rm_unregistered_jobs(penalized);
-
- if (forgive_all) {
-
- std::copy(penalized.begin(), penalized.end(), std::back_inserter(to_scrub));
- penalized.clear();
-
- } else {
-
- auto forgiven_last = std::partition(
- penalized.begin(),
- penalized.end(),
- [time_now](const auto& e) {
- return (*e).updated || ((*e).penalty_timeout <= time_now);
- });
-
- std::copy(penalized.begin(), forgiven_last, std::back_inserter(to_scrub));
- penalized.erase(penalized.begin(), forgiven_last);
- dout(20) << "penalized after screening: " << penalized.size() << dendl;
- }
-}
-
void ScrubQueue::dump_scrubs(ceph::Formatter* f) const
{
ceph_assert(f != nullptr);
std::lock_guard lck(jobs_lock);
f->open_array_section("scrubs");
-
std::for_each(
to_scrub.cbegin(), to_scrub.cend(),
[&f](const Scrub::ScrubJobRef& j) { j->dump(f); });
-
- std::for_each(
- penalized.cbegin(), penalized.cend(),
- [&f](const Scrub::ScrubJobRef& j) { j->dump(f); });
-
f->close_section();
}
ScrubQContainer ScrubQueue::list_registered_jobs() const
{
ScrubQContainer all_jobs;
- all_jobs.reserve(to_scrub.size() + penalized.size());
+ all_jobs.reserve(to_scrub.size());
dout(20) << " size: " << all_jobs.capacity() << dendl;
std::lock_guard lck{jobs_lock};
-
std::copy_if(to_scrub.begin(),
to_scrub.end(),
std::back_inserter(all_jobs),
registered_job);
- std::copy_if(penalized.begin(),
- penalized.end(),
- std::back_inserter(all_jobs),
- registered_job);
-
return all_jobs;
}
│ │
│ │
│ ScrubQContainer to_scrub <>────────┼────────┐
-│ ScrubQContainer penalized │ │
│ │ │
│ │ │
│ OSD_wide resource counters │ │
* the queue of PGs waiting to be scrubbed.
* Main operations are scheduling/unscheduling a PG to be scrubbed at a certain
* time.
- *
- * A "penalty" queue maintains those PGs that have failed to reserve the
- * resources of their replicas. The PGs in this list will be reinstated into the
- * scrub queue when all eligible PGs were already handled, or after a timeout
- * (or if their deadline has passed [[disabled at this time]]).
*/
class ScrubQueue {
public:
mutable ceph::mutex jobs_lock = ceph::make_mutex("ScrubQueue::jobs_lock");
Scrub::ScrubQContainer to_scrub; ///< scrub jobs (i.e. PGs) to scrub
- Scrub::ScrubQContainer penalized; ///< those that failed to reserve remote resources
- bool restore_penalized{false};
static inline constexpr auto registered_job = [](const auto& jobref) -> bool {
return jobref->state == Scrub::qu_state_t::registered;
return jobref->state == Scrub::qu_state_t::not_registered;
};
- /**
- * Are there scrub jobs that should be reinstated?
- */
- void scan_penalized(bool forgive_all, utime_t time_now);
-
/**
* clear dead entries (unregistered, or belonging to removed PGs) from a
* queue. Job state is changed to match new status.
Scrub::scrub_schedule_t adjust_target_time(
const Scrub::sched_params_t& recomputed_params) const;
- /**
- * Look for scrub jobs that have their 'resources_failure' set. These jobs
- * have failed to acquire remote resources last time we've initiated a scrub
- * session on them. They are now moved from the 'to_scrub' queue to the
- * 'penalized' set.
- *
- * locking: called with job_lock held
- */
- void move_failed_pgs(utime_t now_is);
-
protected: // used by the unit-tests
/**
* unit-tests will override this function to return a mock time
* 'updated' is a temporary flag, used to create a barrier after
* 'sched_time' and 'deadline' (or any other job entry) were modified by
* different task.
- * 'updated' also signals the need to move a job back from the penalized
- * queue to the regular one.
*/
std::atomic_bool updated{false};
bool blocked{false};
utime_t blocked_since{};
- utime_t penalty_timeout{0, 0};
-
CephContext* cct;
bool high_priority{false};
{
return fmt::format_to(
ctx.out(),
- "pg[{}] @ {:s} (dl:{:s}) - <{}> / failure: {} / pen. t.o.: {:s} / "
- "queue "
- "state: {:.7}",
- sjob.pgid, sjob.schedule.scheduled_at, sjob.schedule.deadline,
- sjob.registration_state(), sjob.resources_failure, sjob.penalty_timeout,
- sjob.state.load(std::memory_order_relaxed));
+ "pg[{}] @ {:s} (dl:{:s}) - <{}> / failure: {} / queue state: "
+ "{:.7}",
+ sjob.pgid, sjob.schedule.scheduled_at,
+ sjob.schedule.deadline, sjob.registration_state(),
+ sjob.resources_failure, sjob.state.load(std::memory_order_relaxed));
}
};