]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd,osd_operation: initialize mClock scheduler, detect rotational devices...
authorMohit Agrawal <moagrawa@redhat.com>
Mon, 28 Jul 2025 12:48:57 +0000 (18:18 +0530)
committerMohit Agrawal <moagrawa@redhat.com>
Wed, 3 Dec 2025 12:30:17 +0000 (18:00 +0530)
Initialize the mClock scheduler on all shards when the device class
is non-rotational. If the device is rotational throw an exception
to prevent unsupported configurations.

In addition, introduce a background task in OperationThrottler that
continuously dequeues and schedules client requests from the mClock
scheduler based on available credits and throttling limits.

Signed-off-by: Mohit Agrawal <moagrawa@redhat.com>
src/crimson/osd/osd.cc
src/crimson/osd/osd.h
src/crimson/osd/osd_operation.cc
src/crimson/osd/osd_operation.h
src/crimson/osd/shard_services.h

index 87a69ab18c146458abf901e94e3b22798951b10e..b914b3fc42bcb8cdf0bb6def3be32d08f6382499 100644 (file)
@@ -581,6 +581,14 @@ seastar::future<> OSD::start()
     return _add_me_to_crush();
   }).then([this] {
     return _add_device_class();
+   }).then([this] {
+    if (is_rotational.has_value()) {
+      return shard_services.invoke_on_all([this](auto &local_service) {
+        local_service.local_state.initialize_scheduler(local_service.get_cct(), *is_rotational);
+      });
+    } else {
+      throw std::runtime_error("No device class is set");
+    }
    }).then([this] {
     monc->sub_want("osd_pg_creates", last_pg_create_epoch, 0);
     monc->sub_want("mgrmap", 0, 0);
@@ -701,6 +709,7 @@ seastar::future<> OSD::_add_device_class()
 
   INFO("device_class is {} ", device_class);
 
+  is_rotational = (device_class != "ssd");
   std::string cmd = fmt::format(
     R"({{"prefix": "osd crush set-device-class", "class": "{}", "ids": ["{}"]}})",
     device_class, stringify(whoami)
index 401b65022c1ba602838044a100683de6af28bc0c..3ec1a3636c94873dca88553988291489087dcfa6 100644 (file)
@@ -66,6 +66,7 @@ class OSD final : public crimson::net::Dispatcher,
                  public md_config_obs_t {
   const int whoami;
   const uint32_t nonce;
+  std::optional<bool> is_rotational;
   seastar::abort_source& abort_source;
   seastar::timer<seastar::lowres_clock> beacon_timer;
   // talk with osd
index f2debef62978a2844edcf0a35e0c317df4cd4415..41b630eb76794bd0872389923bcd574be2333d4c 100644 (file)
@@ -7,6 +7,7 @@
 #include "crimson/osd/osd_operations/client_request.h"
 
 using namespace std::string_literals;
+SET_SUBSYS(osd);
 
 namespace {
   seastar::logger& logger() {
@@ -14,6 +15,7 @@ namespace {
   }
 }
 
+using namespace crimson::osd::scheduler;
 namespace crimson::osd {
 
 void OSDOperationRegistry::do_stop()
@@ -149,27 +151,85 @@ void OSDOperationRegistry::visit_ops_in_flight(std::function<void(const ClientRe
   }
 }
 
+void OperationThrottler::start()
+{
+  LOG_PREFIX(OperationThrottler::start);
+  if (started) {
+    DEBUG("OperationThrottler background task is already started, skipping.");
+    return;
+  }
+
+  started=true;
+  stopped=false;
+
+  INFO("Starting OperationThrottler background task");
+  bg_future.emplace(background_task());
+  return;
+}
+
 OperationThrottler::OperationThrottler(ConfigProxy &conf)
-  : scheduler(crimson::osd::scheduler::make_scheduler(conf))
 {
   conf.add_observer(this);
-  update_from_config(conf);
 }
 
-void OperationThrottler::wake()
+void OperationThrottler::initialize_scheduler(CephContext *cct, ConfigProxy &conf, bool is_rotational, int whoami)
 {
-  while (available() && !scheduler->empty()) {
-    auto item = scheduler->dequeue();
-    item.wake.set_value();
-    ++in_progress;
-    --pending;
+  scheduler = crimson::osd::scheduler::make_scheduler(cct, conf, whoami, seastar::smp::count,
+            seastar::this_shard_id(), is_rotational, true);
+  update_from_config(conf);
+}
+
+seastar::future<> OperationThrottler::background_task() {
+  LOG_PREFIX(OperationThrottler::background_task);
+  while (!stopped) {
+    co_await cv.wait([this] {
+      return (available() && !scheduler->empty()) || stopped;
+    });
+
+    // It might be possible as mclock scheduler can return a timestamp in double means
+    // the work item is scheduled in the future, so in that case wait until
+    // the returned timestamp in the dequeue response before retrying.
+    while (available() && !scheduler->empty() && !stopped) {
+      WorkItem work_item = scheduler->dequeue();
+      if (auto when_ready = std::get_if<double>(&work_item)) {
+        ceph::real_clock::time_point future_time = ceph::real_clock::from_double(*when_ready);
+        auto now = ceph::real_clock::now();
+        ceph_assert(future_time > now);
+        auto wait_duration = std::chrono::duration_cast<std::chrono::milliseconds>(future_time - now);
+        INFO("No items ready. Retrying in {} ms", wait_duration.count());
+        co_await seastar::sleep(wait_duration);
+        continue;
+      }
+      if (auto *item = std::get_if<crimson::osd::scheduler::item_t>(&work_item)) {
+        DEBUG("Waking up a work item");
+        item->wake.set_value();
+        ++in_progress;
+        --pending;
+        DEBUG("Updated counters during background_task: in_progress={}, pending={}", in_progress, pending);
+      } else {
+        DEBUG("Unexpected variant in WorkItem — neither time nor item");
+      }
+    }
   }
+
+  DEBUG("Background task exiting cleanly");
+  co_return;
+}
+
+void OperationThrottler::wake() {
+  // Attempt to wakeup pending operation if resources are available.
+  // The mclock scheduler might return delay if item is not ready
+  // to process
+  cv.signal();
 }
 
 void OperationThrottler::release_throttle()
 {
+  LOG_PREFIX(OperationThrottler::release_throttle);
   ceph_assert(in_progress > 0);
   --in_progress;
+  DEBUG("Updated counters during release_throttle: in_progress={}, pending={}",
+        in_progress, pending);
   wake();
 }
 
@@ -179,14 +239,37 @@ seastar::future<> OperationThrottler::acquire_throttle(
   crimson::osd::scheduler::item_t item{params, seastar::promise<>()};
   auto fut = item.wake.get_future();
   scheduler->enqueue(std::move(item));
+  ++pending;
   wake();
   return fut;
 }
 
+seastar::future<> OperationThrottler::stop()
+{
+  if (!started)
+    co_return;
+
+  stopped = true;
+  cv.broadcast();
+
+  if (bg_future && !bg_future->available()) {
+    co_await std::move(*bg_future);
+  }
+
+  bg_future.reset();
+  started = false;
+
+  co_return;
+}
+
 void OperationThrottler::dump_detail(Formatter *f) const
 {
   f->dump_unsigned("max_in_progress", max_in_progress);
   f->dump_unsigned("in_progress", in_progress);
+  f->dump_unsigned("pending", pending);
+  f->dump_unsigned("background_task started", started);
+  f->dump_unsigned("background_task stopeed", stopped);
+
   f->open_object_section("scheduler");
   {
     scheduler->dump(*f);
index 0e38868a279001ed9e19a9cb685bd2931d0cbc35..a5b0d7815f9472fc5e2b66a2d0fb49f47cc19f2e 100644 (file)
@@ -360,6 +360,8 @@ class OperationThrottler : public BlockerT<OperationThrottler>,
 
 public:
   OperationThrottler(ConfigProxy &conf);
+  void start();
+  seastar::future<> stop();
 
   std::vector<std::string> get_tracked_keys() const noexcept final;
   void handle_conf_change(const ConfigProxy& conf,
@@ -394,6 +396,7 @@ public:
     });
   }
 
+  void initialize_scheduler(CephContext* cct, ConfigProxy &conf, bool is_rotational, int whoami);
 private:
   void dump_detail(Formatter *f) const final;
 
@@ -403,13 +406,21 @@ private:
   uint64_t in_progress = 0;
 
   uint64_t pending = 0;
+  bool started = false;
+  bool stopped = false;
+
+  seastar::condition_variable cv;
+  std::optional<seastar::future<>> bg_future;
 
   void wake();
 
+  seastar::future<> wake_set();
+
   seastar::future<> acquire_throttle(
     crimson::osd::scheduler::params_t params);
 
   void release_throttle();
+  seastar::future<> background_task();
 };
 
 }
index 42146427e9db2c7977a0a6811f31c1ae029ce013..8689a7da49a58dc89fa5de56a526cd44f82a9a09 100644 (file)
@@ -113,9 +113,15 @@ class PerShardState {
   bool stopping = false;
   seastar::future<> stop_registry() {
     assert_core();
-    crimson::get_logger(ceph_subsys_osd).info("PerShardState::{}", __func__);
+    auto& logger = crimson::get_logger(ceph_subsys_osd);
+    logger.info("PerShardState::{}", __func__);
     stopping = true;
-    return registry.stop();
+
+    // First throttler stop and then call registry stop
+    return throttler.stop().then([this, &logger] {
+      logger.debug("Throttler stopped for shard {}", seastar::this_shard_id());
+      return registry.stop();
+    });
   }
 
   // PGMap state
@@ -209,6 +215,12 @@ public:
     PerfCounters *recoverystate_perf,
     crimson::os::FuturizedStore &store,
     OSDState& osd_state);
+
+  void initialize_scheduler(CephContext* cct, bool is_rotational) {
+    throttler.initialize_scheduler(cct, crimson::common::local_conf(), is_rotational, whoami);
+    throttler.start();
+ }
+
 };
 
 /**