From b99b27ac374515a5b1c53def84c43c77aecaab12 Mon Sep 17 00:00:00 2001 From: Mohit Agrawal Date: Wed, 26 Feb 2025 15:02:21 +0530 Subject: [PATCH] crimson: Support mclock for crimson The patch is trying to sync mclock source of crimson similar to classic osds. Currently the feature is use by crimson only for background recovery operations but later we will use it for other OSD operations also.To use the same user need to configure crimson_osd_scheduler_concurrency parameter for osd and the schedule is enabled for the recovery related operations. Signed-off-by: Mohit Agrawal --- src/crimson/common/config_proxy.h | 6 + src/crimson/osd/backfill_state.cc | 83 ++- src/crimson/osd/backfill_state.h | 7 +- src/crimson/osd/osd.cc | 24 + src/crimson/osd/osd.h | 1 + src/crimson/osd/osd_operation.cc | 40 +- src/crimson/osd/osd_operation.h | 9 +- .../osd/osd_operations/background_recovery.h | 6 +- src/crimson/osd/pg.h | 20 +- src/crimson/osd/pg_recovery.cc | 46 +- src/crimson/osd/pg_recovery.h | 3 + src/crimson/osd/scheduler/mclock_scheduler.cc | 502 ++++++++++++++++-- src/crimson/osd/scheduler/mclock_scheduler.h | 177 +++++- src/crimson/osd/scheduler/scheduler.cc | 9 +- src/crimson/osd/scheduler/scheduler.h | 16 +- src/crimson/osd/shard_services.h | 5 + 16 files changed, 857 insertions(+), 97 deletions(-) diff --git a/src/crimson/common/config_proxy.h b/src/crimson/common/config_proxy.h index fce86f2389c..f0dda3760e8 100644 --- a/src/crimson/common/config_proxy.h +++ b/src/crimson/common/config_proxy.h @@ -152,9 +152,15 @@ public: } }); } + int get_val(std::string_view key, std::string *val) const { return get_config().get_val(*values, key, val); } + + void set_val_default_sync(const std::string& key, const std::string& val) { + get_config().set_val_default(*values, obs_mgr, key, val); + } + template const T get_val(std::string_view key) const { return get_config().template get_val(*values, key); diff --git a/src/crimson/osd/backfill_state.cc b/src/crimson/osd/backfill_state.cc index a068c1fa8d4..cc6df427e1a 100644 --- a/src/crimson/osd/backfill_state.cc +++ b/src/crimson/osd/backfill_state.cc @@ -7,6 +7,7 @@ #include "common/hobject.h" #include "crimson/osd/backfill_state.h" #include "osd/osd_types_fmt.h" +#include "crimson/osd/pg_interval_interrupt_condition.h" SET_SUBSYS(osd); @@ -310,11 +311,58 @@ bool BackfillState::Enqueuing::Enqueuing::all_emptied( return local_backfill_info.empty() && replicas_emptied; } +int BackfillState::Enqueuing::process_backfill_attempt(auto & primary_bi) +{ + + LOG_PREFIX(BackfillState::Enqueuing::process_backfill_attempt); + if (!backfill_listener().budget_available()) { + DEBUGDPP("throttle failed, turning to Waiting", pg()); + post_event(RequestWaiting{}); + return 1; + } else if (should_rescan_replicas(backfill_state().peer_backfill_info, + primary_bi)) { + // Count simultaneous scans as a single op and let those complete + post_event(RequestReplicasScanning{}); + return 1; + } + + if (all_emptied(primary_bi, backfill_state().peer_backfill_info)) { + return 2; + } + // Get object within set of peers to operate on and the set of targets + // for which that object applies. + if (const hobject_t check = \ + earliest_peer_backfill(backfill_state().peer_backfill_info); + check < primary_bi.begin) { + // Don't increment ops here because deletions + // are cheap and not replied to unlike real recovery_ops, + // and we can't increment ops without requeueing ourself + // for recovery. + auto result = remove_on_peers(check); + trim_backfilled_object_from_intervals(std::move(result), + backfill_state().last_backfill_started, + backfill_state().peer_backfill_info); + backfill_listener().maybe_flush(); + } else if (!primary_bi.empty()) { + auto result = update_on_peers(check); + trim_backfilled_object_from_intervals(std::move(result), + backfill_state().last_backfill_started, + backfill_state().peer_backfill_info); + primary_bi.pop_front(); + backfill_listener().maybe_flush(); + } else { + return 2; + } + return 0; +} + BackfillState::Enqueuing::Enqueuing(my_context ctx) : my_base(ctx) { LOG_PREFIX(BackfillState::Enqueuing::Enqueuing); auto& primary_bi = backfill_state().backfill_info; + using interruptor = + crimson::interruptible::interruptor; // update our local interval to cope with recent changes primary_bi.begin = backfill_state().last_backfill_started; @@ -339,7 +387,8 @@ BackfillState::Enqueuing::Enqueuing(my_context ctx) post_event(RequestPrimaryScanning{}); return; } - + + /* do { if (!backfill_listener().budget_available()) { DEBUGDPP("throttle failed, turning to Waiting", pg()); @@ -347,7 +396,6 @@ BackfillState::Enqueuing::Enqueuing(my_context ctx) return; } else if (should_rescan_replicas(backfill_state().peer_backfill_info, primary_bi)) { - // Count simultaneous scans as a single op and let those complete post_event(RequestReplicasScanning{}); return; } @@ -355,15 +403,9 @@ BackfillState::Enqueuing::Enqueuing(my_context ctx) if (all_emptied(primary_bi, backfill_state().peer_backfill_info)) { break; } - // Get object within set of peers to operate on and the set of targets - // for which that object applies. if (const hobject_t check = \ earliest_peer_backfill(backfill_state().peer_backfill_info); check < primary_bi.begin) { - // Don't increment ops here because deletions - // are cheap and not replied to unlike real recovery_ops, - // and we can't increment ops without requeueing ourself - // for recovery. auto result = remove_on_peers(check); trim_backfilled_object_from_intervals(std::move(result), backfill_state().last_backfill_started, @@ -380,7 +422,32 @@ BackfillState::Enqueuing::Enqueuing(my_context ctx) break; } } while (!all_emptied(primary_bi, backfill_state().peer_backfill_info)); + */ + do { + seastar::logger& logger = crimson::get_logger(0); + logger.info("Calling acq_throttle "); + std::optional backfill_result; + auto futopt = backfill_listener().acq_throttle(); + if (!futopt) { + seastar::logger& logger = crimson::get_logger(0); + logger.info("throtteling future is disabled"); + process_backfill_attempt(primary_bi); + } else { + logger.info(" throttle is acquired"); + backfill_listener().throttle_acquired = true; + std::ignore = interruptor::make_interruptible(std::move(*futopt) + ).then_interruptible([this, &primary_bi, &backfill_result] { + seastar::logger& logger = crimson::get_logger(0); + logger.info("throtteling future is loaded "); + backfill_result = process_backfill_attempt(primary_bi); + backfill_listener().release_throttle(); + }); + } + if(backfill_result.has_value()) + logger.info("backfill_result value is {}", backfill_result.value()); + } while (!all_emptied(primary_bi, backfill_state().peer_backfill_info)); + if (should_rescan_primary(backfill_state().peer_backfill_info, primary_bi)) { // need to grab one another chunk of the object namespace and restart diff --git a/src/crimson/osd/backfill_state.h b/src/crimson/osd/backfill_state.h index ea4b159ded9..909d112c998 100644 --- a/src/crimson/osd/backfill_state.h +++ b/src/crimson/osd/backfill_state.h @@ -170,7 +170,7 @@ public: const PeeringFacade& peering_state, const BackfillInterval& backfill_info, const std::map& peer_backfill_info); - + int process_backfill_attempt(auto &primary_bi); private: void maybe_update_range(); void trim_backfill_infos(); @@ -320,6 +320,8 @@ public: backfill_machine.process_event(RequestDone{}); } } + //void process_backfill_attempt(auto &primary_bi); + private: struct backfill_suspend_state_t { bool suspended = false; @@ -383,6 +385,9 @@ struct BackfillState::BackfillListener { virtual void backfilled() = 0; + virtual std::optional> acq_throttle() = 0; + virtual void release_throttle() = 0; + bool throttle_acquired = false; virtual ~BackfillListener() = default; }; diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc index 27511df8582..13a7d9fc5ab 100644 --- a/src/crimson/osd/osd.cc +++ b/src/crimson/osd/osd.cc @@ -581,6 +581,12 @@ seastar::future<> OSD::start() return _add_me_to_crush(); }).then([this] { return _add_device_class(); + }).then([this] { + return _get_device_type(); + }).then([this](auto rotational) { + return shard_services.invoke_on_all([this, rotational](auto &local_service) { + local_service.local_state.initialize_scheduler(local_service.get_cct(), rotational); + }); }).then([this] { monc->sub_want("osd_pg_creates", last_pg_create_epoch, 0); monc->sub_want("mgrmap", 0, 0); @@ -685,6 +691,24 @@ seastar::future<> OSD::_send_boot() m->metadata["osd_type"] = "crimson"; return monc->send_message(std::move(m)); } +seastar::future OSD::_get_device_type() +{ + LOG_PREFIX(OSD::_get_device_type); + if (!local_conf().get_val("osd_class_update_on_start")) { + co_return false; + } + + std::string device_class = co_await store.get_default_device_class(); + if (device_class.empty()) { + INFO("Device class is empty; "); + co_return false; + } + + bool rotational = (device_class != "ssd"); + + INFO("device_class is {} rotational: {}", device_class, rotational); + co_return rotational; +} seastar::future<> OSD::_add_device_class() { diff --git a/src/crimson/osd/osd.h b/src/crimson/osd/osd.h index 5d36f7ad8ce..8f0c7a0dad3 100644 --- a/src/crimson/osd/osd.h +++ b/src/crimson/osd/osd.h @@ -189,6 +189,7 @@ private: seastar::future<> _send_boot(); seastar::future<> _add_me_to_crush(); seastar::future<> _add_device_class(); + seastar::future _get_device_type(); seastar::future<> osdmap_subscribe(version_t epoch, bool force_request); diff --git a/src/crimson/osd/osd_operation.cc b/src/crimson/osd/osd_operation.cc index 97800970976..52f2a32db5b 100644 --- a/src/crimson/osd/osd_operation.cc +++ b/src/crimson/osd/osd_operation.cc @@ -14,6 +14,7 @@ namespace { } } +using namespace crimson::osd::scheduler; namespace crimson::osd { void OSDOperationRegistry::do_stop() @@ -149,21 +150,50 @@ void OSDOperationRegistry::visit_ops_in_flight(std::functionempty()) { - auto item = scheduler->dequeue(); - item.wake.set_value(); - ++in_progress; - --pending; + work_item = scheduler->dequeue(); + if (auto when_ready = std::get_if(&work_item)) { + ceph::real_clock::time_point future_time = ceph::real_clock::from_double(*when_ready); + auto now = ceph::real_clock::now(); + auto wait_duration = std::chrono::duration_cast(future_time - now); + if (wait_duration.count() > 0) { + logger().info("No items ready. Retrying wake() in {} ms", wait_duration.count()); + seastar::sleep(wait_duration).then([this] { + wake(); + }); + return; + } + } + if (auto *item = std::get_if(&work_item)) { + logger().info("MY_LOG Processing item and setting future"); + item->wake.set_value(); + ++in_progress; + --pending; + + } + } + if (scheduler->empty()) { + logger().warn("MY_LOG wake() exited early: scheduler is empty"); } } diff --git a/src/crimson/osd/osd_operation.h b/src/crimson/osd/osd_operation.h index d702919bd6f..1df3eaeea99 100644 --- a/src/crimson/osd/osd_operation.h +++ b/src/crimson/osd/osd_operation.h @@ -346,14 +346,17 @@ public: // returns the future for the acquiring otherwise std::optional> try_acquire_throttle_now(crimson::osd::scheduler::params_t params) { - if (!max_in_progress || in_progress < max_in_progress) { - ++in_progress; - --pending; + if (!max_in_progress) { return std::nullopt; } return acquire_throttle(params); } + void try_release_throttle() { + return release_throttle(); + } + + void initialize_scheduler(CephContext* cct, ConfigProxy &conf, bool is_rotational, int whoami); private: void dump_detail(Formatter *f) const final; diff --git a/src/crimson/osd/osd_operations/background_recovery.h b/src/crimson/osd/osd_operations/background_recovery.h index 255a934cd49..d4e46075466 100644 --- a/src/crimson/osd/osd_operations/background_recovery.h +++ b/src/crimson/osd/osd_operations/background_recovery.h @@ -38,8 +38,12 @@ protected: private: virtual void dump_detail(Formatter *f) const; crimson::osd::scheduler::params_t get_scheduler_params() const { + int cost = std::max(1, (pg->get_info().stats.stats.sum.num_bytes / pg->get_info().stats.stats.sum.num_objects)); + unsigned priority = pg->get_recovery_op_priority(); + return { - 1, // cost + cost, // cost + priority, // priority 0, // owner scheduler_class }; diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index 1168c56933d..7e713ed124a 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -867,9 +867,23 @@ public: int *return_code, std::vector *op_returns) const; int get_recovery_op_priority() const { - int64_t pri = 0; - get_pgpool().info.opts.get(pool_opts_t::RECOVERY_OP_PRIORITY, &pri); - return pri > 0 ? pri : crimson::common::local_conf()->osd_recovery_op_priority; + const std::string _type = this->shard_services.get_cct()->_conf.get_val("osd_op_queue"); + const std::string *type = &_type; + if (*type == "mclock_scheduler") { + if (peering_state.is_forced_recovery_or_backfill()) { + return peering_state.recovery_msg_priority_t::FORCED; + } else if (peering_state.is_undersized()) { + return peering_state.recovery_msg_priority_t::UNDERSIZED; + } else if (peering_state.is_degraded()) { + return peering_state.recovery_msg_priority_t::DEGRADED; + } else { + return peering_state.recovery_msg_priority_t::BEST_EFFORT; + } + } else { + int64_t pri = 0; + get_pgpool().info.opts.get(pool_opts_t::RECOVERY_OP_PRIORITY, &pri); + return pri > 0 ? pri : crimson::common::local_conf()->osd_recovery_op_priority; + } } seastar::future<> mark_unfound_lost(int) { // TODO: see PrimaryLogPG::mark_all_unfound_lost() diff --git a/src/crimson/osd/pg_recovery.cc b/src/crimson/osd/pg_recovery.cc index 735dd7f02e8..a8c10dfcfdc 100644 --- a/src/crimson/osd/pg_recovery.cc +++ b/src/crimson/osd/pg_recovery.cc @@ -603,8 +603,9 @@ void PGRecovery::update_peers_last_backfill( bool PGRecovery::budget_available() const { + return true; crimson::osd::scheduler::params_t params = - {1, 0, crimson::osd::scheduler::scheduler_class_t::background_best_effort}; + {1, 0, 0, crimson::osd::scheduler::scheduler_class_t::background_best_effort}; auto &ss = pg->get_shard_services(); auto futopt = ss.try_acquire_throttle_now(std::move(params)); if (!futopt) { @@ -620,11 +621,35 @@ bool PGRecovery::budget_available() const return false; } +std::optional> PGRecovery::acq_throttle() { + auto &ss = pg->get_shard_services(); + crimson::osd::scheduler::params_t params = + {1, 0, 0, crimson::osd::scheduler::scheduler_class_t::background_best_effort}; + auto futopt = ss.try_acquire_throttle_now(std::move(params)); + if (!futopt) { + logger().info(" throttling is disabled for pg {} ", *static_cast(pg)); + return std::nullopt; + } + return futopt; +} + void PGRecovery::on_pg_clean() { backfill_state.reset(); } +void PGRecovery::release_throttle() +{ + if (this->throttle_acquired) { + seastar::logger& logger = crimson::get_logger(0); + logger.info("Calling release_throttle pg_recovery"); + + auto &ss = pg->get_shard_services(); + ss.try_release_throttle(); + this->throttle_acquired = false; + } +} + void PGRecovery::backfilled() { using LocalPeeringEvent = crimson::osd::LocalPeeringEvent; @@ -635,6 +660,7 @@ void PGRecovery::backfilled() pg->get_osdmap_epoch(), pg->get_osdmap_epoch(), PeeringState::Backfilled{}); + release_throttle(); } void PGRecovery::backfill_suspended() @@ -679,9 +705,27 @@ void PGRecovery::on_backfill_reserved() std::make_unique( *static_cast(pg))); } + /*auto &ss = pg->get_shard_services(); + crimson::osd::scheduler::params_t params = + {1, 0, 0, crimson::osd::scheduler::scheduler_class_t::background_best_effort}; + auto futopt = ss.try_acquire_throttle_now(std::move(params)); + if (!futopt) { + logger().info(" throttling is disabled for pg {} ", *static_cast(pg)); + backfill_state->process_event(BackfillState::Triggered{}.intrusive_from_this()); + return; + } + std::ignore = interruptor::make_interruptible(std::move(*futopt) + ).then_interruptible([this] { + this->throttle_acquired = true; + logger().info(" throttling is acquired for pg {} ", *static_cast(pg)); + backfill_state->process_event( + BackfillState::Triggered{}.intrusive_from_this()); + }); + */ // it may be we either start a completely new backfill (first // event since last on_activate_complete()) or to resume already // (but stopped one). + logger().info(" Triggered backfillState event ", *static_cast(pg)); backfill_state->process_event( BackfillState::Triggered{}.intrusive_from_this()); } diff --git a/src/crimson/osd/pg_recovery.h b/src/crimson/osd/pg_recovery.h index a9a48cf465c..3101cc7b5c7 100644 --- a/src/crimson/osd/pg_recovery.h +++ b/src/crimson/osd/pg_recovery.h @@ -51,6 +51,8 @@ public: const hobject_t& obj, const eversion_t& v, const std::vector &peers) final; + void release_throttle(); + //bool throttle_acquired; private: PGRecoveryListener* pg; size_t start_primary_recovery_ops( @@ -122,6 +124,7 @@ private: void update_peers_last_backfill( const hobject_t& new_last_backfill) final; bool budget_available() const final; + std::optional> acq_throttle() final; void backfilled() final; friend crimson::osd::BackfillState::PGFacade; friend crimson::osd::PG; diff --git a/src/crimson/osd/scheduler/mclock_scheduler.cc b/src/crimson/osd/scheduler/mclock_scheduler.cc index 2fcf6d57db4..af0412e4f13 100644 --- a/src/crimson/osd/scheduler/mclock_scheduler.cc +++ b/src/crimson/osd/scheduler/mclock_scheduler.cc @@ -21,46 +21,129 @@ namespace dmc = crimson::dmclock; using namespace std::placeholders; -using namespace std::string_literals; +using namespace std::literals; #define dout_context cct -#define dout_subsys ceph_subsys_osd +#define dout_subsys ceph_subsys_mclock #undef dout_prefix -#define dout_prefix *_dout +#define dout_prefix *_dout << "mClockScheduler: " namespace crimson::osd::scheduler { -mClockScheduler::mClockScheduler(ConfigProxy &conf) : - scheduler( - std::bind(&mClockScheduler::ClientRegistry::get_info, - &client_registry, - _1), - dmc::AtLimit::Allow, - conf.get_val("osd_mclock_scheduler_anticipation_timeout")) +mClockScheduler::mClockScheduler(CephContext *cct, + int whoami, + uint32_t num_shards, + int shard_id, + bool is_rotational, + bool init_perfcounter) + : cct(cct), + whoami(whoami), + num_shards(num_shards), + shard_id(shard_id), + is_rotational(is_rotational), + logger(nullptr), + scheduler( + std::bind(&mClockScheduler::ClientRegistry::get_info, + &client_registry, + _1), + dmc::AtLimit::Wait, + cct->_conf.get_val("osd_mclock_scheduler_anticipation_timeout")) { - conf.add_observer(this); - client_registry.update_from_config(conf); + cct->_conf.add_observer(this); + ceph_assert(num_shards > 0); + auto get_op_queue_cut_off = [&conf = cct->_conf]() { + if (conf.get_val("osd_op_queue_cut_off") == "debug_random") { + std::random_device rd; + std::mt19937 random_gen(rd()); + return (random_gen() % 2 < 1) ? CEPH_MSG_PRIO_HIGH : CEPH_MSG_PRIO_LOW; + } else if (conf.get_val("osd_op_queue_cut_off") == "high") { + return CEPH_MSG_PRIO_HIGH; + } else { + // default / catch-all is 'low' + return CEPH_MSG_PRIO_LOW; + } + }; + cutoff_priority = get_op_queue_cut_off(); + set_osd_capacity_params_from_config(); + set_config_defaults_from_profile(); + client_registry.update_from_config( + cct->_conf, osd_bandwidth_capacity_per_shard); + } -void mClockScheduler::ClientRegistry::update_from_config(const ConfigProxy &conf) +/* ClientRegistry holds the dmclock::ClientInfo configuration parameters + * (reservation (bytes/second), weight (unitless), limit (bytes/second)) + * for each IO class in the OSD (client, background_recovery, + * background_best_effort). + * + * mclock expects limit and reservation to have units of /second + * (bytes/second), but osd_mclock_scheduler_client_(lim|res) are provided + * as ratios of the OSD's capacity. We convert from the one to the other + * using the capacity_per_shard parameter. + * + * Note, mclock profile information will already have been set as a default + * for the osd_mclock_scheduler_client_* parameters prior to calling + * update_from_config -- see set_config_defaults_from_profile(). + */ +void mClockScheduler::ClientRegistry::update_from_config( + const ConfigProxy &conf, + const double capacity_per_shard) { + + auto get_res = [&](double res) { + if (res) { + return res * capacity_per_shard; + } else { + return default_min; // min reservation + } + }; + + auto get_lim = [&](double lim) { + if (lim) { + return lim * capacity_per_shard; + } else { + return default_max; // high limit + } + }; + + // Set external client infos + double res = conf.get_val( + "osd_mclock_scheduler_client_res"); + double lim = conf.get_val( + "osd_mclock_scheduler_client_lim"); + uint64_t wgt = conf.get_val( + "osd_mclock_scheduler_client_wgt"); default_external_client_info.update( - conf.get_val("osd_mclock_scheduler_client_res"), - conf.get_val("osd_mclock_scheduler_client_wgt"), - conf.get_val("osd_mclock_scheduler_client_lim")); + get_res(res), + wgt, + get_lim(lim)); + // Set background recovery client infos + res = conf.get_val( + "osd_mclock_scheduler_background_recovery_res"); + lim = conf.get_val( + "osd_mclock_scheduler_background_recovery_lim"); + wgt = conf.get_val( + "osd_mclock_scheduler_background_recovery_wgt"); internal_client_infos[ static_cast(scheduler_class_t::background_recovery)].update( - conf.get_val("osd_mclock_scheduler_background_recovery_res"), - conf.get_val("osd_mclock_scheduler_background_recovery_wgt"), - conf.get_val("osd_mclock_scheduler_background_recovery_lim")); + get_res(res), + wgt, + get_lim(lim)); + // Set background best effort client infos + res = conf.get_val( + "osd_mclock_scheduler_background_best_effort_res"); + lim = conf.get_val( + "osd_mclock_scheduler_background_best_effort_lim"); + wgt = conf.get_val( + "osd_mclock_scheduler_background_best_effort_wgt"); internal_client_infos[ static_cast(scheduler_class_t::background_best_effort)].update( - conf.get_val("osd_mclock_scheduler_background_best_effort_res"), - conf.get_val("osd_mclock_scheduler_background_best_effort_wgt"), - conf.get_val("osd_mclock_scheduler_background_best_effort_lim")); + get_res(res), + wgt, + get_lim(lim)); } const dmc::ClientInfo *mClockScheduler::ClientRegistry::get_external_client( @@ -79,7 +162,6 @@ const dmc::ClientInfo *mClockScheduler::ClientRegistry::get_info( case scheduler_class_t::immediate: ceph_assert(0 == "Cannot schedule immediate"); return (dmc::ClientInfo*)nullptr; - case scheduler_class_t::repop: case scheduler_class_t::client: return get_external_client(id.client_profile_id); default: @@ -88,44 +170,306 @@ const dmc::ClientInfo *mClockScheduler::ClientRegistry::get_info( } } +void mClockScheduler::set_osd_capacity_params_from_config() +{ + uint64_t osd_bandwidth_capacity; + double osd_iop_capacity; + + std::tie(osd_bandwidth_capacity, osd_iop_capacity) = [&, this] { + if (is_rotational) { + return std::make_tuple( + cct->_conf.get_val( + "osd_mclock_max_sequential_bandwidth_hdd"), + cct->_conf.get_val("osd_mclock_max_capacity_iops_hdd")); + } else { + return std::make_tuple( + cct->_conf.get_val( + "osd_mclock_max_sequential_bandwidth_ssd"), + cct->_conf.get_val("osd_mclock_max_capacity_iops_ssd")); + } + }(); + + osd_bandwidth_capacity = std::max(1, osd_bandwidth_capacity); + osd_iop_capacity = std::max(1.0, osd_iop_capacity); + + osd_bandwidth_cost_per_io = + static_cast(osd_bandwidth_capacity) / osd_iop_capacity; + osd_bandwidth_capacity_per_shard = static_cast(osd_bandwidth_capacity) + / static_cast(num_shards); + + dout(1) << __func__ << ": osd_bandwidth_cost_per_io: " + << std::fixed << std::setprecision(2) + << osd_bandwidth_cost_per_io << " bytes/io" + << ", osd_bandwidth_capacity_per_shard " + << osd_bandwidth_capacity_per_shard << " bytes/second" + << dendl; +} + +/** + * profile_t + * + * mclock profile -- 3 params for each of 3 client classes + * 0 (min): specifies no minimum reservation + * 0 (max): specifies no upper limit + */ +struct profile_t { + struct client_config_t { + double reservation; + uint64_t weight; + double limit; + }; + client_config_t client; + client_config_t background_recovery; + client_config_t background_best_effort; +}; + +static std::ostream &operator<<( + std::ostream &lhs, const profile_t::client_config_t &rhs) +{ + return lhs << "{res: " << rhs.reservation + << ", wgt: " << rhs.weight + << ", lim: " << rhs.limit + << "}"; +} + +static std::ostream &operator<<(std::ostream &lhs, const profile_t &rhs) +{ + return lhs << "[client: " << rhs.client + << ", background_recovery: " << rhs.background_recovery + << ", background_best_effort: " << rhs.background_best_effort + << "]"; +} + +void mClockScheduler::set_config_defaults_from_profile() +{ + // Let only a single osd shard (id:0) set the profile configs + if (shard_id > 0) { + return; + } + + /** + * high_client_ops + * + * Client Allocation: + * reservation: 60% | weight: 2 | limit: 0 (max) | + * Background Recovery Allocation: + * reservation: 40% | weight: 1 | limit: 0 (max) | + * Background Best Effort Allocation: + * reservation: 0 (min) | weight: 1 | limit: 70% | + */ + static constexpr profile_t high_client_ops_profile{ + { .6, 2, 0 }, + { .4, 1, 0 }, + { 0, 1, .7 } + }; + + /** + * high_recovery_ops + * + * Client Allocation: + * reservation: 30% | weight: 1 | limit: 0 (max) | + * Background Recovery Allocation: + * reservation: 70% | weight: 2 | limit: 0 (max) | + * Background Best Effort Allocation: + * reservation: 0 (min) | weight: 1 | limit: 0 (max) | + */ + static constexpr profile_t high_recovery_ops_profile{ + { .3, 1, 0 }, + { .7, 2, 0 }, + { 0, 1, 0 } + }; + + /** + * balanced + * + * Client Allocation: + * reservation: 50% | weight: 1 | limit: 0 (max) | + * Background Recovery Allocation: + * reservation: 50% | weight: 1 | limit: 0 (max) | + * Background Best Effort Allocation: + * reservation: 0 (min) | weight: 1 | limit: 90% | + */ + static constexpr profile_t balanced_profile{ + { .5, 1, 0 }, + { .5, 1, 0 }, + { 0, 1, .9 } + }; + + const profile_t *profile = nullptr; + auto mclock_profile = cct->_conf.get_val("osd_mclock_profile"); + if (mclock_profile == "high_client_ops") { + profile = &high_client_ops_profile; + dout(10) << "Setting high_client_ops profile " << *profile << dendl; + } else if (mclock_profile == "high_recovery_ops") { + profile = &high_recovery_ops_profile; + dout(10) << "Setting high_recovery_ops profile " << *profile << dendl; + } else if (mclock_profile == "balanced") { + profile = &balanced_profile; + dout(10) << "Setting balanced profile " << *profile << dendl; + } else if (mclock_profile == "custom") { + dout(10) << "Profile set to custom, not setting defaults" << dendl; + return; + } else { + derr << "Invalid mclock profile: " << mclock_profile << dendl; + ceph_assert("Invalid choice of mclock profile" == 0); + return; + } + ceph_assert(nullptr != profile); + + auto set_config = [&conf = cct->_conf](const char *key, auto val) { + return conf.set_val_default_sync(key, std::to_string(val)); + }; + + set_config("osd_mclock_scheduler_client_res", profile->client.reservation); + set_config("osd_mclock_scheduler_client_res", profile->client.reservation); + set_config("osd_mclock_scheduler_client_wgt", profile->client.weight); + set_config("osd_mclock_scheduler_client_lim", profile->client.limit); + + + set_config( + "osd_mclock_scheduler_background_recovery_res", + profile->background_recovery.reservation); + set_config( + "osd_mclock_scheduler_background_recovery_wgt", + profile->background_recovery.weight); + set_config( + "osd_mclock_scheduler_background_recovery_lim", + profile->background_recovery.limit); + + set_config( + "osd_mclock_scheduler_background_best_effort_res", + profile->background_best_effort.reservation); + set_config( + "osd_mclock_scheduler_background_best_effort_wgt", + profile->background_best_effort.weight); + set_config( + "osd_mclock_scheduler_background_best_effort_lim", + profile->background_best_effort.limit); + +} + +uint32_t mClockScheduler::calc_scaled_cost(int item_cost) +{ + auto cost = static_cast( + std::max( + 1, // ensure cost is non-zero and positive + item_cost)); + auto cost_per_io = static_cast(osd_bandwidth_cost_per_io); + + return std::max(cost, cost_per_io); +} + void mClockScheduler::dump(ceph::Formatter &f) const { + // Display queue sizes + f.open_object_section("queue_sizes"); + f.dump_int("high_priority_queue", high_priority.size()); + f.dump_int("scheduler", scheduler.request_count()); + f.close_section(); + + // client map and queue tops (res, wgt, lim) + std::ostringstream out; + f.open_object_section("mClockClients"); + f.dump_int("client_count", scheduler.client_count()); + //out << scheduler; + f.dump_string("clients", out.str()); + f.close_section(); + + // Display sorted queues (res, wgt, lim) + f.open_object_section("mClockQueues"); + f.dump_string("queues", display_queues()); + f.close_section(); + + f.open_object_section("HighPriorityQueue"); + for (auto it = high_priority.begin(); + it != high_priority.end(); it++) { + f.dump_int("priority", it->first); + f.dump_int("queue_size", it->second.size()); + } + f.close_section(); } void mClockScheduler::enqueue(item_t&& item) { auto id = get_scheduler_id(item); - auto cost = item.params.cost; + unsigned priority = item.get_priority(); + // TODO: move this check into item, handle backwards compat if (scheduler_class_t::immediate == item.params.klass) { - immediate.push_front(std::move(item)); + enqueue_high(immediate_class_priority, std::move(item)); + } else if (priority >= cutoff_priority) { + enqueue_high(priority, std::move(item)); } else { + auto cost = calc_scaled_cost(item.get_cost()); + dout(20) << __func__ << " " << id + << " item_cost: " << item.get_cost() + << " scaled_cost: " << cost + << dendl; + + // Add item to scheduler queue scheduler.add_request( std::move(item), id, cost); + //_get_mclock_counter(id); } + + dout(10) << __func__ << " client_count: " << scheduler.client_count() + << " queue_sizes: [ " + << " high_priority_queue: " << high_priority.size() + << " sched: " << scheduler.request_count() << " ]" + << dendl; + dout(30) << __func__ << " mClockClients: " + << dendl; + dout(10) << __func__ << " mClockQueues: { " + << display_queues() << " }" + << dendl; } void mClockScheduler::enqueue_front(item_t&& item) { - immediate.push_back(std::move(item)); - // TODO: item may not be immediate, update mclock machinery to permit - // putting the item back in the queue + unsigned priority = item.get_priority(); + + if (scheduler_class_t::immediate == item.params.klass) { + enqueue_high(immediate_class_priority, std::move(item), true); + } else if (priority >= cutoff_priority) { + enqueue_high(priority, std::move(item), true); + } else { + // mClock does not support enqueue at front, so we use + // the high queue with priority 0 + enqueue_high(0, std::move(item), true); + } } -item_t mClockScheduler::dequeue() +void mClockScheduler::enqueue_high(unsigned priority, + item_t&& item, + bool front) { - if (!immediate.empty()) { - auto ret = std::move(immediate.back()); - immediate.pop_back(); + if (front) { + high_priority[priority].push_back(std::move(item)); + } else { + high_priority[priority].push_front(std::move(item)); + } +} + +WorkItem mClockScheduler::dequeue() +{ + if (!high_priority.empty()) { + auto iter = high_priority.begin(); + // invariant: high_priority entries are never empty + assert(!iter->second.empty()); + WorkItem ret{std::move(iter->second.back())}; + iter->second.pop_back(); + if (iter->second.empty()) { + // maintain invariant, high priority entries are never empty + high_priority.erase(iter); + } + return ret; } else { mclock_queue_t::PullReq result = scheduler.pull_request(); if (result.is_future()) { - ceph_assert( - 0 == "Not implemented, user would have to be able to be woken up"); - return std::move(*(item_t*)nullptr); + return result.getTime(); } else if (result.is_none()) { ceph_assert( 0 == "Impossible, must have checked empty() first"); @@ -139,19 +483,33 @@ item_t mClockScheduler::dequeue() } } -std::vector mClockScheduler::get_tracked_keys() const noexcept +std::string mClockScheduler::display_queues() const +{ + std::ostringstream out; + scheduler.display_queues(out); + return out.str(); +} + +const char** mClockScheduler::get_tracked_conf_keys() const { - return { - "osd_mclock_scheduler_client_res"s, - "osd_mclock_scheduler_client_wgt"s, - "osd_mclock_scheduler_client_lim"s, - "osd_mclock_scheduler_background_recovery_res"s, - "osd_mclock_scheduler_background_recovery_wgt"s, - "osd_mclock_scheduler_background_recovery_lim"s, - "osd_mclock_scheduler_background_best_effort_res"s, - "osd_mclock_scheduler_background_best_effort_wgt"s, - "osd_mclock_scheduler_background_best_effort_lim"s + static const char* KEYS[] = { + "osd_mclock_scheduler_client_res", + "osd_mclock_scheduler_client_wgt", + "osd_mclock_scheduler_client_lim", + "osd_mclock_scheduler_background_recovery_res", + "osd_mclock_scheduler_background_recovery_wgt", + "osd_mclock_scheduler_background_recovery_lim", + "osd_mclock_scheduler_background_best_effort_res", + "osd_mclock_scheduler_background_best_effort_wgt", + "osd_mclock_scheduler_background_best_effort_lim", + "osd_mclock_max_capacity_iops_hdd", + "osd_mclock_max_capacity_iops_ssd", + "osd_mclock_max_sequential_bandwidth_hdd", + "osd_mclock_max_sequential_bandwidth_ssd", + "osd_mclock_profile", + NULL }; + return KEYS; } @@ -159,7 +517,57 @@ void mClockScheduler::handle_conf_change( const ConfigProxy& conf, const std::set &changed) { - client_registry.update_from_config(conf); + if (changed.count("osd_mclock_max_capacity_iops_hdd") || + changed.count("osd_mclock_max_capacity_iops_ssd")) { + set_osd_capacity_params_from_config(); + client_registry.update_from_config( + conf, osd_bandwidth_capacity_per_shard); + } + if (changed.count("osd_mclock_max_sequential_bandwidth_hdd") || + changed.count("osd_mclock_max_sequential_bandwidth_ssd")) { + set_osd_capacity_params_from_config(); + client_registry.update_from_config( + conf, osd_bandwidth_capacity_per_shard); + } + if (changed.count("osd_mclock_profile")) { + set_config_defaults_from_profile(); + client_registry.update_from_config( + conf, osd_bandwidth_capacity_per_shard); + } + + auto get_changed_key = [&changed]() -> std::optional { + static const std::vector qos_params = { + "osd_mclock_scheduler_client_res", + "osd_mclock_scheduler_client_wgt", + "osd_mclock_scheduler_client_lim", + "osd_mclock_scheduler_background_recovery_res", + "osd_mclock_scheduler_background_recovery_wgt", + "osd_mclock_scheduler_background_recovery_lim", + "osd_mclock_scheduler_background_best_effort_res", + "osd_mclock_scheduler_background_best_effort_wgt", + "osd_mclock_scheduler_background_best_effort_lim" + }; + + for (auto &qp : qos_params) { + if (changed.count(qp)) { + return qp; + } + } + return std::nullopt; + }; + + if (auto key = get_changed_key(); key.has_value()) { + auto mclock_profile = cct->_conf.get_val("osd_mclock_profile"); + if (mclock_profile == "custom") { + client_registry.update_from_config( + conf, osd_bandwidth_capacity_per_shard); + } + } +} + +mClockScheduler::~mClockScheduler() +{ + cct->_conf.remove_observer(this); } } diff --git a/src/crimson/osd/scheduler/mclock_scheduler.h b/src/crimson/osd/scheduler/mclock_scheduler.h index 16082ffd095..5b180272179 100644 --- a/src/crimson/osd/scheduler/mclock_scheduler.h +++ b/src/crimson/osd/scheduler/mclock_scheduler.h @@ -15,6 +15,7 @@ #pragma once +#include #include #include #include @@ -22,28 +23,65 @@ #include "boost/variant.hpp" #include "dmclock/src/dmclock_server.h" - #include "crimson/osd/scheduler/scheduler.h" +#include "crimson/mon/MonClient.h" + #include "common/config.h" #include "common/ceph_context.h" namespace crimson::osd::scheduler { -using client_id_t = uint64_t; -using profile_id_t = uint64_t; +constexpr double default_min = 0.0; +constexpr double default_max = std::numeric_limits::is_iec559 ? + std::numeric_limits::infinity() : + std::numeric_limits::max(); +/** + * client_profile_id_t + * + * client_id - global id (client.####) for client QoS + * profile_id - id generated by client's QoS profile + * + * Currently (Reef and below), both members are set to + * 0 which ensures that all external clients share the + * mClock profile allocated reservation and limit + * bandwidth. + * + * Note: Post Reef, both members will be set to non-zero + * values when the distributed feature of the mClock + * algorithm is utilized. + */ struct client_profile_id_t { - client_id_t client_id; - profile_id_t profile_id; + uint64_t client_id = 0; + uint64_t profile_id = 0; + + client_profile_id_t(uint64_t _client_id, uint64_t _profile_id) : + client_id(_client_id), + profile_id(_profile_id) {} + + client_profile_id_t() = default; + auto operator<=>(const client_profile_id_t&) const = default; + friend std::ostream& operator<<(std::ostream& out, + const client_profile_id_t& client_profile) { + out << " client_id: " << client_profile.client_id + << " profile_id: " << client_profile.profile_id; + return out; + } }; - struct scheduler_id_t { scheduler_class_t class_id; client_profile_id_t client_profile_id; + auto operator<=>(const scheduler_id_t&) const = default; + friend std::ostream& operator<<(std::ostream& out, + const scheduler_id_t& sched_id) { + out << "{ class_id: " << sched_id.class_id + << sched_id.client_profile_id; + return out << " }"; + } }; /** @@ -53,6 +91,56 @@ struct scheduler_id_t { */ class mClockScheduler : public Scheduler, md_config_obs_t { + crimson::common::CephContext *cct; + const int whoami; + const uint32_t num_shards; + const int shard_id; + const bool is_rotational; + unsigned cutoff_priority; + PerfCounters *logger; + + /** + * osd_bandwidth_cost_per_io + * + * mClock expects all queued items to have a uniform expression of + * "cost". However, IO devices generally have quite different capacity + * for sequential IO vs small random IO. This implementation handles this + * by expressing all costs as a number of sequential bytes written adding + * additional cost for each random IO equal to osd_bandwidth_cost_per_io. + * + * Thus, an IO operation requiring a total of bytes to be written + * accross different locations will have a cost of + * + (osd_bandwidth_cost_per_io * ) bytes. + * + * Set in set_osd_capacity_params_from_config in the constructor and upon + * config change. + * + * Has units bytes/io. + */ + double osd_bandwidth_cost_per_io; + + /** + * osd_bandwidth_capacity_per_shard + * + * mClock expects reservation and limit paramters to be expressed in units + * of cost/second -- which means bytes/second for this implementation. + * + * Rather than expecting users to compute appropriate limit and reservation + * values for each class of OSDs in their cluster, we instead express + * reservation and limit paramaters as ratios of the OSD's maxmimum capacity. + * osd_bandwidth_capacity_per_shard is that capacity divided by the number + * of shards. + * + * Set in set_osd_capacity_params_from_config in the constructor and upon + * config change. + * + * This value gets passed to ClientRegistry::update_from_config in order + * to resolve the full reservaiton and limit parameters for mclock from + * the configured ratios. + * + * Has units bytes/second. + */ + double osd_bandwidth_capacity_per_shard; class ClientRegistry { std::array< crimson::dmclock::ClientInfo, @@ -69,7 +157,16 @@ class mClockScheduler : public Scheduler, md_config_obs_t { const crimson::dmclock::ClientInfo *get_external_client( const client_profile_id_t &client) const; public: - void update_from_config(const ConfigProxy &conf); + /** + * update_from_config + * + * Sets the mclock paramaters (reservation, weight, and limit) + * for each class of IO (background_recovery, background_best_effort, + * and client). + */ + void update_from_config( + const ConfigProxy &conf, + double capacity_per_shard); const crimson::dmclock::ClientInfo *get_info( const scheduler_id_t &id) const; } client_registry; @@ -80,47 +177,91 @@ class mClockScheduler : public Scheduler, md_config_obs_t { true, true, 2>; + using priority_t = unsigned; + using SubQueue = std::map, + std::greater>; mclock_queue_t scheduler; - std::list immediate; + /** + * high_priority + * + * Holds entries to be dequeued in strict order ahead of mClock + * Invariant: entries are never empty + */ + SubQueue high_priority; + priority_t immediate_class_priority = std::numeric_limits::max(); static scheduler_id_t get_scheduler_id(const item_t &item) { return scheduler_id_t{ item.params.klass, - client_profile_id_t{ - item.params.owner, - 0 - } + client_profile_id_t{ + item.params.owner, + 0 + } }; } + /** + * set_osd_capacity_params_from_config + * + * mClockScheduler uses two parameters, osd_bandwidth_cost_per_io + * and osd_bandwidth_capacity_per_shard, internally. These two + * parameters are derived from config parameters + * osd_mclock_max_capacity_iops_(hdd|ssd) and + * osd_mclock_max_sequential_bandwidth_(hdd|ssd) as well as num_shards. + * Invoking set_osd_capacity_params_from_config() resets those derived + * params based on the current config and should be invoked any time they + * are modified as well as in the constructor. See handle_conf_change(). + */ + void set_osd_capacity_params_from_config(); + + // Set the mclock related config params based on the profile + void set_config_defaults_from_profile(); + public: - mClockScheduler(ConfigProxy &conf); + mClockScheduler(CephContext *cct, int whoami, uint32_t num_shards, + int shard_id, bool is_rotational, bool init_perfcounter=true); + ~mClockScheduler() override; + + /// Calculate scaled cost per item + uint32_t calc_scaled_cost(int cost); + + // Helper method to display mclock queues + std::string display_queues() const; // Enqueue op in the back of the regular queue void enqueue(item_t &&item) final; - // Enqueue the op in the front of the regular queue + // Enqueue the op in the front of the high priority queue void enqueue_front(item_t &&item) final; // Return an op to be dispatch - item_t dequeue() final; + WorkItem dequeue() final; // Returns if the queue is empty bool empty() const final { - return immediate.empty() && scheduler.empty(); + return scheduler.empty() && high_priority.empty(); } // Formatted output of the queue void dump(ceph::Formatter &f) const final; void print(std::ostream &ostream) const final { - ostream << "mClockScheduler"; + ostream << "mClockScheduer "; + ostream << ", cutoff=" << cutoff_priority; } - std::vector get_tracked_keys() const noexcept final; + const char** get_tracked_conf_keys() const final; void handle_conf_change(const ConfigProxy& conf, const std::set &changed) final; + + double get_cost_per_io() const { + return osd_bandwidth_cost_per_io; + } +private: + // Enqueue the op to the high priority queue + void enqueue_high(unsigned prio, item_t &&item, bool front = false); }; } diff --git a/src/crimson/osd/scheduler/scheduler.cc b/src/crimson/osd/scheduler/scheduler.cc index c85cb388ef3..70ca8002f9a 100644 --- a/src/crimson/osd/scheduler/scheduler.cc +++ b/src/crimson/osd/scheduler/scheduler.cc @@ -66,7 +66,7 @@ class ClassedOpQueueScheduler final : public Scheduler { if (conf.get_val("osd_op_queue_cut_off") == "debug_random") { srand(time(NULL)); return (rand() % 2 < 1) ? - scheduler_class_t::repop : scheduler_class_t::immediate; + scheduler_class_t::repop : scheduler_class_t::immediate; } else if (conf.get_val("osd_op_queue_cut_off") == "high") { return scheduler_class_t::immediate; } else { @@ -128,7 +128,7 @@ public: return queue.empty(); } - item_t dequeue() final { + WorkItem dequeue() final { return queue.dequeue(); } @@ -145,7 +145,8 @@ public: ~ClassedOpQueueScheduler() final {}; }; -SchedulerRef make_scheduler(ConfigProxy &conf) +SchedulerRef make_scheduler(CephContext *cct, ConfigProxy &conf, int whoami, uint32_t nshards, int sid, + bool is_rotational, bool perf_cnt) { const std::string _type = conf.get_val("osd_op_queue"); const std::string *type = &_type; @@ -166,7 +167,7 @@ SchedulerRef make_scheduler(ConfigProxy &conf) conf->osd_op_pq_min_cost ); } else if (*type == "mclock_scheduler") { - return std::make_unique(conf); + return std::make_unique(cct, whoami, nshards, sid, is_rotational, perf_cnt); } else { ceph_assert("Invalid choice of wq" == 0); return std::unique_ptr(); diff --git a/src/crimson/osd/scheduler/scheduler.h b/src/crimson/osd/scheduler/scheduler.h index a014991abcd..0d3c147a43e 100644 --- a/src/crimson/osd/scheduler/scheduler.h +++ b/src/crimson/osd/scheduler/scheduler.h @@ -22,8 +22,8 @@ namespace crimson::osd::scheduler { enum class scheduler_class_t : uint8_t { - background_best_effort = 0, - background_recovery, + background_recovery = 0, + background_best_effort, client, repop, immediate, @@ -32,10 +32,10 @@ enum class scheduler_class_t : uint8_t { std::ostream &operator<<(std::ostream &, const scheduler_class_t &); using client_t = uint64_t; -using cost_t = uint64_t; struct params_t { - cost_t cost = 1; + int cost = 1; + unsigned priority = 0; client_t owner; scheduler_class_t klass; }; @@ -43,8 +43,11 @@ struct params_t { struct item_t { params_t params; seastar::promise<> wake; + int get_cost() const { return params.cost; } + unsigned get_priority() const { return params.priority; } }; +using WorkItem = std::variant; /** * Base interface for classes responsible for choosing * op processing order in the OSD. @@ -62,7 +65,7 @@ public: virtual bool empty() const = 0; // Return next op to be processed - virtual item_t dequeue() = 0; + virtual WorkItem dequeue() = 0; // Dump formatted representation for the queue virtual void dump(ceph::Formatter &f) const = 0; @@ -77,6 +80,7 @@ public: std::ostream &operator<<(std::ostream &lhs, const Scheduler &); using SchedulerRef = std::unique_ptr; -SchedulerRef make_scheduler(ConfigProxy &); +SchedulerRef make_scheduler(CephContext *cct, ConfigProxy &, int whoami, uint32_t num_shards, + int shard_id, bool is_rotational, bool perf_cnt); } diff --git a/src/crimson/osd/shard_services.h b/src/crimson/osd/shard_services.h index 400976d0db9..0ae59a74e66 100644 --- a/src/crimson/osd/shard_services.h +++ b/src/crimson/osd/shard_services.h @@ -209,6 +209,10 @@ public: PerfCounters *recoverystate_perf, crimson::os::FuturizedStore &store, OSDState& osd_state); + + void initialize_scheduler(CephContext* cct, bool is_rotational) { + throttler.initialize_scheduler(cct, crimson::common::local_conf(), is_rotational, whoami); + } }; /** @@ -595,6 +599,7 @@ public: FORWARD_TO_OSD_SINGLETON(get_pool_info) FORWARD(with_throttle_while, with_throttle_while, local_state.throttler) FORWARD(try_acquire_throttle_now, try_acquire_throttle_now, local_state.throttler) + FORWARD(try_release_throttle, try_release_throttle, local_state.throttler) FORWARD_TO_OSD_SINGLETON(build_incremental_map_msg) FORWARD_TO_OSD_SINGLETON(send_incremental_map) -- 2.47.3