From: Mohit Agrawal Date: Thu, 3 Apr 2025 05:09:48 +0000 (+0000) Subject: Merge backfill/recovery patch to mclock scheduler X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=refs%2Fheads%2Fmclock_scheduler_backfill_recovery;p=ceph.git Merge backfill/recovery patch to mclock scheduler Signed-off-by: Mohit Agrawal --- diff --git a/src/crimson/common/config_proxy.h b/src/crimson/common/config_proxy.h index fce86f2389cb1..f0dda3760e894 100644 --- a/src/crimson/common/config_proxy.h +++ b/src/crimson/common/config_proxy.h @@ -152,9 +152,15 @@ public: } }); } + int get_val(std::string_view key, std::string *val) const { return get_config().get_val(*values, key, val); } + + void set_val_default_sync(const std::string& key, const std::string& val) { + get_config().set_val_default(*values, obs_mgr, key, val); + } + template const T get_val(std::string_view key) const { return get_config().template get_val(*values, key); diff --git a/src/crimson/osd/backfill_state.cc b/src/crimson/osd/backfill_state.cc index a068c1fa8d41b..7fea4b7d58423 100644 --- a/src/crimson/osd/backfill_state.cc +++ b/src/crimson/osd/backfill_state.cc @@ -342,7 +342,6 @@ BackfillState::Enqueuing::Enqueuing(my_context ctx) do { if (!backfill_listener().budget_available()) { - DEBUGDPP("throttle failed, turning to Waiting", pg()); post_event(RequestWaiting{}); return; } else if (should_rescan_replicas(backfill_state().peer_backfill_info, diff --git a/src/crimson/osd/backfill_state.h b/src/crimson/osd/backfill_state.h index ea4b159ded92f..75129d3974501 100644 --- a/src/crimson/osd/backfill_state.h +++ b/src/crimson/osd/backfill_state.h @@ -62,8 +62,6 @@ struct BackfillState { struct SuspendBackfill : sc::event { }; - struct ThrottleAcquired : sc::event { - }; private: // internal events struct RequestPrimaryScanning : sc::event { @@ -262,7 +260,6 @@ public: sc::transition, sc::custom_reaction, sc::custom_reaction, - sc::transition, sc::transition>; explicit Waiting(my_context); sc::result react(ObjectPushed); diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc index 27511df85822c..13a7d9fc5ab85 100644 --- a/src/crimson/osd/osd.cc +++ b/src/crimson/osd/osd.cc @@ -581,6 +581,12 @@ seastar::future<> OSD::start() return _add_me_to_crush(); }).then([this] { return _add_device_class(); + }).then([this] { + return _get_device_type(); + }).then([this](auto rotational) { + return shard_services.invoke_on_all([this, rotational](auto &local_service) { + local_service.local_state.initialize_scheduler(local_service.get_cct(), rotational); + }); }).then([this] { monc->sub_want("osd_pg_creates", last_pg_create_epoch, 0); monc->sub_want("mgrmap", 0, 0); @@ -685,6 +691,24 @@ seastar::future<> OSD::_send_boot() m->metadata["osd_type"] = "crimson"; return monc->send_message(std::move(m)); } +seastar::future OSD::_get_device_type() +{ + LOG_PREFIX(OSD::_get_device_type); + if (!local_conf().get_val("osd_class_update_on_start")) { + co_return false; + } + + std::string device_class = co_await store.get_default_device_class(); + if (device_class.empty()) { + INFO("Device class is empty; "); + co_return false; + } + + bool rotational = (device_class != "ssd"); + + INFO("device_class is {} rotational: {}", device_class, rotational); + co_return rotational; +} seastar::future<> OSD::_add_device_class() { diff --git a/src/crimson/osd/osd.h b/src/crimson/osd/osd.h index 5d36f7ad8cee1..8f0c7a0dad3c3 100644 --- a/src/crimson/osd/osd.h +++ b/src/crimson/osd/osd.h @@ -189,6 +189,7 @@ private: seastar::future<> _send_boot(); seastar::future<> _add_me_to_crush(); seastar::future<> _add_device_class(); + seastar::future _get_device_type(); seastar::future<> osdmap_subscribe(version_t epoch, bool force_request); diff --git a/src/crimson/osd/osd_operation.cc b/src/crimson/osd/osd_operation.cc index 9780097097613..8fe07a8c16378 100644 --- a/src/crimson/osd/osd_operation.cc +++ b/src/crimson/osd/osd_operation.cc @@ -14,6 +14,7 @@ namespace { } } +using namespace crimson::osd::scheduler; namespace crimson::osd { void OSDOperationRegistry::do_stop() @@ -149,21 +150,44 @@ void OSDOperationRegistry::visit_ops_in_flight(std::functionempty()) { - auto item = scheduler->dequeue(); - item.wake.set_value(); - ++in_progress; - --pending; + work_item = scheduler->dequeue(); + if (auto when_ready = std::get_if(&work_item)) { + ceph::real_clock::time_point future_time = ceph::real_clock::from_double(*when_ready); + auto now = ceph::real_clock::now(); + auto wait_duration = std::chrono::duration_cast(future_time - now); + if (wait_duration.count() > 0) { + logger().info("No items ready. Retrying wake() in {} ms", wait_duration.count()); + seastar::sleep(wait_duration).then([this] { + wake(); + }); + return; + } + } + if (auto *item = std::get_if(&work_item)) { + item->wake.set_value(); + ++in_progress; + --pending; + + } } } diff --git a/src/crimson/osd/osd_operation.h b/src/crimson/osd/osd_operation.h index d702919bd6fae..fcac7c4ad6e38 100644 --- a/src/crimson/osd/osd_operation.h +++ b/src/crimson/osd/osd_operation.h @@ -334,6 +334,22 @@ public: const std::set &changed) final; void update_from_config(const ConfigProxy &conf); + bool available() const { + return !max_in_progress || in_progress < max_in_progress; + } + + template + auto with_throttle( + crimson::osd::scheduler::params_t params, + F &&f) { + if (!max_in_progress) return f(); + return acquire_throttle(params) + .then(std::forward(f)) + .finally([this] { + release_throttle(); + }); + } + template seastar::future<> with_throttle_while( BlockingEvent::Trigger&& trigger, @@ -342,18 +358,7 @@ public: with_throttle_while(std::forward(args)...), *this); } - // Returns std::nullopt if the throttle is acquired immediately, - // returns the future for the acquiring otherwise - std::optional> - try_acquire_throttle_now(crimson::osd::scheduler::params_t params) { - if (!max_in_progress || in_progress < max_in_progress) { - ++in_progress; - --pending; - return std::nullopt; - } - return acquire_throttle(params); - } - + void initialize_scheduler(CephContext* cct, ConfigProxy &conf, bool is_rotational, int whoami); private: void dump_detail(Formatter *f) const final; diff --git a/src/crimson/osd/osd_operations/background_recovery.cc b/src/crimson/osd/osd_operations/background_recovery.cc index 7f93e42d53d8e..3a2142ca83412 100644 --- a/src/crimson/osd/osd_operations/background_recovery.cc +++ b/src/crimson/osd/osd_operations/background_recovery.cc @@ -77,18 +77,19 @@ seastar::future<> BackgroundRecoveryT::start() std::chrono::milliseconds(std::lround(delay * 1000))); } return maybe_delay.then([ref, this] { - return this->template with_blocking_event( - [ref, this] (auto&& trigger) { - return ss.with_throttle_while( - std::move(trigger), - this, get_scheduler_params(), [this] { - return interruptor::with_interruption([this] { - return do_recovery(); - }, [](std::exception_ptr) { - return seastar::make_ready_future(false); - }, pg, epoch_started); - }); + return seastar::repeat([ref, this] { + return interruptor::with_interruption([this] { + return do_recovery(); + }, [](std::exception_ptr) { + return seastar::make_ready_future(false); + }, pg, epoch_started).then([](bool goon) { + if (goon) { + return seastar::stop_iteration::no; + } else { + return seastar::stop_iteration::yes; + } }); + }); }); } @@ -117,7 +118,8 @@ UrgentRecovery::do_recovery() ).then_interruptible([this] { return with_blocking_event([this] (auto&& trigger) { - return pg->get_recovery_handler()->recover_missing(trigger, soid, need); + return pg->get_recovery_handler()->recover_missing( + trigger, soid, need, false); }).then_interruptible([] { return seastar::make_ready_future(false); }); diff --git a/src/crimson/osd/osd_operations/background_recovery.h b/src/crimson/osd/osd_operations/background_recovery.h index 255a934cd4987..bbb98bd8f7cb9 100644 --- a/src/crimson/osd/osd_operations/background_recovery.h +++ b/src/crimson/osd/osd_operations/background_recovery.h @@ -38,8 +38,12 @@ protected: private: virtual void dump_detail(Formatter *f) const; crimson::osd::scheduler::params_t get_scheduler_params() const { + int cost = std::max(1, (pg->get_info().stats.stats.sum.num_bytes / pg->get_info().stats.stats.sum.num_objects)); + unsigned priority = pg->get_recovery_op_priority(); + return { - 1, // cost + cost, // cost + priority, // priority 0, // owner scheduler_class }; @@ -66,7 +70,6 @@ public: void print(std::ostream&) const final; std::tuple< - OperationThrottler::BlockingEvent, RecoveryBackend::RecoveryBlockingEvent > tracking_events; @@ -86,7 +89,6 @@ public: float delay = 0); std::tuple< - OperationThrottler::BlockingEvent, RecoveryBackend::RecoveryBlockingEvent > tracking_events; diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index b6b7b67262980..996cbfb61ccf9 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -882,9 +882,23 @@ public: int *return_code, std::vector *op_returns) const; int get_recovery_op_priority() const { - int64_t pri = 0; - get_pgpool().info.opts.get(pool_opts_t::RECOVERY_OP_PRIORITY, &pri); - return pri > 0 ? pri : crimson::common::local_conf()->osd_recovery_op_priority; + const std::string _type = this->shard_services.get_cct()->_conf.get_val("osd_op_queue"); + const std::string *type = &_type; + if (*type == "mclock_scheduler") { + if (peering_state.is_forced_recovery_or_backfill()) { + return peering_state.recovery_msg_priority_t::FORCED; + } else if (peering_state.is_undersized()) { + return peering_state.recovery_msg_priority_t::UNDERSIZED; + } else if (peering_state.is_degraded()) { + return peering_state.recovery_msg_priority_t::DEGRADED; + } else { + return peering_state.recovery_msg_priority_t::BEST_EFFORT; + } + } else { + int64_t pri = 0; + get_pgpool().info.opts.get(pool_opts_t::RECOVERY_OP_PRIORITY, &pri); + return pri > 0 ? pri : crimson::common::local_conf()->osd_recovery_op_priority; + } } seastar::future<> mark_unfound_lost(int) { // TODO: see PrimaryLogPG::mark_all_unfound_lost() diff --git a/src/crimson/osd/pg_recovery.cc b/src/crimson/osd/pg_recovery.cc index 735dd7f02e81f..071b34c3c6b20 100644 --- a/src/crimson/osd/pg_recovery.cc +++ b/src/crimson/osd/pg_recovery.cc @@ -67,6 +67,8 @@ PGRecovery::start_recovery_ops( if (max_to_start > 0) { max_to_start -= start_replica_recovery_ops(trigger, max_to_start, &started); } + using interruptor = + crimson::interruptible::interruptor; return interruptor::parallel_for_each(started, [] (auto&& ifut) { return std::move(ifut); @@ -194,10 +196,10 @@ size_t PGRecovery::start_primary_recovery_ops( auto it = missing.get_items().find(head); assert(it != missing.get_items().end()); auto head_need = it->second.need; - out->emplace_back(recover_missing(trigger, head, head_need)); + out->emplace_back(recover_missing(trigger, head, head_need, true)); ++skipped; } else { - out->emplace_back(recover_missing(trigger, soid, item.need)); + out->emplace_back(recover_missing(trigger, soid, item.need, true)); } ++started; } @@ -304,7 +306,9 @@ size_t PGRecovery::start_replica_recovery_ops( PGRecovery::interruptible_future<> PGRecovery::recover_missing( RecoveryBackend::RecoveryBlockingEvent::TriggerI& trigger, - const hobject_t &soid, eversion_t need) + const hobject_t &soid, + eversion_t need, + bool with_throttle) { logger().info("{} {} v {}", __func__, soid, need); auto [recovering, added] = pg->get_recovery_backend()->add_recovering(soid); @@ -317,7 +321,9 @@ PGRecovery::recover_missing( } else { return recovering.wait_track_blocking( trigger, - pg->get_recovery_backend()->recover_object(soid, need) + with_throttle + ? recover_object_with_throttle(soid, need) + : recover_object(soid, need) .handle_exception_interruptible( [=, this, soid = std::move(soid)] (auto e) { on_failed_recover({ pg->get_pg_whoami() }, soid, need); @@ -365,7 +371,7 @@ RecoveryBackend::interruptible_future<> PGRecovery::prep_object_replica_pushes( logger().info("{} {} v {}, new recovery", __func__, soid, need); return recovering.wait_track_blocking( trigger, - pg->get_recovery_backend()->recover_object(soid, need) + recover_object_with_throttle(soid, need) .handle_exception_interruptible( [=, this, soid = std::move(soid)] (auto e) { on_failed_recover({ pg->get_pg_whoami() }, soid, need); @@ -514,6 +520,23 @@ void PGRecovery::request_primary_scan( }); } +PGRecovery::interruptible_future<> +PGRecovery::recover_object_with_throttle( + const hobject_t &soid, + eversion_t need) +{ + crimson::osd::scheduler::params_t params = + {1, 0, 0, crimson::osd::scheduler::scheduler_class_t::background_best_effort}; + auto &ss = pg->get_shard_services(); + return ss.with_throttle( + std::move(params), + [this, soid, need] { + auto backend = pg->get_recovery_backend(); + assert(backend); + return backend->recover_object(soid, need); + }); +} + void PGRecovery::enqueue_push( const hobject_t& obj, const eversion_t& v, @@ -525,7 +548,7 @@ void PGRecovery::enqueue_push( if (!added) return; peering_state.prepare_backfill_for_missing(obj, v, peers); - std::ignore = pg->get_recovery_backend()->recover_object(obj, v).\ + std::ignore = recover_object_with_throttle(obj, v).\ handle_exception_interruptible([] (auto) { ceph_abort_msg("got exception on backfill's push"); return seastar::make_ready_future<>(); @@ -603,21 +626,8 @@ void PGRecovery::update_peers_last_backfill( bool PGRecovery::budget_available() const { - crimson::osd::scheduler::params_t params = - {1, 0, crimson::osd::scheduler::scheduler_class_t::background_best_effort}; auto &ss = pg->get_shard_services(); - auto futopt = ss.try_acquire_throttle_now(std::move(params)); - if (!futopt) { - return true; - } - std::ignore = interruptor::make_interruptible(std::move(*futopt) - ).then_interruptible([this] { - assert(!backfill_state->is_triggered()); - using BackfillState = crimson::osd::BackfillState; - backfill_state->process_event( - BackfillState::ThrottleAcquired{}.intrusive_from_this()); - }); - return false; + return ss.throttle_available(); } void PGRecovery::on_pg_clean() diff --git a/src/crimson/osd/pg_recovery.h b/src/crimson/osd/pg_recovery.h index a9a48cf465ce8..37fae278fa09f 100644 --- a/src/crimson/osd/pg_recovery.h +++ b/src/crimson/osd/pg_recovery.h @@ -25,8 +25,6 @@ class PGBackend; class PGRecovery : public crimson::osd::BackfillState::BackfillListener { public: - using interruptor = - crimson::interruptible::interruptor; template using interruptible_future = RecoveryBackend::interruptible_future; PGRecovery(PGRecoveryListener* pg) : pg(pg) {} @@ -67,7 +65,9 @@ private: } RecoveryBackend::interruptible_future<> recover_missing( RecoveryBackend::RecoveryBlockingEvent::TriggerI&, - const hobject_t &soid, eversion_t need); + const hobject_t &soid, + eversion_t need, + bool with_throttle); RecoveryBackend::interruptible_future<> prep_object_replica_deletes( RecoveryBackend::RecoveryBlockingEvent::TriggerI& trigger, const hobject_t& soid, @@ -99,6 +99,18 @@ private: friend class ReplicatedRecoveryBackend; friend class crimson::osd::UrgentRecovery; + interruptible_future<> recover_object_with_throttle( + const hobject_t &soid, + eversion_t need); + + interruptible_future<> recover_object( + const hobject_t &soid, + eversion_t need) { + auto backend = pg->get_recovery_backend(); + assert(backend); + return backend->recover_object(soid, need); + } + // backfill begin std::unique_ptr backfill_state; std::map("osd_mclock_scheduler_anticipation_timeout")) +mClockScheduler::mClockScheduler(CephContext *cct, + int whoami, + uint32_t num_shards, + int shard_id, + bool is_rotational, + bool init_perfcounter) + : cct(cct), + whoami(whoami), + num_shards(num_shards), + shard_id(shard_id), + is_rotational(is_rotational), + logger(nullptr), + scheduler( + std::bind(&mClockScheduler::ClientRegistry::get_info, + &client_registry, + _1), + dmc::AtLimit::Wait, + cct->_conf.get_val("osd_mclock_scheduler_anticipation_timeout")) { - conf.add_observer(this); - client_registry.update_from_config(conf); + cct->_conf.add_observer(this); + ceph_assert(num_shards > 0); + auto get_op_queue_cut_off = [&conf = cct->_conf]() { + if (conf.get_val("osd_op_queue_cut_off") == "debug_random") { + std::random_device rd; + std::mt19937 random_gen(rd()); + return (random_gen() % 2 < 1) ? CEPH_MSG_PRIO_HIGH : CEPH_MSG_PRIO_LOW; + } else if (conf.get_val("osd_op_queue_cut_off") == "high") { + return CEPH_MSG_PRIO_HIGH; + } else { + // default / catch-all is 'low' + return CEPH_MSG_PRIO_LOW; + } + }; + cutoff_priority = get_op_queue_cut_off(); + set_osd_capacity_params_from_config(); + set_config_defaults_from_profile(); + client_registry.update_from_config( + cct->_conf, osd_bandwidth_capacity_per_shard); + } -void mClockScheduler::ClientRegistry::update_from_config(const ConfigProxy &conf) +/* ClientRegistry holds the dmclock::ClientInfo configuration parameters + * (reservation (bytes/second), weight (unitless), limit (bytes/second)) + * for each IO class in the OSD (client, background_recovery, + * background_best_effort). + * + * mclock expects limit and reservation to have units of /second + * (bytes/second), but osd_mclock_scheduler_client_(lim|res) are provided + * as ratios of the OSD's capacity. We convert from the one to the other + * using the capacity_per_shard parameter. + * + * Note, mclock profile information will already have been set as a default + * for the osd_mclock_scheduler_client_* parameters prior to calling + * update_from_config -- see set_config_defaults_from_profile(). + */ +void mClockScheduler::ClientRegistry::update_from_config( + const ConfigProxy &conf, + const double capacity_per_shard) { + + auto get_res = [&](double res) { + if (res) { + return res * capacity_per_shard; + } else { + return default_min; // min reservation + } + }; + + auto get_lim = [&](double lim) { + if (lim) { + return lim * capacity_per_shard; + } else { + return default_max; // high limit + } + }; + + // Set external client infos + double res = conf.get_val( + "osd_mclock_scheduler_client_res"); + double lim = conf.get_val( + "osd_mclock_scheduler_client_lim"); + uint64_t wgt = conf.get_val( + "osd_mclock_scheduler_client_wgt"); default_external_client_info.update( - conf.get_val("osd_mclock_scheduler_client_res"), - conf.get_val("osd_mclock_scheduler_client_wgt"), - conf.get_val("osd_mclock_scheduler_client_lim")); + get_res(res), + wgt, + get_lim(lim)); + // Set background recovery client infos + res = conf.get_val( + "osd_mclock_scheduler_background_recovery_res"); + lim = conf.get_val( + "osd_mclock_scheduler_background_recovery_lim"); + wgt = conf.get_val( + "osd_mclock_scheduler_background_recovery_wgt"); internal_client_infos[ static_cast(scheduler_class_t::background_recovery)].update( - conf.get_val("osd_mclock_scheduler_background_recovery_res"), - conf.get_val("osd_mclock_scheduler_background_recovery_wgt"), - conf.get_val("osd_mclock_scheduler_background_recovery_lim")); + get_res(res), + wgt, + get_lim(lim)); + // Set background best effort client infos + res = conf.get_val( + "osd_mclock_scheduler_background_best_effort_res"); + lim = conf.get_val( + "osd_mclock_scheduler_background_best_effort_lim"); + wgt = conf.get_val( + "osd_mclock_scheduler_background_best_effort_wgt"); internal_client_infos[ static_cast(scheduler_class_t::background_best_effort)].update( - conf.get_val("osd_mclock_scheduler_background_best_effort_res"), - conf.get_val("osd_mclock_scheduler_background_best_effort_wgt"), - conf.get_val("osd_mclock_scheduler_background_best_effort_lim")); + get_res(res), + wgt, + get_lim(lim)); } const dmc::ClientInfo *mClockScheduler::ClientRegistry::get_external_client( @@ -79,7 +162,6 @@ const dmc::ClientInfo *mClockScheduler::ClientRegistry::get_info( case scheduler_class_t::immediate: ceph_assert(0 == "Cannot schedule immediate"); return (dmc::ClientInfo*)nullptr; - case scheduler_class_t::repop: case scheduler_class_t::client: return get_external_client(id.client_profile_id); default: @@ -88,44 +170,306 @@ const dmc::ClientInfo *mClockScheduler::ClientRegistry::get_info( } } +void mClockScheduler::set_osd_capacity_params_from_config() +{ + uint64_t osd_bandwidth_capacity; + double osd_iop_capacity; + + std::tie(osd_bandwidth_capacity, osd_iop_capacity) = [&, this] { + if (is_rotational) { + return std::make_tuple( + cct->_conf.get_val( + "osd_mclock_max_sequential_bandwidth_hdd"), + cct->_conf.get_val("osd_mclock_max_capacity_iops_hdd")); + } else { + return std::make_tuple( + cct->_conf.get_val( + "osd_mclock_max_sequential_bandwidth_ssd"), + cct->_conf.get_val("osd_mclock_max_capacity_iops_ssd")); + } + }(); + + osd_bandwidth_capacity = std::max(1, osd_bandwidth_capacity); + osd_iop_capacity = std::max(1.0, osd_iop_capacity); + + osd_bandwidth_cost_per_io = + static_cast(osd_bandwidth_capacity) / osd_iop_capacity; + osd_bandwidth_capacity_per_shard = static_cast(osd_bandwidth_capacity) + / static_cast(num_shards); + + dout(1) << __func__ << ": osd_bandwidth_cost_per_io: " + << std::fixed << std::setprecision(2) + << osd_bandwidth_cost_per_io << " bytes/io" + << ", osd_bandwidth_capacity_per_shard " + << osd_bandwidth_capacity_per_shard << " bytes/second" + << dendl; +} + +/** + * profile_t + * + * mclock profile -- 3 params for each of 3 client classes + * 0 (min): specifies no minimum reservation + * 0 (max): specifies no upper limit + */ +struct profile_t { + struct client_config_t { + double reservation; + uint64_t weight; + double limit; + }; + client_config_t client; + client_config_t background_recovery; + client_config_t background_best_effort; +}; + +static std::ostream &operator<<( + std::ostream &lhs, const profile_t::client_config_t &rhs) +{ + return lhs << "{res: " << rhs.reservation + << ", wgt: " << rhs.weight + << ", lim: " << rhs.limit + << "}"; +} + +static std::ostream &operator<<(std::ostream &lhs, const profile_t &rhs) +{ + return lhs << "[client: " << rhs.client + << ", background_recovery: " << rhs.background_recovery + << ", background_best_effort: " << rhs.background_best_effort + << "]"; +} + +void mClockScheduler::set_config_defaults_from_profile() +{ + // Let only a single osd shard (id:0) set the profile configs + if (shard_id > 0) { + return; + } + + /** + * high_client_ops + * + * Client Allocation: + * reservation: 60% | weight: 2 | limit: 0 (max) | + * Background Recovery Allocation: + * reservation: 40% | weight: 1 | limit: 0 (max) | + * Background Best Effort Allocation: + * reservation: 0 (min) | weight: 1 | limit: 70% | + */ + static constexpr profile_t high_client_ops_profile{ + { .6, 2, 0 }, + { .4, 1, 0 }, + { 0, 1, .7 } + }; + + /** + * high_recovery_ops + * + * Client Allocation: + * reservation: 30% | weight: 1 | limit: 0 (max) | + * Background Recovery Allocation: + * reservation: 70% | weight: 2 | limit: 0 (max) | + * Background Best Effort Allocation: + * reservation: 0 (min) | weight: 1 | limit: 0 (max) | + */ + static constexpr profile_t high_recovery_ops_profile{ + { .3, 1, 0 }, + { .7, 2, 0 }, + { 0, 1, 0 } + }; + + /** + * balanced + * + * Client Allocation: + * reservation: 50% | weight: 1 | limit: 0 (max) | + * Background Recovery Allocation: + * reservation: 50% | weight: 1 | limit: 0 (max) | + * Background Best Effort Allocation: + * reservation: 0 (min) | weight: 1 | limit: 90% | + */ + static constexpr profile_t balanced_profile{ + { .5, 1, 0 }, + { .5, 1, 0 }, + { 0, 1, .9 } + }; + + const profile_t *profile = nullptr; + auto mclock_profile = cct->_conf.get_val("osd_mclock_profile"); + if (mclock_profile == "high_client_ops") { + profile = &high_client_ops_profile; + dout(10) << "Setting high_client_ops profile " << *profile << dendl; + } else if (mclock_profile == "high_recovery_ops") { + profile = &high_recovery_ops_profile; + dout(10) << "Setting high_recovery_ops profile " << *profile << dendl; + } else if (mclock_profile == "balanced") { + profile = &balanced_profile; + dout(10) << "Setting balanced profile " << *profile << dendl; + } else if (mclock_profile == "custom") { + dout(10) << "Profile set to custom, not setting defaults" << dendl; + return; + } else { + derr << "Invalid mclock profile: " << mclock_profile << dendl; + ceph_assert("Invalid choice of mclock profile" == 0); + return; + } + ceph_assert(nullptr != profile); + + auto set_config = [&conf = cct->_conf](const char *key, auto val) { + return conf.set_val_default_sync(key, std::to_string(val)); + }; + + set_config("osd_mclock_scheduler_client_res", profile->client.reservation); + set_config("osd_mclock_scheduler_client_res", profile->client.reservation); + set_config("osd_mclock_scheduler_client_wgt", profile->client.weight); + set_config("osd_mclock_scheduler_client_lim", profile->client.limit); + + + set_config( + "osd_mclock_scheduler_background_recovery_res", + profile->background_recovery.reservation); + set_config( + "osd_mclock_scheduler_background_recovery_wgt", + profile->background_recovery.weight); + set_config( + "osd_mclock_scheduler_background_recovery_lim", + profile->background_recovery.limit); + + set_config( + "osd_mclock_scheduler_background_best_effort_res", + profile->background_best_effort.reservation); + set_config( + "osd_mclock_scheduler_background_best_effort_wgt", + profile->background_best_effort.weight); + set_config( + "osd_mclock_scheduler_background_best_effort_lim", + profile->background_best_effort.limit); + +} + +uint32_t mClockScheduler::calc_scaled_cost(int item_cost) +{ + auto cost = static_cast( + std::max( + 1, // ensure cost is non-zero and positive + item_cost)); + auto cost_per_io = static_cast(osd_bandwidth_cost_per_io); + + return std::max(cost, cost_per_io); +} + void mClockScheduler::dump(ceph::Formatter &f) const { + // Display queue sizes + f.open_object_section("queue_sizes"); + f.dump_int("high_priority_queue", high_priority.size()); + f.dump_int("scheduler", scheduler.request_count()); + f.close_section(); + + // client map and queue tops (res, wgt, lim) + std::ostringstream out; + f.open_object_section("mClockClients"); + f.dump_int("client_count", scheduler.client_count()); + //out << scheduler; + f.dump_string("clients", out.str()); + f.close_section(); + + // Display sorted queues (res, wgt, lim) + f.open_object_section("mClockQueues"); + f.dump_string("queues", display_queues()); + f.close_section(); + + f.open_object_section("HighPriorityQueue"); + for (auto it = high_priority.begin(); + it != high_priority.end(); it++) { + f.dump_int("priority", it->first); + f.dump_int("queue_size", it->second.size()); + } + f.close_section(); } void mClockScheduler::enqueue(item_t&& item) { auto id = get_scheduler_id(item); - auto cost = item.params.cost; + unsigned priority = item.get_priority(); + // TODO: move this check into item, handle backwards compat if (scheduler_class_t::immediate == item.params.klass) { - immediate.push_front(std::move(item)); + enqueue_high(immediate_class_priority, std::move(item)); + } else if (priority >= cutoff_priority) { + enqueue_high(priority, std::move(item)); } else { + auto cost = calc_scaled_cost(item.get_cost()); + dout(20) << __func__ << " " << id + << " item_cost: " << item.get_cost() + << " scaled_cost: " << cost + << dendl; + + // Add item to scheduler queue scheduler.add_request( std::move(item), id, cost); + //_get_mclock_counter(id); } + + dout(10) << __func__ << " client_count: " << scheduler.client_count() + << " queue_sizes: [ " + << " high_priority_queue: " << high_priority.size() + << " sched: " << scheduler.request_count() << " ]" + << dendl; + dout(30) << __func__ << " mClockClients: " + << dendl; + dout(10) << __func__ << " mClockQueues: { " + << display_queues() << " }" + << dendl; } void mClockScheduler::enqueue_front(item_t&& item) { - immediate.push_back(std::move(item)); - // TODO: item may not be immediate, update mclock machinery to permit - // putting the item back in the queue + unsigned priority = item.get_priority(); + + if (scheduler_class_t::immediate == item.params.klass) { + enqueue_high(immediate_class_priority, std::move(item), true); + } else if (priority >= cutoff_priority) { + enqueue_high(priority, std::move(item), true); + } else { + // mClock does not support enqueue at front, so we use + // the high queue with priority 0 + enqueue_high(0, std::move(item), true); + } +} + +void mClockScheduler::enqueue_high(unsigned priority, + item_t&& item, + bool front) +{ + if (front) { + high_priority[priority].push_back(std::move(item)); + } else { + high_priority[priority].push_front(std::move(item)); + } } -item_t mClockScheduler::dequeue() +WorkItem mClockScheduler::dequeue() { - if (!immediate.empty()) { - auto ret = std::move(immediate.back()); - immediate.pop_back(); + if (!high_priority.empty()) { + auto iter = high_priority.begin(); + // invariant: high_priority entries are never empty + assert(!iter->second.empty()); + WorkItem ret{std::move(iter->second.back())}; + iter->second.pop_back(); + if (iter->second.empty()) { + // maintain invariant, high priority entries are never empty + high_priority.erase(iter); + } + return ret; } else { mclock_queue_t::PullReq result = scheduler.pull_request(); if (result.is_future()) { - ceph_assert( - 0 == "Not implemented, user would have to be able to be woken up"); - return std::move(*(item_t*)nullptr); + return result.getTime(); } else if (result.is_none()) { ceph_assert( 0 == "Impossible, must have checked empty() first"); @@ -139,27 +483,110 @@ item_t mClockScheduler::dequeue() } } +std::string mClockScheduler::display_queues() const +{ + std::ostringstream out; + scheduler.display_queues(out); + return out.str(); +} + +/*const char** mClockScheduler::get_tracked_conf_keys() const +{ + static const char* KEYS[] = { + "osd_mclock_scheduler_client_res", + "osd_mclock_scheduler_client_wgt", + "osd_mclock_scheduler_client_lim", + "osd_mclock_scheduler_background_recovery_res", + "osd_mclock_scheduler_background_recovery_wgt", + "osd_mclock_scheduler_background_recovery_lim", + "osd_mclock_scheduler_background_best_effort_res", + "osd_mclock_scheduler_background_best_effort_wgt", + "osd_mclock_scheduler_background_best_effort_lim", + "osd_mclock_max_capacity_iops_hdd", + "osd_mclock_max_capacity_iops_ssd", + "osd_mclock_max_sequential_bandwidth_hdd", + "osd_mclock_max_sequential_bandwidth_ssd", + "osd_mclock_profile", + NULL + }; + return KEYS; +}*/ + std::vector mClockScheduler::get_tracked_keys() const noexcept { return { - "osd_mclock_scheduler_client_res"s, - "osd_mclock_scheduler_client_wgt"s, - "osd_mclock_scheduler_client_lim"s, - "osd_mclock_scheduler_background_recovery_res"s, - "osd_mclock_scheduler_background_recovery_wgt"s, - "osd_mclock_scheduler_background_recovery_lim"s, - "osd_mclock_scheduler_background_best_effort_res"s, - "osd_mclock_scheduler_background_best_effort_wgt"s, - "osd_mclock_scheduler_background_best_effort_lim"s - }; + "osd_mclock_scheduler_client_res", + "osd_mclock_scheduler_client_wgt", + "osd_mclock_scheduler_client_lim", + "osd_mclock_scheduler_background_recovery_res", + "osd_mclock_scheduler_background_recovery_wgt", + "osd_mclock_scheduler_background_recovery_lim", + "osd_mclock_scheduler_background_best_effort_res", + "osd_mclock_scheduler_background_best_effort_wgt", + "osd_mclock_scheduler_background_best_effort_lim", + "osd_mclock_max_capacity_iops_hdd", + "osd_mclock_max_capacity_iops_ssd", + "osd_mclock_max_sequential_bandwidth_hdd", + "osd_mclock_max_sequential_bandwidth_ssd", + "osd_mclock_profile" + }; } - void mClockScheduler::handle_conf_change( const ConfigProxy& conf, const std::set &changed) { - client_registry.update_from_config(conf); + if (changed.count("osd_mclock_max_capacity_iops_hdd") || + changed.count("osd_mclock_max_capacity_iops_ssd")) { + set_osd_capacity_params_from_config(); + client_registry.update_from_config( + conf, osd_bandwidth_capacity_per_shard); + } + if (changed.count("osd_mclock_max_sequential_bandwidth_hdd") || + changed.count("osd_mclock_max_sequential_bandwidth_ssd")) { + set_osd_capacity_params_from_config(); + client_registry.update_from_config( + conf, osd_bandwidth_capacity_per_shard); + } + if (changed.count("osd_mclock_profile")) { + set_config_defaults_from_profile(); + client_registry.update_from_config( + conf, osd_bandwidth_capacity_per_shard); + } + + auto get_changed_key = [&changed]() -> std::optional { + static const std::vector qos_params = { + "osd_mclock_scheduler_client_res", + "osd_mclock_scheduler_client_wgt", + "osd_mclock_scheduler_client_lim", + "osd_mclock_scheduler_background_recovery_res", + "osd_mclock_scheduler_background_recovery_wgt", + "osd_mclock_scheduler_background_recovery_lim", + "osd_mclock_scheduler_background_best_effort_res", + "osd_mclock_scheduler_background_best_effort_wgt", + "osd_mclock_scheduler_background_best_effort_lim" + }; + + for (auto &qp : qos_params) { + if (changed.count(qp)) { + return qp; + } + } + return std::nullopt; + }; + + if (auto key = get_changed_key(); key.has_value()) { + auto mclock_profile = cct->_conf.get_val("osd_mclock_profile"); + if (mclock_profile == "custom") { + client_registry.update_from_config( + conf, osd_bandwidth_capacity_per_shard); + } + } +} + +mClockScheduler::~mClockScheduler() +{ + cct->_conf.remove_observer(this); } } diff --git a/src/crimson/osd/scheduler/mclock_scheduler.h b/src/crimson/osd/scheduler/mclock_scheduler.h index 16082ffd09540..d8d3c324001f5 100644 --- a/src/crimson/osd/scheduler/mclock_scheduler.h +++ b/src/crimson/osd/scheduler/mclock_scheduler.h @@ -15,6 +15,7 @@ #pragma once +#include #include #include #include @@ -22,28 +23,65 @@ #include "boost/variant.hpp" #include "dmclock/src/dmclock_server.h" - #include "crimson/osd/scheduler/scheduler.h" +#include "crimson/mon/MonClient.h" + #include "common/config.h" #include "common/ceph_context.h" namespace crimson::osd::scheduler { -using client_id_t = uint64_t; -using profile_id_t = uint64_t; +constexpr double default_min = 0.0; +constexpr double default_max = std::numeric_limits::is_iec559 ? + std::numeric_limits::infinity() : + std::numeric_limits::max(); +/** + * client_profile_id_t + * + * client_id - global id (client.####) for client QoS + * profile_id - id generated by client's QoS profile + * + * Currently (Reef and below), both members are set to + * 0 which ensures that all external clients share the + * mClock profile allocated reservation and limit + * bandwidth. + * + * Note: Post Reef, both members will be set to non-zero + * values when the distributed feature of the mClock + * algorithm is utilized. + */ struct client_profile_id_t { - client_id_t client_id; - profile_id_t profile_id; + uint64_t client_id = 0; + uint64_t profile_id = 0; + + client_profile_id_t(uint64_t _client_id, uint64_t _profile_id) : + client_id(_client_id), + profile_id(_profile_id) {} + + client_profile_id_t() = default; + auto operator<=>(const client_profile_id_t&) const = default; + friend std::ostream& operator<<(std::ostream& out, + const client_profile_id_t& client_profile) { + out << " client_id: " << client_profile.client_id + << " profile_id: " << client_profile.profile_id; + return out; + } }; - struct scheduler_id_t { scheduler_class_t class_id; client_profile_id_t client_profile_id; + auto operator<=>(const scheduler_id_t&) const = default; + friend std::ostream& operator<<(std::ostream& out, + const scheduler_id_t& sched_id) { + out << "{ class_id: " << sched_id.class_id + << sched_id.client_profile_id; + return out << " }"; + } }; /** @@ -53,6 +91,56 @@ struct scheduler_id_t { */ class mClockScheduler : public Scheduler, md_config_obs_t { + crimson::common::CephContext *cct; + const int whoami; + const uint32_t num_shards; + const int shard_id; + const bool is_rotational; + unsigned cutoff_priority; + PerfCounters *logger; + + /** + * osd_bandwidth_cost_per_io + * + * mClock expects all queued items to have a uniform expression of + * "cost". However, IO devices generally have quite different capacity + * for sequential IO vs small random IO. This implementation handles this + * by expressing all costs as a number of sequential bytes written adding + * additional cost for each random IO equal to osd_bandwidth_cost_per_io. + * + * Thus, an IO operation requiring a total of bytes to be written + * accross different locations will have a cost of + * + (osd_bandwidth_cost_per_io * ) bytes. + * + * Set in set_osd_capacity_params_from_config in the constructor and upon + * config change. + * + * Has units bytes/io. + */ + double osd_bandwidth_cost_per_io; + + /** + * osd_bandwidth_capacity_per_shard + * + * mClock expects reservation and limit paramters to be expressed in units + * of cost/second -- which means bytes/second for this implementation. + * + * Rather than expecting users to compute appropriate limit and reservation + * values for each class of OSDs in their cluster, we instead express + * reservation and limit paramaters as ratios of the OSD's maxmimum capacity. + * osd_bandwidth_capacity_per_shard is that capacity divided by the number + * of shards. + * + * Set in set_osd_capacity_params_from_config in the constructor and upon + * config change. + * + * This value gets passed to ClientRegistry::update_from_config in order + * to resolve the full reservaiton and limit parameters for mclock from + * the configured ratios. + * + * Has units bytes/second. + */ + double osd_bandwidth_capacity_per_shard; class ClientRegistry { std::array< crimson::dmclock::ClientInfo, @@ -69,7 +157,16 @@ class mClockScheduler : public Scheduler, md_config_obs_t { const crimson::dmclock::ClientInfo *get_external_client( const client_profile_id_t &client) const; public: - void update_from_config(const ConfigProxy &conf); + /** + * update_from_config + * + * Sets the mclock paramaters (reservation, weight, and limit) + * for each class of IO (background_recovery, background_best_effort, + * and client). + */ + void update_from_config( + const ConfigProxy &conf, + double capacity_per_shard); const crimson::dmclock::ClientInfo *get_info( const scheduler_id_t &id) const; } client_registry; @@ -80,47 +177,91 @@ class mClockScheduler : public Scheduler, md_config_obs_t { true, true, 2>; + using priority_t = unsigned; + using SubQueue = std::map, + std::greater>; mclock_queue_t scheduler; - std::list immediate; + /** + * high_priority + * + * Holds entries to be dequeued in strict order ahead of mClock + * Invariant: entries are never empty + */ + SubQueue high_priority; + priority_t immediate_class_priority = std::numeric_limits::max(); static scheduler_id_t get_scheduler_id(const item_t &item) { return scheduler_id_t{ item.params.klass, - client_profile_id_t{ - item.params.owner, - 0 - } + client_profile_id_t{ + item.params.owner, + 0 + } }; } + /** + * set_osd_capacity_params_from_config + * + * mClockScheduler uses two parameters, osd_bandwidth_cost_per_io + * and osd_bandwidth_capacity_per_shard, internally. These two + * parameters are derived from config parameters + * osd_mclock_max_capacity_iops_(hdd|ssd) and + * osd_mclock_max_sequential_bandwidth_(hdd|ssd) as well as num_shards. + * Invoking set_osd_capacity_params_from_config() resets those derived + * params based on the current config and should be invoked any time they + * are modified as well as in the constructor. See handle_conf_change(). + */ + void set_osd_capacity_params_from_config(); + + // Set the mclock related config params based on the profile + void set_config_defaults_from_profile(); + public: - mClockScheduler(ConfigProxy &conf); + mClockScheduler(CephContext *cct, int whoami, uint32_t num_shards, + int shard_id, bool is_rotational, bool init_perfcounter=true); + ~mClockScheduler() override; + + /// Calculate scaled cost per item + uint32_t calc_scaled_cost(int cost); + + // Helper method to display mclock queues + std::string display_queues() const; // Enqueue op in the back of the regular queue void enqueue(item_t &&item) final; - // Enqueue the op in the front of the regular queue + // Enqueue the op in the front of the high priority queue void enqueue_front(item_t &&item) final; // Return an op to be dispatch - item_t dequeue() final; + WorkItem dequeue() final; // Returns if the queue is empty bool empty() const final { - return immediate.empty() && scheduler.empty(); + return scheduler.empty() && high_priority.empty(); } // Formatted output of the queue void dump(ceph::Formatter &f) const final; void print(std::ostream &ostream) const final { - ostream << "mClockScheduler"; + ostream << "mClockScheduer "; + ostream << ", cutoff=" << cutoff_priority; } std::vector get_tracked_keys() const noexcept final; void handle_conf_change(const ConfigProxy& conf, const std::set &changed) final; + + double get_cost_per_io() const { + return osd_bandwidth_cost_per_io; + } +private: + // Enqueue the op to the high priority queue + void enqueue_high(unsigned prio, item_t &&item, bool front = false); }; } diff --git a/src/crimson/osd/scheduler/scheduler.cc b/src/crimson/osd/scheduler/scheduler.cc index c85cb388ef36e..70ca8002f9afd 100644 --- a/src/crimson/osd/scheduler/scheduler.cc +++ b/src/crimson/osd/scheduler/scheduler.cc @@ -66,7 +66,7 @@ class ClassedOpQueueScheduler final : public Scheduler { if (conf.get_val("osd_op_queue_cut_off") == "debug_random") { srand(time(NULL)); return (rand() % 2 < 1) ? - scheduler_class_t::repop : scheduler_class_t::immediate; + scheduler_class_t::repop : scheduler_class_t::immediate; } else if (conf.get_val("osd_op_queue_cut_off") == "high") { return scheduler_class_t::immediate; } else { @@ -128,7 +128,7 @@ public: return queue.empty(); } - item_t dequeue() final { + WorkItem dequeue() final { return queue.dequeue(); } @@ -145,7 +145,8 @@ public: ~ClassedOpQueueScheduler() final {}; }; -SchedulerRef make_scheduler(ConfigProxy &conf) +SchedulerRef make_scheduler(CephContext *cct, ConfigProxy &conf, int whoami, uint32_t nshards, int sid, + bool is_rotational, bool perf_cnt) { const std::string _type = conf.get_val("osd_op_queue"); const std::string *type = &_type; @@ -166,7 +167,7 @@ SchedulerRef make_scheduler(ConfigProxy &conf) conf->osd_op_pq_min_cost ); } else if (*type == "mclock_scheduler") { - return std::make_unique(conf); + return std::make_unique(cct, whoami, nshards, sid, is_rotational, perf_cnt); } else { ceph_assert("Invalid choice of wq" == 0); return std::unique_ptr(); diff --git a/src/crimson/osd/scheduler/scheduler.h b/src/crimson/osd/scheduler/scheduler.h index a014991abcd05..0d3c147a43e38 100644 --- a/src/crimson/osd/scheduler/scheduler.h +++ b/src/crimson/osd/scheduler/scheduler.h @@ -22,8 +22,8 @@ namespace crimson::osd::scheduler { enum class scheduler_class_t : uint8_t { - background_best_effort = 0, - background_recovery, + background_recovery = 0, + background_best_effort, client, repop, immediate, @@ -32,10 +32,10 @@ enum class scheduler_class_t : uint8_t { std::ostream &operator<<(std::ostream &, const scheduler_class_t &); using client_t = uint64_t; -using cost_t = uint64_t; struct params_t { - cost_t cost = 1; + int cost = 1; + unsigned priority = 0; client_t owner; scheduler_class_t klass; }; @@ -43,8 +43,11 @@ struct params_t { struct item_t { params_t params; seastar::promise<> wake; + int get_cost() const { return params.cost; } + unsigned get_priority() const { return params.priority; } }; +using WorkItem = std::variant; /** * Base interface for classes responsible for choosing * op processing order in the OSD. @@ -62,7 +65,7 @@ public: virtual bool empty() const = 0; // Return next op to be processed - virtual item_t dequeue() = 0; + virtual WorkItem dequeue() = 0; // Dump formatted representation for the queue virtual void dump(ceph::Formatter &f) const = 0; @@ -77,6 +80,7 @@ public: std::ostream &operator<<(std::ostream &lhs, const Scheduler &); using SchedulerRef = std::unique_ptr; -SchedulerRef make_scheduler(ConfigProxy &); +SchedulerRef make_scheduler(CephContext *cct, ConfigProxy &, int whoami, uint32_t num_shards, + int shard_id, bool is_rotational, bool perf_cnt); } diff --git a/src/crimson/osd/shard_services.h b/src/crimson/osd/shard_services.h index 400976d0db954..caf3123b72e4b 100644 --- a/src/crimson/osd/shard_services.h +++ b/src/crimson/osd/shard_services.h @@ -209,6 +209,10 @@ public: PerfCounters *recoverystate_perf, crimson::os::FuturizedStore &store, OSDState& osd_state); + + void initialize_scheduler(CephContext* cct, bool is_rotational) { + throttler.initialize_scheduler(cct, crimson::common::local_conf(), is_rotational, whoami); + } }; /** @@ -593,8 +597,7 @@ public: } FORWARD_TO_OSD_SINGLETON(get_pool_info) - FORWARD(with_throttle_while, with_throttle_while, local_state.throttler) - FORWARD(try_acquire_throttle_now, try_acquire_throttle_now, local_state.throttler) + FORWARD(with_throttle, with_throttle, local_state.throttler) FORWARD_TO_OSD_SINGLETON(build_incremental_map_msg) FORWARD_TO_OSD_SINGLETON(send_incremental_map) @@ -618,6 +621,9 @@ public: snap_dump_reservations, snap_reserver.dump) + bool throttle_available() const { + return local_state.throttler.available(); + } auto local_update_priority( singleton_orderer_t &orderer,