]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd/scrub: extracting scrub scheduling code from OSD.cc 40984/head
authorRonen Friedman <rfriedma@redhat.com>
Thu, 22 Apr 2021 07:56:12 +0000 (10:56 +0300)
committerRonen Friedman <rfriedma@redhat.com>
Sat, 18 Sep 2021 10:22:12 +0000 (10:22 +0000)
A separate object (ScrubQueue) now manages the collection of ScrubJob-s
that are to be scrubbed.

Notable changes:
- ScrubQueue keeps track of the CPU load (instead of the OSD main)
- ScrubJob-s are part of each PG's scrubber. They are registered with the
  ScrubQueue when eligible for scrubbing (i.e. - active Primary), and removed
  from it otherwise.
- "a PG is trying to secure Replica resources" was always treated as an OSD-wide
  state that delays new scrubs from starting. Here - it is also implemented as such
  (instead of per-PG state that must be queried)

Internally, the SQ maintains two groups of jobs: the regular ones, waiting to be
scrubbed, and the "penalized" - those PGs that failed in securing their
replicas' scrub resources (@dzafman changes, slightly modified to fit the
new environment)

Also included: minor fixes to the scrub state-machine and events priorities.

Signed-off-by: Ronen Friedman <rfriedma@redhat.com>
Co-authored-by: David Zafman <dzafman@redhat.com>
18 files changed:
src/crimson/osd/pg.h
src/osd/CMakeLists.txt
src/osd/OSD.cc
src/osd/OSD.h
src/osd/PG.cc
src/osd/PG.h
src/osd/PeeringState.cc
src/osd/PeeringState.h
src/osd/PrimaryLogPG.cc
src/osd/scrubber/osd_scrub_sched.cc [new file with mode: 0644]
src/osd/scrubber/osd_scrub_sched.h [new file with mode: 0644]
src/osd/scrubber/pg_scrubber.cc
src/osd/scrubber/pg_scrubber.h
src/osd/scrubber/scrub_machine.cc
src/osd/scrubber/scrub_machine.h
src/osd/scrubber/scrub_machine_lstnr.h
src/osd/scrubber_common.h
src/test/osd/TestOSDScrub.cc

index 39f645fc918780d531b4cfcedff6edb99b992330..b78a44dd5538d9b40dc5d0c235772850aa53153d 100644 (file)
@@ -157,6 +157,14 @@ public:
     // Not needed yet -- mainly for scrub scheduling
   }
 
