return _add_me_to_crush();
}).then([this] {
return _add_device_class();
+ }).then([this] {
+ if (is_rotational.has_value()) {
+ return shard_services.invoke_on_all([this](auto &local_service) {
+ local_service.local_state.initialize_scheduler(local_service.get_cct(), *is_rotational);
+ });
+ } else {
+ throw std::runtime_error("No device class is set");
+ }
}).then([this] {
monc->sub_want("osd_pg_creates", last_pg_create_epoch, 0);
monc->sub_want("mgrmap", 0, 0);
INFO("device_class is {} ", device_class);
+ is_rotational = (device_class != "ssd");
std::string cmd = fmt::format(
R"({{"prefix": "osd crush set-device-class", "class": "{}", "ids": ["{}"]}})",
device_class, stringify(whoami)
#include "crimson/osd/osd_operations/client_request.h"
using namespace std::string_literals;
+SET_SUBSYS(osd);
namespace {
seastar::logger& logger() {
}
}
+using namespace crimson::osd::scheduler;
namespace crimson::osd {
void OSDOperationRegistry::do_stop()
}
}
+void OperationThrottler::start()
+{
+ LOG_PREFIX(OperationThrottler::start);
+ if (started) {
+ DEBUG("OperationThrottler background task is already started, skipping.");
+ return;
+ }
+
+ started=true;
+ stopped=false;
+
+ INFO("Starting OperationThrottler background task");
+ bg_future.emplace(background_task());
+ return;
+}
+
OperationThrottler::OperationThrottler(ConfigProxy &conf)
- : scheduler(crimson::osd::scheduler::make_scheduler(conf))
{
conf.add_observer(this);
- update_from_config(conf);
}
-void OperationThrottler::wake()
+void OperationThrottler::initialize_scheduler(CephContext *cct, ConfigProxy &conf, bool is_rotational, int whoami)
{
- while (available() && !scheduler->empty()) {
- auto item = scheduler->dequeue();
- item.wake.set_value();
- ++in_progress;
- --pending;
+ scheduler = crimson::osd::scheduler::make_scheduler(cct, conf, whoami, seastar::smp::count,
+ seastar::this_shard_id(), is_rotational, true);
+ update_from_config(conf);
+}
+
+seastar::future<> OperationThrottler::background_task() {
+ LOG_PREFIX(OperationThrottler::background_task);
+ while (!stopped) {
+ co_await cv.wait([this] {
+ return (available() && !scheduler->empty()) || stopped;
+ });
+
+ // It might be possible as mclock scheduler can return a timestamp in double means
+ // the work item is scheduled in the future, so in that case wait until
+ // the returned timestamp in the dequeue response before retrying.
+ while (available() && !scheduler->empty() && !stopped) {
+ WorkItem work_item = scheduler->dequeue();
+ if (auto when_ready = std::get_if<double>(&work_item)) {
+ ceph::real_clock::time_point future_time = ceph::real_clock::from_double(*when_ready);
+ auto now = ceph::real_clock::now();
+ ceph_assert(future_time > now);
+ auto wait_duration = std::chrono::duration_cast<std::chrono::milliseconds>(future_time - now);
+ INFO("No items ready. Retrying in {} ms", wait_duration.count());
+ co_await seastar::sleep(wait_duration);
+ continue;
+ }
+ if (auto *item = std::get_if<crimson::osd::scheduler::item_t>(&work_item)) {
+ DEBUG("Waking up a work item");
+ item->wake.set_value();
+ ++in_progress;
+ --pending;
+ DEBUG("Updated counters during background_task: in_progress={}, pending={}", in_progress, pending);
+ } else {
+ DEBUG("Unexpected variant in WorkItem — neither time nor item");
+ }
+ }
}
+
+ DEBUG("Background task exiting cleanly");
+ co_return;
+}
+
+void OperationThrottler::wake() {
+ // Attempt to wakeup pending operation if resources are available.
+ // The mclock scheduler might return delay if item is not ready
+ // to process
+ cv.signal();
}
void OperationThrottler::release_throttle()
{
+ LOG_PREFIX(OperationThrottler::release_throttle);
ceph_assert(in_progress > 0);
--in_progress;
+ DEBUG("Updated counters during release_throttle: in_progress={}, pending={}",
+ in_progress, pending);
wake();
}
crimson::osd::scheduler::item_t item{params, seastar::promise<>()};
auto fut = item.wake.get_future();
scheduler->enqueue(std::move(item));
+ ++pending;
wake();
return fut;
}
+seastar::future<> OperationThrottler::stop()
+{
+ if (!started)
+ co_return;
+
+ stopped = true;
+ cv.broadcast();
+
+ if (bg_future && !bg_future->available()) {
+ co_await std::move(*bg_future);
+ }
+
+ bg_future.reset();
+ started = false;
+
+ co_return;
+}
+
void OperationThrottler::dump_detail(Formatter *f) const
{
f->dump_unsigned("max_in_progress", max_in_progress);
f->dump_unsigned("in_progress", in_progress);
+ f->dump_unsigned("pending", pending);
+ f->dump_unsigned("background_task started", started);
+ f->dump_unsigned("background_task stopeed", stopped);
+
f->open_object_section("scheduler");
{
scheduler->dump(*f);
public:
OperationThrottler(ConfigProxy &conf);
+ void start();
+ seastar::future<> stop();
std::vector<std::string> get_tracked_keys() const noexcept final;
void handle_conf_change(const ConfigProxy& conf,
});
}
+ void initialize_scheduler(CephContext* cct, ConfigProxy &conf, bool is_rotational, int whoami);
private:
void dump_detail(Formatter *f) const final;
uint64_t in_progress = 0;
uint64_t pending = 0;
+ bool started = false;
+ bool stopped = false;
+
+ seastar::condition_variable cv;
+ std::optional<seastar::future<>> bg_future;
void wake();
+ seastar::future<> wake_set();
+
seastar::future<> acquire_throttle(
crimson::osd::scheduler::params_t params);
void release_throttle();
+ seastar::future<> background_task();
};
}
bool stopping = false;
seastar::future<> stop_registry() {
assert_core();
- crimson::get_logger(ceph_subsys_osd).info("PerShardState::{}", __func__);
+ auto& logger = crimson::get_logger(ceph_subsys_osd);
+ logger.info("PerShardState::{}", __func__);
stopping = true;
- return registry.stop();
+
+ // First throttler stop and then call registry stop
+ return throttler.stop().then([this, &logger] {
+ logger.debug("Throttler stopped for shard {}", seastar::this_shard_id());
+ return registry.stop();
+ });
}
// PGMap state
PerfCounters *recoverystate_perf,
crimson::os::FuturizedStore &store,
OSDState& osd_state);
+
+ void initialize_scheduler(CephContext* cct, bool is_rotational) {
+ throttler.initialize_scheduler(cct, crimson::common::local_conf(), is_rotational, whoami);
+ throttler.start();
+ }
+
};
/**