]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson: Support mclock for crimson backfill_try_patch
authorMohit Agrawal <moagrawa@redhat.com>
Wed, 26 Feb 2025 09:32:21 +0000 (15:02 +0530)
committerSuper User <root@shakthi5.lab.eng.blr.redhat.com>
Thu, 27 Mar 2025 09:48:54 +0000 (15:18 +0530)
The patch is trying to sync mclock source of crimson
similar to classic osds. Currently the feature is use
by crimson only for background recovery operations but
later we will use it for other OSD operations also.To use
the same user need to configure crimson_osd_scheduler_concurrency
parameter for osd and the schedule is enabled for the recovery
related operations.

Signed-off-by: Mohit Agrawal <moagrawa@redhat.com>
16 files changed:
src/crimson/common/config_proxy.h
src/crimson/osd/backfill_state.cc
src/crimson/osd/backfill_state.h
src/crimson/osd/osd.cc
src/crimson/osd/osd.h
src/crimson/osd/osd_operation.cc
src/crimson/osd/osd_operation.h
src/crimson/osd/osd_operations/background_recovery.h
src/crimson/osd/pg.h
src/crimson/osd/pg_recovery.cc
src/crimson/osd/pg_recovery.h
src/crimson/osd/scheduler/mclock_scheduler.cc
src/crimson/osd/scheduler/mclock_scheduler.h
src/crimson/osd/scheduler/scheduler.cc
src/crimson/osd/scheduler/scheduler.h
src/crimson/osd/shard_services.h

index fce86f2389cb1ac3c0144136ccefe49cad9f9138..f0dda3760e89473a52762af5e674140d4fa6df1e 100644 (file)
@@ -152,9 +152,15 @@ public:
       }
     });
   }
+
   int get_val(std::string_view key, std::string *val) const {
     return get_config().get_val(*values, key, val);
   }
+
+  void set_val_default_sync(const std::string& key, const std::string& val) {
+    get_config().set_val_default(*values, obs_mgr, key, val);
+  }
+
   template<typename T>
   const T get_val(std::string_view key) const {
     return get_config().template get_val<T>(*values, key);
index a068c1fa8d41bd95602f865d8d34ceaa2a65e1c0..cc6df427e1ab9fafe6d849823f7d9af3f5540c7a 100644 (file)
@@ -7,6 +7,7 @@
 #include "common/hobject.h"
 #include "crimson/osd/backfill_state.h"
 #include "osd/osd_types_fmt.h"
+#include "crimson/osd/pg_interval_interrupt_condition.h"
 
 SET_SUBSYS(osd);
 
@@ -310,11 +311,58 @@ bool BackfillState::Enqueuing::Enqueuing::all_emptied(
   return local_backfill_info.empty() && replicas_emptied;
 }
 
+int BackfillState::Enqueuing::process_backfill_attempt(auto & primary_bi)
+{
+
+  LOG_PREFIX(BackfillState::Enqueuing::process_backfill_attempt);
+  if (!backfill_listener().budget_available()) {
+    DEBUGDPP("throttle failed, turning to Waiting", pg());
+    post_event(RequestWaiting{});
+    return 1;
+  } else if (should_rescan_replicas(backfill_state().peer_backfill_info,
+                                    primary_bi)) {
+    // Count simultaneous scans as a single op and let those complete
+    post_event(RequestReplicasScanning{});
+    return 1;
+  }
+
+  if (all_emptied(primary_bi, backfill_state().peer_backfill_info)) {
+     return 2;
+  }
+  // Get object within set of peers to operate on and the set of targets
+  // for which that object applies.
+  if (const hobject_t check = \
+      earliest_peer_backfill(backfill_state().peer_backfill_info);
+      check < primary_bi.begin) {
+    // Don't increment ops here because deletions
+    // are cheap and not replied to unlike real recovery_ops,
+    // and we can't increment ops without requeueing ourself
+    // for recovery.
+    auto result = remove_on_peers(check);
+    trim_backfilled_object_from_intervals(std::move(result),
+                                          backfill_state().last_backfill_started,
+                                          backfill_state().peer_backfill_info);
+                                          backfill_listener().maybe_flush();
+  } else if (!primary_bi.empty()) {
+    auto result = update_on_peers(check);
+    trim_backfilled_object_from_intervals(std::move(result),
+                                          backfill_state().last_backfill_started,
+                                          backfill_state().peer_backfill_info);
+    primary_bi.pop_front();
+    backfill_listener().maybe_flush();
+    } else {
+      return 2;
+    }
+    return 0;
+}
+
 BackfillState::Enqueuing::Enqueuing(my_context ctx)
   : my_base(ctx)
 {
   LOG_PREFIX(BackfillState::Enqueuing::Enqueuing);
   auto& primary_bi = backfill_state().backfill_info;
+  using interruptor =
+    crimson::interruptible::interruptor<crimson::osd::IOInterruptCondition>;
 
   // update our local interval to cope with recent changes
   primary_bi.begin = backfill_state().last_backfill_started;
@@ -339,7 +387,8 @@ BackfillState::Enqueuing::Enqueuing(my_context ctx)
     post_event(RequestPrimaryScanning{});
     return;
   }
-
+  
+  /*
   do {
     if (!backfill_listener().budget_available()) {
       DEBUGDPP("throttle failed, turning to Waiting", pg());
@@ -347,7 +396,6 @@ BackfillState::Enqueuing::Enqueuing(my_context ctx)
       return;
     } else if (should_rescan_replicas(backfill_state().peer_backfill_info,
                                      primary_bi)) {
-      // Count simultaneous scans as a single op and let those complete
       post_event(RequestReplicasScanning{});
       return;
     }
@@ -355,15 +403,9 @@ BackfillState::Enqueuing::Enqueuing(my_context ctx)
     if (all_emptied(primary_bi, backfill_state().peer_backfill_info)) {
       break;
     }
-    // Get object within set of peers to operate on and the set of targets
-    // for which that object applies.
     if (const hobject_t check = \
           earliest_peer_backfill(backfill_state().peer_backfill_info);
         check < primary_bi.begin) {
-      // Don't increment ops here because deletions
-      // are cheap and not replied to unlike real recovery_ops,
-      // and we can't increment ops without requeueing ourself
-      // for recovery.
       auto result = remove_on_peers(check);
       trim_backfilled_object_from_intervals(std::move(result),
                                            backfill_state().last_backfill_started,
@@ -380,7 +422,32 @@ BackfillState::Enqueuing::Enqueuing(my_context ctx)
       break;
     }
   } while (!all_emptied(primary_bi, backfill_state().peer_backfill_info));
+   */
 
+  do {
+    seastar::logger& logger = crimson::get_logger(0);
+    logger.info("Calling acq_throttle ");
+    std::optional<int> backfill_result;
+    auto futopt = backfill_listener().acq_throttle();
+    if (!futopt) {
+      seastar::logger& logger = crimson::get_logger(0);
+      logger.info("throtteling future is disabled");
+      process_backfill_attempt(primary_bi);
+    } else {
+      logger.info(" throttle is acquired");
+      backfill_listener().throttle_acquired = true;
+      std::ignore = interruptor::make_interruptible(std::move(*futopt)
+      ).then_interruptible([this, &primary_bi, &backfill_result] {
+         seastar::logger& logger = crimson::get_logger(0);
+         logger.info("throtteling future is loaded ");
+         backfill_result = process_backfill_attempt(primary_bi);
+         backfill_listener().release_throttle();
+      });
+    }
+    if(backfill_result.has_value())
+      logger.info("backfill_result value is {}", backfill_result.value());
+  } while (!all_emptied(primary_bi, backfill_state().peer_backfill_info));
+   
   if (should_rescan_primary(backfill_state().peer_backfill_info,
                                   primary_bi)) {
     // need to grab one another chunk of the object namespace and restart
index ea4b159ded92f971e3580466ccb1a879f7a235ed..909d112c99834ba64a8f6c1311b146b23d3511fe 100644 (file)
@@ -170,7 +170,7 @@ public:
       const PeeringFacade& peering_state,
       const BackfillInterval& backfill_info,
       const std::map<pg_shard_t, BackfillInterval>& peer_backfill_info);
-
+    int process_backfill_attempt(auto &primary_bi);
   private:
     void maybe_update_range();
     void trim_backfill_infos();
@@ -320,6 +320,8 @@ public:
       backfill_machine.process_event(RequestDone{});
     }
   }