+  /// Notify PG that Primary/Replica status has changed (to update scrub registration)
+  void on_primary_status_change(bool was_primary, bool now_primary) final {
+  }
+
+  /// Need to reschedule next scrub. Assuming no change in role
+  void reschedule_scrub() final {
+  }
+
   void scrub_requested(scrub_level_t scrub_level, scrub_type_t scrub_type) final;
 
   uint64_t get_snap_trimq_size() const final {
index 82a5451804a717c257feaba2722e38ff37a3b5b3..26a0dd519fc4d0f2e9695a33d8cad6e9665a838d 100644 (file)
@@ -22,6 +22,7 @@ set(osd_srcs
   PGBackend.cc
   OSDCap.cc
   scrubber/pg_scrubber.cc
+  scrubber/osd_scrub_sched.cc
   scrubber/PrimaryLogScrub.cc
   scrubber/scrub_machine.cc
   scrubber/ScrubStore.cc
index 62aa2f767a4501f3eb70a182d2257749a4845830..ab2f4a0ff5df22ee98bf26dcaa205a971997a1e6 100644 (file)
@@ -267,8 +267,7 @@ OSDService::OSDService(OSD *osd, ceph::async::io_context_pool& poolctx) :
   publish_lock{ceph::make_mutex("OSDService::publish_lock")},
   pre_publish_lock{ceph::make_mutex("OSDService::pre_publish_lock")},
   max_oldest_map(0),
-  scrubs_local(0),
-  scrubs_remote(0),
+  m_scrub_queue{cct, *this},
   agent_valid_iterator(false),
   agent_ops(0),
   flush_mode_high_count(0),
@@ -1267,79 +1266,6 @@ void OSDService::prune_pg_created()
 // --------------------------------------
 // dispatch
 
-bool OSDService::can_inc_scrubs()
-{
-  bool can_inc = false;
-  std::lock_guard l(sched_scrub_lock);
-
-  if (scrubs_local + scrubs_remote < cct->_conf->osd_max_scrubs) {
-    dout(20) << __func__ << " == true " << scrubs_local << " local + " << scrubs_remote
-            << " remote < max " << cct->_conf->osd_max_scrubs << dendl;
-    can_inc = true;
-  } else {
-    dout(20) << __func__ << " == false " << scrubs_local << " local + " << scrubs_remote
-            << " remote >= max " << cct->_conf->osd_max_scrubs << dendl;
-  }
-
-  return can_inc;
-}
-
-bool OSDService::inc_scrubs_local()
-{
-  bool result = false;
-  std::lock_guard l{sched_scrub_lock};
-  if (scrubs_local + scrubs_remote < cct->_conf->osd_max_scrubs) {
-    dout(20) << __func__ << " " << scrubs_local << " -> " << (scrubs_local+1)
-            << " (max " << cct->_conf->osd_max_scrubs << ", remote " << scrubs_remote << ")" << dendl;
-    result = true;
-    ++scrubs_local;
-  } else {
-    dout(20) << __func__ << " " << scrubs_local << " local + " << scrubs_remote << " remote >= max " << cct->_conf->osd_max_scrubs << dendl;
-  }
-  return result;
-}
-
-void OSDService::dec_scrubs_local()
-{
-  std::lock_guard l{sched_scrub_lock};
-  dout(20) << __func__ << " " << scrubs_local << " -> " << (scrubs_local-1)
-          << " (max " << cct->_conf->osd_max_scrubs << ", remote " << scrubs_remote << ")" << dendl;
-  --scrubs_local;
-  ceph_assert(scrubs_local >= 0);
-}
-
-bool OSDService::inc_scrubs_remote()
-{
-  bool result = false;
-  std::lock_guard l{sched_scrub_lock};
-  if (scrubs_local + scrubs_remote < cct->_conf->osd_max_scrubs) {
-    dout(20) << __func__ << " " << scrubs_remote << " -> " << (scrubs_remote+1)
-            << " (max " << cct->_conf->osd_max_scrubs << ", local " << scrubs_local << ")" << dendl;
-    result = true;
-    ++scrubs_remote;
-  } else {
-    dout(20) << __func__ << " " << scrubs_local << " local + " << scrubs_remote << " remote >= max " << cct->_conf->osd_max_scrubs << dendl;
-  }
-  return result;
-}
-
-void OSDService::dec_scrubs_remote()
-{
-  std::lock_guard l{sched_scrub_lock};
-  dout(20) << __func__ << " " << scrubs_remote << " -> " << (scrubs_remote-1)
-          << " (max " << cct->_conf->osd_max_scrubs << ", local " << scrubs_local << ")" << dendl;
-  --scrubs_remote;
-  ceph_assert(scrubs_remote >= 0);
-}
-
-void OSDService::dump_scrub_reservations(Formatter *f)
-{
-  std::lock_guard l{sched_scrub_lock};
-  f->dump_int("scrubs_local", scrubs_local);
-  f->dump_int("scrubs_remote", scrubs_remote);
-  f->dump_int("osd_max_scrubs", cct->_conf->osd_max_scrubs);
-}
-
 void OSDService::retrieve_epochs(epoch_t *_boot_epoch, epoch_t *_up_epoch,
                                  epoch_t *_bind_epoch) const
 {
@@ -2793,7 +2719,7 @@ will start to track new ops received afterwards.";
     f->close_section();
   } else if (prefix == "dump_scrub_reservations") {
     f->open_object_section("scrub_reservations");
-    service.dump_scrub_reservations(f);
+    service.get_scrub_services().dump_scrub_reservations(f);
     f->close_section();
   } else if (prefix == "get_latest_osdmap") {
     get_latest_osdmap();
@@ -2843,7 +2769,7 @@ will start to track new ops received afterwards.";
   } else if (prefix == "dump_objectstore_kv_stats") {
     store->get_db_statistics(f);
   } else if (prefix == "dump_scrubs") {
-    service.dumps_scrub(f);
+    service.get_scrub_services().dump_scrubs(f);
   } else if (prefix == "calc_objectstore_db_histogram") {
     store->generate_db_histogram(f);
   } else if (prefix == "flush_store_cache") {
@@ -5004,8 +4930,6 @@ void OSD::load_pgs()
       store->set_collection_commit_queue(pg->coll, &(shards[shard_index]->context_queue));
     }
 
-    pg->reg_next_scrub();
-
     dout(10) << __func__ << " loaded " << *pg << dendl;
     pg->unlock();
 
@@ -5915,22 +5839,10 @@ void OSD::heartbeat()
   ceph_assert(ceph_mutex_is_locked_by_me(heartbeat_lock));
   dout(30) << "heartbeat" << dendl;
 
-  // get CPU load avg
-  double loadavgs[1];
-  int hb_interval = cct->_conf->osd_heartbeat_interval;
-  int n_samples = 86400;
-  if (hb_interval > 1) {
-    n_samples /= hb_interval;
-    if (n_samples < 1)
-      n_samples = 1;
-  }
-
-  if (getloadavg(loadavgs, 1) == 1) {
-    logger->set(l_osd_loadavg, 100 * loadavgs[0]);
-    daily_loadavg = (daily_loadavg * (n_samples - 1) + loadavgs[0]) / n_samples;
-    dout(30) << "heartbeat: daily_loadavg " << daily_loadavg << dendl;
+  auto load_for_logger = service.get_scrub_services().update_load_average();
+  if (load_for_logger) {
+    logger->set(l_osd_loadavg, load_for_logger.value());
   }
-
   dout(30) << "heartbeat checking stats" << dendl;
 
   // refresh peer list and osd stats
@@ -7576,264 +7488,104 @@ bool OSD::scrub_random_backoff()
   return false;
 }
 
-OSDService::ScrubJob::ScrubJob(CephContext* cct,
-                              const spg_t& pg, const utime_t& timestamp,
-                              double pool_scrub_min_interval,
-                              double pool_scrub_max_interval, bool must)
-  : cct(cct),
-    pgid(pg),
-    sched_time(timestamp),
-    deadline(timestamp)
-{
-  // if not explicitly requested, postpone the scrub with a random delay
-  if (!must) {
-    double scrub_min_interval = pool_scrub_min_interval > 0 ?
-      pool_scrub_min_interval : cct->_conf->osd_scrub_min_interval;
-    double scrub_max_interval = pool_scrub_max_interval > 0 ?
-      pool_scrub_max_interval : cct->_conf->osd_scrub_max_interval;
-
-    sched_time += scrub_min_interval;
-    double r = rand() / (double)RAND_MAX;
-    sched_time +=
-      scrub_min_interval * cct->_conf->osd_scrub_interval_randomize_ratio * r;
-    if (scrub_max_interval == 0) {
-      deadline = utime_t();
-    } else {
-      deadline += scrub_max_interval;
-    }
 
-  }
-}
-
-bool OSDService::ScrubJob::ScrubJob::operator<(const OSDService::ScrubJob& rhs) const {
-  if (sched_time < rhs.sched_time)
-    return true;
-  if (sched_time > rhs.sched_time)
-    return false;
-  return pgid < rhs.pgid;
-}
-
-void OSDService::dumps_scrub(ceph::Formatter *f)
+void OSD::sched_scrub()
 {
-  ceph_assert(f != nullptr);
-  std::lock_guard l(sched_scrub_lock);
+  auto& scrub_scheduler = service.get_scrub_services();
 
-  f->open_array_section("scrubs");
-  for (const auto &i: sched_scrub_pg) {
-    f->open_object_section("scrub");
-    f->dump_stream("pgid") << i.pgid;
-    f->dump_stream("sched_time") << i.sched_time;
-    f->dump_stream("deadline") << i.deadline;
-    f->dump_bool("forced", i.sched_time == PgScrubber::scrub_must_stamp());
-    f->close_section();
+  // fail fast if no resources are available
+  if (!scrub_scheduler.can_inc_scrubs()) {
+    dout(20) << __func__ << ": OSD cannot inc scrubs" << dendl;
+    return;
   }
-  f->close_section();
-}
 
-double OSD::scrub_sleep_time(bool must_scrub)
-{
-  if (must_scrub) {
-    return cct->_conf->osd_scrub_sleep;
-  }
-  utime_t now = ceph_clock_now();
-  if (scrub_time_permit(now)) {
-    return cct->_conf->osd_scrub_sleep;
+  // if there is a PG that is just now trying to reserve scrub replica resources -
+  // we should wait and not initiate a new scrub
+  if (scrub_scheduler.is_reserving_now()) {
+    dout(20) << __func__ << ": scrub resources reservation in progress" << dendl;
+    return;
   }
-  double normal_sleep = cct->_conf->osd_scrub_sleep;
-  double extended_sleep = cct->_conf->osd_scrub_extended_sleep;
-  return std::max(extended_sleep, normal_sleep);
-}
 
-bool OSD::scrub_time_permit(utime_t now)
-{
-  struct tm bdt;
-  time_t tt = now.sec();
-  localtime_r(&tt, &bdt);
+  Scrub::ScrubPreconds env_conditions;
 
-  bool day_permit = false;
-  if (cct->_conf->osd_scrub_begin_week_day < cct->_conf->osd_scrub_end_week_day) {
-    if (bdt.tm_wday >= cct->_conf->osd_scrub_begin_week_day && bdt.tm_wday < cct->_conf->osd_scrub_end_week_day) {
-      day_permit = true;
-    }
-  } else {
-    if (bdt.tm_wday >= cct->_conf->osd_scrub_begin_week_day || bdt.tm_wday < cct->_conf->osd_scrub_end_week_day) {
-      day_permit = true;
+  if (service.is_recovery_active() && !cct->_conf->osd_scrub_during_recovery) {
+    if (!cct->_conf->osd_repair_during_recovery) {
+      dout(15) << __func__ << ": not scheduling scrubs due to active recovery"
+              << dendl;
+      return;
     }
+    dout(10) << __func__
+      << " will only schedule explicitly requested repair due to active recovery"
+      << dendl;
+    env_conditions.allow_requested_repair_only = true;
   }
 
-  if (!day_permit) {
-    dout(20) << __func__ << " should run between week day " << cct->_conf->osd_scrub_begin_week_day
-            << " - " << cct->_conf->osd_scrub_end_week_day
-            << " now " << bdt.tm_wday << " = no" << dendl;
-    return false;
-  }
-
-  bool time_permit = false;
-  if (cct->_conf->osd_scrub_begin_hour < cct->_conf->osd_scrub_end_hour) {
-    if (bdt.tm_hour >= cct->_conf->osd_scrub_begin_hour && bdt.tm_hour < cct->_conf->osd_scrub_end_hour) {
-      time_permit = true;
-    }
-  } else {
-    if (bdt.tm_hour >= cct->_conf->osd_scrub_begin_hour || bdt.tm_hour < cct->_conf->osd_scrub_end_hour) {
-      time_permit = true;
+  if (g_conf()->subsys.should_gather<ceph_subsys_osd, 20>()) {
+    dout(20) << __func__ << " sched_scrub starts" << dendl;
+    auto all_jobs = scrub_scheduler.list_registered_jobs();
+    for (const auto& sj : all_jobs) {
+      dout(20) << "sched_scrub scrub-queue jobs: " << *sj << dendl;
     }
   }
-  if (time_permit) {
-    dout(20) << __func__ << " should run between " << cct->_conf->osd_scrub_begin_hour
-            << " - " << cct->_conf->osd_scrub_end_hour
-            << " now " << bdt.tm_hour << " = yes" << dendl;
-  } else {
-    dout(20) << __func__ << " should run between " << cct->_conf->osd_scrub_begin_hour
-            << " - " << cct->_conf->osd_scrub_end_hour
-            << " now " << bdt.tm_hour << " = no" << dendl;
-  }
-  return time_permit;
+
+  auto was_started = scrub_scheduler.select_pg_and_scrub(env_conditions);
+  dout(20) << "sched_scrub done (" << ScrubQueue::attempt_res_text(was_started)
+          << ")" << dendl;
 }
 
-bool OSD::scrub_load_below_threshold()
+Scrub::schedule_result_t OSDService::initiate_a_scrub(spg_t pgid,
+                                                     bool allow_requested_repair_only)
 {
-  double loadavgs[3];
-  if (getloadavg(loadavgs, 3) != 3) {
-    dout(10) << __func__ << " couldn't read loadavgs\n" << dendl;
-    return false;
-  }
+  dout(20) << __func__ << " trying " << pgid << dendl;
 
-  // allow scrub if below configured threshold
-  long cpus = sysconf(_SC_NPROCESSORS_ONLN);
-  double loadavg_per_cpu = cpus > 0 ? loadavgs[0] / cpus : loadavgs[0];
-  if (loadavg_per_cpu < cct->_conf->osd_scrub_load_threshold) {
-    dout(20) << __func__ << " loadavg per cpu " << loadavg_per_cpu
-            << " < max " << cct->_conf->osd_scrub_load_threshold
-            << " = yes" << dendl;
-    return true;
-  }
+  // we have a candidate to scrub. We need some PG information to know if scrubbing is
+  // allowed
 
-  // allow scrub if below daily avg and currently decreasing
-  if (loadavgs[0] < daily_loadavg && loadavgs[0] < loadavgs[2]) {
-    dout(20) << __func__ << " loadavg " << loadavgs[0]
-            << " < daily_loadavg " << daily_loadavg
-            << " and < 15m avg " << loadavgs[2]
-            << " = yes" << dendl;
-    return true;
+  PGRef pg = osd->lookup_lock_pg(pgid);
+  if (!pg) {
+    // the PG was dequeued in the short timespan between creating the candidates list
+    // (collect_ripe_jobs()) and here
+    dout(5) << __func__ << " pg  " << pgid << " not found" << dendl;
+    return Scrub::schedule_result_t::no_such_pg;
   }
 
-  dout(20) << __func__ << " loadavg " << loadavgs[0]
-          << " >= max " << cct->_conf->osd_scrub_load_threshold
-          << " and ( >= daily_loadavg " << daily_loadavg
-          << " or >= 15m avg " << loadavgs[2]
-          << ") = no" << dendl;
-  return false;
-}
-
-void OSD::sched_scrub()
-{
-  dout(20) << __func__ << " sched_scrub starts" << dendl;
-
-  // if not permitted, fail fast
-  if (!service.can_inc_scrubs()) {
-    dout(20) << __func__ << ": OSD cannot inc scrubs" << dendl;
-    return;
+  // This has already started, so go on to the next scrub job
+  if (pg->is_scrub_active()) {
+    pg->unlock();
+    dout(20) << __func__ << ": already in progress pgid " << pgid << dendl;
+    return Scrub::schedule_result_t::already_started;
   }
-  bool allow_requested_repair_only = false;
-  if (service.is_recovery_active() && !cct->_conf->osd_scrub_during_recovery) {
-    if (!cct->_conf->osd_repair_during_recovery) {
-      dout(15) << __func__ << ": not scheduling scrubs due to active recovery" << dendl;
-      return;
-    }
-    dout(10) << __func__
-             << " will only schedule explicitly requested repair due to active recovery"
-             << dendl;
-    allow_requested_repair_only = true;
+  // Skip other kinds of scrubbing if only explicitly requested repairing is allowed
+  if (allow_requested_repair_only && !pg->m_planned_scrub.must_repair) {
+    pg->unlock();
+    dout(10) << __func__ << " skip " << pgid
+            << " because repairing is not explicitly requested on it" << dendl;
+    return Scrub::schedule_result_t::preconditions;
   }
 
-  utime_t now = ceph_clock_now();
-  bool time_permit = scrub_time_permit(now);
-  bool load_is_low = scrub_load_below_threshold();
-  dout(20) << "sched_scrub load_is_low=" << (int)load_is_low << dendl;
-
-  OSDService::ScrubJob scrub_job;
-  if (service.first_scrub_stamp(&scrub_job)) {
-    do {
-      dout(30) << "sched_scrub examine " << scrub_job.pgid << " at " << scrub_job.sched_time << dendl;
-
-      if (scrub_job.sched_time > now) {
-       // save ourselves some effort
-       dout(20) << "sched_scrub " << scrub_job.pgid << " scheduled at " << scrub_job.sched_time
-                << " > " << now << dendl;
-       break;
-      }
-
-      if ((scrub_job.deadline.is_zero() || scrub_job.deadline >= now) && !(time_permit && load_is_low)) {
-        dout(15) << __func__ << " not scheduling scrub for " << scrub_job.pgid << " due to "
-                 << (!time_permit ? "time not permit" : "high load") << dendl;
-        continue;
-      }
-
-      PGRef pg = _lookup_lock_pg(scrub_job.pgid);
-      if (!pg) {
-       dout(20) << __func__ << " pg  " << scrub_job.pgid << " not found" << dendl;
-       continue;
-      }
-
-      // This has already started, so go on to the next scrub job
-      if (pg->is_scrub_active()) {
-       pg->unlock();
-       dout(20) << __func__ << ": already in progress pgid " << scrub_job.pgid << dendl;
-       continue;
-      }
-      // Skip other kinds of scrubbing if only explicitly requested repairing is allowed
-      if (allow_requested_repair_only && !pg->m_planned_scrub.must_repair) {
-        pg->unlock();
-        dout(10) << __func__ << " skip " << scrub_job.pgid
-                 << " because repairing is not explicitly requested on it"
-                 << dendl;
-        continue;
-      }
-
-      // If it is reserving, let it resolve before going to the next scrub job
-      if (pg->m_scrubber->is_reserving()) {
-       pg->unlock();
-       dout(10) << __func__ << ": reserve in progress pgid " << scrub_job.pgid << dendl;
-       break;
-      }
-      dout(15) << "sched_scrub scrubbing " << scrub_job.pgid << " at " << scrub_job.sched_time
-              << (pg->get_must_scrub() ? ", explicitly requested" :
-                  (load_is_low ? ", load_is_low" : " deadline < now"))
-              << dendl;
-      if (pg->sched_scrub()) {
-       pg->unlock();
-        dout(10) << __func__ << " scheduled a scrub!" << " (~" << scrub_job.pgid << "~)" << dendl;
-       break;
-      }
-      pg->unlock();
-    } while (service.next_scrub_stamp(scrub_job, &scrub_job));
-  }
-  dout(20) << "sched_scrub done" << dendl;
+  auto scrub_attempt = pg->sched_scrub();
+  pg->unlock();
+  return scrub_attempt;
 }
 
 void OSD::resched_all_scrubs()
 {
   dout(10) << __func__ << ": start" << dendl;
-  const vector<spg_t> pgs = [this] {
-    vector<spg_t> pgs;
-    OSDService::ScrubJob job;
-    if (service.first_scrub_stamp(&job)) {
-      do {
-        pgs.push_back(job.pgid);
-      } while (service.next_scrub_stamp(job, &job));
-    }
-    return pgs;
-  }();
-  for (auto& pgid : pgs) {
-      dout(20) << __func__ << ": examine " << pgid << dendl;
-      PGRef pg = _lookup_lock_pg(pgid);
-      if (!pg)
-       continue;
-      if (!pg->m_planned_scrub.must_scrub && !pg->m_planned_scrub.need_auto) {
-        dout(15) << __func__ << ": reschedule " << pgid << dendl;
-        pg->on_info_history_change();
-      }
-      pg->unlock();
+  auto all_jobs = service.get_scrub_services().list_registered_jobs();
+  for (auto& e : all_jobs) {
+
+    auto& job = *e;
+    dout(20) << __func__ << ": examine " << job.pgid << dendl;
+
+    PGRef pg = _lookup_lock_pg(job.pgid);
+    if (!pg)
+      continue;
+
+    if (!pg->m_planned_scrub.must_scrub && !pg->m_planned_scrub.need_auto) {
+      dout(15) << __func__ << ": reschedule " << job.pgid << dendl;
+      pg->reschedule_scrub();
+    }
+    pg->unlock();
   }
   dout(10) << __func__ << ": done" << dendl;
 }
index 53e2433a0334b25eccea01bd4d3cc2ec7f1a90b0..ad85d224a09e924b917ad915d99495aac6070410 100644 (file)
@@ -53,6 +53,7 @@
 #include "common/EventTrace.h"
 #include "osd/osd_perf_counters.h"
 #include "common/Finisher.h"
+#include "scrubber/osd_scrub_sched.h"
 
 #define CEPH_OSD_PROTOCOL    10 /* cluster internal */
 
@@ -260,87 +261,39 @@ public:
   }
   entity_name_t get_cluster_msgr_name() const;
 
-private:
-  // -- scrub scheduling --
-  ceph::mutex sched_scrub_lock = ceph::make_mutex("OSDService::sched_scrub_lock");
-  int scrubs_local;
-  int scrubs_remote;
 
 public:
-  struct ScrubJob {
-    CephContext* cct;
-    /// pg to be scrubbed
-    spg_t pgid;
-    /// a time scheduled for scrub. but the scrub could be delayed if system
-    /// load is too high or it fails to fall in the scrub hours
-    utime_t sched_time;
-    /// the hard upper bound of scrub time
-    utime_t deadline;
-    ScrubJob() : cct(nullptr) {}
-    explicit ScrubJob(CephContext* cct, const spg_t& pg,
-                     const utime_t& timestamp,
-                     double pool_scrub_min_interval = 0,
-                     double pool_scrub_max_interval = 0, bool must = true);
-    /// order the jobs by sched_time
-    bool operator<(const ScrubJob& rhs) const;
-  };
-  std::set<ScrubJob> sched_scrub_pg;
-
-  /// @returns the scrub_reg_stamp used for unregistering the scrub job
-  utime_t reg_pg_scrub(spg_t pgid,
-                      utime_t t,
-                      double pool_scrub_min_interval,
-                      double pool_scrub_max_interval,
-                      bool must) {
-    ScrubJob scrub_job(cct, pgid, t, pool_scrub_min_interval, pool_scrub_max_interval,
-                      must);
-    std::lock_guard l(OSDService::sched_scrub_lock);
-    sched_scrub_pg.insert(scrub_job);
-    return scrub_job.sched_time;
-  }
-
-  void unreg_pg_scrub(spg_t pgid, utime_t t) {
-    std::lock_guard l(sched_scrub_lock);
-    size_t removed = sched_scrub_pg.erase(ScrubJob(cct, pgid, t));
-    ceph_assert(removed);
-  }
-
-  bool first_scrub_stamp(ScrubJob *out) {
-    std::lock_guard l(sched_scrub_lock);
-    if (sched_scrub_pg.empty())
-      return false;
-    std::set<ScrubJob>::iterator iter = sched_scrub_pg.begin();
-    *out = *iter;
-    return true;
-  }
-  bool next_scrub_stamp(const ScrubJob& next,
-                       ScrubJob *out) {
-    std::lock_guard l(sched_scrub_lock);
-    if (sched_scrub_pg.empty())
-      return false;
-    std::set<ScrubJob>::const_iterator iter = sched_scrub_pg.upper_bound(next);
-    if (iter == sched_scrub_pg.cend())
-      return false;
-    *out = *iter;
-    return true;
-  }
-
-  void dumps_scrub(ceph::Formatter* f);
-
-  bool can_inc_scrubs();
-  bool inc_scrubs_local();
-  void dec_scrubs_local();
-  bool inc_scrubs_remote();
-  void dec_scrubs_remote();
-  void dump_scrub_reservations(ceph::Formatter *f);
 
   void reply_op_error(OpRequestRef op, int err);
   void reply_op_error(OpRequestRef op, int err, eversion_t v, version_t uv,
                      std::vector<pg_log_op_return_item_t> op_returns);
   void handle_misdirected_op(PG *pg, OpRequestRef op);
 
+ private:
+  /**
+   * The entity that maintains the set of PGs we may scrub (i.e. - those that we
+   * are their primary), and schedules their scrubbing.
+   */
+  ScrubQueue m_scrub_queue;
+
+ public:
+  ScrubQueue& get_scrub_services() { return m_scrub_queue; }
 
-private:
+  /**
+   * A callback used by the ScrubQueue object to initiate a scrub on a specific PG.
+   *
+   * The request might fail for multiple reasons, as ScrubQueue cannot by its own
+   * check some of the PG-specific preconditions and those are checked here. See
+   * attempt_t definition.
+   *
+   * @param pgid to scrub
+   * @param allow_requested_repair_only
+   * @return a Scrub::attempt_t detailing either a success, or the failure reason.
+   */
+  Scrub::schedule_result_t initiate_a_scrub(spg_t pgid, bool allow_requested_repair_only);
+
+
+ private:
   // -- agent shared state --
   ceph::mutex agent_lock = ceph::make_mutex("OSDService::agent_lock");
   ceph::condition_variable agent_cond;
@@ -1929,8 +1882,6 @@ protected:
     return service.get_tid();
   }
 
-  double scrub_sleep_time(bool must_scrub);
-
   // -- generic pg peering --
   void dispatch_context(PeeringCtx &ctx, PG *pg, OSDMapRef curmap,
                         ThreadPool::TPHandle *handle = NULL);
@@ -1981,8 +1932,6 @@ protected:
   void sched_scrub();
   void resched_all_scrubs();
   bool scrub_random_backoff();
-  bool scrub_load_below_threshold();
-  bool scrub_time_permit(utime_t now);
 
   // -- status reporting --
   MPGStats *collect_pg_stats();
index 0f992d652834d9a3ce052e62cd6ea811caad30cf..fa109e257cc6b7e6432d07c343de3da5aa5ea4bd 100644 (file)
@@ -1324,26 +1324,22 @@ unsigned int PG::scrub_requeue_priority(Scrub::scrub_prio_t with_priority, unsig
  *  Unless failing to start scrubbing, the 'planned scrub' flag-set is 'frozen' into
  *  PgScrubber's m_flags, then cleared.
  */
-bool PG::sched_scrub()
+Scrub::schedule_result_t PG::sched_scrub()
 {
   dout(15) << __func__ << " pg(" << info.pgid
          << (is_active() ? ") <active>" : ") <not-active>")
          << (is_clean() ? " <clean>" : " <not-clean>") << dendl;
   ceph_assert(ceph_mutex_is_locked(_lock));
 
-  if (m_scrubber && m_scrubber->is_scrub_active()) {
-    return false;
-  }
-  
   if (!is_primary() || !is_active() || !is_clean()) {
-    return false;
+    return Scrub::schedule_result_t::bad_pg_state;
   }
 
   if (scrub_queued) {
     // only applicable to the very first time a scrub event is queued
     // (until handled and posted to the scrub FSM)
     dout(10) << __func__ << ": already queued" << dendl;
-    return false;
+    return Scrub::schedule_result_t::already_started;
   }
 
   // analyse the combination of the requested scrub flags, the osd/pool configuration
@@ -1355,14 +1351,14 @@ bool PG::sched_scrub()
     // (due to configuration or priority issues)
     // The reason was already reported by the callee.
     dout(10) << __func__ << ": failed to initiate a scrub" << dendl;
-    return false;
+    return Scrub::schedule_result_t::preconditions;
   }
 
   // try to reserve the local OSD resources. If failing: no harm. We will
   // be retried by the OSD later on.
   if (!m_scrubber->reserve_local()) {
     dout(10) << __func__ << ": failed to reserve locally" << dendl;
-    return false;
+    return Scrub::schedule_result_t::no_local_resources;
   }
 
   // can commit to the updated flags now, as nothing will stop the scrub
@@ -1379,7 +1375,7 @@ bool PG::sched_scrub()
 
   scrub_queued = true;
   osd->queue_for_scrub(this, Scrub::scrub_prio_t::low_priority);
-  return true;
+  return Scrub::schedule_result_t::scrub_initiated;
 }
 
 double PG::next_deepscrub_interval() const
@@ -1532,17 +1528,37 @@ std::optional<requested_scrub_t> PG::verify_scrub_mode() const
   return upd_flags;
 }
 
-void PG::reg_next_scrub()
+/*
+ * Note: on_info_history_change() is used in those two cases where we're not sure
+ * whether the role of the PG was changed, and if so - was this change relayed to the
+ * scrub-queue.
+ */
+void PG::on_info_history_change()
+{
+  dout(20) << __func__ << " for a " << (is_primary() ? "Primary" : "non-primary") <<dendl;
+
+  if (m_scrubber) {
+    m_scrubber->on_maybe_registration_change(m_planned_scrub);
+  }
+}
+
+void PG::reschedule_scrub()
 {
-  m_scrubber->reg_next_scrub(m_planned_scrub);
+  dout(20) << __func__ << " for a " << (is_primary() ? "Primary" : "non-primary") <<dendl;
+
+  // we are assuming no change in primary status
+  if (is_primary() && m_scrubber) {
+    m_scrubber->update_scrub_job(m_planned_scrub);
+  }
 }
 
-void PG::on_info_history_change()
+void PG::on_primary_status_change(bool was_primary, bool now_primary)
 {
-  dout(20) << __func__ << dendl;
-  if (m_scrubber) {
-    m_scrubber->unreg_next_scrub();
-    m_scrubber->reg_next_scrub(m_planned_scrub);
+  // make sure we have a working scrubber when becoming a primary
+  ceph_assert(m_scrubber || !now_primary);
+
+  if ((was_primary != now_primary) && m_scrubber) {
+    m_scrubber->on_primary_change(m_planned_scrub);
   }
 }
 
@@ -1575,6 +1591,9 @@ void PG::on_new_interval() {
   scrub_queued = false;
   projected_last_update = eversion_t();
   cancel_recovery();
+  if (m_scrubber) {
+    m_scrubber->on_maybe_registration_change(m_planned_scrub);
+  }
 }
 
 epoch_t PG::oldest_stored_osdmap() {
index 95cd29a4104585fa492df00c46ffde73c4b92d41..63bcb7c11d0c7b551d790c3a0edb5b6e2c8e94f8 100644 (file)
@@ -68,6 +68,7 @@ namespace Scrub {
   class ReplicaReservations;
   class LocalReservation;
   class ReservedByRemotePrimary;
+  enum class schedule_result_t;
 }
 
 #ifdef PG_DEBUG_REFS
@@ -496,8 +497,6 @@ public:
     forward_scrub_event(&ScrubPgIF::send_chunk_busy, queued, "ChunkIsBusy");
   }
 
-  void reg_next_scrub();
-
   void queue_want_pg_temp(const std::vector<int> &wanted) override;
   void clear_want_pg_temp() override;
 
@@ -512,6 +511,10 @@ public:
 
   void on_info_history_change() override;
 
+  void on_primary_status_change(bool was_primary, bool now_primary) override;
+
+  void reschedule_scrub() override;
+
   void scrub_requested(scrub_level_t scrub_level, scrub_type_t scrub_type) override;
 
   uint64_t get_snap_trimq_size() const override {
@@ -635,7 +638,7 @@ public:
   virtual void on_shutdown() = 0;
 
   bool get_must_scrub() const;
-  bool sched_scrub();
+  Scrub::schedule_result_t sched_scrub();
 
   unsigned int scrub_requeue_priority(Scrub::scrub_prio_t with_priority, unsigned int suggested_priority) const;
   /// the version that refers to flags_.priority
index 698b767de8240add286c3d7f0b0d957dda4a512c..ce4d88a88c76c44e16f27ebbc1d34f055099eac1 100644 (file)
@@ -711,6 +711,8 @@ void PeeringState::start_peering_interval(
     // did primary change?
     if (was_old_primary != is_primary()) {
       state_clear(PG_STATE_CLEAN);
+      // queue/dequeue the scrubber
+      pl->on_primary_status_change(was_old_primary, is_primary());
     }
 
     pl->on_role_change();
@@ -734,6 +736,10 @@ void PeeringState::start_peering_interval(
     }
   }
 
+  if (is_primary() && was_old_primary) {
+    pl->reschedule_scrub();
+  }
+
   if (acting.empty() && !up.empty() && up_primary == pg_whoami) {
     psdout(10) << " acting empty, but i am up[0], clearing pg_temp" << dendl;
     pl->queue_want_pg_temp(acting);
@@ -3969,7 +3975,7 @@ void PeeringState::update_stats(
   if (f(info.history, info.stats)) {
     pl->publish_stats_to_osd();
   }
-  pl->on_info_history_change();
+  pl->reschedule_scrub();
 
   if (t) {
     dirty_info = true;
index 992af4c576804a52a2c64d9552d15cb436d89305..2225fcd3f4ab02e9d6625b5420d129c44f588a1a 100644 (file)
@@ -264,6 +264,13 @@ public:
 
     /// Notify that info/history changed (generally to update scrub registration)
     virtual void on_info_history_change() = 0;
+
+    /// Notify PG that Primary/Replica status has changed (to update scrub registration)
+    virtual void on_primary_status_change(bool was_primary, bool now_primary) = 0;
+
+    /// Need to reschedule next scrub. Assuming no change in role
+    virtual void reschedule_scrub() = 0;
+
     /// Notify that a scrub has been requested
     virtual void scrub_requested(scrub_level_t scrub_level, scrub_type_t scrub_type) = 0;
 
index 6899c5ea6b05d46f7b9c83d640db9b75809689e4..0e318c12c85b62c29b32135ffd29ead3167e9527 100644 (file)
  * Foundation.  See file COPYING.
  *
  */
-
 #include "PrimaryLogPG.h"
 
+#include <errno.h>
+
+#include <charconv>
+#include <sstream>
+#include <utility>
+
 #include <boost/intrusive_ptr.hpp>
+#include <boost/tuple/tuple.hpp>
 
 #include "cls/cas/cls_cas_ops.h"
+#include "common/CDC.h"
 #include "common/EventTrace.h"
 #include "common/ceph_crypto.h"
-#include "common/CDC.h"
 #include "common/config.h"
 #include "common/errno.h"
-#include "common/EventTrace.h"
 #include "common/perf_counters.h"
 #include "common/scrub_types.h"
 #include "include/compat.h"
+#include "json_spirit/json_spirit_reader.h"
+#include "json_spirit/json_spirit_value.h"
 #include "messages/MCommandReply.h"
 #include "messages/MOSDBackoff.h"
 #include "messages/MOSDOp.h"
 #include "mon/MonClient.h"
 #include "objclass/objclass.h"
 #include "osd/ClassHandler.h"
-#include "osd/OpRequest.h"
-#include "osd/Session.h"
 #include "osdc/Objecter.h"
 #include "scrubber/PrimaryLogScrub.h"
+#include "scrubber/ScrubStore.h"
+#include "scrubber/pg_scrubber.h"
+
+#include "OSD.h"
+#include "OpRequest.h"
+#include "PG.h"
+#include "Session.h"
 
 // required includes order:
 #include "json_spirit/json_spirit_value.h"
@@ -12855,8 +12867,7 @@ void PrimaryLogPG::on_shutdown()
   }
 
   m_scrubber->scrub_clear_state();
-
-  m_scrubber->unreg_next_scrub();
+  m_scrubber->rm_from_osd_scrubbing();
 
   vector<ceph_tid_t> tids;
   cancel_copy_ops(false, &tids);
diff --git a/src/osd/scrubber/osd_scrub_sched.cc b/src/osd/scrubber/osd_scrub_sched.cc
new file mode 100644 (file)
index 0000000..6cba890
--- /dev/null
@@ -0,0 +1,719 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+#include "./osd_scrub_sched.h"
+
+#include "include/utime.h"
+#include "osd/OSD.h"
+
+#include "pg_scrubber.h"
+
+using namespace ::std::literals;
+
+// ////////////////////////////////////////////////////////////////////////// //
+// ScrubJob
+
+#define dout_context (cct)
+#define dout_subsys ceph_subsys_osd
+#undef dout_prefix
+#define dout_prefix *_dout << "osd." << whoami << "  "
+
+ScrubQueue::ScrubJob::ScrubJob(CephContext* cct, const spg_t& pg, int node_id)
+    : RefCountedObject{cct}, pgid{pg}, whoami{node_id}, cct{cct}
+{}
+
+// debug usage only
+ostream& operator<<(ostream& out, const ScrubQueue::ScrubJob& sjob)
+{
+  out << sjob.pgid << ",  " << sjob.schedule.scheduled_at
+      << " dead: " << sjob.schedule.deadline << " - " << sjob.registration_state()
+      << " / failure: " << sjob.resources_failure
+      << " / pen. t.o.: " << sjob.penalty_timeout
+      << " / queue state: " << ScrubQueue::qu_state_text(sjob.state);
+
+  return out;
+}
+
+void ScrubQueue::ScrubJob::update_schedule(
+  const ScrubQueue::scrub_schedule_t& adjusted)
+{
+  schedule = adjusted;
+  penalty_timeout = utime_t(0, 0);  // helps with debugging
+
+  // 'updated' is changed here while not holding jobs_lock. That's OK, as
+  // the (atomic) flag will only be cleared by select_pg_and_scrub() after
+  // scan_penalized() is called and the job was moved to the to_scrub queue.
+  updated = true;
+
+  dout(10) << " pg[" << pgid << "] adjusted: " << schedule.scheduled_at << "  "
+          << registration_state() << dendl;
+}
+
+// ////////////////////////////////////////////////////////////////////////// //
+// ScrubQueue
+
+#undef dout_context
+#define dout_context (cct)
+#undef dout_prefix
+#define dout_prefix \
+  *_dout << "osd." << osd_service.whoami << " scrub-queue::" << __func__ << " "
+
+
+ScrubQueue::ScrubQueue(CephContext* cct, OSDService& osds)
+    : cct{cct}, osd_service{osds}
+{
+  // initialize the daily loadavg with current 15min loadavg
+  if (double loadavgs[3]; getloadavg(loadavgs, 3) == 3) {
+    daily_loadavg = loadavgs[2];
+  } else {
+    derr << "OSD::init() : couldn't read loadavgs\n" << dendl;
+    daily_loadavg = 1.0;
+  }
+}
+
+std::optional<double> ScrubQueue::update_load_average()
+{
+  int hb_interval = cct->_conf->osd_heartbeat_interval;
+  int n_samples = 60 * 24 * 24;
+  if (hb_interval > 1) {
+    n_samples /= hb_interval;
+    if (n_samples < 1)
+      n_samples = 1;
+  }
+
+  // get CPU load avg
+  double loadavg;
+  if (getloadavg(&loadavg, 1) == 1) {
+    daily_loadavg = (daily_loadavg * (n_samples - 1) + loadavg) / n_samples;
+    dout(17) << "heartbeat: daily_loadavg " << daily_loadavg << dendl;
+    return 100 * loadavg;
+  }
+
+  return std::nullopt;
+}
+
+/*
+ * Modify the scrub job state:
+ * - if 'registered' (as expected): mark as 'unregistering'. The job will be
+ *   dequeued the next time sched_scrub() is called.
+ * - if already 'not_registered': shouldn't really happen, but not a problem.
+ *   The state will not be modified.
+ * - same for 'unregistering'.
+ *
+ * Note: not holding the jobs lock
+ */
+void ScrubQueue::remove_from_osd_queue(ScrubJobRef scrub_job)
+{
+  dout(15) << "removing pg[" << scrub_job->pgid << "] from OSD scrub queue"
+          << dendl;
+
+  qu_state_t expected_state{qu_state_t::registered};
+  auto ret = scrub_job->state.compare_exchange_strong(expected_state,
+                                                     qu_state_t::unregistering);
+
+  if (ret) {
+
+    dout(10) << "pg[" << scrub_job->pgid << "] sched-state changed from "
+            << qu_state_text(expected_state) << " to "
+            << qu_state_text(scrub_job->state) << dendl;
+
+  } else {
+
+    // job wasn't in state 'registered' coming in
+    dout(5) << "removing pg[" << scrub_job->pgid
+           << "] failed. State was: " << qu_state_text(expected_state) << dendl;
+  }
+}
+
+void ScrubQueue::register_with_osd(ScrubJobRef scrub_job,
+                                  const ScrubQueue::sched_params_t& suggested)
+{
+  qu_state_t state_at_entry = scrub_job->state.load();
+
+  dout(15) << "pg[" << scrub_job->pgid << "] was "
+          << qu_state_text(state_at_entry) << dendl;
+
+  switch (state_at_entry) {
+    case qu_state_t::registered:
+      // just updating the schedule?
+      update_job(scrub_job, suggested);
+      break;
+
+    case qu_state_t::not_registered:
+      // insertion under lock
+      {
+       std::unique_lock lck{jobs_lock};
+
+       if (state_at_entry != scrub_job->state) {
+         lck.unlock();
+         dout(5) << " scrub job state changed" << dendl;
+         // retry
+         register_with_osd(scrub_job, suggested);
+         break;
+       }
+
+       update_job(scrub_job, suggested);
+       to_scrub.push_back(scrub_job);
+       scrub_job->in_queues = true;
+       scrub_job->state = qu_state_t::registered;
+      }
+
+      break;
+
+    case qu_state_t::unregistering:
+      // restore to the to_sched queue
+      {
+       // must be under lock, as the job might be removed from the queue
+       // at any minute
+       std::lock_guard lck{jobs_lock};
+
+       update_job(scrub_job, suggested);
+       if (scrub_job->state == qu_state_t::not_registered) {
+         dout(5) << " scrub job state changed to 'not registered'" << dendl;
+         to_scrub.push_back(scrub_job);
+       }
+       scrub_job->in_queues = true;
+       scrub_job->state = qu_state_t::registered;
+      }
+      break;
+  }
+
+  dout(10) << "pg(" << scrub_job->pgid << ") sched-state changed from "
+          << qu_state_text(state_at_entry) << " to "
+          << qu_state_text(scrub_job->state)
+          << " at: " << scrub_job->schedule.scheduled_at << dendl;
+}
+
+// look mommy - no locks!
+void ScrubQueue::update_job(ScrubJobRef scrub_job,
+                           const ScrubQueue::sched_params_t& suggested)
+{
+  // adjust the suggested scrub time according to OSD-wide status
+  auto adjusted = adjust_target_time(suggested);
+  scrub_job->update_schedule(adjusted);
+}
+
+// used under jobs_lock
+void ScrubQueue::move_failed_pgs(utime_t now_is)
+{
+  int punished_cnt{0}; // for log/debug only
+
+  for (auto job = to_scrub.begin(); job != to_scrub.end();) {
+    if ((*job)->resources_failure) {
+      auto sjob = *job;
+
+      // last time it was scheduled for a scrub, this PG failed in securing
+      // remote resources. Move it to the secondary scrub queue.
+
+      dout(15) << "moving " << sjob->pgid
+              << " state: " << ScrubQueue::qu_state_text(sjob->state) << dendl;
+
+      // determine the penalty time, after which the job should be reinstated
+      utime_t after = now_is;
+      after += cct->_conf->osd_scrub_sleep * 2 + utime_t{300'000ms};
+
+      // note: currently - not taking 'deadline' into account when determining
+      // 'penalty_timeout'.
+      sjob->penalty_timeout = after;
+      sjob->resources_failure = false;
+      sjob->updated = false;  // as otherwise will be pardoned immediately
+
+      // place in the penalty list, and remove from the to-scrub group
+      penalized.push_back(sjob);
+      job = to_scrub.erase(job);
+      punished_cnt++;
+    } else {
+      job++;
+    }
+  }
+
+  if (punished_cnt) {
+    dout(15) << "# of jobs penalized: " << punished_cnt << dendl;
+  }
+}
+
+// clang-format off
+/*
+ * Implementation note:
+ * Clang (10 & 11) produces here efficient table-based code, comparable to using
+ * a direct index into an array of strings.
+ * Gcc (11, trunk) is almost as efficient.
+ */
+std::string_view ScrubQueue::attempt_res_text(Scrub::schedule_result_t v)
+{
+  switch (v) {
+    case Scrub::schedule_result_t::scrub_initiated: return "scrubbing"sv;
+    case Scrub::schedule_result_t::none_ready: return "no ready job"sv;
+    case Scrub::schedule_result_t::no_local_resources: return "local resources shortage"sv;
+    case Scrub::schedule_result_t::already_started: return "denied as already started"sv;
+    case Scrub::schedule_result_t::no_such_pg: return "pg not found"sv;
+    case Scrub::schedule_result_t::bad_pg_state: return "prevented by pg state"sv;
+    case Scrub::schedule_result_t::preconditions: return "preconditions not met"sv;
+  }
+  // g++ (unlike CLANG), requires an extra 'return' here
+  return "(unknown)"sv;
+}
+
+std::string_view ScrubQueue::qu_state_text(qu_state_t st)
+{
+  switch (st) {
+    case qu_state_t::not_registered: return "not registered w/ OSD"sv;
+    case qu_state_t::registered: return "registered"sv;
+    case qu_state_t::unregistering: return "unregistering"sv;
+  }
+  // g++ (unlike CLANG), requires an extra 'return' here
+  return "(unknown)"sv;
+}
+// clang-format on
+
+/**
+ *  a note regarding 'to_scrub_copy':
+ *  'to_scrub_copy' is a sorted set of all the ripe jobs from to_copy.
+ *  As we usually expect to refer to only the first job in this set, we could
+ *  consider an alternative implementation:
+ *  - have collect_ripe_jobs() return the copied set without sorting it;
+ *  - loop, performing:
+ *    - use std::min_element() to find a candidate;
+ *    - try that one. If not suitable, discard from 'to_scrub_copy'
+ */
+Scrub::schedule_result_t ScrubQueue::select_pg_and_scrub(
+  Scrub::ScrubPreconds& preconds)
+{
+  dout(10) << " reg./pen. sizes: " << to_scrub.size() << " / " << penalized.size()
+          << dendl;
+
+  utime_t now_is = ceph_clock_now();
+
+  preconds.time_permit = scrub_time_permit(now_is);
+  preconds.load_is_low = scrub_load_below_threshold();
+  preconds.only_deadlined = !preconds.time_permit || !preconds.load_is_low;
+
+  //  create a list of candidates (copying, as otherwise creating a deadlock):
+  //  - possibly restore penalized
+  //  - (if we didn't handle directly) remove invalid jobs
+  //  - create a copy of the to_scrub (possibly up to first not-ripe)
+  //  - same for the penalized (although that usually be a waste)
+  //  unlock, then try the lists
+
+  std::unique_lock lck{jobs_lock};
+
+  // pardon all penalized jobs that have deadlined (or were updated)
+  scan_penalized(restore_penalized, now_is);
+  restore_penalized = false;
+
+  // remove the 'updated' flag from all entries
+  std::for_each(to_scrub.begin(), to_scrub.end(),
+               [](const auto& jobref) -> void { jobref->updated = false; });
+
+  // add failed scrub attempts to the penalized list
+  move_failed_pgs(now_is);
+
+  //  collect all valid & ripe jobs from the two lists. Note that we must copy,
+  //  as when we use the lists we will not be holding jobs_lock (see
+  //  explanation above)
+
+  auto to_scrub_copy = collect_ripe_jobs(to_scrub, now_is);
+  auto penalized_copy = collect_ripe_jobs(penalized, now_is);
+  lck.unlock();
+
+  // try the regular queue first
+  auto res = select_from_group(to_scrub_copy, preconds, now_is);
+
+  // in the sole scenario in which we've gone over all ripe jobs without success
+  // - we will try the penalized
+  if (res == Scrub::schedule_result_t::none_ready && !penalized_copy.empty()) {
+    res = select_from_group(penalized_copy, preconds, now_is);
+    dout(10) << "tried the penalized. Res: " << ScrubQueue::attempt_res_text(res)
+            << dendl;
+    restore_penalized = true;
+  }
+
+  dout(15) << dendl;
+  return res;
+}
+
+// must be called under lock
+void ScrubQueue::rm_unregistered_jobs(ScrubQContainer& group)
+{
+  std::for_each(group.begin(), group.end(), [](auto& job) {
+    if (job->state == qu_state_t::unregistering) {
+      job->in_queues = false;
+      job->state = qu_state_t::not_registered;
+    } else if (job->state == qu_state_t::not_registered) {
+      job->in_queues = false;
+    }
+  });
+
+  group.erase(std::remove_if(group.begin(), group.end(), invalid_state),
+             group.end());
+}
+
+namespace {
+struct cmp_sched_time_t {
+  bool operator()(const ScrubQueue::ScrubJobRef& lhs,
+                 const ScrubQueue::ScrubJobRef& rhs) const
+  {
+    return lhs->schedule.scheduled_at < rhs->schedule.scheduled_at;
+  }
+};
+}  // namespace
+
+// called under lock
+ScrubQueue::ScrubQContainer ScrubQueue::collect_ripe_jobs(ScrubQContainer& group,
+                                                         utime_t time_now)
+{
+  rm_unregistered_jobs(group);
+
+  // copy ripe jobs
+  ScrubQueue::ScrubQContainer ripes;
+  ripes.reserve(group.size());
+
+  std::copy_if(group.begin(), group.end(), std::back_inserter(ripes),
+              [time_now](const auto& jobref) -> bool {
+                return jobref->schedule.scheduled_at <= time_now;
+              });
+  std::sort(ripes.begin(), ripes.end(), cmp_sched_time_t{});
+
+  if (g_conf()->subsys.should_gather<ceph_subsys_osd, 20>()) {
+    for (const auto& jobref : group) {
+      if (jobref->schedule.scheduled_at > time_now) {
+       dout(20) << " not ripe: " << jobref->pgid << " @ "
+                << jobref->schedule.scheduled_at << dendl;
+      }
+    }
+  }
+
+  return ripes;
+}
+
+// not holding jobs_lock. 'group' is a copy of the actual list.
+Scrub::schedule_result_t ScrubQueue::select_from_group(
+  ScrubQContainer& group, const Scrub::ScrubPreconds& preconds, utime_t now_is)
+{
+  dout(15) << "jobs #: " << group.size() << dendl;
+
+  for (auto& candidate : group) {
+
+    // we expect the first job in the list to be a good candidate (if any)
+
+    dout(20) << "try initiating scrub for " << candidate->pgid << dendl;
+
+    if (preconds.only_deadlined && (candidate->schedule.deadline.is_zero() ||
+                                   candidate->schedule.deadline >= now_is)) {
+      dout(15) << " not scheduling scrub for " << candidate->pgid << " due to "
+              << (preconds.time_permit ? "high load" : "time not permitting")
+              << dendl;
+      continue;
+    }
+
+    // we have a candidate to scrub. We turn to the OSD to verify that the PG
+    // configuration allows the specified type of scrub, and to initiate the
+    // scrub.
+    switch (osd_service.initiate_a_scrub(candidate->pgid,
+                                        preconds.allow_requested_repair_only)) {
+
+      case Scrub::schedule_result_t::scrub_initiated:
+       // the happy path. We are done
+       dout(20) << " initiated for " << candidate->pgid << dendl;
+       return Scrub::schedule_result_t::scrub_initiated;
+
+      case Scrub::schedule_result_t::already_started:
+      case Scrub::schedule_result_t::preconditions:
+      case Scrub::schedule_result_t::bad_pg_state:
+       // continue with the next job
+       dout(20) << "failed (state/cond/started) " << candidate->pgid << dendl;
+       break;
+
+      case Scrub::schedule_result_t::no_such_pg:
+       // The pg is no longer there
+       dout(20) << "failed (no pg) " << candidate->pgid << dendl;
+       break;
+
+      case Scrub::schedule_result_t::no_local_resources:
+       // failure to secure local resources. No point in trying the other
+       // PGs at this time. Note that this is not the same as replica resources
+       // failure!
+       dout(20) << "failed (local) " << candidate->pgid << dendl;
+       return Scrub::schedule_result_t::no_local_resources;
+
+      case Scrub::schedule_result_t::none_ready:
+       // can't happen. Just for the compiler.
+       dout(5) << "failed !!! " << candidate->pgid << dendl;
+       return Scrub::schedule_result_t::none_ready;
+    }
+  }
+
+  dout(20) << " returning 'none ready' " << dendl;
+  return Scrub::schedule_result_t::none_ready;
+}
+
+ScrubQueue::scrub_schedule_t ScrubQueue::adjust_target_time(
+  const sched_params_t& times) const
+{
+  ScrubQueue::scrub_schedule_t sched_n_dead{times.proposed_time,
+                                           times.proposed_time};
+
+  if (g_conf()->subsys.should_gather<ceph_subsys_osd, 20>()) {
+    dout(20) << "min t: " << times.min_interval
+            << " osd: " << cct->_conf->osd_scrub_min_interval
+            << " max t: " << times.max_interval
+            << " osd: " << cct->_conf->osd_scrub_max_interval << dendl;
+
+    dout(20) << "at " << sched_n_dead.scheduled_at << " ratio "
+            << cct->_conf->osd_scrub_interval_randomize_ratio << dendl;
+  }
+
+  if (times.is_must == ScrubQueue::must_scrub_t::not_mandatory) {
+
+    // if not explicitly requested, postpone the scrub with a random delay
+    double scrub_min_interval = times.min_interval > 0
+                                 ? times.min_interval
+                                 : cct->_conf->osd_scrub_min_interval;
+    double scrub_max_interval = times.max_interval > 0
+                                 ? times.max_interval
+                                 : cct->_conf->osd_scrub_max_interval;
+
+    sched_n_dead.scheduled_at += scrub_min_interval;
+    double r = rand() / (double)RAND_MAX;
+    sched_n_dead.scheduled_at +=
+      scrub_min_interval * cct->_conf->osd_scrub_interval_randomize_ratio * r;
+
+    if (scrub_max_interval <= 0) {
+      sched_n_dead.deadline = utime_t{};
+    } else {
+      sched_n_dead.deadline += scrub_max_interval;
+    }
+  }
+
+  dout(17) << "at (final) " << sched_n_dead.scheduled_at << " - "
+          << sched_n_dead.deadline << dendl;
+  return sched_n_dead;
+}
+
+double ScrubQueue::scrub_sleep_time(bool must_scrub) const
+{
+  double regular_sleep_period = cct->_conf->osd_scrub_sleep;
+
+  if (must_scrub || scrub_time_permit(ceph_clock_now())) {
+    return regular_sleep_period;
+  }
+
+  // relevant if scrubbing started during allowed time, but continued into
+  // forbidden hours
+  double extended_sleep = cct->_conf->osd_scrub_extended_sleep;
+  dout(20) << "w/ extended sleep (" << extended_sleep << ")" << dendl;
+  return std::max(extended_sleep, regular_sleep_period);
+}
+
+bool ScrubQueue::scrub_load_below_threshold() const
+{
+  double loadavgs[3];
+  if (getloadavg(loadavgs, 3) != 3) {
+    dout(10) << __func__ << " couldn't read loadavgs\n" << dendl;
+    return false;
+  }
+
+  // allow scrub if below configured threshold
+  long cpus = sysconf(_SC_NPROCESSORS_ONLN);
+  double loadavg_per_cpu = cpus > 0 ? loadavgs[0] / cpus : loadavgs[0];
+  if (loadavg_per_cpu < cct->_conf->osd_scrub_load_threshold) {
+    dout(20) << "loadavg per cpu " << loadavg_per_cpu << " < max "
+            << cct->_conf->osd_scrub_load_threshold << " = yes" << dendl;
+    return true;
+  }
+
+  // allow scrub if below daily avg and currently decreasing
+  if (loadavgs[0] < daily_loadavg && loadavgs[0] < loadavgs[2]) {
+    dout(20) << "loadavg " << loadavgs[0] << " < daily_loadavg " << daily_loadavg
+            << " and < 15m avg " << loadavgs[2] << " = yes" << dendl;
+    return true;
+  }
+
+  dout(20) << "loadavg " << loadavgs[0] << " >= max "
+          << cct->_conf->osd_scrub_load_threshold << " and ( >= daily_loadavg "
+          << daily_loadavg << " or >= 15m avg " << loadavgs[2] << ") = no"
+          << dendl;
+  return false;
+}
+
+
+// note: called with jobs_lock held
+void ScrubQueue::scan_penalized(bool forgive_all, utime_t time_now)
+{
+  dout(20) << time_now << (forgive_all ? " all " : " - ") << penalized.size()
+          << dendl;
+
+  // clear dead entries (deleted PGs, or those PGs we are no longer their
+  // primary)
+  rm_unregistered_jobs(penalized);
+
+  if (forgive_all) {
+
+    std::copy(penalized.begin(), penalized.end(), std::back_inserter(to_scrub));
+    penalized.clear();
+
+  } else {
+
+    auto forgiven_last = std::partition(
+      penalized.begin(), penalized.end(), [time_now](const auto& e) {
+       return (*e).updated || ((*e).penalty_timeout <= time_now);
+      });
+
+    std::copy(penalized.begin(), forgiven_last, std::back_inserter(to_scrub));
+    penalized.erase(penalized.begin(), forgiven_last);
+    dout(20) << "penalized after screening: " << penalized.size() << dendl;
+  }
+}
+
+// checks for half-closed ranges. Modify the (p<till)to '<=' to check for
+// closed.
+static inline bool isbetween_modulo(int64_t from, int64_t till, int p)
+{
+  // the 1st condition is because we have defined from==till as "always true"
+  return (till == from) || ((till >= from) ^ (p >= from) ^ (p < till));
+}
+
+bool ScrubQueue::scrub_time_permit(utime_t now) const
+{
+  tm bdt;
+  time_t tt = now.sec();
+  localtime_r(&tt, &bdt);
+
+  bool day_permit =
+    isbetween_modulo(cct->_conf->osd_scrub_begin_week_day,
+                    cct->_conf->osd_scrub_end_week_day, bdt.tm_wday);
+  if (!day_permit) {
+    dout(20) << "should run between week day "
+            << cct->_conf->osd_scrub_begin_week_day << " - "
+            << cct->_conf->osd_scrub_end_week_day << " now " << bdt.tm_wday
+            << " - no" << dendl;
+    return false;
+  }
+
+  bool time_permit =
+    isbetween_modulo(cct->_conf->osd_scrub_begin_hour,
+                    cct->_conf->osd_scrub_end_hour, bdt.tm_hour);
+  dout(20) << "should run between " << cct->_conf->osd_scrub_begin_hour << " - "
+          << cct->_conf->osd_scrub_end_hour << " now (" << bdt.tm_hour
+          << ") = " << (time_permit ? "yes" : "no") << dendl;
+  return time_permit;
+}
+
+void ScrubQueue::ScrubJob::dump(ceph::Formatter* f) const
+{
+  f->open_object_section("scrub");
+  f->dump_stream("pgid") << pgid;
+  f->dump_stream("sched_time") << schedule.scheduled_at;
+  f->dump_stream("deadline") << schedule.deadline;
+  f->dump_bool("forced", schedule.scheduled_at == PgScrubber::scrub_must_stamp());
+  f->close_section();
+}
+
+void ScrubQueue::dump_scrubs(ceph::Formatter* f) const
+{
+  ceph_assert(f != nullptr);
+  std::lock_guard lck(jobs_lock);
+
+  f->open_array_section("scrubs");
+
+  std::for_each(to_scrub.cbegin(), to_scrub.cend(),
+               [&f](const ScrubJobRef& j) { j->dump(f); });
+
+  std::for_each(penalized.cbegin(), penalized.cend(),
+               [&f](const ScrubJobRef& j) { j->dump(f); });
+
+  f->close_section();
+}
+
+ScrubQueue::ScrubQContainer ScrubQueue::list_registered_jobs() const
+{
+  ScrubQueue::ScrubQContainer all_jobs;
+  all_jobs.reserve(to_scrub.size() + penalized.size());
+  dout(20) << " size: " << all_jobs.capacity() << dendl;
+
+  std::lock_guard lck{jobs_lock};
+
+  std::copy_if(to_scrub.begin(), to_scrub.end(), std::back_inserter(all_jobs),
+              registered_job);
+  std::copy_if(penalized.begin(), penalized.end(), std::back_inserter(all_jobs),
+              registered_job);
+
+  return all_jobs;
+}
+
+// ////////////////////////////////////////////////////////////////////////// //
+// ScrubJob - scrub resource management
+
+bool ScrubQueue::can_inc_scrubs() const
+{
+  // consider removing the lock here. Caller already handles delayed
+  // inc_scrubs_local() failures
+  std::lock_guard lck{resource_lock};
+
+  if (scrubs_local + scrubs_remote < cct->_conf->osd_max_scrubs) {
+    return true;
+  }
+
+  dout(20) << " == false. " << scrubs_local << " local + " << scrubs_remote
+          << " remote >= max " << cct->_conf->osd_max_scrubs << dendl;
+  return false;
+}
+
+bool ScrubQueue::inc_scrubs_local()
+{
+  std::lock_guard lck{resource_lock};
+
+  if (scrubs_local + scrubs_remote < cct->_conf->osd_max_scrubs) {
+    ++scrubs_local;
+    return true;
+  }
+
+  dout(20) << ": " << scrubs_local << " local + " << scrubs_remote
+          << " remote >= max " << cct->_conf->osd_max_scrubs << dendl;
+  return false;
+}
+
+void ScrubQueue::dec_scrubs_local()
+{
+  std::lock_guard lck{resource_lock};
+  dout(20) << ": " << scrubs_local << " -> " << (scrubs_local - 1) << " (max "
+          << cct->_conf->osd_max_scrubs << ", remote " << scrubs_remote << ")"
+          << dendl;
+
+  --scrubs_local;
+  ceph_assert(scrubs_local >= 0);
+}
+
+bool ScrubQueue::inc_scrubs_remote()
+{
+  std::lock_guard lck{resource_lock};
+
+  if (scrubs_local + scrubs_remote < cct->_conf->osd_max_scrubs) {
+    dout(20) << ": " << scrubs_remote << " -> " << (scrubs_remote + 1) << " (max "
+            << cct->_conf->osd_max_scrubs << ", local " << scrubs_local << ")"
+            << dendl;
+    ++scrubs_remote;
+    return true;
+  }
+
+  dout(20) << ": " << scrubs_local << " local + " << scrubs_remote
+          << " remote >= max " << cct->_conf->osd_max_scrubs << dendl;
+  return false;
+}
+
+void ScrubQueue::dec_scrubs_remote()
+{
+  std::lock_guard lck{resource_lock};
+  dout(20) << ": " << scrubs_remote << " -> " << (scrubs_remote - 1) << " (max "
+          << cct->_conf->osd_max_scrubs << ", local " << scrubs_local << ")"
+          << dendl;
+  --scrubs_remote;
+  ceph_assert(scrubs_remote >= 0);
+}
+
+void ScrubQueue::dump_scrub_reservations(ceph::Formatter* f) const
+{
+  std::lock_guard lck{resource_lock};
+  f->dump_int("scrubs_local", scrubs_local);
+  f->dump_int("scrubs_remote", scrubs_remote);
+  f->dump_int("osd_max_scrubs", cct->_conf->osd_max_scrubs);
+}
diff --git a/src/osd/scrubber/osd_scrub_sched.h b/src/osd/scrubber/osd_scrub_sched.h
new file mode 100644 (file)
index 0000000..f689425
--- /dev/null
@@ -0,0 +1,346 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <atomic>
+#include <chrono>
+#include <memory>
+#include <optional>
+#include <vector>
+
+#include "common/RefCountedObj.h"
+#include "common/ceph_atomic.h"
+#include "osd/osd_types.h"
+#include "osd/scrubber_common.h"
+
+#include "utime.h"
+
+class PG;
+
+namespace Scrub {
+
+using namespace ::std::literals;
+
+// possible outcome when trying to select a PG and scrub it
+enum class schedule_result_t {
+  scrub_initiated,     // successfully started a scrub
+  none_ready,         // no pg to scrub
+  no_local_resources,  // failure to secure local OSD scrub resource
+  already_started,     // failed, as already started scrubbing this pg
+  no_such_pg,         // can't find this pg
+  bad_pg_state,               // pg state (clean, active, etc.)
+  preconditions               // time, configuration, etc.
+};
+
+}  // namespace Scrub
+
+/**
+ * the queue of PGs waiting to be scrubbed.
+ * Main operations are scheduling/unscheduling a PG to be scrubbed at a certain
+ * time.
+ *
+ * A "penalty" queue maintains those PGs that have failed to reserve the
+ * resources of their replicas. The PGs in this list will be reinstated into the
+ * scrub queue when all eligible PGs were already handled, or after a timeout
+ * (or if their deadline has passed [[disabled at this time]]).
+ */
+class ScrubQueue {
+ public:
+  enum class must_scrub_t { not_mandatory, mandatory };
+
+  enum class qu_state_t {
+    not_registered,  // not a primary, thus not considered for scrubbing by this
+                    // OSD (also the temporary state when just created)
+    registered,             // in either of the two queues ('to_scrub' or 'penalized')
+    unregistering    // in the process of being unregistered. Will be finalized
+                    // under lock
+  };
+
+  ScrubQueue(CephContext* cct, OSDService& osds);
+
+  struct scrub_schedule_t {
+    utime_t scheduled_at{};
+    utime_t deadline{0, 0};
+  };
+
+  struct sched_params_t {
+    utime_t proposed_time{};
+    double min_interval{0.0};
+    double max_interval{0.0};
+    must_scrub_t is_must{ScrubQueue::must_scrub_t::not_mandatory};
+  };
+
+  struct ScrubJob final : public RefCountedObject {
+
+    /**
+     *  a time scheduled for scrub, and a deadline: The scrub could be delayed if
+     * system load is too high (but not if after the deadline),or if trying to
+     * scrub out of scrub hours.
+     */
+    scrub_schedule_t schedule;
+
+    /// pg to be scrubbed
+    const spg_t pgid;
+
+    /// the OSD id (for the log)
+    const int whoami;
+
+    ceph::atomic<qu_state_t> state{qu_state_t::not_registered};
+
+    /**
+     * the old 'is_registered'. Set whenever the job is registered with the OSD,
+     * i.e. is in either the 'to_scrub' or the 'penalized' vectors.
+     */
+    std::atomic_bool in_queues{false};
+
+    /// last scrub attempt failed to secure replica resources
+    bool resources_failure{false};
+
+    /**
+     *  'updated' is a temporary flag, used to create a barrier after
+     *  'sched_time' and 'deadline' (or any other job entry) were modified by
+     *  different task.
+     *  'updated' also signals the need to move a job back from the penalized
+     *  queue to the regular one.
+     */
+    std::atomic_bool updated{false};
+
+    utime_t penalty_timeout{0, 0};
+
+    CephContext* cct;
+
+    ScrubJob(CephContext* cct, const spg_t& pg, int node_id);
+
+    utime_t get_sched_time() const { return schedule.scheduled_at; }
+
+    /**
+     * relatively low-cost(*) access to the scrub job's state, to be used in
+     * logging.
+     *  (*) not a low-cost access on x64 architecture
+     */
+    std::string_view state_desc() const
+    {
+      return ScrubQueue::qu_state_text(state.load(std::memory_order_relaxed));
+    }
+
+    void update_schedule(const ScrubQueue::scrub_schedule_t& adjusted);
+
+    void dump(ceph::Formatter* f) const;
+
+    /*
+     * as the atomic 'in_queues' appears in many log prints, accessing it for
+     * display-only should be made less expensive (on ARM. On x86 the _relaxed
+     * produces the same code as '_cs')
+     */
+    std::string_view registration_state() const
+    {
+      return in_queues.load(std::memory_order_relaxed) ? " in-queue"
+                                                      : " not-queued";
+    }
+
+    friend std::ostream& operator<<(std::ostream& out, const ScrubJob& pg);
+  };
+
+  friend class TestOSDScrub;
+
+  using ScrubJobRef = ceph::ref_t<ScrubJob>;
+  using ScrubQContainer = std::vector<ScrubJobRef>;
+
+  static std::string_view qu_state_text(qu_state_t st);
+
+  /**
+   * called periodically by the OSD to select the first scrub-eligible PG
+   * and scrub it.
+   *
+   * Selection is affected by:
+   * - time of day: scheduled scrubbing might be configured to only happen
+   *   during certain hours;
+   * - same for days of the week, and for the system load;
+   *
+   * @param preconds: what types of scrub are allowed, given system status &
+   *                  config. Some of the preconditions are calculated here.
+   * @return Scrub::attempt_t::scrubbing if a scrub session was successfully
+   *         initiated. Otherwise - the failure cause.
+   *
+   * locking: locks jobs_lock
+   */
+  Scrub::schedule_result_t select_pg_and_scrub(Scrub::ScrubPreconds& preconds);
+
+  /**
+   * Translate attempt_ values into readable text
+   */
+  static std::string_view attempt_res_text(Scrub::schedule_result_t v);
+
+  /**
+   * remove the pg from set of PGs to be scanned for scrubbing.
+   * To be used if we are no longer the PG's primary, or if the PG is removed.
+   */
+  void remove_from_osd_queue(ScrubJobRef sjob);
+
+  /**
+   * @return the list (not std::set!) of all scrub jobs registered
+   *   (apart from PGs in the process of being removed)
+   */
+  ScrubQContainer list_registered_jobs() const;
+
+  /**
+   * Add the scrub job to the list of jobs (i.e. list of PGs) to be periodically
+   * scrubbed by the OSD.
+   * The registration is active as long as the PG exists and the OSD is its
+   * primary.
+   *
+   * See update_job() for the handling of the 'suggested' parameter.
+   *
+   * locking: might lock jobs_lock
+   */
+  void register_with_osd(ScrubJobRef sjob, const sched_params_t& suggested);
+
+  /**
+   * modify a scrub-job's schduled time and deadline
+   *
+   * There are 3 argument combinations to consider:
+   * - 'must' is asserted, and the suggested time is 'scrub_must_stamp':
+   *   the registration will be with "beginning of time" target, making the
+   *   scrub-job eligible to immediate scrub (given that external conditions
+   *   do not prevent scrubbing)
+   *
+   * - 'must' is asserted, and the suggested time is 'now':
+   *   This happens if our stats are unknown. The results are similar to the
+   *   previous scenario.
+   *
+   * - not a 'must': we take the suggested time as a basis, and add to it some
+   *   configuration / random delays.
+   *
+   *  ('must' is sched_params_t.is_must)
+   *
+   *  locking: not using the jobs_lock
+   */
+  void update_job(ScrubJobRef sjob, const sched_params_t& suggested);
+
+ public:
+  void dump_scrubs(ceph::Formatter* f) const;
+
+  /**
+   * No new scrub session will start while a scrub was initiated on a PG,
+   * and that PG is trying to acquire replica resources.
+   */
+  void set_reserving_now() { a_pg_is_reserving = true; }
+  void clear_reserving_now() { a_pg_is_reserving = false; }
+  bool is_reserving_now() const { return a_pg_is_reserving; }
+
+  bool can_inc_scrubs() const;
+  bool inc_scrubs_local();
+  void dec_scrubs_local();
+  bool inc_scrubs_remote();
+  void dec_scrubs_remote();
+  void dump_scrub_reservations(ceph::Formatter* f) const;
+
+  /**
+   * Pacing the scrub operation by inserting delays (mostly between chunks)
+   *
+   * Special handling for regular scrubs that continued into "no scrub" times.
+   * Scrubbing will continue, but the delays will be controlled by a separate
+   * (read - with higher value) configuration element
+   * (osd_scrub_extended_sleep).
+   */
+  double scrub_sleep_time(
+    bool must_scrub) const;  /// \todo (future) return milliseconds
+
+  /**
+   *  called every heartbeat to update the "daily" load average
+   *
+   *  @returns a load value for the logger
+   */
+  [[nodiscard]] std::optional<double> update_load_average();
+
+ private:
+  CephContext* cct;
+  OSDService& osd_service;
+
+  /**
+   *  jobs_lock protects the job containers and the relevant scrub-jobs state
+   *  variables. Specifically, the following are guaranteed:
+   *  - 'in_queues' is asserted only if the job is in one of the queues;
+   *  - a job will only be in state 'registered' if in one of the queues;
+   *  - no job will be in the two queues simulatenously
+   *
+   *  Note that PG locks should not be acquired while holding jobs_lock.
+   */
+  mutable ceph::mutex jobs_lock = ceph::make_mutex("ScrubQueue::jobs_lock");
+
+  ScrubQContainer to_scrub;   ///< scrub jobs (i.e. PGs) to scrub
+  ScrubQContainer penalized;  ///< those that failed to reserve remote resources
+  bool restore_penalized{false};
+
+  double daily_loadavg{0.0};
+
+  static inline constexpr auto registered_job = [](const auto& jobref) -> bool {
+    return jobref->state == qu_state_t::registered;
+  };
+
+  static inline constexpr auto invalid_state = [](const auto& jobref) -> bool {
+    return jobref->state == qu_state_t::not_registered;
+  };
+
+  /**
+   * Are there scrub jobs that should be reinstated?
+   */
+  void scan_penalized(bool forgive_all, utime_t time_now);
+
+  /**
+   * clear dead entries (unregistered, or belonging to removed PGs) from a
+   * queue. Job state is changed to match new status.
+   */
+  void rm_unregistered_jobs(ScrubQContainer& group);
+
+  /**
+   * the set of all scrub jobs in 'group' which are ready to be scrubbed
+   * (ready = their scheduled time has passed).
+   * The scrub jobs in the new collection are sorted according to
+   * their scheduled time.
+   *
+   * Note that the returned container holds independent refs to the
+   * scrub jobs.
+   */
+  ScrubQContainer collect_ripe_jobs(ScrubQContainer& group, utime_t time_now);
+
+
+  /// scrub resources management lock (guarding scrubs_local & scrubs_remote)
+  mutable ceph::mutex resource_lock =
+    ceph::make_mutex("ScrubQueue::resource_lock");
+
+  // the counters used to manage scrub activity parallelism:
+  int scrubs_local{0};
+  int scrubs_remote{0};
+
+  std::atomic_bool a_pg_is_reserving{false};
+
+  [[nodiscard]] bool scrub_load_below_threshold() const;
+  [[nodiscard]] bool scrub_time_permit(utime_t now) const;
+
+  /**
+   * If the scrub job was not explicitly requested, we postpone it by some
+   * random length of time.
+   * And if delaying the scrub - we calculate, based on pool parameters, a
+   * deadline we should scrub before.
+   *
+   * @return a pair of values: the determined scrub time, and the deadline
+   */
+  scrub_schedule_t adjust_target_time(
+    const sched_params_t& recomputed_params) const;
+
+  /**
+   * Look for scrub jobs that have their 'resources_failure' set. These jobs
+   * have failed to acquire remote resources last time we've initiated a scrub
+   * session on them. They are now moved from the 'to_scrub' queue to the
+   * 'penalized' set.
+   *
+   * locking: called with job_lock held
+   */
+  void move_failed_pgs(utime_t now_is);
+
+  Scrub::schedule_result_t select_from_group(ScrubQContainer& group,
+                                            const Scrub::ScrubPreconds& preconds,
+                                            utime_t now_is);
+};
index a9405ad82af8a8d539e3c9b1e91940be596bee23..dce25081a3a780b7fa4de1c3a8259f9f2d82bd58 100644 (file)
@@ -443,62 +443,118 @@ unsigned int PgScrubber::scrub_requeue_priority(Scrub::scrub_prio_t with_priorit
 // ///////////////////////////////////////////////////////////////////// //
 // scrub-op registration handling
 
+void PgScrubber::unregister_from_osd()
+{
+  if (m_scrub_job) {
+    dout(15) << __func__ << " prev. state: " << registration_state() << dendl;
+    m_osds->get_scrub_services().remove_from_osd_queue(m_scrub_job);
+  }
+}
+
 bool PgScrubber::is_scrub_registered() const
 {
-  return !m_scrub_reg_stamp.is_zero();
+  return m_scrub_job && m_scrub_job->in_queues;
 }
 
-void PgScrubber::reg_next_scrub(const requested_scrub_t& request_flags)
+std::string_view PgScrubber::registration_state() const
 {
-  if (!is_primary()) {
-    // normal. No warning is required.
-    return;
+  if (m_scrub_job) {
+    return m_scrub_job->registration_state();
   }
+  return "(no sched job)"sv;
+}
 
-  dout(10) << __func__ << " planned: must? " << request_flags.must_scrub << " need-auto? "
-          << request_flags.need_auto << " stamp: " << m_pg->info.history.last_scrub_stamp
-          << dendl;
+void PgScrubber::rm_from_osd_scrubbing()
+{
+  // make sure the OSD won't try to scrub this one just now
+  unregister_from_osd();
+}
 
-  ceph_assert(!is_scrub_registered());
+void PgScrubber::on_primary_change(const requested_scrub_t& request_flags)
+{
+  dout(10) << __func__ << (is_primary() ? " Primary " : " Replica ")
+               << " flags: " << request_flags << dendl;
 
-  utime_t reg_stamp;
-  bool must = false;
+  if (!m_scrub_job) {
+    return;
+  }
 
-  if (request_flags.must_scrub || request_flags.need_auto) {
-    // Set the smallest time that isn't utime_t()
-    reg_stamp = PgScrubber::scrub_must_stamp();
-    must = true;
-  } else if (m_pg->info.stats.stats_invalid &&
-            m_pg->cct->_conf->osd_scrub_invalid_stats) {
-    reg_stamp = ceph_clock_now();
-    must = true;
+  dout(15) << __func__ << " scrub-job state: " << m_scrub_job->state_desc() << dendl;
+
+  if (is_primary()) {
+    auto suggested =  determine_scrub_time(request_flags);
+    m_osds->get_scrub_services().register_with_osd(m_scrub_job, suggested);
   } else {
-    reg_stamp = m_pg->info.history.last_scrub_stamp;
+    m_osds->get_scrub_services().remove_from_osd_queue(m_scrub_job);
   }
 
-  dout(15) << __func__ << " pg(" << m_pg_id << ") must: " << must
-          << " required:" << m_flags.required << " flags: " << request_flags
-          << " stamp: " << reg_stamp << dendl;
+  dout(15) << __func__ << " done " << registration_state() << dendl;
+}
 
-  const double scrub_min_interval =
-    m_pg->pool.info.opts.value_or(pool_opts_t::SCRUB_MIN_INTERVAL, 0.0);
-  const double scrub_max_interval =
-    m_pg->pool.info.opts.value_or(pool_opts_t::SCRUB_MAX_INTERVAL, 0.0);
+void PgScrubber::on_maybe_registration_change(const requested_scrub_t& request_flags)
+{
+  dout(10) << __func__ << (is_primary() ? " Primary " : " Replica/other ")
+          << registration_state() << " flags: " << request_flags << dendl;
 
-  // note the sched_time, so we can locate this scrub, and remove it later
-  m_scrub_reg_stamp = m_osds->reg_pg_scrub(m_pg->info.pgid, reg_stamp, scrub_min_interval,
-                                          scrub_max_interval, must);
-  dout(15) << __func__ << " pg(" << m_pg_id << ") register next scrub, scrub time "
-          << m_scrub_reg_stamp << ", must = " << (int)must << dendl;
+  on_primary_change(request_flags);
+  dout(15) << __func__ << " done " << registration_state() << dendl;
 }
 
-void PgScrubber::unreg_next_scrub()
+void PgScrubber::update_scrub_job(const requested_scrub_t& request_flags)
 {
-  if (is_scrub_registered()) {
-    dout(15) << __func__ << " existing-" << m_scrub_reg_stamp << dendl;
-    m_osds->unreg_pg_scrub(m_pg->info.pgid, m_scrub_reg_stamp);
-    m_scrub_reg_stamp = utime_t{};
+  dout(10) << __func__ << " flags: " << request_flags << dendl;
+
+  {
+    // verify that the 'in_q' status matches our "Primariority"
+    if (m_scrub_job && is_primary() &&  !m_scrub_job->in_queues) {
+      dout(1) << __func__ << " !!! primary but not scheduled! " << dendl;
+    }
+  }
+
+  if (is_primary() && m_scrub_job) {
+    auto suggested = determine_scrub_time(request_flags);
+    m_osds->get_scrub_services().update_job(m_scrub_job, suggested);
   }
+
+  dout(15) << __func__ << " done " << registration_state() << dendl;
+}
+
+ScrubQueue::sched_params_t
+PgScrubber::determine_scrub_time(const requested_scrub_t& request_flags)
+{
+  ScrubQueue::sched_params_t res;
+
+  if (!is_primary()) {
+    return res; // with ok_to_scrub set to 'false'
+  }
+
+  if (request_flags.must_scrub || request_flags.need_auto) {
+
+    // Set the smallest time that isn't utime_t()
+    res.proposed_time = PgScrubber::scrub_must_stamp();
+    res.is_must = ScrubQueue::must_scrub_t::mandatory;
+    // we do not need the interval data in this case
+
+  } else if (m_pg->info.stats.stats_invalid &&
+           m_pg->cct->_conf->osd_scrub_invalid_stats) {
+    res.proposed_time = ceph_clock_now();
+    res.is_must = ScrubQueue::must_scrub_t::mandatory;
+
+  } else {
+    res.proposed_time = m_pg->info.history.last_scrub_stamp;
+    res.min_interval =
+      m_pg->get_pool().info.opts.value_or(pool_opts_t::SCRUB_MIN_INTERVAL, 0.0);
+    res.max_interval =
+      m_pg->get_pool().info.opts.value_or(pool_opts_t::SCRUB_MAX_INTERVAL, 0.0);
+  }
+
+  dout(15) << __func__ << " suggested: " << res.proposed_time << " hist: "
+    << m_pg->info.history.last_scrub_stamp << " v:" << m_pg->info.stats.stats_invalid
+    << " / " << m_pg->cct->_conf->osd_scrub_invalid_stats << " must:"
+    << (res.is_must==ScrubQueue::must_scrub_t::mandatory ? "y" : "n" )
+    << " pool min: " << res.min_interval
+    << dendl;
+  return res;
 }
 
 void PgScrubber::scrub_requested(scrub_level_t scrub_level,
@@ -507,11 +563,10 @@ void PgScrubber::scrub_requested(scrub_level_t scrub_level,
 {
   dout(10) << __func__ << (scrub_level == scrub_level_t::deep ? " deep " : " shallow ")
           << (scrub_type == scrub_type_t::do_repair ? " repair-scrub " : " not-repair ")
-          << " prev stamp: " << m_scrub_reg_stamp << " " << is_scrub_registered()
+          << " prev stamp: " << m_scrub_job->get_sched_time()
+          << " registered? " << registration_state()
           << dendl;
 
-  unreg_next_scrub();
-
   req_flags.must_scrub = true;
   req_flags.must_deep_scrub =
     (scrub_level == scrub_level_t::deep) || (scrub_type == scrub_type_t::do_repair);
@@ -522,17 +577,16 @@ void PgScrubber::scrub_requested(scrub_level_t scrub_level,
 
   dout(20) << __func__ << " pg(" << m_pg_id << ") planned:" << req_flags << dendl;
 
-  reg_next_scrub(req_flags);
+  update_scrub_job(req_flags);
 }
 
-void PgScrubber::request_rescrubbing(requested_scrub_t& req_flags)
+
+void PgScrubber::request_rescrubbing(requested_scrub_t& request_flags)
 {
-  dout(10) << __func__ << " existing-" << m_scrub_reg_stamp << ". was registered? "
-          << is_scrub_registered() << dendl;
+  dout(10) << __func__ << " flags: " << request_flags << dendl;
 
-  unreg_next_scrub();
-  req_flags.need_auto = true;
-  reg_next_scrub(req_flags);
+  request_flags.need_auto = true;
+  update_scrub_job(request_flags);
 }
 
 bool PgScrubber::reserve_local()
@@ -606,11 +660,11 @@ bool PgScrubber::select_range()
    * left end of the range if we are a tier because they may legitimately
    * not exist (see _scrub).
    */
-  int min_idx = std::max<int64_t>(
-    3, m_pg->get_cct()->_conf->osd_scrub_chunk_min / preemption_data.chunk_divisor());
+  int min_idx = static_cast<int>(std::max<int64_t>(
+    3, m_pg->get_cct()->_conf->osd_scrub_chunk_min / (int)preemption_data.chunk_divisor()));
 
-  int max_idx = std::max<int64_t>(min_idx, m_pg->get_cct()->_conf->osd_scrub_chunk_max /
-                                            preemption_data.chunk_divisor());
+  int max_idx = static_cast<int>(std::max<int64_t>(min_idx, m_pg->get_cct()->_conf->osd_scrub_chunk_max /
+    (int)preemption_data.chunk_divisor()));
 
   dout(10) << __func__ << " Min: " << min_idx << " Max: " << max_idx
           << " Div: " << preemption_data.chunk_divisor() << dendl;
@@ -738,7 +792,8 @@ void PgScrubber::add_delayed_scheduling()
 
   milliseconds sleep_time{0ms};
   if (m_needs_sleep) {
-    double scrub_sleep = 1000.0 * m_osds->osd->scrub_sleep_time(m_flags.required);
+    double scrub_sleep =
+      1000.0 * m_osds->get_scrub_services().scrub_sleep_time(m_flags.required);
     sleep_time = milliseconds{long(scrub_sleep)};
   }
   dout(15) << __func__ << " sleep: " << sleep_time.count() << "ms. needed? "
@@ -756,8 +811,8 @@ void PgScrubber::add_delayed_scheduling()
     // the 'delayer' for crimson is different. Will be factored out.
 
     spg_t pgid = m_pg->get_pgid();
-    auto callbk = new LambdaContext([osds = m_osds, pgid,
-                                    scrbr = this]([[maybe_unused]] int r) mutable {
+    auto callbk = new LambdaContext([osds = m_osds, pgid, scrbr = this](
+                                     [[maybe_unused]] int r) mutable {
       PGRef pg = osds->osd->lookup_lock_pg(pgid);
       if (!pg) {
        lgeneric_subdout(g_ceph_context, osd, 10)
@@ -1623,6 +1678,17 @@ void PgScrubber::unreserve_replicas()
   m_reservations.reset();
 }
 
+void PgScrubber::set_reserving_now()
+{
+  m_osds->get_scrub_services().set_reserving_now();
+}
+
+void PgScrubber::clear_reserving_now()
+{
+  m_osds->get_scrub_services().clear_reserving_now();
+}
+
+
 [[nodiscard]] bool PgScrubber::scrub_process_inconsistent()
 {
   dout(10) << __func__ << ": checking authoritative (mode="
@@ -1877,7 +1943,7 @@ void PgScrubber::dump(ceph::Formatter* f) const
     f->dump_bool("auto_repair", m_flags.auto_repair);
     f->dump_bool("check_repair", m_flags.check_repair);
     f->dump_bool("deep_scrub_on_error", m_flags.deep_scrub_on_error);
-    f->dump_stream("scrub_reg_stamp") << m_scrub_reg_stamp;  // utime_t
+    f->dump_stream("scrub_reg_stamp") << m_scrub_job->get_sched_time();  // utime_t
     f->dump_unsigned("priority", m_flags.priority);
     f->dump_int("shallow_errors", m_shallow_errors);
     f->dump_int("deep_errors", m_deep_errors);
@@ -1919,7 +1985,14 @@ void PgScrubber::handle_query_state(ceph::Formatter* f)
   f->close_section();
 }
 
-PgScrubber::~PgScrubber() = default;
+PgScrubber::~PgScrubber()
+{
+  if (m_scrub_job) {
+    // make sure the OSD won't try to scrub this one just now
+    rm_from_osd_scrubbing();
+    m_scrub_job.reset();
+  }
+}
 
 PgScrubber::PgScrubber(PG* pg)
     : m_pg{pg}
@@ -1930,12 +2003,15 @@ PgScrubber::PgScrubber(PG* pg)
 {
   m_fsm = std::make_unique<ScrubMachine>(m_pg, this);
   m_fsm->initiate();
+
+  m_scrub_job = ceph::make_ref<ScrubQueue::ScrubJob>(m_osds->cct, m_pg->pg_id,
+                                                    m_osds->get_nodeid());
 }
 
 void PgScrubber::reserve_replicas()
 {
   dout(10) << __func__ << dendl;
-  m_reservations.emplace(m_pg, m_pg_whoami);
+  m_reservations.emplace(m_pg, m_pg_whoami, m_scrub_job);
 }
 
 void PgScrubber::cleanup_on_finish()
@@ -2129,12 +2205,13 @@ void ReplicaReservations::release_replica(pg_shard_t peer, epoch_t epoch)
   m_osds->send_message_osd_cluster(peer.osd, m, epoch);
 }
 
-ReplicaReservations::ReplicaReservations(PG* pg, pg_shard_t whoami)
+ReplicaReservations::ReplicaReservations(PG* pg, pg_shard_t whoami, ScrubQueue::ScrubJobRef scrubjob)
     : m_pg{pg}
     , m_acting_set{pg->get_actingset()}
     , m_osds{m_pg->get_pg_osd(ScrubberPasskey())}
     , m_pending{static_cast<int>(m_acting_set.size()) - 1}
     , m_pg_info{m_pg->get_pg_info(ScrubberPasskey())}
+    , m_scrub_job{scrubjob}
 {
   epoch_t epoch = m_pg->get_osdmap_epoch();
 
@@ -2164,6 +2241,7 @@ void ReplicaReservations::send_all_done()
 
 void ReplicaReservations::send_reject()
 {
+  m_scrub_job->resources_failure = true;
   m_osds->queue_for_scrub_denied(m_pg, scrub_prio_t::low_priority);
 }
 
@@ -2274,7 +2352,7 @@ LocalReservation::LocalReservation(PG* pg, OSDService* osds)
     : m_pg{pg} // holding the "whole PG" for dout() sake
     , m_osds{osds}
 {
-  if (!m_osds->inc_scrubs_local()) {
+  if (!m_osds->get_scrub_services().inc_scrubs_local()) {
     dout(10) << __func__ << ": failed to reserve locally " << dendl;
     // the failure is signalled by not having m_holding_local_reservation set
     return;
@@ -2288,7 +2366,7 @@ LocalReservation::~LocalReservation()
 {
   if (m_holding_local_reservation) {
     m_holding_local_reservation = false;
-    m_osds->dec_scrubs_local();
+    m_osds->get_scrub_services().dec_scrubs_local();
   }
 }
 
@@ -2298,7 +2376,7 @@ LocalReservation::~LocalReservation()
 ReservedByRemotePrimary::ReservedByRemotePrimary(PG* pg, OSDService* osds, epoch_t epoch)
     : m_pg{pg}, m_osds{osds}, m_reserved_at{epoch}
 {
-  if (!m_osds->inc_scrubs_remote()) {
+  if (!m_osds->get_scrub_services().inc_scrubs_remote()) {
     dout(10) << __func__ << ": failed to reserve at Primary request" << dendl;
     // the failure is signalled by not having m_reserved_by_remote_primary set
     return;
@@ -2317,7 +2395,7 @@ ReservedByRemotePrimary::~ReservedByRemotePrimary()
 {
   if (m_reserved_by_remote_primary) {
     m_reserved_by_remote_primary = false;
-    m_osds->dec_scrubs_remote();
+    m_osds->get_scrub_services().dec_scrubs_remote();
   }
 }
 
index c08279efb61e4cd1bc8b01f506d6e84b85048e69..9077bfcf3bcf694a88752380c7fe49761835188c 100644 (file)
@@ -16,6 +16,7 @@
 #include "ScrubStore.h"
 #include "scrub_machine_lstnr.h"
 #include "osd/scrubber_common.h"
+#include "osd_scrub_sched.h"
 
 class Callback;
 
@@ -46,6 +47,7 @@ class ReplicaReservations {
   bool m_had_rejections{false};
   int m_pending{-1};
   const pg_info_t& m_pg_info;
+  ScrubQueue::ScrubJobRef m_scrub_job; ///< a ref to this PG's scrub job
 
   void release_replica(pg_shard_t peer, epoch_t epoch);
 
@@ -63,7 +65,7 @@ class ReplicaReservations {
    */
   void discard_all();
 
-  ReplicaReservations(PG* pg, pg_shard_t whoami);
+  ReplicaReservations(PG* pg, pg_shard_t whoami, ScrubQueue::ScrubJobRef scrubjob);
 
   ~ReplicaReservations();
 
@@ -131,7 +133,7 @@ class MapsCollectionStatus {
   /// @returns true if indeed waiting for this one. Otherwise: an error string
   auto mark_arriving_map(pg_shard_t from) -> std::tuple<bool, std::string_view>;
 
-  std::vector<pg_shard_t> get_awaited() const { return m_maps_awaited_for; }
+  [[nodiscard]] std::vector<pg_shard_t> get_awaited() const { return m_maps_awaited_for; }
 
   void reset();
 
@@ -262,9 +264,13 @@ class PgScrubber : public ScrubPgIF, public ScrubMachineListener {
 
   // managing scrub op registration
 
-  void reg_next_scrub(const requested_scrub_t& request_flags) final;
+  void update_scrub_job(const requested_scrub_t& request_flags) final;
 
-  void unreg_next_scrub() final;
+  void rm_from_osd_scrubbing() final;
+
+  void on_primary_change(const requested_scrub_t& request_flags) final;
+
+  void on_maybe_registration_change(const requested_scrub_t& request_flags) final;
 
   void scrub_requested(scrub_level_t scrub_level,
                       scrub_type_t scrub_type,
@@ -312,7 +318,7 @@ class PgScrubber : public ScrubPgIF, public ScrubMachineListener {
   /**
    *  add to scrub statistics, but only if the soid is below the scrub start
    */
-  virtual void stats_of_handled_objects(const object_stat_sum_t& delta_stats,
+  void stats_of_handled_objects(const object_stat_sum_t& delta_stats,
                                        const hobject_t& soid) override
   {
     ceph_assert(false);
@@ -404,6 +410,9 @@ class PgScrubber : public ScrubPgIF, public ScrubMachineListener {
 
   void reserve_replicas() final;
 
+  void set_reserving_now() final;
+  void clear_reserving_now() final;
+
   [[nodiscard]] bool was_epoch_changed() const final;
 
   void mark_local_map_ready() final;
@@ -419,9 +428,13 @@ class PgScrubber : public ScrubPgIF, public ScrubMachineListener {
 
   [[nodiscard]] bool is_scrub_registered() const;
 
+  /// the 'is-in-scheduling-queue' status, using relaxed-semantics access to the status
+  std::string_view registration_state() const;
+
   virtual void _scrub_clear_state() {}
 
   utime_t m_scrub_reg_stamp;  ///< stamp we registered for
+  ScrubQueue::ScrubJobRef m_scrub_job; ///< the scrub-job used by the OSD to schedule us
 
   ostream& show(ostream& out) const override;
 
@@ -646,6 +659,11 @@ private:
    */
   void request_rescrubbing(requested_scrub_t& req_flags);
 
+  ScrubQueue::sched_params_t
+  determine_scrub_time(const requested_scrub_t& request_flags);
+
+  void unregister_from_osd();
+
   /*
    * Select a range of objects to scrub.
    *
index 41e3cd1f162a5bc77511707cb12048a991a75e73..4f9ed5e7f8300858f18330345bad3db70250da9e 100644 (file)
@@ -90,9 +90,19 @@ ReservingReplicas::ReservingReplicas(my_context ctx) : my_base(ctx)
 {
   dout(10) << "-- state -->> ReservingReplicas" << dendl;
   DECLARE_LOCALS;  // 'scrbr' & 'pg_id' aliases
+
+  // prevent the OSD from starting another scrub while we are trying to secure
+  // replicas resources
+  scrbr->set_reserving_now();
   scrbr->reserve_replicas();
 }
 
+ReservingReplicas::~ReservingReplicas()
+{
+  DECLARE_LOCALS;  // 'scrbr' & 'pg_id' aliases
+  scrbr->clear_reserving_now();
+}
+
 sc::result ReservingReplicas::react(const ReservationFailure&)
 {
   DECLARE_LOCALS;  // 'scrbr' & 'pg_id' aliases
index 7f1870056098147b4756b42a34921d1195df37e3..f75c5acdc2ecbf0b6fe2e19fe4dfcedfe4aa178b 100644 (file)
@@ -164,6 +164,7 @@ struct NotActive : sc::state<NotActive, ScrubMachine> {
 struct ReservingReplicas : sc::state<ReservingReplicas, ScrubMachine> {
 
   explicit ReservingReplicas(my_context ctx);
+  ~ReservingReplicas();
   using reactions = mpl::list<sc::custom_reaction<FullReset>,
                              // all replicas granted our resources request
                              sc::transition<RemotesReserved, ActiveScrubbing>,
index 25bd080fbcada86859fd39b45400040fff17d886..28745d469d998638525e9306ad8767d210a90995 100644 (file)
@@ -148,6 +148,15 @@ struct ScrubMachineListener {
 
   virtual void unreserve_replicas() = 0;
 
+  /**
+   * No new scrub session will start while a scrub was initiate on a PG,
+   * and that PG is trying to acquire replica resources.
+   * set_reserving_now()/clear_reserving_now() let's the OSD scrub-queue know
+   * we are busy reserving.
+   */
+  virtual void set_reserving_now() = 0;
+  virtual void clear_reserving_now() = 0;
+
   /**
    * the FSM interface into the "are we waiting for maps, either our own or from
    * replicas" state.
index 668ae282fd4eabb49029c867ad6115211ffd9083..2f4376a8b7bfda82c78d461caa23b96c18d857cb 100644 (file)
@@ -21,6 +21,14 @@ enum class scrub_prio_t : bool { low_priority = false, high_priority = true };
 /// see ScrubPGgIF::m_current_token
 using act_token_t = uint32_t;
 
+/// "environment" preconditions affecting which PGs are eligible for scrubbing
+struct ScrubPreconds {
+  bool allow_requested_repair_only{false};
+  bool load_is_low{true};
+  bool time_permit{true};
+  bool only_deadlined{false};
+};
+
 }  // namespace Scrub
 
 
@@ -266,6 +274,23 @@ struct ScrubPgIF {
    */
   virtual bool reserve_local() = 0;
 
+  /**
+   * Register/de-register with the OSD scrub queue
+   *
+   * Following our status as Primary or replica.
+   */
+  virtual void on_primary_change(const requested_scrub_t& request_flags) = 0;
+
+  /**
+   * Recalculate the required scrub time.
+   *
+   * This function assumes that the queue registration status is up-to-date,
+   * i.e. the OSD "knows our name" if-f we are the Primary.
+   */
+  virtual void update_scrub_job(const requested_scrub_t& request_flags) = 0;
+
+  virtual void on_maybe_registration_change(const requested_scrub_t& request_flags) = 0;
+
   // on the replica:
   virtual void handle_scrub_reserve_request(OpRequestRef op) = 0;
   virtual void handle_scrub_reserve_release(OpRequestRef op) = 0;
@@ -274,8 +299,8 @@ struct ScrubPgIF {
   virtual void handle_scrub_reserve_grant(OpRequestRef op, pg_shard_t from) = 0;
   virtual void handle_scrub_reserve_reject(OpRequestRef op, pg_shard_t from) = 0;
 
-  virtual void reg_next_scrub(const requested_scrub_t& request_flags) = 0;
-  virtual void unreg_next_scrub() = 0;
+  virtual void rm_from_osd_scrubbing() = 0;
+
   virtual void scrub_requested(scrub_level_t scrub_level,
                               scrub_type_t scrub_type,
                               requested_scrub_t& req_flags) = 0;
index 7b81f84738a37ed3d7715fc6eabde6dfa9e7d352..4c6d4cceedf552393e5e26081f34bbcc12d10f03 100644 (file)
@@ -52,7 +52,7 @@ public:
   }
 
   bool scrub_time_permit(utime_t now) {
-    return OSD::scrub_time_permit(now);
+    return service.get_scrub_services().scrub_time_permit(now);
   }
 };