From: Mohit Agrawal Date: Mon, 28 Jul 2025 12:48:57 +0000 (+0530) Subject: crimson/osd,osd_operation: initialize mClock scheduler, detect rotational devices... X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=888aaae41680fe1d7e71c559d25faa8c8e698769;p=ceph.git crimson/osd,osd_operation: initialize mClock scheduler, detect rotational devices, and run OperationThrottler background task Initialize the mClock scheduler on all shards when the device class is non-rotational. If the device is rotational throw an exception to prevent unsupported configurations. In addition, introduce a background task in OperationThrottler that continuously dequeues and schedules client requests from the mClock scheduler based on available credits and throttling limits. Signed-off-by: Mohit Agrawal --- diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc index 87a69ab18c1..b914b3fc42b 100644 --- a/src/crimson/osd/osd.cc +++ b/src/crimson/osd/osd.cc @@ -581,6 +581,14 @@ seastar::future<> OSD::start() return _add_me_to_crush(); }).then([this] { return _add_device_class(); + }).then([this] { + if (is_rotational.has_value()) { + return shard_services.invoke_on_all([this](auto &local_service) { + local_service.local_state.initialize_scheduler(local_service.get_cct(), *is_rotational); + }); + } else { + throw std::runtime_error("No device class is set"); + } }).then([this] { monc->sub_want("osd_pg_creates", last_pg_create_epoch, 0); monc->sub_want("mgrmap", 0, 0); @@ -701,6 +709,7 @@ seastar::future<> OSD::_add_device_class() INFO("device_class is {} ", device_class); + is_rotational = (device_class != "ssd"); std::string cmd = fmt::format( R"({{"prefix": "osd crush set-device-class", "class": "{}", "ids": ["{}"]}})", device_class, stringify(whoami) diff --git a/src/crimson/osd/osd.h b/src/crimson/osd/osd.h index 401b65022c1..3ec1a3636c9 100644 --- a/src/crimson/osd/osd.h +++ b/src/crimson/osd/osd.h @@ -66,6 +66,7 @@ class OSD final : public crimson::net::Dispatcher, public md_config_obs_t { const int whoami; const uint32_t nonce; + std::optional is_rotational; seastar::abort_source& abort_source; seastar::timer beacon_timer; // talk with osd diff --git a/src/crimson/osd/osd_operation.cc b/src/crimson/osd/osd_operation.cc index f2debef6297..41b630eb767 100644 --- a/src/crimson/osd/osd_operation.cc +++ b/src/crimson/osd/osd_operation.cc @@ -7,6 +7,7 @@ #include "crimson/osd/osd_operations/client_request.h" using namespace std::string_literals; +SET_SUBSYS(osd); namespace { seastar::logger& logger() { @@ -14,6 +15,7 @@ namespace { } } +using namespace crimson::osd::scheduler; namespace crimson::osd { void OSDOperationRegistry::do_stop() @@ -149,27 +151,85 @@ void OSDOperationRegistry::visit_ops_in_flight(std::functionempty()) { - auto item = scheduler->dequeue(); - item.wake.set_value(); - ++in_progress; - --pending; + scheduler = crimson::osd::scheduler::make_scheduler(cct, conf, whoami, seastar::smp::count, + seastar::this_shard_id(), is_rotational, true); + update_from_config(conf); +} + +seastar::future<> OperationThrottler::background_task() { + LOG_PREFIX(OperationThrottler::background_task); + while (!stopped) { + co_await cv.wait([this] { + return (available() && !scheduler->empty()) || stopped; + }); + + // It might be possible as mclock scheduler can return a timestamp in double means + // the work item is scheduled in the future, so in that case wait until + // the returned timestamp in the dequeue response before retrying. + while (available() && !scheduler->empty() && !stopped) { + WorkItem work_item = scheduler->dequeue(); + if (auto when_ready = std::get_if(&work_item)) { + ceph::real_clock::time_point future_time = ceph::real_clock::from_double(*when_ready); + auto now = ceph::real_clock::now(); + ceph_assert(future_time > now); + auto wait_duration = std::chrono::duration_cast(future_time - now); + INFO("No items ready. Retrying in {} ms", wait_duration.count()); + co_await seastar::sleep(wait_duration); + continue; + } + if (auto *item = std::get_if(&work_item)) { + DEBUG("Waking up a work item"); + item->wake.set_value(); + ++in_progress; + --pending; + DEBUG("Updated counters during background_task: in_progress={}, pending={}", in_progress, pending); + } else { + DEBUG("Unexpected variant in WorkItem — neither time nor item"); + } + } } + + DEBUG("Background task exiting cleanly"); + co_return; +} + +void OperationThrottler::wake() { + // Attempt to wakeup pending operation if resources are available. + // The mclock scheduler might return delay if item is not ready + // to process + cv.signal(); } void OperationThrottler::release_throttle() { + LOG_PREFIX(OperationThrottler::release_throttle); ceph_assert(in_progress > 0); --in_progress; + DEBUG("Updated counters during release_throttle: in_progress={}, pending={}", + in_progress, pending); wake(); } @@ -179,14 +239,37 @@ seastar::future<> OperationThrottler::acquire_throttle( crimson::osd::scheduler::item_t item{params, seastar::promise<>()}; auto fut = item.wake.get_future(); scheduler->enqueue(std::move(item)); + ++pending; wake(); return fut; } +seastar::future<> OperationThrottler::stop() +{ + if (!started) + co_return; + + stopped = true; + cv.broadcast(); + + if (bg_future && !bg_future->available()) { + co_await std::move(*bg_future); + } + + bg_future.reset(); + started = false; + + co_return; +} + void OperationThrottler::dump_detail(Formatter *f) const { f->dump_unsigned("max_in_progress", max_in_progress); f->dump_unsigned("in_progress", in_progress); + f->dump_unsigned("pending", pending); + f->dump_unsigned("background_task started", started); + f->dump_unsigned("background_task stopeed", stopped); + f->open_object_section("scheduler"); { scheduler->dump(*f); diff --git a/src/crimson/osd/osd_operation.h b/src/crimson/osd/osd_operation.h index 0e38868a279..a5b0d7815f9 100644 --- a/src/crimson/osd/osd_operation.h +++ b/src/crimson/osd/osd_operation.h @@ -360,6 +360,8 @@ class OperationThrottler : public BlockerT, public: OperationThrottler(ConfigProxy &conf); + void start(); + seastar::future<> stop(); std::vector get_tracked_keys() const noexcept final; void handle_conf_change(const ConfigProxy& conf, @@ -394,6 +396,7 @@ public: }); } + void initialize_scheduler(CephContext* cct, ConfigProxy &conf, bool is_rotational, int whoami); private: void dump_detail(Formatter *f) const final; @@ -403,13 +406,21 @@ private: uint64_t in_progress = 0; uint64_t pending = 0; + bool started = false; + bool stopped = false; + + seastar::condition_variable cv; + std::optional> bg_future; void wake(); + seastar::future<> wake_set(); + seastar::future<> acquire_throttle( crimson::osd::scheduler::params_t params); void release_throttle(); + seastar::future<> background_task(); }; } diff --git a/src/crimson/osd/shard_services.h b/src/crimson/osd/shard_services.h index 42146427e9d..8689a7da49a 100644 --- a/src/crimson/osd/shard_services.h +++ b/src/crimson/osd/shard_services.h @@ -113,9 +113,15 @@ class PerShardState { bool stopping = false; seastar::future<> stop_registry() { assert_core(); - crimson::get_logger(ceph_subsys_osd).info("PerShardState::{}", __func__); + auto& logger = crimson::get_logger(ceph_subsys_osd); + logger.info("PerShardState::{}", __func__); stopping = true; - return registry.stop(); + + // First throttler stop and then call registry stop + return throttler.stop().then([this, &logger] { + logger.debug("Throttler stopped for shard {}", seastar::this_shard_id()); + return registry.stop(); + }); } // PGMap state @@ -209,6 +215,12 @@ public: PerfCounters *recoverystate_perf, crimson::os::FuturizedStore &store, OSDState& osd_state); + + void initialize_scheduler(CephContext* cct, bool is_rotational) { + throttler.initialize_scheduler(cct, crimson::common::local_conf(), is_rotational, whoami); + throttler.start(); + } + }; /**