]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
Merge backfill/recovery patch to mclock scheduler mclock_scheduler_backfill_recovery
authorMohit Agrawal <moagrawa@redhat.com>
Thu, 3 Apr 2025 05:09:48 +0000 (05:09 +0000)
committerMohit Agrawal <moagrawa@redhat.com>
Thu, 3 Apr 2025 05:09:48 +0000 (05:09 +0000)
Signed-off-by: Mohit Agrawal <moagrawa@redhat.com>
17 files changed:
src/crimson/common/config_proxy.h
src/crimson/osd/backfill_state.cc
src/crimson/osd/backfill_state.h
src/crimson/osd/osd.cc
src/crimson/osd/osd.h
src/crimson/osd/osd_operation.cc
src/crimson/osd/osd_operation.h
src/crimson/osd/osd_operations/background_recovery.cc
src/crimson/osd/osd_operations/background_recovery.h
src/crimson/osd/pg.h
src/crimson/osd/pg_recovery.cc
src/crimson/osd/pg_recovery.h
src/crimson/osd/scheduler/mclock_scheduler.cc
src/crimson/osd/scheduler/mclock_scheduler.h
src/crimson/osd/scheduler/scheduler.cc
src/crimson/osd/scheduler/scheduler.h
src/crimson/osd/shard_services.h

index fce86f2389cb1ac3c0144136ccefe49cad9f9138..f0dda3760e89473a52762af5e674140d4fa6df1e 100644 (file)
@@ -152,9 +152,15 @@ public:
       }
     });
   }
+
   int get_val(std::string_view key, std::string *val) const {
     return get_config().get_val(*values, key, val);
   }
