From: Mohit Agrawal Date: Mon, 28 Jul 2025 13:25:41 +0000 (+0530) Subject: crimson/mclock_scheduler: Support mclock for crimson X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=b011424ea2a540fd1bb8136a42c2ec22e4fdb58a;p=ceph.git crimson/mclock_scheduler: Support mclock for crimson The patch is trying to sync mclock source of crimson similar to classic osds. Currently the feature is use by crimson only for background recovery operations but later we will use it for other OSD operations also.To use the same user need to configure crimson_osd_scheduler_concurrency parameter for osd. Replace item_t with WorkItem variant to maintain similarity with classic OSD. Introduce cost and priority as part of item_t. Fixes: https://tracker.ceph.com/issues/67367 Signed-off-by: Mohit Agrawal --- diff --git a/src/crimson/osd/scheduler/mclock_scheduler.cc b/src/crimson/osd/scheduler/mclock_scheduler.cc index c9342f9dd19c..185f75ffb801 100644 --- a/src/crimson/osd/scheduler/mclock_scheduler.cc +++ b/src/crimson/osd/scheduler/mclock_scheduler.cc @@ -20,113 +20,133 @@ #include "crimson/osd/scheduler/mclock_scheduler.h" #include "common/dout.h" -namespace dmc = crimson::dmclock; -using namespace std::placeholders; -using namespace std::string_literals; - #define dout_context cct -#define dout_subsys ceph_subsys_osd +#define dout_subsys ceph_subsys_mclock #undef dout_prefix -#define dout_prefix *_dout +#define dout_prefix *_dout << "mClockScheduler: " namespace crimson::osd::scheduler { -mClockScheduler::mClockScheduler(ConfigProxy &conf) : - scheduler( - std::bind(&mClockScheduler::ClientRegistry::get_info, - &client_registry, - _1), - dmc::AtLimit::Allow, - conf.get_val("osd_mclock_scheduler_anticipation_timeout")) -{ - conf.add_observer(this); - client_registry.update_from_config(conf); -} - -void mClockScheduler::ClientRegistry::update_from_config(const ConfigProxy &conf) +uint32_t mClockScheduler::calc_scaled_cost(int item_cost) { - default_external_client_info.update( - conf.get_val("osd_mclock_scheduler_client_res"), - conf.get_val("osd_mclock_scheduler_client_wgt"), - conf.get_val("osd_mclock_scheduler_client_lim")); - - internal_client_infos[ - static_cast(scheduler_class_t::background_recovery)].update( - conf.get_val("osd_mclock_scheduler_background_recovery_res"), - conf.get_val("osd_mclock_scheduler_background_recovery_wgt"), - conf.get_val("osd_mclock_scheduler_background_recovery_lim")); - - internal_client_infos[ - static_cast(scheduler_class_t::background_best_effort)].update( - conf.get_val("osd_mclock_scheduler_background_best_effort_res"), - conf.get_val("osd_mclock_scheduler_background_best_effort_wgt"), - conf.get_val("osd_mclock_scheduler_background_best_effort_lim")); -} - -const dmc::ClientInfo *mClockScheduler::ClientRegistry::get_external_client( - const client_profile_id_t &client) const -{ - auto ret = external_client_infos.find(client); - if (ret == external_client_infos.end()) - return &default_external_client_info; - else - return &(ret->second); -} - -const dmc::ClientInfo *mClockScheduler::ClientRegistry::get_info( - const scheduler_id_t &id) const { - switch (id.class_id) { - case scheduler_class_t::immediate: - ceph_assert(0 == "Cannot schedule immediate"); - return (dmc::ClientInfo*)nullptr; - case scheduler_class_t::repop: - case scheduler_class_t::client: - return get_external_client(id.client_profile_id); - default: - ceph_assert(static_cast(id.class_id) < internal_client_infos.size()); - return &internal_client_infos[static_cast(id.class_id)]; - } + return mclock_conf.calc_scaled_cost(item_cost); } void mClockScheduler::dump(ceph::Formatter &f) const { + // Display queue sizes + f.open_object_section("queue_sizes"); + f.dump_int("high_priority_queue", high_priority.size()); + f.dump_int("scheduler", scheduler.request_count()); + f.close_section(); + + // client map and queue tops (res, wgt, lim) + std::ostringstream out; + f.open_object_section("mClockClients"); + f.dump_int("client_count", scheduler.client_count()); + //out << scheduler; + f.dump_string("clients", out.str()); + f.close_section(); + + // Display sorted queues (res, wgt, lim) + f.open_object_section("mClockQueues"); + f.dump_string("queues", display_queues()); + f.close_section(); + + f.open_object_section("HighPriorityQueue"); + for (auto it = high_priority.begin(); + it != high_priority.end(); it++) { + f.dump_int("priority", it->first); + f.dump_int("queue_size", it->second.size()); + } + f.close_section(); } void mClockScheduler::enqueue(item_t&& item) { auto id = get_scheduler_id(item); - auto cost = item.params.cost; + unsigned priority = item.get_priority(); - if (scheduler_class_t::immediate == item.params.klass) { - immediate.push_front(std::move(item)); + // TODO: move this check into item, handle backwards compat + if (SchedulerClass::immediate == item.params.klass) { + enqueue_high(immediate_class_priority, std::move(item)); + } else if (priority >= cutoff_priority) { + enqueue_high(priority, std::move(item)); } else { + auto cost = calc_scaled_cost(item.get_cost()); + dout(20) << __func__ << " " << id + << " item_cost: " << item.get_cost() + << " scaled_cost: " << cost + << dendl; + + // Add item to scheduler queue scheduler.add_request( std::move(item), id, cost); } + + dout(20) << __func__ << ": sched client_count: " << scheduler.client_count() + << " sched queue size: " << scheduler.request_count() + << dendl; + + for (auto it = high_priority.begin();it != high_priority.end(); ++it) { + dout(20) << __func__ << " high_priority[" << it->first + << "]: " << it->second.size() + << dendl; + } + + dout(30) << __func__ << " mClockClients: " + << dendl; + dout(30) << __func__ << " mClockQueues: { " + << display_queues() << " }" + << dendl; } void mClockScheduler::enqueue_front(item_t&& item) { - immediate.push_back(std::move(item)); - // TODO: item may not be immediate, update mclock machinery to permit - // putting the item back in the queue + unsigned priority = item.get_priority(); + + if (SchedulerClass::immediate == item.params.klass) { + enqueue_high(immediate_class_priority, std::move(item), true); + } else if (priority >= cutoff_priority) { + enqueue_high(priority, std::move(item), true); + } else { + // mClock does not support enqueue at front, so we use + // the high queue with priority 0 + enqueue_high(0, std::move(item), true); + } +} + +void mClockScheduler::enqueue_high(unsigned priority, + item_t&& item, + bool front) +{ + if (front) { + high_priority[priority].push_back(std::move(item)); + } else { + high_priority[priority].push_front(std::move(item)); + } } -item_t mClockScheduler::dequeue() +WorkItem mClockScheduler::dequeue() { - if (!immediate.empty()) { - auto ret = std::move(immediate.back()); - immediate.pop_back(); + if (!high_priority.empty()) { + auto iter = high_priority.begin(); + // invariant: high_priority entries are never empty + assert(!iter->second.empty()); + WorkItem ret{std::move(iter->second.back())}; + iter->second.pop_back(); + if (iter->second.empty()) { + // maintain invariant, high priority entries are never empty + high_priority.erase(iter); + } return ret; } else { mclock_queue_t::PullReq result = scheduler.pull_request(); if (result.is_future()) { - ceph_abort_msg( - "Not implemented, user would have to be able to be woken up"); - return std::move(*(item_t*)nullptr); + return result.getTime(); } else if (result.is_none()) { ceph_abort_msg( "Impossible, must have checked empty() first"); @@ -140,27 +160,29 @@ item_t mClockScheduler::dequeue() } } -std::vector mClockScheduler::get_tracked_keys() const noexcept +std::string mClockScheduler::display_queues() const { - return { - "osd_mclock_scheduler_client_res"s, - "osd_mclock_scheduler_client_wgt"s, - "osd_mclock_scheduler_client_lim"s, - "osd_mclock_scheduler_background_recovery_res"s, - "osd_mclock_scheduler_background_recovery_wgt"s, - "osd_mclock_scheduler_background_recovery_lim"s, - "osd_mclock_scheduler_background_best_effort_res"s, - "osd_mclock_scheduler_background_best_effort_wgt"s, - "osd_mclock_scheduler_background_best_effort_lim"s - }; + std::ostringstream out; + scheduler.display_queues(out); + return out.str(); } +std::vector mClockScheduler::get_tracked_keys() const noexcept +{ + return mclock_conf.get_tracked_keys(); +} + void mClockScheduler::handle_conf_change( const ConfigProxy& conf, const std::set &changed) { - client_registry.update_from_config(conf); + mclock_conf.handle_conf_change(conf, changed); +} + +mClockScheduler::~mClockScheduler() +{ + cct->_conf.remove_observer(this); } } diff --git a/src/crimson/osd/scheduler/mclock_scheduler.h b/src/crimson/osd/scheduler/mclock_scheduler.h index 7a6424d44e7c..c3f4d58f0976 100644 --- a/src/crimson/osd/scheduler/mclock_scheduler.h +++ b/src/crimson/osd/scheduler/mclock_scheduler.h @@ -25,30 +25,19 @@ #include "boost/variant.hpp" #include "dmclock/src/dmclock_server.h" - #include "crimson/osd/scheduler/scheduler.h" +#include "crimson/mon/MonClient.h" + #include "common/config.h" #include "common/ceph_context.h" +#include "common/mclock_common.h" +namespace dmc = crimson::dmclock; +using namespace std::placeholders; +using namespace std::literals; namespace crimson::osd::scheduler { -using client_id_t = uint64_t; -using profile_id_t = uint64_t; - -struct client_profile_id_t { - client_id_t client_id; - profile_id_t profile_id; - auto operator<=>(const client_profile_id_t&) const = default; -}; - - -struct scheduler_id_t { - scheduler_class_t class_id; - client_profile_id_t client_profile_id; - auto operator<=>(const scheduler_id_t&) const = default; -}; - /** * Scheduler implementation based on mclock. * @@ -56,26 +45,12 @@ struct scheduler_id_t { */ class mClockScheduler : public Scheduler, md_config_obs_t { - class ClientRegistry { - std::array< - crimson::dmclock::ClientInfo, - static_cast(scheduler_class_t::client) - > internal_client_infos = { - // Placeholder, gets replaced with configured values - crimson::dmclock::ClientInfo(1, 1, 1), - crimson::dmclock::ClientInfo(1, 1, 1) - }; + crimson::common::CephContext *cct; + unsigned cutoff_priority; + PerfCounters *logger; - crimson::dmclock::ClientInfo default_external_client_info = {1, 1, 1}; - std::map external_client_infos; - const crimson::dmclock::ClientInfo *get_external_client( - const client_profile_id_t &client) const; - public: - void update_from_config(const ConfigProxy &conf); - const crimson::dmclock::ClientInfo *get_info( - const scheduler_id_t &id) const; - } client_registry; + ClientRegistry client_registry; + MclockConfig mclock_conf; class crimson_mclock_cleaning_job_t { struct job_control_t { @@ -127,47 +102,112 @@ class mClockScheduler : public Scheduler, md_config_obs_t { true, 2, crimson_mclock_cleaning_job_t>; + using priority_t = unsigned; + using SubQueue = std::map, + std::greater>; mclock_queue_t scheduler; - std::list immediate; + /** + * high_priority + * + * Holds entries to be dequeued in strict order ahead of mClock + * Invariant: entries are never empty + */ + SubQueue high_priority; + priority_t immediate_class_priority = std::numeric_limits::max(); static scheduler_id_t get_scheduler_id(const item_t &item) { return scheduler_id_t{ item.params.klass, - client_profile_id_t{ - item.params.owner, - 0 - } + client_profile_id_t() }; } public: - mClockScheduler(ConfigProxy &conf); + template + mClockScheduler(CephContext *cct, int whoami, uint32_t num_shards, + int shard_id, bool is_rotational, + std::chrono::duration idle_age, + std::chrono::duration erase_age, + std::chrono::duration check_time, + bool init_perfcounter=true, MonClient *monc=nullptr) + : cct(cct), + logger(nullptr), + mclock_conf(cct, client_registry, num_shards, is_rotational, shard_id, whoami), + scheduler( + std::bind(&ClientRegistry::get_info, + &client_registry, + _1), + idle_age, erase_age, check_time, + dmc::AtLimit::Wait, + cct->_conf.get_val("osd_mclock_scheduler_anticipation_timeout")) + { + cct->_conf.add_observer(this); + ceph_assert(num_shards > 0); + auto get_op_queue_cut_off = [&conf = cct->_conf]() { + if (conf.get_val("osd_op_queue_cut_off") == "debug_random") { + std::random_device rd; + std::mt19937 random_gen(rd()); + return (random_gen() % 2 < 1) ? CEPH_MSG_PRIO_HIGH : CEPH_MSG_PRIO_LOW; + } else if (conf.get_val("osd_op_queue_cut_off") == "high") { + return CEPH_MSG_PRIO_HIGH; + } else { + // default / catch-all is 'low' + return CEPH_MSG_PRIO_LOW; + } + }; + cutoff_priority = get_op_queue_cut_off(); +} + mClockScheduler(CephContext *cct, int whoami, uint32_t num_shards, + int shard_id, bool is_rotational, + bool init_perfcounter=true, MonClient *monc=nullptr) : + mClockScheduler( + cct, whoami, num_shards, shard_id, is_rotational, + crimson::dmclock::standard_idle_age, + crimson::dmclock::standard_erase_age, + crimson::dmclock::standard_check_time, + init_perfcounter, monc) {} + ~mClockScheduler() override; + + /// Calculate scaled cost per item + uint32_t calc_scaled_cost(int cost); + + // Helper method to display mclock queues + std::string display_queues() const; // Enqueue op in the back of the regular queue void enqueue(item_t &&item) final; - // Enqueue the op in the front of the regular queue + // Enqueue the op in the front of the high priority queue void enqueue_front(item_t &&item) final; // Return an op to be dispatch - item_t dequeue() final; + WorkItem dequeue() final; // Returns if the queue is empty bool empty() const final { - return immediate.empty() && scheduler.empty(); + return scheduler.empty() && high_priority.empty(); } // Formatted output of the queue void dump(ceph::Formatter &f) const final; void print(std::ostream &ostream) const final { - ostream << "mClockScheduler"; + ostream << "mClockScheduer "; + ostream << ", cutoff=" << cutoff_priority; } std::vector get_tracked_keys() const noexcept final; void handle_conf_change(const ConfigProxy& conf, const std::set &changed) final; + + double get_cost_per_io() const { + return mclock_conf.get_cost_per_io(); + } +private: + // Enqueue the op to the high priority queue + void enqueue_high(unsigned prio, item_t &&item, bool front = false); }; }