+  //void process_backfill_attempt(auto &primary_bi);
+
 private:
   struct backfill_suspend_state_t {
     bool suspended = false;
@@ -383,6 +385,9 @@ struct BackfillState::BackfillListener {
 
   virtual void backfilled() = 0;
 
+  virtual std::optional<seastar::future<>> acq_throttle() = 0;
+  virtual void release_throttle() = 0;
+  bool throttle_acquired = false;
   virtual ~BackfillListener() = default;
 };
 
index 27511df85822ca3ccd52da3de232247f535fdcab..13a7d9fc5ab8528f6b35898ee2e54747124dfb85 100644 (file)
@@ -581,6 +581,12 @@ seastar::future<> OSD::start()
     return _add_me_to_crush();
   }).then([this] {
     return _add_device_class();
+   }).then([this] {
+    return _get_device_type();
+   }).then([this](auto rotational) {
+    return shard_services.invoke_on_all([this, rotational](auto &local_service) {
+      local_service.local_state.initialize_scheduler(local_service.get_cct(), rotational);
+    });
    }).then([this] {
     monc->sub_want("osd_pg_creates", last_pg_create_epoch, 0);
     monc->sub_want("mgrmap", 0, 0);
@@ -685,6 +691,24 @@ seastar::future<> OSD::_send_boot()
   m->metadata["osd_type"] = "crimson";
   return monc->send_message(std::move(m));
 }
+seastar::future<bool> OSD::_get_device_type()
+{
+  LOG_PREFIX(OSD::_get_device_type);
+  if (!local_conf().get_val<bool>("osd_class_update_on_start")) {
+    co_return false;
+  }
+
+  std::string device_class = co_await store.get_default_device_class();
+  if (device_class.empty()) {
+    INFO("Device class is empty; ");
+    co_return false;
+  }
+
+  bool rotational = (device_class != "ssd");
+
+  INFO("device_class is {} rotational: {}", device_class, rotational);
+  co_return rotational;
+}
 
 seastar::future<> OSD::_add_device_class()
 {
index 5d36f7ad8cee16176cef9fd240cd83c1c2da245e..8f0c7a0dad3c36cfbe7e475ce1324959c733708e 100644 (file)
@@ -189,6 +189,7 @@ private:
   seastar::future<> _send_boot();
   seastar::future<> _add_me_to_crush();
   seastar::future<> _add_device_class();
+  seastar::future<bool> _get_device_type();
 
   seastar::future<> osdmap_subscribe(version_t epoch, bool force_request);
 
index 978009709761324fb48b9b6c2f8bcb374a686038..52f2a32db5baa47e5f7e022399ca2cc4abbd140e 100644 (file)
@@ -14,6 +14,7 @@ namespace {
   }
 }
 
+using namespace crimson::osd::scheduler;
 namespace crimson::osd {
 
 void OSDOperationRegistry::do_stop()
@@ -149,21 +150,50 @@ void OSDOperationRegistry::visit_ops_in_flight(std::function<void(const ClientRe
   }
 }
 
+
 OperationThrottler::OperationThrottler(ConfigProxy &conf)
-  : scheduler(crimson::osd::scheduler::make_scheduler(conf))
 {
   conf.add_observer(this);
+}
+
+void OperationThrottler::initialize_scheduler(CephContext *cct, ConfigProxy &conf, bool is_rotational, int whoami)
+{
+  scheduler = crimson::osd::scheduler::make_scheduler(cct, conf, whoami, seastar::smp::count,
+            seastar::this_shard_id(), is_rotational, true);
   update_from_config(conf);
 }
 
 void OperationThrottler::wake()
 {
+  WorkItem work_item;
+  logger().info("MY_LOG wake() called. in_progress: {}, max_in_progress: {}", 
+                in_progress, max_in_progress);
+
   while ((!max_in_progress || in_progress < max_in_progress) &&
         !scheduler->empty()) {
-    auto item = scheduler->dequeue();
-    item.wake.set_value();
-    ++in_progress;
-    --pending;
+    work_item = scheduler->dequeue();
+    if (auto when_ready = std::get_if<double>(&work_item)) {
+      ceph::real_clock::time_point future_time = ceph::real_clock::from_double(*when_ready);
+      auto now = ceph::real_clock::now();
+      auto wait_duration = std::chrono::duration_cast<std::chrono::milliseconds>(future_time - now);
+      if (wait_duration.count() > 0) {
+        logger().info("No items ready. Retrying wake() in {} ms", wait_duration.count());
+        seastar::sleep(wait_duration).then([this] {
+          wake();
+        });
+        return;
+      }
+    }
+    if (auto *item = std::get_if<crimson::osd::scheduler::item_t>(&work_item)) {
+      logger().info("MY_LOG Processing item and setting future");
+      item->wake.set_value();
+      ++in_progress;
+      --pending;
+
+    }
+  }
+  if (scheduler->empty()) {
+    logger().warn("MY_LOG wake() exited early: scheduler is empty");
   }
 }
 
index d702919bd6fae6ca52eceb5086be9399d4751caa..1df3eaeea991f2a651018f1dd12586702ddc17f6 100644 (file)
@@ -346,14 +346,17 @@ public:
   // returns the future for the acquiring otherwise
   std::optional<seastar::future<>>
   try_acquire_throttle_now(crimson::osd::scheduler::params_t params) {
-    if (!max_in_progress || in_progress < max_in_progress) {
-      ++in_progress;
-      --pending;
+    if (!max_in_progress) {
       return std::nullopt;
     }
     return acquire_throttle(params);
   }
 
+  void try_release_throttle() {
+    return release_throttle();
+  }
+
+  void initialize_scheduler(CephContext* cct, ConfigProxy &conf, bool is_rotational, int whoami);
 private:
   void dump_detail(Formatter *f) const final;
 
index 255a934cd49879263c3bc50ebaea9725af22dac9..d4e46075466fdf343bd0172acb8cc47ef97e6893 100644 (file)
@@ -38,8 +38,12 @@ protected:
 private:
   virtual void dump_detail(Formatter *f) const;
   crimson::osd::scheduler::params_t get_scheduler_params() const {
+    int cost = std::max<int64_t>(1, (pg->get_info().stats.stats.sum.num_bytes / pg->get_info().stats.stats.sum.num_objects));
+    unsigned priority = pg->get_recovery_op_priority();
+
     return {
-      1, // cost
+      cost, // cost
+      priority, // priority
       0, // owner
       scheduler_class
     };
index 1168c56933de2814a007f70d9ef71f1f147916d5..7e713ed124a7f905303802c1de1ed7d6eb211023 100644 (file)
@@ -867,9 +867,23 @@ public:
     int *return_code,
     std::vector<pg_log_op_return_item_t> *op_returns) const;
   int get_recovery_op_priority() const {
-    int64_t pri = 0;
-    get_pgpool().info.opts.get(pool_opts_t::RECOVERY_OP_PRIORITY, &pri);
-    return  pri > 0 ? pri : crimson::common::local_conf()->osd_recovery_op_priority;
+    const std::string _type = this->shard_services.get_cct()->_conf.get_val<std::string>("osd_op_queue");
+    const std::string *type = &_type;
+    if (*type == "mclock_scheduler") {
+      if (peering_state.is_forced_recovery_or_backfill()) {
+        return peering_state.recovery_msg_priority_t::FORCED;
+      } else if (peering_state.is_undersized()) {
+        return peering_state.recovery_msg_priority_t::UNDERSIZED;
+      } else if (peering_state.is_degraded()) {
+        return peering_state.recovery_msg_priority_t::DEGRADED;
+      } else {
+        return peering_state.recovery_msg_priority_t::BEST_EFFORT;
+      }
+    } else {
+      int64_t pri = 0;
+      get_pgpool().info.opts.get(pool_opts_t::RECOVERY_OP_PRIORITY, &pri);
+      return  pri > 0 ? pri : crimson::common::local_conf()->osd_recovery_op_priority;
+    }
   }
   seastar::future<> mark_unfound_lost(int) {
     // TODO: see PrimaryLogPG::mark_all_unfound_lost()
index 735dd7f02e81fabaeaa10702352ccc902295382e..a8c10dfcfdcfd3e2940115fcc384719210ad89b0 100644 (file)
@@ -603,8 +603,9 @@ void PGRecovery::update_peers_last_backfill(
 
 bool PGRecovery::budget_available() const
 {
+  return true;
   crimson::osd::scheduler::params_t params =
-    {1, 0, crimson::osd::scheduler::scheduler_class_t::background_best_effort};
+    {1, 0, 0, crimson::osd::scheduler::scheduler_class_t::background_best_effort};
   auto &ss = pg->get_shard_services();
   auto futopt = ss.try_acquire_throttle_now(std::move(params));
   if (!futopt) {
@@ -620,11 +621,35 @@ bool PGRecovery::budget_available() const
   return false;
 }
 
+std::optional<seastar::future<>> PGRecovery::acq_throttle() {
+  auto &ss = pg->get_shard_services();
+  crimson::osd::scheduler::params_t params =
+    {1, 0, 0, crimson::osd::scheduler::scheduler_class_t::background_best_effort};
+  auto futopt = ss.try_acquire_throttle_now(std::move(params));
+  if (!futopt) {
+    logger().info(" throttling is disabled for pg {} ", *static_cast<crimson::osd::PG*>(pg));
+    return std::nullopt;
+  }
+  return futopt;
+}
+
 void PGRecovery::on_pg_clean()
 {
   backfill_state.reset();
 }
 
+void PGRecovery::release_throttle()
+{
+  if (this->throttle_acquired) {
+    seastar::logger& logger = crimson::get_logger(0);
+    logger.info("Calling release_throttle pg_recovery");
+
+    auto &ss = pg->get_shard_services();
+    ss.try_release_throttle();
+    this->throttle_acquired = false;
+  }
+}
+
 void PGRecovery::backfilled()
 {
   using LocalPeeringEvent = crimson::osd::LocalPeeringEvent;
@@ -635,6 +660,7 @@ void PGRecovery::backfilled()
     pg->get_osdmap_epoch(),
     pg->get_osdmap_epoch(),
     PeeringState::Backfilled{});
+    release_throttle();
 }
 
 void PGRecovery::backfill_suspended()
@@ -679,9 +705,27 @@ void PGRecovery::on_backfill_reserved()
       std::make_unique<crimson::osd::PGFacade>(
         *static_cast<crimson::osd::PG*>(pg)));
   }
+  /*auto &ss = pg->get_shard_services();
+  crimson::osd::scheduler::params_t params =
+    {1, 0, 0, crimson::osd::scheduler::scheduler_class_t::background_best_effort};
+  auto futopt = ss.try_acquire_throttle_now(std::move(params));
+  if (!futopt) {
+    logger().info(" throttling is disabled for pg {} ", *static_cast<crimson::osd::PG*>(pg));
+    backfill_state->process_event(BackfillState::Triggered{}.intrusive_from_this());
+    return;
+  }
+  std::ignore = interruptor::make_interruptible(std::move(*futopt)
+  ).then_interruptible([this] {
+    this->throttle_acquired = true;
+    logger().info(" throttling is acquired for pg {} ", *static_cast<crimson::osd::PG*>(pg));
+    backfill_state->process_event(
+      BackfillState::Triggered{}.intrusive_from_this());
+  });
+  */
   // it may be we either start a completely new backfill (first
   // event since last on_activate_complete()) or to resume already
   // (but stopped one).
+  logger().info(" Triggered backfillState event ", *static_cast<crimson::osd::PG*>(pg));
   backfill_state->process_event(
     BackfillState::Triggered{}.intrusive_from_this());
 }
index a9a48cf465ce87b7367d0eb24bd8e9ea311d05a0..3101cc7b5c74514e1b5199064ac6661746ec0994 100644 (file)
@@ -51,6 +51,8 @@ public:
     const hobject_t& obj,
     const eversion_t& v,
     const std::vector<pg_shard_t> &peers) final;
+  void release_throttle();
+  //bool throttle_acquired;
 private:
   PGRecoveryListener* pg;
   size_t start_primary_recovery_ops(
@@ -122,6 +124,7 @@ private:
   void update_peers_last_backfill(
     const hobject_t& new_last_backfill) final;
   bool budget_available() const final;
+  std::optional<seastar::future<>> acq_throttle() final;
   void backfilled() final;
   friend crimson::osd::BackfillState::PGFacade;
   friend crimson::osd::PG;
index 2fcf6d57db47fb01e39534bfb74e1a2165fe66dd..af0412e4f131192700741954133f0dbb7cc1adcf 100644 (file)
 
 namespace dmc = crimson::dmclock;
 using namespace std::placeholders;
-using namespace std::string_literals;
+using namespace std::literals;
 
 #define dout_context cct
-#define dout_subsys ceph_subsys_osd
+#define dout_subsys ceph_subsys_mclock
 #undef dout_prefix
-#define dout_prefix *_dout
+#define dout_prefix *_dout << "mClockScheduler: "
 
 
 namespace crimson::osd::scheduler {
 
-mClockScheduler::mClockScheduler(ConfigProxy &conf) :
-  scheduler(
-    std::bind(&mClockScheduler::ClientRegistry::get_info,
-             &client_registry,
-             _1),
-    dmc::AtLimit::Allow,
-    conf.get_val<double>("osd_mclock_scheduler_anticipation_timeout"))
+mClockScheduler::mClockScheduler(CephContext *cct,
+  int whoami,
+  uint32_t num_shards,
+  int shard_id,
+  bool is_rotational,
+  bool init_perfcounter)
+  : cct(cct),
+    whoami(whoami),
+    num_shards(num_shards),
+    shard_id(shard_id),
+    is_rotational(is_rotational),
+    logger(nullptr),
+    scheduler(
+      std::bind(&mClockScheduler::ClientRegistry::get_info,
+                &client_registry,
+                _1),
+      dmc::AtLimit::Wait,
+      cct->_conf.get_val<double>("osd_mclock_scheduler_anticipation_timeout"))
 {
-  conf.add_observer(this);
-  client_registry.update_from_config(conf);
+  cct->_conf.add_observer(this);
+  ceph_assert(num_shards > 0);
+  auto get_op_queue_cut_off = [&conf = cct->_conf]() {
+    if (conf.get_val<std::string>("osd_op_queue_cut_off") == "debug_random") {
+      std::random_device rd;
+      std::mt19937 random_gen(rd());
+      return (random_gen() % 2 < 1) ? CEPH_MSG_PRIO_HIGH : CEPH_MSG_PRIO_LOW;
+    } else if (conf.get_val<std::string>("osd_op_queue_cut_off") == "high") {
+      return CEPH_MSG_PRIO_HIGH;
+    } else {
+      // default / catch-all is 'low'
+      return CEPH_MSG_PRIO_LOW;
+    }
+  };
+  cutoff_priority = get_op_queue_cut_off();
+  set_osd_capacity_params_from_config();
+  set_config_defaults_from_profile();
+  client_registry.update_from_config(
+    cct->_conf, osd_bandwidth_capacity_per_shard);
+
 }
 
-void mClockScheduler::ClientRegistry::update_from_config(const ConfigProxy &conf)
+/* ClientRegistry holds the dmclock::ClientInfo configuration parameters
+ * (reservation (bytes/second), weight (unitless), limit (bytes/second))
+ * for each IO class in the OSD (client, background_recovery,
+ * background_best_effort).
+ *
+ * mclock expects limit and reservation to have units of <cost>/second
+ * (bytes/second), but osd_mclock_scheduler_client_(lim|res) are provided
+ * as ratios of the OSD's capacity.  We convert from the one to the other
+ * using the capacity_per_shard parameter.
+ *
+ * Note, mclock profile information will already have been set as a default
+ * for the osd_mclock_scheduler_client_* parameters prior to calling
+ * update_from_config -- see set_config_defaults_from_profile().
+ */
+void mClockScheduler::ClientRegistry::update_from_config(
+  const ConfigProxy &conf,
+  const double capacity_per_shard)
 {
+
+  auto get_res = [&](double res) {
+    if (res) {
+      return res * capacity_per_shard;
+    } else {
+      return default_min; // min reservation
+    }
+  };
+
+  auto get_lim = [&](double lim) {
+    if (lim) {
+      return lim * capacity_per_shard;
+    } else {
+      return default_max; // high limit
+    }
+  };
+
+  // Set external client infos
+  double res = conf.get_val<double>(
+    "osd_mclock_scheduler_client_res");
+  double lim = conf.get_val<double>(
+    "osd_mclock_scheduler_client_lim");
+  uint64_t wgt = conf.get_val<uint64_t>(
+    "osd_mclock_scheduler_client_wgt");
   default_external_client_info.update(
-    conf.get_val<double>("osd_mclock_scheduler_client_res"),
-    conf.get_val<uint64_t>("osd_mclock_scheduler_client_wgt"),
-    conf.get_val<double>("osd_mclock_scheduler_client_lim"));
+    get_res(res),
+    wgt,
+    get_lim(lim));
 
+  // Set background recovery client infos
+  res = conf.get_val<double>(
+    "osd_mclock_scheduler_background_recovery_res");
+  lim = conf.get_val<double>(
+    "osd_mclock_scheduler_background_recovery_lim");
+  wgt = conf.get_val<uint64_t>(
+    "osd_mclock_scheduler_background_recovery_wgt");
   internal_client_infos[
     static_cast<size_t>(scheduler_class_t::background_recovery)].update(
-    conf.get_val<double>("osd_mclock_scheduler_background_recovery_res"),
-    conf.get_val<uint64_t>("osd_mclock_scheduler_background_recovery_wgt"),
-    conf.get_val<double>("osd_mclock_scheduler_background_recovery_lim"));
+      get_res(res),
+      wgt,
+      get_lim(lim));
 
+  // Set background best effort client infos
+  res = conf.get_val<double>(
+    "osd_mclock_scheduler_background_best_effort_res");
+  lim = conf.get_val<double>(
+    "osd_mclock_scheduler_background_best_effort_lim");
+  wgt = conf.get_val<uint64_t>(
+    "osd_mclock_scheduler_background_best_effort_wgt");
   internal_client_infos[
     static_cast<size_t>(scheduler_class_t::background_best_effort)].update(
-    conf.get_val<double>("osd_mclock_scheduler_background_best_effort_res"),
-    conf.get_val<uint64_t>("osd_mclock_scheduler_background_best_effort_wgt"),
-    conf.get_val<double>("osd_mclock_scheduler_background_best_effort_lim"));
+      get_res(res),
+      wgt,
+      get_lim(lim));
 }
 
 const dmc::ClientInfo *mClockScheduler::ClientRegistry::get_external_client(
@@ -79,7 +162,6 @@ const dmc::ClientInfo *mClockScheduler::ClientRegistry::get_info(
   case scheduler_class_t::immediate:
     ceph_assert(0 == "Cannot schedule immediate");
     return (dmc::ClientInfo*)nullptr;
-  case scheduler_class_t::repop:
   case scheduler_class_t::client:
     return get_external_client(id.client_profile_id);
   default:
@@ -88,44 +170,306 @@ const dmc::ClientInfo *mClockScheduler::ClientRegistry::get_info(
   }
 }
 
+void mClockScheduler::set_osd_capacity_params_from_config()
+{
+  uint64_t osd_bandwidth_capacity;
+  double osd_iop_capacity;
+
+  std::tie(osd_bandwidth_capacity, osd_iop_capacity) = [&, this] {
+    if (is_rotational) {
+      return std::make_tuple(
+        cct->_conf.get_val<Option::size_t>(
+          "osd_mclock_max_sequential_bandwidth_hdd"),
+        cct->_conf.get_val<double>("osd_mclock_max_capacity_iops_hdd"));
+    } else {
+      return std::make_tuple(
+        cct->_conf.get_val<Option::size_t>(
+          "osd_mclock_max_sequential_bandwidth_ssd"),
+        cct->_conf.get_val<double>("osd_mclock_max_capacity_iops_ssd"));
+    }
+  }();
+
+  osd_bandwidth_capacity = std::max<uint64_t>(1, osd_bandwidth_capacity);
+  osd_iop_capacity = std::max<double>(1.0, osd_iop_capacity);
+
+  osd_bandwidth_cost_per_io =
+    static_cast<double>(osd_bandwidth_capacity) / osd_iop_capacity;
+  osd_bandwidth_capacity_per_shard = static_cast<double>(osd_bandwidth_capacity)
+    / static_cast<double>(num_shards);
+
+  dout(1) << __func__ << ": osd_bandwidth_cost_per_io: "
+          << std::fixed << std::setprecision(2)
+          << osd_bandwidth_cost_per_io << " bytes/io"
+          << ", osd_bandwidth_capacity_per_shard "
+          << osd_bandwidth_capacity_per_shard << " bytes/second"
+          << dendl;
+}
+
+/**
+ * profile_t
+ *
+ * mclock profile -- 3 params for each of 3 client classes
+ * 0 (min): specifies no minimum reservation
+ * 0 (max): specifies no upper limit
+ */
+struct profile_t {
+  struct client_config_t {
+    double reservation;
+    uint64_t weight;
+    double limit;
+  };
+  client_config_t client;
+  client_config_t background_recovery;
+  client_config_t background_best_effort;
+};
+
+static std::ostream &operator<<(
+  std::ostream &lhs, const profile_t::client_config_t &rhs)
+{
+  return lhs << "{res: " << rhs.reservation
+             << ", wgt: " << rhs.weight
+             << ", lim: " << rhs.limit
+             << "}";
+}
+
+static std::ostream &operator<<(std::ostream &lhs, const profile_t &rhs)
+{
+  return lhs << "[client: " << rhs.client
+             << ", background_recovery: " << rhs.background_recovery
+             << ", background_best_effort: " << rhs.background_best_effort
+             << "]";
+}
+
+void mClockScheduler::set_config_defaults_from_profile()
+{
+  // Let only a single osd shard (id:0) set the profile configs
+  if (shard_id > 0) {
+    return;
+  }
+
+  /**
+   * high_client_ops
+   *
+   * Client Allocation:
+   *   reservation: 60% | weight: 2 | limit: 0 (max) |
+   * Background Recovery Allocation:
+   *   reservation: 40% | weight: 1 | limit: 0 (max) |
+   * Background Best Effort Allocation:
+   *   reservation: 0 (min) | weight: 1 | limit: 70% |
+   */
+  static constexpr profile_t high_client_ops_profile{
+    { .6, 2,  0 },
+    { .4, 1,  0 },
+    {  0, 1, .7 }
+  };
+
+  /**
+   * high_recovery_ops
+   *
+   * Client Allocation:
+   *   reservation: 30% | weight: 1 | limit: 0 (max) |
+   * Background Recovery Allocation:
+   *   reservation: 70% | weight: 2 | limit: 0 (max) |
+   * Background Best Effort Allocation:
+   *   reservation: 0 (min) | weight: 1 | limit: 0 (max) |
+   */
+  static constexpr profile_t high_recovery_ops_profile{
+    { .3, 1, 0 },
+    { .7, 2, 0 },
+    {  0, 1, 0 }
+  };
+
+  /**
+   * balanced
+   *
+   * Client Allocation:
+   *   reservation: 50% | weight: 1 | limit: 0 (max) |
+   * Background Recovery Allocation:
+   *   reservation: 50% | weight: 1 | limit: 0 (max) |
+   * Background Best Effort Allocation:
+   *   reservation: 0 (min) | weight: 1 | limit: 90% |
+   */
+  static constexpr profile_t balanced_profile{
+    { .5, 1, 0 },
+    { .5, 1, 0 },
+    {  0, 1, .9 }
+  };
+
+  const profile_t *profile = nullptr;
+  auto mclock_profile = cct->_conf.get_val<std::string>("osd_mclock_profile");
+  if (mclock_profile == "high_client_ops") {
+    profile = &high_client_ops_profile;
+    dout(10) << "Setting high_client_ops profile " << *profile << dendl;
+  } else if (mclock_profile == "high_recovery_ops") {
+    profile = &high_recovery_ops_profile;
+    dout(10) << "Setting high_recovery_ops profile " << *profile << dendl;
+  } else if (mclock_profile == "balanced") {
+    profile = &balanced_profile;
+    dout(10) << "Setting balanced profile " << *profile << dendl;
+  } else if (mclock_profile == "custom") {
+    dout(10) << "Profile set to custom, not setting defaults" << dendl;
+    return;
+  } else {
+    derr << "Invalid mclock profile: " << mclock_profile << dendl;
+    ceph_assert("Invalid choice of mclock profile" == 0);
+    return;
+  }
+  ceph_assert(nullptr != profile);
+
+  auto set_config = [&conf = cct->_conf](const char *key, auto val) {
+    return conf.set_val_default_sync(key, std::to_string(val));
+  };
+
+  set_config("osd_mclock_scheduler_client_res", profile->client.reservation);
+  set_config("osd_mclock_scheduler_client_res", profile->client.reservation);
+  set_config("osd_mclock_scheduler_client_wgt", profile->client.weight);
+  set_config("osd_mclock_scheduler_client_lim", profile->client.limit);
+
+
+  set_config(
+    "osd_mclock_scheduler_background_recovery_res",
+    profile->background_recovery.reservation);
+  set_config(
+    "osd_mclock_scheduler_background_recovery_wgt",
+    profile->background_recovery.weight);
+  set_config(
+    "osd_mclock_scheduler_background_recovery_lim",
+    profile->background_recovery.limit);
+
+  set_config(
+    "osd_mclock_scheduler_background_best_effort_res",
+    profile->background_best_effort.reservation);
+  set_config(
+    "osd_mclock_scheduler_background_best_effort_wgt",
+    profile->background_best_effort.weight);
+  set_config(
+    "osd_mclock_scheduler_background_best_effort_lim",
+    profile->background_best_effort.limit);
+
+}
+
+uint32_t mClockScheduler::calc_scaled_cost(int item_cost)
+{
+  auto cost = static_cast<uint32_t>(
+    std::max<int>(
+      1, // ensure cost is non-zero and positive
+      item_cost));
+  auto cost_per_io = static_cast<uint32_t>(osd_bandwidth_cost_per_io);
+
+  return std::max<uint32_t>(cost, cost_per_io);
+}
+
 void mClockScheduler::dump(ceph::Formatter &f) const
 {
+  // Display queue sizes
+  f.open_object_section("queue_sizes");
+  f.dump_int("high_priority_queue", high_priority.size());
+  f.dump_int("scheduler", scheduler.request_count());
+  f.close_section();
+
+  // client map and queue tops (res, wgt, lim)
+  std::ostringstream out;
+  f.open_object_section("mClockClients");
+  f.dump_int("client_count", scheduler.client_count());
+  //out << scheduler;
+  f.dump_string("clients", out.str());
+  f.close_section();
+
+  // Display sorted queues (res, wgt, lim)
+  f.open_object_section("mClockQueues");
+  f.dump_string("queues", display_queues());
+  f.close_section();
+
+  f.open_object_section("HighPriorityQueue");
+  for (auto it = high_priority.begin();
+       it != high_priority.end(); it++) {
+    f.dump_int("priority", it->first);
+    f.dump_int("queue_size", it->second.size());
+  }
+  f.close_section();
 }
 
 void mClockScheduler::enqueue(item_t&& item)
 {
   auto id = get_scheduler_id(item);
-  auto cost = item.params.cost;
+  unsigned priority = item.get_priority();
 
+  // TODO: move this check into item, handle backwards compat
   if (scheduler_class_t::immediate == item.params.klass) {
-    immediate.push_front(std::move(item));
+    enqueue_high(immediate_class_priority, std::move(item));
+  } else if (priority >= cutoff_priority) {
+    enqueue_high(priority, std::move(item));
   } else {
+    auto cost = calc_scaled_cost(item.get_cost());
+    dout(20) << __func__ << " " << id
+             << " item_cost: " << item.get_cost()
+             << " scaled_cost: " << cost
+             << dendl;
+
+    // Add item to scheduler queue
     scheduler.add_request(
       std::move(item),
       id,
       cost);
+    //_get_mclock_counter(id);
   }
+
+ dout(10) << __func__ << " client_count: " << scheduler.client_count()
+          << " queue_sizes: [ "
+         << " high_priority_queue: " << high_priority.size()
+          << " sched: " << scheduler.request_count() << " ]"
+          << dendl;
+ dout(30) << __func__ << " mClockClients: "
+          << dendl;
+ dout(10) << __func__ << " mClockQueues: { "
+          << display_queues() << " }"
+          << dendl;
 }
 
 void mClockScheduler::enqueue_front(item_t&& item)
 {
-  immediate.push_back(std::move(item));
-  // TODO: item may not be immediate, update mclock machinery to permit
-  // putting the item back in the queue
+  unsigned priority = item.get_priority();
+
+  if (scheduler_class_t::immediate == item.params.klass) {
+    enqueue_high(immediate_class_priority, std::move(item), true);
+  } else if (priority >= cutoff_priority) {
+    enqueue_high(priority, std::move(item), true);
+  } else {
+    // mClock does not support enqueue at front, so we use
+    // the high queue with priority 0
+    enqueue_high(0, std::move(item), true);
+  }
 }
 
-item_t mClockScheduler::dequeue()
+void mClockScheduler::enqueue_high(unsigned priority,
+                                   item_t&& item,
+                                  bool front)
 {
-  if (!immediate.empty()) {
-    auto ret = std::move(immediate.back());
-    immediate.pop_back();
+  if (front) {
+    high_priority[priority].push_back(std::move(item));
+  } else {
+    high_priority[priority].push_front(std::move(item));
+  }
+}
+
+WorkItem mClockScheduler::dequeue()
+{
+  if (!high_priority.empty()) {
+    auto iter = high_priority.begin();
+    // invariant: high_priority entries are never empty
+    assert(!iter->second.empty());
+    WorkItem ret{std::move(iter->second.back())};
+    iter->second.pop_back();
+    if (iter->second.empty()) {
+      // maintain invariant, high priority entries are never empty
+      high_priority.erase(iter);
+    }
+
     return ret;
   } else {
     mclock_queue_t::PullReq result = scheduler.pull_request();
     if (result.is_future()) {
-      ceph_assert(
-       0 == "Not implemented, user would have to be able to be woken up");
-      return std::move(*(item_t*)nullptr);
+      return result.getTime();
     } else if (result.is_none()) {
       ceph_assert(
        0 == "Impossible, must have checked empty() first");
@@ -139,19 +483,33 @@ item_t mClockScheduler::dequeue()
   }
 }
 
-std::vector<std::string> mClockScheduler::get_tracked_keys() const noexcept
+std::string mClockScheduler::display_queues() const
+{
+  std::ostringstream out;
+  scheduler.display_queues(out);
+  return out.str();
+}
+
+const char** mClockScheduler::get_tracked_conf_keys() const
 {
-  return {
-    "osd_mclock_scheduler_client_res"s,
-    "osd_mclock_scheduler_client_wgt"s,
-    "osd_mclock_scheduler_client_lim"s,
-    "osd_mclock_scheduler_background_recovery_res"s,
-    "osd_mclock_scheduler_background_recovery_wgt"s,
-    "osd_mclock_scheduler_background_recovery_lim"s,
-    "osd_mclock_scheduler_background_best_effort_res"s,
-    "osd_mclock_scheduler_background_best_effort_wgt"s,
-    "osd_mclock_scheduler_background_best_effort_lim"s
+  static const char* KEYS[] = {
+    "osd_mclock_scheduler_client_res",
+    "osd_mclock_scheduler_client_wgt",
+    "osd_mclock_scheduler_client_lim",
+    "osd_mclock_scheduler_background_recovery_res",
+    "osd_mclock_scheduler_background_recovery_wgt",
+    "osd_mclock_scheduler_background_recovery_lim",
+    "osd_mclock_scheduler_background_best_effort_res",
+    "osd_mclock_scheduler_background_best_effort_wgt",
+    "osd_mclock_scheduler_background_best_effort_lim",
+    "osd_mclock_max_capacity_iops_hdd",
+    "osd_mclock_max_capacity_iops_ssd",
+    "osd_mclock_max_sequential_bandwidth_hdd",
+    "osd_mclock_max_sequential_bandwidth_ssd",
+    "osd_mclock_profile",
+    NULL
   };
+  return KEYS;
 }
 
 
@@ -159,7 +517,57 @@ void mClockScheduler::handle_conf_change(
   const ConfigProxy& conf,
   const std::set<std::string> &changed)
 {
-  client_registry.update_from_config(conf);
+  if (changed.count("osd_mclock_max_capacity_iops_hdd") ||
+      changed.count("osd_mclock_max_capacity_iops_ssd")) {
+    set_osd_capacity_params_from_config();
+    client_registry.update_from_config(
+      conf, osd_bandwidth_capacity_per_shard);
+  }
+  if (changed.count("osd_mclock_max_sequential_bandwidth_hdd") ||
+      changed.count("osd_mclock_max_sequential_bandwidth_ssd")) {
+    set_osd_capacity_params_from_config();
+    client_registry.update_from_config(
+      conf, osd_bandwidth_capacity_per_shard);
+  }
+  if (changed.count("osd_mclock_profile")) {
+    set_config_defaults_from_profile();
+    client_registry.update_from_config(
+      conf, osd_bandwidth_capacity_per_shard);
+  }
+
+  auto get_changed_key = [&changed]() -> std::optional<std::string> {
+    static const std::vector<std::string> qos_params = {
+      "osd_mclock_scheduler_client_res",
+      "osd_mclock_scheduler_client_wgt",
+      "osd_mclock_scheduler_client_lim",
+      "osd_mclock_scheduler_background_recovery_res",
+      "osd_mclock_scheduler_background_recovery_wgt",
+      "osd_mclock_scheduler_background_recovery_lim",
+      "osd_mclock_scheduler_background_best_effort_res",
+      "osd_mclock_scheduler_background_best_effort_wgt",
+      "osd_mclock_scheduler_background_best_effort_lim"
+    };
+
+    for (auto &qp : qos_params) {
+      if (changed.count(qp)) {
+        return qp;
+      }
+    }
+    return std::nullopt;
+  };
+
+  if (auto key = get_changed_key(); key.has_value()) {
+    auto mclock_profile = cct->_conf.get_val<std::string>("osd_mclock_profile");
+    if (mclock_profile == "custom") {
+      client_registry.update_from_config(
+        conf, osd_bandwidth_capacity_per_shard);
+    }
+  }
+}
+
+mClockScheduler::~mClockScheduler()
+{
+  cct->_conf.remove_observer(this);
 }
 
 }
index 16082ffd0954042207d430e5c23ebefea4bf976c..5b180272179b647fab89b3ffa9c21adbc75fb2f7 100644 (file)
@@ -15,6 +15,7 @@
 
 #pragma once
 
+#include <functional>
 #include <ostream>
 #include <map>
 #include <vector>
 #include "boost/variant.hpp"
 
 #include "dmclock/src/dmclock_server.h"
-
 #include "crimson/osd/scheduler/scheduler.h"
+#include "crimson/mon/MonClient.h"
+
 #include "common/config.h"
 #include "common/ceph_context.h"
 
 
 namespace crimson::osd::scheduler {
 
-using client_id_t = uint64_t;
-using profile_id_t = uint64_t;
+constexpr double default_min = 0.0;
+constexpr double default_max = std::numeric_limits<double>::is_iec559 ?
+  std::numeric_limits<double>::infinity() :
+  std::numeric_limits<double>::max();
 
+/**
+ * client_profile_id_t
+ *
+ * client_id - global id (client.####) for client QoS
+ * profile_id - id generated by client's QoS profile
+ *
+ * Currently (Reef and below), both members are set to
+ * 0 which ensures that all external clients share the
+ * mClock profile allocated reservation and limit
+ * bandwidth.
+ *
+ * Note: Post Reef, both members will be set to non-zero
+ * values when the distributed feature of the mClock
+ * algorithm is utilized.
+ */
 struct client_profile_id_t {
-  client_id_t client_id;
-  profile_id_t profile_id;
+  uint64_t client_id = 0;
+  uint64_t profile_id = 0;
+
+  client_profile_id_t(uint64_t _client_id, uint64_t _profile_id) :
+    client_id(_client_id),
+    profile_id(_profile_id) {}
+
+  client_profile_id_t() = default;
+
   auto operator<=>(const client_profile_id_t&) const = default;
+  friend std::ostream& operator<<(std::ostream& out,
+                                  const client_profile_id_t& client_profile) {
+    out << " client_id: " << client_profile.client_id
+        << " profile_id: " << client_profile.profile_id;
+    return out;
+  }
 };
 
-
 struct scheduler_id_t {
   scheduler_class_t class_id;
   client_profile_id_t client_profile_id;
+
   auto operator<=>(const scheduler_id_t&) const = default;
+  friend std::ostream& operator<<(std::ostream& out,
+                                  const scheduler_id_t& sched_id) {
+    out << "{ class_id: " << sched_id.class_id
+        << sched_id.client_profile_id;
+    return out << " }";
+  }
 };
 
 /**
@@ -53,6 +91,56 @@ struct scheduler_id_t {
  */
 class mClockScheduler : public Scheduler, md_config_obs_t {
 
+  crimson::common::CephContext *cct;
+  const int whoami;
+  const uint32_t num_shards;
+  const int shard_id;
+  const bool is_rotational;
+  unsigned cutoff_priority;
+  PerfCounters *logger;
+
+  /**
+   * osd_bandwidth_cost_per_io
+   *
+   * mClock expects all queued items to have a uniform expression of
+   * "cost".  However, IO devices generally have quite different capacity
+   * for sequential IO vs small random IO.  This implementation handles this
+   * by expressing all costs as a number of sequential bytes written adding
+   * additional cost for each random IO equal to osd_bandwidth_cost_per_io.
+   *
+   * Thus, an IO operation requiring a total of <size> bytes to be written
+   * accross <iops> different locations will have a cost of
+   * <size> + (osd_bandwidth_cost_per_io * <iops>) bytes.
+   *
+   * Set in set_osd_capacity_params_from_config in the constructor and upon
+   * config change.
+   *
+   * Has units bytes/io.
+   */
+  double osd_bandwidth_cost_per_io;
+
+  /**
+   * osd_bandwidth_capacity_per_shard
+   *
+   * mClock expects reservation and limit paramters to be expressed in units
+   * of cost/second -- which means bytes/second for this implementation.
+   *
+   * Rather than expecting users to compute appropriate limit and reservation
+   * values for each class of OSDs in their cluster, we instead express
+   * reservation and limit paramaters as ratios of the OSD's maxmimum capacity.
+   * osd_bandwidth_capacity_per_shard is that capacity divided by the number
+   * of shards.
+   *
+   * Set in set_osd_capacity_params_from_config in the constructor and upon
+   * config change.
+   *
+   * This value gets passed to ClientRegistry::update_from_config in order
+   * to resolve the full reservaiton and limit parameters for mclock from
+   * the configured ratios.
+   *
+   * Has units bytes/second.
+   */
+  double osd_bandwidth_capacity_per_shard;
   class ClientRegistry {
     std::array<
       crimson::dmclock::ClientInfo,
@@ -69,7 +157,16 @@ class mClockScheduler : public Scheduler, md_config_obs_t {
     const crimson::dmclock::ClientInfo *get_external_client(
       const client_profile_id_t &client) const;
   public:
-    void update_from_config(const ConfigProxy &conf);
+    /**
+     * update_from_config
+     *
+     * Sets the mclock paramaters (reservation, weight, and limit)
+     * for each class of IO (background_recovery, background_best_effort,
+     * and client).
+     */
+    void update_from_config(
+      const ConfigProxy &conf,
+      double capacity_per_shard);
     const crimson::dmclock::ClientInfo *get_info(
       const scheduler_id_t &id) const;
   } client_registry;
@@ -80,47 +177,91 @@ class mClockScheduler : public Scheduler, md_config_obs_t {
     true,
     true,
     2>;
+  using priority_t = unsigned;
+  using SubQueue = std::map<priority_t,
+       std::list<item_t>,
+       std::greater<priority_t>>;
   mclock_queue_t scheduler;
-  std::list<item_t> immediate;
+  /**
+   * high_priority
+   *
+   * Holds entries to be dequeued in strict order ahead of mClock
+   * Invariant: entries are never empty
+   */
+  SubQueue high_priority;
+  priority_t immediate_class_priority = std::numeric_limits<priority_t>::max();
 
   static scheduler_id_t get_scheduler_id(const item_t &item) {
     return scheduler_id_t{
       item.params.klass,
-       client_profile_id_t{
-       item.params.owner,
-         0
-         }
+      client_profile_id_t{
+        item.params.owner,
+        0
+      }
     };
   }
 
+  /**
+   * set_osd_capacity_params_from_config
+   *
+   * mClockScheduler uses two parameters, osd_bandwidth_cost_per_io
+   * and osd_bandwidth_capacity_per_shard, internally.  These two
+   * parameters are derived from config parameters
+   * osd_mclock_max_capacity_iops_(hdd|ssd) and
+   * osd_mclock_max_sequential_bandwidth_(hdd|ssd) as well as num_shards.
+   * Invoking set_osd_capacity_params_from_config() resets those derived
+   * params based on the current config and should be invoked any time they
+   * are modified as well as in the constructor.  See handle_conf_change().
+   */
+  void set_osd_capacity_params_from_config();
+
+  // Set the mclock related config params based on the profile
+  void set_config_defaults_from_profile();
+
 public:
-  mClockScheduler(ConfigProxy &conf);
+  mClockScheduler(CephContext *cct, int whoami, uint32_t num_shards,
+    int shard_id, bool is_rotational, bool init_perfcounter=true);
+  ~mClockScheduler() override;
+
+  /// Calculate scaled cost per item
+  uint32_t calc_scaled_cost(int cost);
+
+  // Helper method to display mclock queues
+  std::string display_queues() const;
 
   // Enqueue op in the back of the regular queue
   void enqueue(item_t &&item) final;
 
-  // Enqueue the op in the front of the regular queue
+  // Enqueue the op in the front of the high priority queue
   void enqueue_front(item_t &&item) final;
 
   // Return an op to be dispatch
-  item_t dequeue() final;
+  WorkItem dequeue() final;
 
   // Returns if the queue is empty
   bool empty() const final {
-    return immediate.empty() && scheduler.empty();
+    return scheduler.empty() && high_priority.empty();
   }
 
   // Formatted output of the queue
   void dump(ceph::Formatter &f) const final;
 
   void print(std::ostream &ostream) const final {
-    ostream << "mClockScheduler";
+    ostream << "mClockScheduer ";
+    ostream << ", cutoff=" << cutoff_priority;
   }
 
-  std::vector<std::string> get_tracked_keys() const noexcept final;
+  const char** get_tracked_conf_keys() const final;
 
   void handle_conf_change(const ConfigProxy& conf,
                          const std::set<std::string> &changed) final;
+
+  double get_cost_per_io() const {
+    return osd_bandwidth_cost_per_io;
+  }
+private:
+  // Enqueue the op to the high priority queue
+  void enqueue_high(unsigned prio, item_t &&item, bool front = false);
 };
 
 }
index c85cb388ef36e255407cd447e096fc433e0f97ba..70ca8002f9afdc53dd10e994c3b5f654e9815236 100644 (file)
@@ -66,7 +66,7 @@ class ClassedOpQueueScheduler final : public Scheduler {
     if (conf.get_val<std::string>("osd_op_queue_cut_off") == "debug_random") {
       srand(time(NULL));
       return (rand() % 2 < 1) ?
-       scheduler_class_t::repop : scheduler_class_t::immediate;
+        scheduler_class_t::repop : scheduler_class_t::immediate;
     } else if (conf.get_val<std::string>("osd_op_queue_cut_off") == "high") {
       return scheduler_class_t::immediate;
     } else {
@@ -128,7 +128,7 @@ public:
     return queue.empty();
   }
 
-  item_t dequeue() final {
+  WorkItem dequeue() final {
     return queue.dequeue();
   }
 
@@ -145,7 +145,8 @@ public:
   ~ClassedOpQueueScheduler() final {};
 };
 
-SchedulerRef make_scheduler(ConfigProxy &conf)
+SchedulerRef make_scheduler(CephContext *cct, ConfigProxy &conf, int whoami, uint32_t nshards, int sid,
+                            bool is_rotational, bool perf_cnt)
 {
   const std::string _type = conf.get_val<std::string>("osd_op_queue");
   const std::string *type = &_type;
@@ -166,7 +167,7 @@ SchedulerRef make_scheduler(ConfigProxy &conf)
        conf->osd_op_pq_min_cost
       );
   } else if (*type == "mclock_scheduler") {
-    return std::make_unique<mClockScheduler>(conf);
+    return std::make_unique<mClockScheduler>(cct, whoami, nshards, sid, is_rotational, perf_cnt);
   } else {
     ceph_assert("Invalid choice of wq" == 0);
     return std::unique_ptr<mClockScheduler>();
index a014991abcd053a95aad58deca425dbccc0a0d01..0d3c147a43e382501c5287da986b70078656ec46 100644 (file)
@@ -22,8 +22,8 @@
 namespace crimson::osd::scheduler {
 
 enum class scheduler_class_t : uint8_t {
-  background_best_effort = 0,
-  background_recovery,
+  background_recovery = 0,
+  background_best_effort,
   client,
   repop,
   immediate,
@@ -32,10 +32,10 @@ enum class scheduler_class_t : uint8_t {
 std::ostream &operator<<(std::ostream &, const scheduler_class_t &);
 
 using client_t = uint64_t;
-using cost_t = uint64_t;
 
 struct params_t {
-  cost_t cost = 1;
+  int cost = 1;
+  unsigned priority = 0;
   client_t owner;
   scheduler_class_t klass;
 };
@@ -43,8 +43,11 @@ struct params_t {
 struct item_t {
   params_t params;
   seastar::promise<> wake;
+  int get_cost() const { return params.cost; }
+  unsigned get_priority() const { return params.priority; }
 };
 
+using WorkItem = std::variant<std::monostate, item_t, double>;
 /**
  * Base interface for classes responsible for choosing
  * op processing order in the OSD.
@@ -62,7 +65,7 @@ public:
   virtual bool empty() const = 0;
 
   // Return next op to be processed
-  virtual item_t dequeue() = 0;
+  virtual WorkItem dequeue() = 0;
 
   // Dump formatted representation for the queue
   virtual void dump(ceph::Formatter &f) const = 0;
@@ -77,6 +80,7 @@ public:
 std::ostream &operator<<(std::ostream &lhs, const Scheduler &);
 using SchedulerRef = std::unique_ptr<Scheduler>;
 
-SchedulerRef make_scheduler(ConfigProxy &);
+SchedulerRef make_scheduler(CephContext *cct, ConfigProxy &, int whoami, uint32_t num_shards,
+                            int shard_id, bool is_rotational, bool perf_cnt);
 
 }
index 400976d0db9544b2de8d3ed4a00604c4c267bde8..0ae59a74e6631359d05deb4bff132dc94d556a3d 100644 (file)
@@ -209,6 +209,10 @@ public:
     PerfCounters *recoverystate_perf,
     crimson::os::FuturizedStore &store,
     OSDState& osd_state);
+
+  void initialize_scheduler(CephContext* cct, bool is_rotational) {
+    throttler.initialize_scheduler(cct, crimson::common::local_conf(), is_rotational, whoami);
+ }
 };
 
 /**
@@ -595,6 +599,7 @@ public:
   FORWARD_TO_OSD_SINGLETON(get_pool_info)
   FORWARD(with_throttle_while, with_throttle_while, local_state.throttler)
   FORWARD(try_acquire_throttle_now, try_acquire_throttle_now, local_state.throttler)
+  FORWARD(try_release_throttle, try_release_throttle, local_state.throttler)
 
   FORWARD_TO_OSD_SINGLETON(build_incremental_map_msg)
   FORWARD_TO_OSD_SINGLETON(send_incremental_map)