+
+  void set_val_default_sync(const std::string& key, const std::string& val) {
+    get_config().set_val_default(*values, obs_mgr, key, val);
+  }
+
   template<typename T>
   const T get_val(std::string_view key) const {
     return get_config().template get_val<T>(*values, key);
index a068c1fa8d41bd95602f865d8d34ceaa2a65e1c0..7fea4b7d5842316255f49b6723ca7d5446360fdf 100644 (file)
@@ -342,7 +342,6 @@ BackfillState::Enqueuing::Enqueuing(my_context ctx)
 
   do {
     if (!backfill_listener().budget_available()) {
-      DEBUGDPP("throttle failed, turning to Waiting", pg());
       post_event(RequestWaiting{});
       return;
     } else if (should_rescan_replicas(backfill_state().peer_backfill_info,
index ea4b159ded92f971e3580466ccb1a879f7a235ed..75129d39745014cbf4ac597259f43f4c97a2b39b 100644 (file)
@@ -62,8 +62,6 @@ struct BackfillState {
   struct SuspendBackfill : sc::event<SuspendBackfill> {
   };
 
-  struct ThrottleAcquired : sc::event<ThrottleAcquired> {
-  };
 private:
   // internal events
   struct RequestPrimaryScanning : sc::event<RequestPrimaryScanning> {
@@ -262,7 +260,6 @@ public:
       sc::transition<RequestDone, Done>,
       sc::custom_reaction<SuspendBackfill>,
       sc::custom_reaction<Triggered>,
-      sc::transition<ThrottleAcquired, Enqueuing>,
       sc::transition<sc::event_base, Crashed>>;
     explicit Waiting(my_context);
     sc::result react(ObjectPushed);
index 27511df85822ca3ccd52da3de232247f535fdcab..13a7d9fc5ab8528f6b35898ee2e54747124dfb85 100644 (file)
@@ -581,6 +581,12 @@ seastar::future<> OSD::start()
     return _add_me_to_crush();
   }).then([this] {
     return _add_device_class();
+   }).then([this] {
+    return _get_device_type();
+   }).then([this](auto rotational) {
+    return shard_services.invoke_on_all([this, rotational](auto &local_service) {
+      local_service.local_state.initialize_scheduler(local_service.get_cct(), rotational);
+    });
    }).then([this] {
     monc->sub_want("osd_pg_creates", last_pg_create_epoch, 0);
     monc->sub_want("mgrmap", 0, 0);
@@ -685,6 +691,24 @@ seastar::future<> OSD::_send_boot()
   m->metadata["osd_type"] = "crimson";
   return monc->send_message(std::move(m));
 }
+seastar::future<bool> OSD::_get_device_type()
+{
+  LOG_PREFIX(OSD::_get_device_type);
+  if (!local_conf().get_val<bool>("osd_class_update_on_start")) {
+    co_return false;
+  }
+
+  std::string device_class = co_await store.get_default_device_class();
+  if (device_class.empty()) {
+    INFO("Device class is empty; ");
+    co_return false;
+  }
+
+  bool rotational = (device_class != "ssd");
+
+  INFO("device_class is {} rotational: {}", device_class, rotational);
+  co_return rotational;
+}
 
 seastar::future<> OSD::_add_device_class()
 {
index 5d36f7ad8cee16176cef9fd240cd83c1c2da245e..8f0c7a0dad3c36cfbe7e475ce1324959c733708e 100644 (file)
@@ -189,6 +189,7 @@ private:
   seastar::future<> _send_boot();
   seastar::future<> _add_me_to_crush();
   seastar::future<> _add_device_class();
+  seastar::future<bool> _get_device_type();
 
   seastar::future<> osdmap_subscribe(version_t epoch, bool force_request);
 
index 978009709761324fb48b9b6c2f8bcb374a686038..8fe07a8c163782a04992c05dfa8e75180c46f980 100644 (file)
@@ -14,6 +14,7 @@ namespace {
   }
 }
 
+using namespace crimson::osd::scheduler;
 namespace crimson::osd {
 
 void OSDOperationRegistry::do_stop()
@@ -149,21 +150,44 @@ void OSDOperationRegistry::visit_ops_in_flight(std::function<void(const ClientRe
   }
 }
 
+
 OperationThrottler::OperationThrottler(ConfigProxy &conf)
-  : scheduler(crimson::osd::scheduler::make_scheduler(conf))
 {
   conf.add_observer(this);
+}
+
+void OperationThrottler::initialize_scheduler(CephContext *cct, ConfigProxy &conf, bool is_rotational, int whoami)
+{
+  scheduler = crimson::osd::scheduler::make_scheduler(cct, conf, whoami, seastar::smp::count,
+            seastar::this_shard_id(), is_rotational, true);
   update_from_config(conf);
 }
 
 void OperationThrottler::wake()
 {
+  WorkItem work_item;
+  int i = 0;
   while ((!max_in_progress || in_progress < max_in_progress) &&
         !scheduler->empty()) {
-    auto item = scheduler->dequeue();
-    item.wake.set_value();
-    ++in_progress;
-    --pending;
+    work_item = scheduler->dequeue();
+    if (auto when_ready = std::get_if<double>(&work_item)) {
+      ceph::real_clock::time_point future_time = ceph::real_clock::from_double(*when_ready);
+      auto now = ceph::real_clock::now();
+      auto wait_duration = std::chrono::duration_cast<std::chrono::milliseconds>(future_time - now);
+      if (wait_duration.count() > 0) {
+        logger().info("No items ready. Retrying wake() in {} ms", wait_duration.count());
+        seastar::sleep(wait_duration).then([this] {
+          wake();
+        });
+        return;
+      }
+    }
+    if (auto *item = std::get_if<crimson::osd::scheduler::item_t>(&work_item)) {
+      item->wake.set_value();
+      ++in_progress;
+      --pending;
+
+    }
   }
 }
 
index d702919bd6fae6ca52eceb5086be9399d4751caa..fcac7c4ad6e38bdb7247db6b2daacfd85eed4c1d 100644 (file)
@@ -334,6 +334,22 @@ public:
                          const std::set<std::string> &changed) final;
   void update_from_config(const ConfigProxy &conf);
 
+  bool available() const {
+    return !max_in_progress || in_progress < max_in_progress;
+  }
+
+  template <typename F>
+  auto with_throttle(
+    crimson::osd::scheduler::params_t params,
+    F &&f) {
+    if (!max_in_progress) return f();
+    return acquire_throttle(params)
+      .then(std::forward<F>(f))
+      .finally([this] {
+       release_throttle();
+      });
+  }
+
   template <class OpT, class... Args>
   seastar::future<> with_throttle_while(
     BlockingEvent::Trigger<OpT>&& trigger,
@@ -342,18 +358,7 @@ public:
       with_throttle_while(std::forward<Args>(args)...), *this);
   }
 
-  // Returns std::nullopt if the throttle is acquired immediately,
-  // returns the future for the acquiring otherwise
-  std::optional<seastar::future<>>
-  try_acquire_throttle_now(crimson::osd::scheduler::params_t params) {
-    if (!max_in_progress || in_progress < max_in_progress) {
-      ++in_progress;
-      --pending;
-      return std::nullopt;
-    }
-    return acquire_throttle(params);
-  }
-
+  void initialize_scheduler(CephContext* cct, ConfigProxy &conf, bool is_rotational, int whoami);
 private:
   void dump_detail(Formatter *f) const final;
 
index 7f93e42d53d8effd9bff74e36fc66e09e0bf10d9..3a2142ca8341276f99c074d407eef999f2df1f58 100644 (file)
@@ -77,18 +77,19 @@ seastar::future<> BackgroundRecoveryT<T>::start()
       std::chrono::milliseconds(std::lround(delay * 1000)));
   }
   return maybe_delay.then([ref, this] {
-    return this->template with_blocking_event<OperationThrottler::BlockingEvent>(
-      [ref, this] (auto&& trigger) {
-      return ss.with_throttle_while(
-        std::move(trigger),
-        this, get_scheduler_params(), [this] {
-          return interruptor::with_interruption([this] {
-            return do_recovery();
-          }, [](std::exception_ptr) {
-            return seastar::make_ready_future<bool>(false);
-          }, pg, epoch_started);
-        });
+    return seastar::repeat([ref, this] {
+      return interruptor::with_interruption([this] {
+       return do_recovery();
+      }, [](std::exception_ptr) {
+       return seastar::make_ready_future<bool>(false);
+      }, pg, epoch_started).then([](bool goon) {
+       if (goon) {
+         return seastar::stop_iteration::no;
+       } else {
+         return seastar::stop_iteration::yes;
+       }
       });
+    });
   });
 }
 
@@ -117,7 +118,8 @@ UrgentRecovery::do_recovery()
   ).then_interruptible([this] {
     return with_blocking_event<RecoveryBackend::RecoveryBlockingEvent,
                               interruptor>([this] (auto&& trigger) {
-      return pg->get_recovery_handler()->recover_missing(trigger, soid, need);
+      return pg->get_recovery_handler()->recover_missing(
+       trigger, soid, need, false);
     }).then_interruptible([] {
       return seastar::make_ready_future<bool>(false);
     });
index 255a934cd49879263c3bc50ebaea9725af22dac9..bbb98bd8f7cb9f044bbdc6b0251f2310ce88c008 100644 (file)
@@ -38,8 +38,12 @@ protected:
 private:
   virtual void dump_detail(Formatter *f) const;
   crimson::osd::scheduler::params_t get_scheduler_params() const {
+    int cost = std::max<int64_t>(1, (pg->get_info().stats.stats.sum.num_bytes / pg->get_info().stats.stats.sum.num_objects));
+    unsigned priority = pg->get_recovery_op_priority();
+
     return {
-      1, // cost
+      cost, // cost
+      priority, // priority
       0, // owner
       scheduler_class
     };
@@ -66,7 +70,6 @@ public:
   void print(std::ostream&) const final;
 
   std::tuple<
-    OperationThrottler::BlockingEvent,
     RecoveryBackend::RecoveryBlockingEvent
   > tracking_events;
 
@@ -86,7 +89,6 @@ public:
     float delay = 0);
 
   std::tuple<
-    OperationThrottler::BlockingEvent,
     RecoveryBackend::RecoveryBlockingEvent
   > tracking_events;
 
index b6b7b67262980c91128221502f85b7ce7e227497..996cbfb61ccf94e37e3eba2e246943f2f7a5531b 100644 (file)
@@ -882,9 +882,23 @@ public:
     int *return_code,
     std::vector<pg_log_op_return_item_t> *op_returns) const;
   int get_recovery_op_priority() const {
-    int64_t pri = 0;
-    get_pgpool().info.opts.get(pool_opts_t::RECOVERY_OP_PRIORITY, &pri);
-    return  pri > 0 ? pri : crimson::common::local_conf()->osd_recovery_op_priority;
+    const std::string _type = this->shard_services.get_cct()->_conf.get_val<std::string>("osd_op_queue");
+    const std::string *type = &_type;
+    if (*type == "mclock_scheduler") {
+      if (peering_state.is_forced_recovery_or_backfill()) {
+        return peering_state.recovery_msg_priority_t::FORCED;
+      } else if (peering_state.is_undersized()) {
+        return peering_state.recovery_msg_priority_t::UNDERSIZED;
+      } else if (peering_state.is_degraded()) {
+        return peering_state.recovery_msg_priority_t::DEGRADED;
+      } else {
+        return peering_state.recovery_msg_priority_t::BEST_EFFORT;
+      }
+    } else {
+      int64_t pri = 0;
+      get_pgpool().info.opts.get(pool_opts_t::RECOVERY_OP_PRIORITY, &pri);
+      return  pri > 0 ? pri : crimson::common::local_conf()->osd_recovery_op_priority;
+    }
   }
   seastar::future<> mark_unfound_lost(int) {
     // TODO: see PrimaryLogPG::mark_all_unfound_lost()
index 735dd7f02e81fabaeaa10702352ccc902295382e..071b34c3c6b20f2a25f92802a2918810cc3bb825 100644 (file)
@@ -67,6 +67,8 @@ PGRecovery::start_recovery_ops(
   if (max_to_start > 0) {
     max_to_start -= start_replica_recovery_ops(trigger, max_to_start, &started);
   }
+  using interruptor =
+    crimson::interruptible::interruptor<crimson::osd::IOInterruptCondition>;
   return interruptor::parallel_for_each(started,
                                        [] (auto&& ifut) {
     return std::move(ifut);
@@ -194,10 +196,10 @@ size_t PGRecovery::start_primary_recovery_ops(
        auto it = missing.get_items().find(head);
        assert(it != missing.get_items().end());
        auto head_need = it->second.need;
-       out->emplace_back(recover_missing(trigger, head, head_need));
+       out->emplace_back(recover_missing(trigger, head, head_need, true));
        ++skipped;
       } else {
-       out->emplace_back(recover_missing(trigger, soid, item.need));
+       out->emplace_back(recover_missing(trigger, soid, item.need, true));
       }
       ++started;
     }
@@ -304,7 +306,9 @@ size_t PGRecovery::start_replica_recovery_ops(
 PGRecovery::interruptible_future<>
 PGRecovery::recover_missing(
   RecoveryBackend::RecoveryBlockingEvent::TriggerI& trigger,
-  const hobject_t &soid, eversion_t need)
+  const hobject_t &soid,
+  eversion_t need,
+  bool with_throttle)
 {
   logger().info("{} {} v {}", __func__, soid, need);
   auto [recovering, added] = pg->get_recovery_backend()->add_recovering(soid);
@@ -317,7 +321,9 @@ PGRecovery::recover_missing(
     } else {
       return recovering.wait_track_blocking(
        trigger,
-       pg->get_recovery_backend()->recover_object(soid, need)
+       with_throttle
+         ? recover_object_with_throttle(soid, need)
+         : recover_object(soid, need)
        .handle_exception_interruptible(
          [=, this, soid = std::move(soid)] (auto e) {
          on_failed_recover({ pg->get_pg_whoami() }, soid, need);
@@ -365,7 +371,7 @@ RecoveryBackend::interruptible_future<> PGRecovery::prep_object_replica_pushes(
     logger().info("{} {} v {}, new recovery", __func__, soid, need);
     return recovering.wait_track_blocking(
       trigger,
-      pg->get_recovery_backend()->recover_object(soid, need)
+      recover_object_with_throttle(soid, need)
       .handle_exception_interruptible(
        [=, this, soid = std::move(soid)] (auto e) {
        on_failed_recover({ pg->get_pg_whoami() }, soid, need);
@@ -514,6 +520,23 @@ void PGRecovery::request_primary_scan(
   });
 }
 
+PGRecovery::interruptible_future<>
+PGRecovery::recover_object_with_throttle(
+  const hobject_t &soid,
+  eversion_t need)
+{
+  crimson::osd::scheduler::params_t params =
+    {1, 0, 0, crimson::osd::scheduler::scheduler_class_t::background_best_effort};
+  auto &ss = pg->get_shard_services();
+  return ss.with_throttle(
+    std::move(params),
+    [this, soid, need] {
+    auto backend = pg->get_recovery_backend();
+    assert(backend);
+    return backend->recover_object(soid, need);
+  });
+}
+
 void PGRecovery::enqueue_push(
   const hobject_t& obj,
   const eversion_t& v,
@@ -525,7 +548,7 @@ void PGRecovery::enqueue_push(
   if (!added)
     return;
   peering_state.prepare_backfill_for_missing(obj, v, peers);
-  std::ignore = pg->get_recovery_backend()->recover_object(obj, v).\
+  std::ignore = recover_object_with_throttle(obj, v).\
   handle_exception_interruptible([] (auto) {
     ceph_abort_msg("got exception on backfill's push");
     return seastar::make_ready_future<>();
@@ -603,21 +626,8 @@ void PGRecovery::update_peers_last_backfill(
 
 bool PGRecovery::budget_available() const
 {
-  crimson::osd::scheduler::params_t params =
-    {1, 0, crimson::osd::scheduler::scheduler_class_t::background_best_effort};
   auto &ss = pg->get_shard_services();
-  auto futopt = ss.try_acquire_throttle_now(std::move(params));
-  if (!futopt) {
-    return true;
-  }
-  std::ignore = interruptor::make_interruptible(std::move(*futopt)
-  ).then_interruptible([this] {
-    assert(!backfill_state->is_triggered());
-    using BackfillState = crimson::osd::BackfillState;
-    backfill_state->process_event(
-      BackfillState::ThrottleAcquired{}.intrusive_from_this());
-  });
-  return false;
+  return ss.throttle_available();
 }
 
 void PGRecovery::on_pg_clean()
index a9a48cf465ce87b7367d0eb24bd8e9ea311d05a0..37fae278fa09f12cd64ba283a22a6451ede4537d 100644 (file)
@@ -25,8 +25,6 @@ class PGBackend;
 
 class PGRecovery : public crimson::osd::BackfillState::BackfillListener {
 public:
-  using interruptor =
-    crimson::interruptible::interruptor<crimson::osd::IOInterruptCondition>;
   template <typename T = void>
   using interruptible_future = RecoveryBackend::interruptible_future<T>;
   PGRecovery(PGRecoveryListener* pg) : pg(pg) {}
@@ -67,7 +65,9 @@ private:
   }
   RecoveryBackend::interruptible_future<> recover_missing(
     RecoveryBackend::RecoveryBlockingEvent::TriggerI&,
-    const hobject_t &soid, eversion_t need);
+    const hobject_t &soid,
+    eversion_t need,
+    bool with_throttle);
   RecoveryBackend::interruptible_future<> prep_object_replica_deletes(
     RecoveryBackend::RecoveryBlockingEvent::TriggerI& trigger,
     const hobject_t& soid,
@@ -99,6 +99,18 @@ private:
   friend class ReplicatedRecoveryBackend;
   friend class crimson::osd::UrgentRecovery;
 
+  interruptible_future<> recover_object_with_throttle(
+    const hobject_t &soid,
+    eversion_t need);
+
+  interruptible_future<> recover_object(
+    const hobject_t &soid,
+    eversion_t need) {
+    auto backend = pg->get_recovery_backend();
+    assert(backend);
+    return backend->recover_object(soid, need);
+  }
+
   // backfill begin
   std::unique_ptr<crimson::osd::BackfillState> backfill_state;
   std::map<pg_shard_t,
index 2fcf6d57db47fb01e39534bfb74e1a2165fe66dd..ff7b92dbd77f8593db3c8e891376d0026acb7c33 100644 (file)
 
 namespace dmc = crimson::dmclock;
 using namespace std::placeholders;
-using namespace std::string_literals;
+using namespace std::literals;
 
 #define dout_context cct
-#define dout_subsys ceph_subsys_osd
+#define dout_subsys ceph_subsys_mclock
 #undef dout_prefix
-#define dout_prefix *_dout
+#define dout_prefix *_dout << "mClockScheduler: "
 
 
 namespace crimson::osd::scheduler {
 
-mClockScheduler::mClockScheduler(ConfigProxy &conf) :
-  scheduler(
-    std::bind(&mClockScheduler::ClientRegistry::get_info,
-             &client_registry,
-             _1),
-    dmc::AtLimit::Allow,
-    conf.get_val<double>("osd_mclock_scheduler_anticipation_timeout"))
+mClockScheduler::mClockScheduler(CephContext *cct,
+  int whoami,
+  uint32_t num_shards,
+  int shard_id,
+  bool is_rotational,
+  bool init_perfcounter)
+  : cct(cct),
+    whoami(whoami),
+    num_shards(num_shards),
+    shard_id(shard_id),
+    is_rotational(is_rotational),
+    logger(nullptr),
+    scheduler(
+      std::bind(&mClockScheduler::ClientRegistry::get_info,
+                &client_registry,
+                _1),
+      dmc::AtLimit::Wait,
+      cct->_conf.get_val<double>("osd_mclock_scheduler_anticipation_timeout"))
 {
-  conf.add_observer(this);
-  client_registry.update_from_config(conf);
+  cct->_conf.add_observer(this);
+  ceph_assert(num_shards > 0);
+  auto get_op_queue_cut_off = [&conf = cct->_conf]() {
+    if (conf.get_val<std::string>("osd_op_queue_cut_off") == "debug_random") {
+      std::random_device rd;
+      std::mt19937 random_gen(rd());
+      return (random_gen() % 2 < 1) ? CEPH_MSG_PRIO_HIGH : CEPH_MSG_PRIO_LOW;
+    } else if (conf.get_val<std::string>("osd_op_queue_cut_off") == "high") {
+      return CEPH_MSG_PRIO_HIGH;
+    } else {
+      // default / catch-all is 'low'
+      return CEPH_MSG_PRIO_LOW;
+    }
+  };
+  cutoff_priority = get_op_queue_cut_off();
+  set_osd_capacity_params_from_config();
+  set_config_defaults_from_profile();
+  client_registry.update_from_config(
+    cct->_conf, osd_bandwidth_capacity_per_shard);
+
 }
 
-void mClockScheduler::ClientRegistry::update_from_config(const ConfigProxy &conf)
+/* ClientRegistry holds the dmclock::ClientInfo configuration parameters
+ * (reservation (bytes/second), weight (unitless), limit (bytes/second))
+ * for each IO class in the OSD (client, background_recovery,
+ * background_best_effort).
+ *
+ * mclock expects limit and reservation to have units of <cost>/second
+ * (bytes/second), but osd_mclock_scheduler_client_(lim|res) are provided
+ * as ratios of the OSD's capacity.  We convert from the one to the other
+ * using the capacity_per_shard parameter.
+ *
+ * Note, mclock profile information will already have been set as a default
+ * for the osd_mclock_scheduler_client_* parameters prior to calling
+ * update_from_config -- see set_config_defaults_from_profile().
+ */
+void mClockScheduler::ClientRegistry::update_from_config(
+  const ConfigProxy &conf,
+  const double capacity_per_shard)
 {
+
+  auto get_res = [&](double res) {
+    if (res) {
+      return res * capacity_per_shard;
+    } else {
+      return default_min; // min reservation
+    }
+  };
+
+  auto get_lim = [&](double lim) {
+    if (lim) {
+      return lim * capacity_per_shard;
+    } else {
+      return default_max; // high limit
+    }
+  };
+
+  // Set external client infos
+  double res = conf.get_val<double>(
+    "osd_mclock_scheduler_client_res");
+  double lim = conf.get_val<double>(
+    "osd_mclock_scheduler_client_lim");
+  uint64_t wgt = conf.get_val<uint64_t>(
+    "osd_mclock_scheduler_client_wgt");
   default_external_client_info.update(
-    conf.get_val<double>("osd_mclock_scheduler_client_res"),
-    conf.get_val<uint64_t>("osd_mclock_scheduler_client_wgt"),
-    conf.get_val<double>("osd_mclock_scheduler_client_lim"));
+    get_res(res),
+    wgt,
+    get_lim(lim));
 
+  // Set background recovery client infos
+  res = conf.get_val<double>(
+    "osd_mclock_scheduler_background_recovery_res");
+  lim = conf.get_val<double>(
+    "osd_mclock_scheduler_background_recovery_lim");
+  wgt = conf.get_val<uint64_t>(
+    "osd_mclock_scheduler_background_recovery_wgt");
   internal_client_infos[
     static_cast<size_t>(scheduler_class_t::background_recovery)].update(
-    conf.get_val<double>("osd_mclock_scheduler_background_recovery_res"),
-    conf.get_val<uint64_t>("osd_mclock_scheduler_background_recovery_wgt"),
-    conf.get_val<double>("osd_mclock_scheduler_background_recovery_lim"));
+      get_res(res),
+      wgt,
+      get_lim(lim));
 
+  // Set background best effort client infos
+  res = conf.get_val<double>(
+    "osd_mclock_scheduler_background_best_effort_res");
+  lim = conf.get_val<double>(
+    "osd_mclock_scheduler_background_best_effort_lim");
+  wgt = conf.get_val<uint64_t>(
+    "osd_mclock_scheduler_background_best_effort_wgt");
   internal_client_infos[
     static_cast<size_t>(scheduler_class_t::background_best_effort)].update(
-    conf.get_val<double>("osd_mclock_scheduler_background_best_effort_res"),
-    conf.get_val<uint64_t>("osd_mclock_scheduler_background_best_effort_wgt"),
-    conf.get_val<double>("osd_mclock_scheduler_background_best_effort_lim"));
+      get_res(res),
+      wgt,
+      get_lim(lim));
 }
 
 const dmc::ClientInfo *mClockScheduler::ClientRegistry::get_external_client(
@@ -79,7 +162,6 @@ const dmc::ClientInfo *mClockScheduler::ClientRegistry::get_info(
   case scheduler_class_t::immediate:
     ceph_assert(0 == "Cannot schedule immediate");
     return (dmc::ClientInfo*)nullptr;
-  case scheduler_class_t::repop:
   case scheduler_class_t::client:
     return get_external_client(id.client_profile_id);
   default:
@@ -88,44 +170,306 @@ const dmc::ClientInfo *mClockScheduler::ClientRegistry::get_info(
   }
 }
 
+void mClockScheduler::set_osd_capacity_params_from_config()
+{
+  uint64_t osd_bandwidth_capacity;
+  double osd_iop_capacity;
+
+  std::tie(osd_bandwidth_capacity, osd_iop_capacity) = [&, this] {
+    if (is_rotational) {
+      return std::make_tuple(
+        cct->_conf.get_val<Option::size_t>(
+          "osd_mclock_max_sequential_bandwidth_hdd"),
+        cct->_conf.get_val<double>("osd_mclock_max_capacity_iops_hdd"));
+    } else {
+      return std::make_tuple(
+        cct->_conf.get_val<Option::size_t>(
+          "osd_mclock_max_sequential_bandwidth_ssd"),
+        cct->_conf.get_val<double>("osd_mclock_max_capacity_iops_ssd"));
+    }
+  }();
+
+  osd_bandwidth_capacity = std::max<uint64_t>(1, osd_bandwidth_capacity);
+  osd_iop_capacity = std::max<double>(1.0, osd_iop_capacity);
+
+  osd_bandwidth_cost_per_io =
+    static_cast<double>(osd_bandwidth_capacity) / osd_iop_capacity;
+  osd_bandwidth_capacity_per_shard = static_cast<double>(osd_bandwidth_capacity)
+    / static_cast<double>(num_shards);
+
+  dout(1) << __func__ << ": osd_bandwidth_cost_per_io: "
+          << std::fixed << std::setprecision(2)
+          << osd_bandwidth_cost_per_io << " bytes/io"
+          << ", osd_bandwidth_capacity_per_shard "
+          << osd_bandwidth_capacity_per_shard << " bytes/second"
+          << dendl;
+}
+
+/**
+ * profile_t
+ *
+ * mclock profile -- 3 params for each of 3 client classes
+ * 0 (min): specifies no minimum reservation
+ * 0 (max): specifies no upper limit
+ */
+struct profile_t {
+  struct client_config_t {
+    double reservation;
+    uint64_t weight;
+    double limit;
+  };
+  client_config_t client;
+  client_config_t background_recovery;
+  client_config_t background_best_effort;
+};
+
+static std::ostream &operator<<(
+  std::ostream &lhs, const profile_t::client_config_t &rhs)
+{
+  return lhs << "{res: " << rhs.reservation
+             << ", wgt: " << rhs.weight
+             << ", lim: " << rhs.limit
+             << "}";
+}
+
+static std::ostream &operator<<(std::ostream &lhs, const profile_t &rhs)
+{
+  return lhs << "[client: " << rhs.client
+             << ", background_recovery: " << rhs.background_recovery
+             << ", background_best_effort: " << rhs.background_best_effort
+             << "]";
+}
+
+void mClockScheduler::set_config_defaults_from_profile()
+{
+  // Let only a single osd shard (id:0) set the profile configs
+  if (shard_id > 0) {
+    return;
+  }
+
+  /**
+   * high_client_ops
+   *
+   * Client Allocation:
+   *   reservation: 60% | weight: 2 | limit: 0 (max) |
+   * Background Recovery Allocation:
+   *   reservation: 40% | weight: 1 | limit: 0 (max) |
+   * Background Best Effort Allocation:
+   *   reservation: 0 (min) | weight: 1 | limit: 70% |
+   */
+  static constexpr profile_t high_client_ops_profile{
+    { .6, 2,  0 },
+    { .4, 1,  0 },
+    {  0, 1, .7 }
+  };
+
+  /**
+   * high_recovery_ops
+   *
+   * Client Allocation:
+   *   reservation: 30% | weight: 1 | limit: 0 (max) |
+   * Background Recovery Allocation:
+   *   reservation: 70% | weight: 2 | limit: 0 (max) |
+   * Background Best Effort Allocation:
+   *   reservation: 0 (min) | weight: 1 | limit: 0 (max) |
+   */
+  static constexpr profile_t high_recovery_ops_profile{
+    { .3, 1, 0 },
+    { .7, 2, 0 },
+    {  0, 1, 0 }
+  };
+
+  /**
+   * balanced
+   *
+   * Client Allocation:
+   *   reservation: 50% | weight: 1 | limit: 0 (max) |
+   * Background Recovery Allocation:
+   *   reservation: 50% | weight: 1 | limit: 0 (max) |
+   * Background Best Effort Allocation:
+   *   reservation: 0 (min) | weight: 1 | limit: 90% |
+   */
+  static constexpr profile_t balanced_profile{
+    { .5, 1, 0 },
+    { .5, 1, 0 },
+    {  0, 1, .9 }
+  };
+
+  const profile_t *profile = nullptr;
+  auto mclock_profile = cct->_conf.get_val<std::string>("osd_mclock_profile");
+  if (mclock_profile == "high_client_ops") {
+    profile = &high_client_ops_profile;
+    dout(10) << "Setting high_client_ops profile " << *profile << dendl;
+  } else if (mclock_profile == "high_recovery_ops") {
+    profile = &high_recovery_ops_profile;
+    dout(10) << "Setting high_recovery_ops profile " << *profile << dendl;
+  } else if (mclock_profile == "balanced") {
+    profile = &balanced_profile;
+    dout(10) << "Setting balanced profile " << *profile << dendl;
+  } else if (mclock_profile == "custom") {
+    dout(10) << "Profile set to custom, not setting defaults" << dendl;
+    return;
+  } else {
+    derr << "Invalid mclock profile: " << mclock_profile << dendl;
+    ceph_assert("Invalid choice of mclock profile" == 0);
+    return;
+  }
+  ceph_assert(nullptr != profile);
+
+  auto set_config = [&conf = cct->_conf](const char *key, auto val) {
+    return conf.set_val_default_sync(key, std::to_string(val));
+  };
+
+  set_config("osd_mclock_scheduler_client_res", profile->client.reservation);
+  set_config("osd_mclock_scheduler_client_res", profile->client.reservation);
+  set_config("osd_mclock_scheduler_client_wgt", profile->client.weight);
+  set_config("osd_mclock_scheduler_client_lim", profile->client.limit);
+
+
+  set_config(
+    "osd_mclock_scheduler_background_recovery_res",
+    profile->background_recovery.reservation);
+  set_config(
+    "osd_mclock_scheduler_background_recovery_wgt",
+    profile->background_recovery.weight);
+  set_config(
+    "osd_mclock_scheduler_background_recovery_lim",
+    profile->background_recovery.limit);
+
+  set_config(
+    "osd_mclock_scheduler_background_best_effort_res",
+    profile->background_best_effort.reservation);
+  set_config(
+    "osd_mclock_scheduler_background_best_effort_wgt",
+    profile->background_best_effort.weight);
+  set_config(
+    "osd_mclock_scheduler_background_best_effort_lim",
+    profile->background_best_effort.limit);
+
+}
+
+uint32_t mClockScheduler::calc_scaled_cost(int item_cost)
+{
+  auto cost = static_cast<uint32_t>(
+    std::max<int>(
+      1, // ensure cost is non-zero and positive
+      item_cost));
+  auto cost_per_io = static_cast<uint32_t>(osd_bandwidth_cost_per_io);
+
+  return std::max<uint32_t>(cost, cost_per_io);
+}
+
 void mClockScheduler::dump(ceph::Formatter &f) const
 {
+  // Display queue sizes
+  f.open_object_section("queue_sizes");
+  f.dump_int("high_priority_queue", high_priority.size());
+  f.dump_int("scheduler", scheduler.request_count());
+  f.close_section();
+
+  // client map and queue tops (res, wgt, lim)
+  std::ostringstream out;
+  f.open_object_section("mClockClients");
+  f.dump_int("client_count", scheduler.client_count());
+  //out << scheduler;
+  f.dump_string("clients", out.str());
+  f.close_section();
+
+  // Display sorted queues (res, wgt, lim)
+  f.open_object_section("mClockQueues");
+  f.dump_string("queues", display_queues());
+  f.close_section();
+
+  f.open_object_section("HighPriorityQueue");
+  for (auto it = high_priority.begin();
+       it != high_priority.end(); it++) {
+    f.dump_int("priority", it->first);
+    f.dump_int("queue_size", it->second.size());
+  }
+  f.close_section();
 }
 
 void mClockScheduler::enqueue(item_t&& item)
 {
   auto id = get_scheduler_id(item);
-  auto cost = item.params.cost;
+  unsigned priority = item.get_priority();
 
+  // TODO: move this check into item, handle backwards compat
   if (scheduler_class_t::immediate == item.params.klass) {
-    immediate.push_front(std::move(item));
+    enqueue_high(immediate_class_priority, std::move(item));
+  } else if (priority >= cutoff_priority) {
+    enqueue_high(priority, std::move(item));
   } else {
+    auto cost = calc_scaled_cost(item.get_cost());
+    dout(20) << __func__ << " " << id
+             << " item_cost: " << item.get_cost()
+             << " scaled_cost: " << cost
+             << dendl;
+
+    // Add item to scheduler queue
     scheduler.add_request(
       std::move(item),
       id,
       cost);
+    //_get_mclock_counter(id);
   }
+
+ dout(10) << __func__ << " client_count: " << scheduler.client_count()
+          << " queue_sizes: [ "
+         << " high_priority_queue: " << high_priority.size()
+          << " sched: " << scheduler.request_count() << " ]"
+          << dendl;
+ dout(30) << __func__ << " mClockClients: "
+          << dendl;
+ dout(10) << __func__ << " mClockQueues: { "
+          << display_queues() << " }"
+          << dendl;
 }
 
 void mClockScheduler::enqueue_front(item_t&& item)
 {
-  immediate.push_back(std::move(item));
-  // TODO: item may not be immediate, update mclock machinery to permit
-  // putting the item back in the queue
+  unsigned priority = item.get_priority();
+
+  if (scheduler_class_t::immediate == item.params.klass) {
+    enqueue_high(immediate_class_priority, std::move(item), true);
+  } else if (priority >= cutoff_priority) {
+    enqueue_high(priority, std::move(item), true);
+  } else {
+    // mClock does not support enqueue at front, so we use
+    // the high queue with priority 0
+    enqueue_high(0, std::move(item), true);
+  }
+}
+
+void mClockScheduler::enqueue_high(unsigned priority,
+                                   item_t&& item,
+                                  bool front)
+{
+  if (front) {
+    high_priority[priority].push_back(std::move(item));
+  } else {
+    high_priority[priority].push_front(std::move(item));
+  }
 }
 
-item_t mClockScheduler::dequeue()
+WorkItem mClockScheduler::dequeue()
 {
-  if (!immediate.empty()) {
-    auto ret = std::move(immediate.back());
-    immediate.pop_back();
+  if (!high_priority.empty()) {
+    auto iter = high_priority.begin();
+    // invariant: high_priority entries are never empty
+    assert(!iter->second.empty());
+    WorkItem ret{std::move(iter->second.back())};
+    iter->second.pop_back();
+    if (iter->second.empty()) {
+      // maintain invariant, high priority entries are never empty
+      high_priority.erase(iter);
+    }
+
     return ret;
   } else {
     mclock_queue_t::PullReq result = scheduler.pull_request();
     if (result.is_future()) {
-      ceph_assert(
-       0 == "Not implemented, user would have to be able to be woken up");
-      return std::move(*(item_t*)nullptr);
+      return result.getTime();
     } else if (result.is_none()) {
       ceph_assert(
        0 == "Impossible, must have checked empty() first");
@@ -139,27 +483,110 @@ item_t mClockScheduler::dequeue()
   }
 }
 
+std::string mClockScheduler::display_queues() const
+{
+  std::ostringstream out;
+  scheduler.display_queues(out);
+  return out.str();
+}
+
+/*const char** mClockScheduler::get_tracked_conf_keys() const
+{
+  static const char* KEYS[] = {
+    "osd_mclock_scheduler_client_res",
+    "osd_mclock_scheduler_client_wgt",
+    "osd_mclock_scheduler_client_lim",
+    "osd_mclock_scheduler_background_recovery_res",
+    "osd_mclock_scheduler_background_recovery_wgt",
+    "osd_mclock_scheduler_background_recovery_lim",
+    "osd_mclock_scheduler_background_best_effort_res",
+    "osd_mclock_scheduler_background_best_effort_wgt",
+    "osd_mclock_scheduler_background_best_effort_lim",
+    "osd_mclock_max_capacity_iops_hdd",
+    "osd_mclock_max_capacity_iops_ssd",
+    "osd_mclock_max_sequential_bandwidth_hdd",
+    "osd_mclock_max_sequential_bandwidth_ssd",
+    "osd_mclock_profile",
+    NULL
+  };
+  return KEYS;
+}*/
+
 std::vector<std::string> mClockScheduler::get_tracked_keys() const noexcept
 {
   return {
-    "osd_mclock_scheduler_client_res"s,
-    "osd_mclock_scheduler_client_wgt"s,
-    "osd_mclock_scheduler_client_lim"s,
-    "osd_mclock_scheduler_background_recovery_res"s,
-    "osd_mclock_scheduler_background_recovery_wgt"s,
-    "osd_mclock_scheduler_background_recovery_lim"s,
-    "osd_mclock_scheduler_background_best_effort_res"s,
-    "osd_mclock_scheduler_background_best_effort_wgt"s,
-    "osd_mclock_scheduler_background_best_effort_lim"s
-  };
+    "osd_mclock_scheduler_client_res",
+    "osd_mclock_scheduler_client_wgt",
+    "osd_mclock_scheduler_client_lim",
+    "osd_mclock_scheduler_background_recovery_res",
+    "osd_mclock_scheduler_background_recovery_wgt",
+    "osd_mclock_scheduler_background_recovery_lim",
+    "osd_mclock_scheduler_background_best_effort_res",
+    "osd_mclock_scheduler_background_best_effort_wgt",
+    "osd_mclock_scheduler_background_best_effort_lim",
+    "osd_mclock_max_capacity_iops_hdd",
+    "osd_mclock_max_capacity_iops_ssd",
+    "osd_mclock_max_sequential_bandwidth_hdd",
+    "osd_mclock_max_sequential_bandwidth_ssd",
+    "osd_mclock_profile"
+  }; 
 }
 
-
 void mClockScheduler::handle_conf_change(
   const ConfigProxy& conf,
   const std::set<std::string> &changed)
 {
-  client_registry.update_from_config(conf);
+  if (changed.count("osd_mclock_max_capacity_iops_hdd") ||
+      changed.count("osd_mclock_max_capacity_iops_ssd")) {
+    set_osd_capacity_params_from_config();
+    client_registry.update_from_config(
+      conf, osd_bandwidth_capacity_per_shard);
+  }
+  if (changed.count("osd_mclock_max_sequential_bandwidth_hdd") ||
+      changed.count("osd_mclock_max_sequential_bandwidth_ssd")) {
+    set_osd_capacity_params_from_config();
+    client_registry.update_from_config(
+      conf, osd_bandwidth_capacity_per_shard);
+  }
+  if (changed.count("osd_mclock_profile")) {
+    set_config_defaults_from_profile();
+    client_registry.update_from_config(
+      conf, osd_bandwidth_capacity_per_shard);
+  }
+
+  auto get_changed_key = [&changed]() -> std::optional<std::string> {
+    static const std::vector<std::string> qos_params = {
+      "osd_mclock_scheduler_client_res",
+      "osd_mclock_scheduler_client_wgt",
+      "osd_mclock_scheduler_client_lim",
+      "osd_mclock_scheduler_background_recovery_res",
+      "osd_mclock_scheduler_background_recovery_wgt",
+      "osd_mclock_scheduler_background_recovery_lim",
+      "osd_mclock_scheduler_background_best_effort_res",
+      "osd_mclock_scheduler_background_best_effort_wgt",
+      "osd_mclock_scheduler_background_best_effort_lim"
+    };
+
+    for (auto &qp : qos_params) {
+      if (changed.count(qp)) {
+        return qp;
+      }
+    }
+    return std::nullopt;
+  };
+
+  if (auto key = get_changed_key(); key.has_value()) {
+    auto mclock_profile = cct->_conf.get_val<std::string>("osd_mclock_profile");
+    if (mclock_profile == "custom") {
+      client_registry.update_from_config(
+        conf, osd_bandwidth_capacity_per_shard);
+    }
+  }
+}
+
+mClockScheduler::~mClockScheduler()
+{
+  cct->_conf.remove_observer(this);
 }
 
 }
index 16082ffd0954042207d430e5c23ebefea4bf976c..d8d3c324001f5ea9b9df48db08e009748ead8c9b 100644 (file)
@@ -15,6 +15,7 @@
 
 #pragma once
 
+#include <functional>
 #include <ostream>
 #include <map>
 #include <vector>
 #include "boost/variant.hpp"
 
 #include "dmclock/src/dmclock_server.h"
-
 #include "crimson/osd/scheduler/scheduler.h"
+#include "crimson/mon/MonClient.h"
+
 #include "common/config.h"
 #include "common/ceph_context.h"
 
 
 namespace crimson::osd::scheduler {
 
-using client_id_t = uint64_t;
-using profile_id_t = uint64_t;
+constexpr double default_min = 0.0;
+constexpr double default_max = std::numeric_limits<double>::is_iec559 ?
+  std::numeric_limits<double>::infinity() :
+  std::numeric_limits<double>::max();
 
+/**
+ * client_profile_id_t
+ *
+ * client_id - global id (client.####) for client QoS
+ * profile_id - id generated by client's QoS profile
+ *
+ * Currently (Reef and below), both members are set to
+ * 0 which ensures that all external clients share the
+ * mClock profile allocated reservation and limit
+ * bandwidth.
+ *
+ * Note: Post Reef, both members will be set to non-zero
+ * values when the distributed feature of the mClock
+ * algorithm is utilized.
+ */
 struct client_profile_id_t {
-  client_id_t client_id;
-  profile_id_t profile_id;
+  uint64_t client_id = 0;
+  uint64_t profile_id = 0;
+
+  client_profile_id_t(uint64_t _client_id, uint64_t _profile_id) :
+    client_id(_client_id),
+    profile_id(_profile_id) {}
+
+  client_profile_id_t() = default;
+
   auto operator<=>(const client_profile_id_t&) const = default;
+  friend std::ostream& operator<<(std::ostream& out,
+                                  const client_profile_id_t& client_profile) {
+    out << " client_id: " << client_profile.client_id
+        << " profile_id: " << client_profile.profile_id;
+    return out;
+  }
 };
 
-
 struct scheduler_id_t {
   scheduler_class_t class_id;
   client_profile_id_t client_profile_id;
+
   auto operator<=>(const scheduler_id_t&) const = default;
+  friend std::ostream& operator<<(std::ostream& out,
+                                  const scheduler_id_t& sched_id) {
+    out << "{ class_id: " << sched_id.class_id
+        << sched_id.client_profile_id;
+    return out << " }";
+  }
 };
 
 /**
@@ -53,6 +91,56 @@ struct scheduler_id_t {
  */
 class mClockScheduler : public Scheduler, md_config_obs_t {
 
+  crimson::common::CephContext *cct;
+  const int whoami;
+  const uint32_t num_shards;
+  const int shard_id;
+  const bool is_rotational;
+  unsigned cutoff_priority;
+  PerfCounters *logger;
+
+  /**
+   * osd_bandwidth_cost_per_io
+   *
+   * mClock expects all queued items to have a uniform expression of
+   * "cost".  However, IO devices generally have quite different capacity
+   * for sequential IO vs small random IO.  This implementation handles this
+   * by expressing all costs as a number of sequential bytes written adding
+   * additional cost for each random IO equal to osd_bandwidth_cost_per_io.
+   *
+   * Thus, an IO operation requiring a total of <size> bytes to be written
+   * accross <iops> different locations will have a cost of
+   * <size> + (osd_bandwidth_cost_per_io * <iops>) bytes.
+   *
+   * Set in set_osd_capacity_params_from_config in the constructor and upon
+   * config change.
+   *
+   * Has units bytes/io.
+   */
+  double osd_bandwidth_cost_per_io;
+
+  /**
+   * osd_bandwidth_capacity_per_shard
+   *
+   * mClock expects reservation and limit paramters to be expressed in units
+   * of cost/second -- which means bytes/second for this implementation.
+   *
+   * Rather than expecting users to compute appropriate limit and reservation
+   * values for each class of OSDs in their cluster, we instead express
+   * reservation and limit paramaters as ratios of the OSD's maxmimum capacity.
+   * osd_bandwidth_capacity_per_shard is that capacity divided by the number
+   * of shards.
+   *
+   * Set in set_osd_capacity_params_from_config in the constructor and upon
+   * config change.
+   *
+   * This value gets passed to ClientRegistry::update_from_config in order
+   * to resolve the full reservaiton and limit parameters for mclock from
+   * the configured ratios.
+   *
+   * Has units bytes/second.
+   */
+  double osd_bandwidth_capacity_per_shard;
   class ClientRegistry {
     std::array<
       crimson::dmclock::ClientInfo,
@@ -69,7 +157,16 @@ class mClockScheduler : public Scheduler, md_config_obs_t {
     const crimson::dmclock::ClientInfo *get_external_client(
       const client_profile_id_t &client) const;
   public:
-    void update_from_config(const ConfigProxy &conf);
+    /**
+     * update_from_config
+     *
+     * Sets the mclock paramaters (reservation, weight, and limit)
+     * for each class of IO (background_recovery, background_best_effort,
+     * and client).
+     */
+    void update_from_config(
+      const ConfigProxy &conf,
+      double capacity_per_shard);
     const crimson::dmclock::ClientInfo *get_info(
       const scheduler_id_t &id) const;
   } client_registry;
@@ -80,47 +177,91 @@ class mClockScheduler : public Scheduler, md_config_obs_t {
     true,
     true,
     2>;
+  using priority_t = unsigned;
+  using SubQueue = std::map<priority_t,
+       std::list<item_t>,
+       std::greater<priority_t>>;
   mclock_queue_t scheduler;
-  std::list<item_t> immediate;
+  /**
+   * high_priority
+   *
+   * Holds entries to be dequeued in strict order ahead of mClock
+   * Invariant: entries are never empty
+   */
+  SubQueue high_priority;
+  priority_t immediate_class_priority = std::numeric_limits<priority_t>::max();
 
   static scheduler_id_t get_scheduler_id(const item_t &item) {
     return scheduler_id_t{
       item.params.klass,
-       client_profile_id_t{
-       item.params.owner,
-         0
-         }
+      client_profile_id_t{
+        item.params.owner,
+        0
+      }
     };
   }
 
+  /**
+   * set_osd_capacity_params_from_config
+   *
+   * mClockScheduler uses two parameters, osd_bandwidth_cost_per_io
+   * and osd_bandwidth_capacity_per_shard, internally.  These two
+   * parameters are derived from config parameters
+   * osd_mclock_max_capacity_iops_(hdd|ssd) and
+   * osd_mclock_max_sequential_bandwidth_(hdd|ssd) as well as num_shards.
+   * Invoking set_osd_capacity_params_from_config() resets those derived
+   * params based on the current config and should be invoked any time they
+   * are modified as well as in the constructor.  See handle_conf_change().
+   */
+  void set_osd_capacity_params_from_config();
+
+  // Set the mclock related config params based on the profile
+  void set_config_defaults_from_profile();
+
 public:
-  mClockScheduler(ConfigProxy &conf);
+  mClockScheduler(CephContext *cct, int whoami, uint32_t num_shards,
+    int shard_id, bool is_rotational, bool init_perfcounter=true);
+  ~mClockScheduler() override;
+
+  /// Calculate scaled cost per item
+  uint32_t calc_scaled_cost(int cost);
+
+  // Helper method to display mclock queues
+  std::string display_queues() const;
 
   // Enqueue op in the back of the regular queue
   void enqueue(item_t &&item) final;
 
-  // Enqueue the op in the front of the regular queue
+  // Enqueue the op in the front of the high priority queue
   void enqueue_front(item_t &&item) final;
 
   // Return an op to be dispatch
-  item_t dequeue() final;
+  WorkItem dequeue() final;
 
   // Returns if the queue is empty
   bool empty() const final {
-    return immediate.empty() && scheduler.empty();
+    return scheduler.empty() && high_priority.empty();
   }
 
   // Formatted output of the queue
   void dump(ceph::Formatter &f) const final;
 
   void print(std::ostream &ostream) const final {
-    ostream << "mClockScheduler";
+    ostream << "mClockScheduer ";
+    ostream << ", cutoff=" << cutoff_priority;
   }
 
   std::vector<std::string> get_tracked_keys() const noexcept final;
 
   void handle_conf_change(const ConfigProxy& conf,
                          const std::set<std::string> &changed) final;
+
+  double get_cost_per_io() const {
+    return osd_bandwidth_cost_per_io;
+  }
+private:
+  // Enqueue the op to the high priority queue
+  void enqueue_high(unsigned prio, item_t &&item, bool front = false);
 };
 
 }
index c85cb388ef36e255407cd447e096fc433e0f97ba..70ca8002f9afdc53dd10e994c3b5f654e9815236 100644 (file)
@@ -66,7 +66,7 @@ class ClassedOpQueueScheduler final : public Scheduler {
     if (conf.get_val<std::string>("osd_op_queue_cut_off") == "debug_random") {
       srand(time(NULL));
       return (rand() % 2 < 1) ?
-       scheduler_class_t::repop : scheduler_class_t::immediate;
+        scheduler_class_t::repop : scheduler_class_t::immediate;
     } else if (conf.get_val<std::string>("osd_op_queue_cut_off") == "high") {
       return scheduler_class_t::immediate;
     } else {
@@ -128,7 +128,7 @@ public:
     return queue.empty();
   }
 
-  item_t dequeue() final {
+  WorkItem dequeue() final {
     return queue.dequeue();
   }
 
@@ -145,7 +145,8 @@ public:
   ~ClassedOpQueueScheduler() final {};
 };
 
-SchedulerRef make_scheduler(ConfigProxy &conf)
+SchedulerRef make_scheduler(CephContext *cct, ConfigProxy &conf, int whoami, uint32_t nshards, int sid,
+                            bool is_rotational, bool perf_cnt)
 {
   const std::string _type = conf.get_val<std::string>("osd_op_queue");
   const std::string *type = &_type;
@@ -166,7 +167,7 @@ SchedulerRef make_scheduler(ConfigProxy &conf)
        conf->osd_op_pq_min_cost
       );
   } else if (*type == "mclock_scheduler") {
-    return std::make_unique<mClockScheduler>(conf);
+    return std::make_unique<mClockScheduler>(cct, whoami, nshards, sid, is_rotational, perf_cnt);
   } else {
     ceph_assert("Invalid choice of wq" == 0);
     return std::unique_ptr<mClockScheduler>();
index a014991abcd053a95aad58deca425dbccc0a0d01..0d3c147a43e382501c5287da986b70078656ec46 100644 (file)
@@ -22,8 +22,8 @@
 namespace crimson::osd::scheduler {
 
 enum class scheduler_class_t : uint8_t {
-  background_best_effort = 0,
-  background_recovery,
+  background_recovery = 0,
+  background_best_effort,
   client,
   repop,
   immediate,
@@ -32,10 +32,10 @@ enum class scheduler_class_t : uint8_t {
 std::ostream &operator<<(std::ostream &, const scheduler_class_t &);
 
 using client_t = uint64_t;
-using cost_t = uint64_t;
 
 struct params_t {
-  cost_t cost = 1;
+  int cost = 1;
+  unsigned priority = 0;
   client_t owner;
   scheduler_class_t klass;
 };
@@ -43,8 +43,11 @@ struct params_t {
 struct item_t {
   params_t params;
   seastar::promise<> wake;
+  int get_cost() const { return params.cost; }
+  unsigned get_priority() const { return params.priority; }
 };
 
+using WorkItem = std::variant<std::monostate, item_t, double>;
 /**
  * Base interface for classes responsible for choosing
  * op processing order in the OSD.
@@ -62,7 +65,7 @@ public:
   virtual bool empty() const = 0;
 
   // Return next op to be processed
-  virtual item_t dequeue() = 0;
+  virtual WorkItem dequeue() = 0;
 
   // Dump formatted representation for the queue
   virtual void dump(ceph::Formatter &f) const = 0;
@@ -77,6 +80,7 @@ public:
 std::ostream &operator<<(std::ostream &lhs, const Scheduler &);
 using SchedulerRef = std::unique_ptr<Scheduler>;
 
-SchedulerRef make_scheduler(ConfigProxy &);
+SchedulerRef make_scheduler(CephContext *cct, ConfigProxy &, int whoami, uint32_t num_shards,
+                            int shard_id, bool is_rotational, bool perf_cnt);
 
 }
index 400976d0db9544b2de8d3ed4a00604c4c267bde8..caf3123b72e4b39b68abb851d2564d7124ec2622 100644 (file)
@@ -209,6 +209,10 @@ public:
     PerfCounters *recoverystate_perf,
     crimson::os::FuturizedStore &store,
     OSDState& osd_state);
+
+  void initialize_scheduler(CephContext* cct, bool is_rotational) {
+    throttler.initialize_scheduler(cct, crimson::common::local_conf(), is_rotational, whoami);
+ }
 };
 
 /**
@@ -593,8 +597,7 @@ public:
   }
 
   FORWARD_TO_OSD_SINGLETON(get_pool_info)
-  FORWARD(with_throttle_while, with_throttle_while, local_state.throttler)
-  FORWARD(try_acquire_throttle_now, try_acquire_throttle_now, local_state.throttler)
+  FORWARD(with_throttle, with_throttle, local_state.throttler)
 
   FORWARD_TO_OSD_SINGLETON(build_incremental_map_msg)
   FORWARD_TO_OSD_SINGLETON(send_incremental_map)
@@ -618,6 +621,9 @@ public:
     snap_dump_reservations,
     snap_reserver.dump)
 
+  bool throttle_available() const {
+    return local_state.throttler.available();
+  }
 
   auto local_update_priority(
     singleton_orderer_t &orderer,