}
});
}
+
int get_val(std::string_view key, std::string *val) const {
return get_config().get_val(*values, key, val);
}
+
+ void set_val_default_sync(const std::string& key, const std::string& val) {
+ get_config().set_val_default(*values, obs_mgr, key, val);
+ }
+
template<typename T>
const T get_val(std::string_view key) const {
return get_config().template get_val<T>(*values, key);
do {
if (!backfill_listener().budget_available()) {
- DEBUGDPP("throttle failed, turning to Waiting", pg());
post_event(RequestWaiting{});
return;
} else if (should_rescan_replicas(backfill_state().peer_backfill_info,
struct SuspendBackfill : sc::event<SuspendBackfill> {
};
- struct ThrottleAcquired : sc::event<ThrottleAcquired> {
- };
private:
// internal events
struct RequestPrimaryScanning : sc::event<RequestPrimaryScanning> {
sc::transition<RequestDone, Done>,
sc::custom_reaction<SuspendBackfill>,
sc::custom_reaction<Triggered>,
- sc::transition<ThrottleAcquired, Enqueuing>,
sc::transition<sc::event_base, Crashed>>;
explicit Waiting(my_context);
sc::result react(ObjectPushed);
return _add_me_to_crush();
}).then([this] {
return _add_device_class();
+ }).then([this] {
+ return _get_device_type();
+ }).then([this](auto rotational) {
+ return shard_services.invoke_on_all([this, rotational](auto &local_service) {
+ local_service.local_state.initialize_scheduler(local_service.get_cct(), rotational);
+ });
}).then([this] {
monc->sub_want("osd_pg_creates", last_pg_create_epoch, 0);
monc->sub_want("mgrmap", 0, 0);
m->metadata["osd_type"] = "crimson";
return monc->send_message(std::move(m));
}
+seastar::future<bool> OSD::_get_device_type()
+{
+ LOG_PREFIX(OSD::_get_device_type);
+ if (!local_conf().get_val<bool>("osd_class_update_on_start")) {
+ co_return false;
+ }
+
+ std::string device_class = co_await store.get_default_device_class();
+ if (device_class.empty()) {
+ INFO("Device class is empty; ");
+ co_return false;
+ }
+
+ bool rotational = (device_class != "ssd");
+
+ INFO("device_class is {} rotational: {}", device_class, rotational);
+ co_return rotational;
+}
seastar::future<> OSD::_add_device_class()
{
seastar::future<> _send_boot();
seastar::future<> _add_me_to_crush();
seastar::future<> _add_device_class();
+ seastar::future<bool> _get_device_type();
seastar::future<> osdmap_subscribe(version_t epoch, bool force_request);
}
}
+using namespace crimson::osd::scheduler;
namespace crimson::osd {
void OSDOperationRegistry::do_stop()
}
}
+
OperationThrottler::OperationThrottler(ConfigProxy &conf)
- : scheduler(crimson::osd::scheduler::make_scheduler(conf))
{
conf.add_observer(this);
+}
+
+void OperationThrottler::initialize_scheduler(CephContext *cct, ConfigProxy &conf, bool is_rotational, int whoami)
+{
+ scheduler = crimson::osd::scheduler::make_scheduler(cct, conf, whoami, seastar::smp::count,
+ seastar::this_shard_id(), is_rotational, true);
update_from_config(conf);
}
void OperationThrottler::wake()
{
+ WorkItem work_item;
+ int i = 0;
while ((!max_in_progress || in_progress < max_in_progress) &&
!scheduler->empty()) {
- auto item = scheduler->dequeue();
- item.wake.set_value();
- ++in_progress;
- --pending;
+ work_item = scheduler->dequeue();
+ if (auto when_ready = std::get_if<double>(&work_item)) {
+ ceph::real_clock::time_point future_time = ceph::real_clock::from_double(*when_ready);
+ auto now = ceph::real_clock::now();
+ auto wait_duration = std::chrono::duration_cast<std::chrono::milliseconds>(future_time - now);
+ if (wait_duration.count() > 0) {
+ logger().info("No items ready. Retrying wake() in {} ms", wait_duration.count());
+ seastar::sleep(wait_duration).then([this] {
+ wake();
+ });
+ return;
+ }
+ }
+ if (auto *item = std::get_if<crimson::osd::scheduler::item_t>(&work_item)) {
+ item->wake.set_value();
+ ++in_progress;
+ --pending;
+
+ }
}
}
const std::set<std::string> &changed) final;
void update_from_config(const ConfigProxy &conf);
+ bool available() const {
+ return !max_in_progress || in_progress < max_in_progress;
+ }
+
+ template <typename F>
+ auto with_throttle(
+ crimson::osd::scheduler::params_t params,
+ F &&f) {
+ if (!max_in_progress) return f();
+ return acquire_throttle(params)
+ .then(std::forward<F>(f))
+ .finally([this] {
+ release_throttle();
+ });
+ }
+
template <class OpT, class... Args>
seastar::future<> with_throttle_while(
BlockingEvent::Trigger<OpT>&& trigger,
with_throttle_while(std::forward<Args>(args)...), *this);
}
- // Returns std::nullopt if the throttle is acquired immediately,
- // returns the future for the acquiring otherwise
- std::optional<seastar::future<>>
- try_acquire_throttle_now(crimson::osd::scheduler::params_t params) {
- if (!max_in_progress || in_progress < max_in_progress) {
- ++in_progress;
- --pending;
- return std::nullopt;
- }
- return acquire_throttle(params);
- }
-
+ void initialize_scheduler(CephContext* cct, ConfigProxy &conf, bool is_rotational, int whoami);
private:
void dump_detail(Formatter *f) const final;
std::chrono::milliseconds(std::lround(delay * 1000)));
}
return maybe_delay.then([ref, this] {
- return this->template with_blocking_event<OperationThrottler::BlockingEvent>(
- [ref, this] (auto&& trigger) {
- return ss.with_throttle_while(
- std::move(trigger),
- this, get_scheduler_params(), [this] {
- return interruptor::with_interruption([this] {
- return do_recovery();
- }, [](std::exception_ptr) {
- return seastar::make_ready_future<bool>(false);
- }, pg, epoch_started);
- });
+ return seastar::repeat([ref, this] {
+ return interruptor::with_interruption([this] {
+ return do_recovery();
+ }, [](std::exception_ptr) {
+ return seastar::make_ready_future<bool>(false);
+ }, pg, epoch_started).then([](bool goon) {
+ if (goon) {
+ return seastar::stop_iteration::no;
+ } else {
+ return seastar::stop_iteration::yes;
+ }
});
+ });
});
}
).then_interruptible([this] {
return with_blocking_event<RecoveryBackend::RecoveryBlockingEvent,
interruptor>([this] (auto&& trigger) {
- return pg->get_recovery_handler()->recover_missing(trigger, soid, need);
+ return pg->get_recovery_handler()->recover_missing(
+ trigger, soid, need, false);
}).then_interruptible([] {
return seastar::make_ready_future<bool>(false);
});
private:
virtual void dump_detail(Formatter *f) const;
crimson::osd::scheduler::params_t get_scheduler_params() const {
+ int cost = std::max<int64_t>(1, (pg->get_info().stats.stats.sum.num_bytes / pg->get_info().stats.stats.sum.num_objects));
+ unsigned priority = pg->get_recovery_op_priority();
+
return {
- 1, // cost
+ cost, // cost
+ priority, // priority
0, // owner
scheduler_class
};
void print(std::ostream&) const final;
std::tuple<
- OperationThrottler::BlockingEvent,
RecoveryBackend::RecoveryBlockingEvent
> tracking_events;
float delay = 0);
std::tuple<
- OperationThrottler::BlockingEvent,
RecoveryBackend::RecoveryBlockingEvent
> tracking_events;
int *return_code,
std::vector<pg_log_op_return_item_t> *op_returns) const;
int get_recovery_op_priority() const {
- int64_t pri = 0;
- get_pgpool().info.opts.get(pool_opts_t::RECOVERY_OP_PRIORITY, &pri);
- return pri > 0 ? pri : crimson::common::local_conf()->osd_recovery_op_priority;
+ const std::string _type = this->shard_services.get_cct()->_conf.get_val<std::string>("osd_op_queue");
+ const std::string *type = &_type;
+ if (*type == "mclock_scheduler") {
+ if (peering_state.is_forced_recovery_or_backfill()) {
+ return peering_state.recovery_msg_priority_t::FORCED;
+ } else if (peering_state.is_undersized()) {
+ return peering_state.recovery_msg_priority_t::UNDERSIZED;
+ } else if (peering_state.is_degraded()) {
+ return peering_state.recovery_msg_priority_t::DEGRADED;
+ } else {
+ return peering_state.recovery_msg_priority_t::BEST_EFFORT;
+ }
+ } else {
+ int64_t pri = 0;
+ get_pgpool().info.opts.get(pool_opts_t::RECOVERY_OP_PRIORITY, &pri);
+ return pri > 0 ? pri : crimson::common::local_conf()->osd_recovery_op_priority;
+ }
}
seastar::future<> mark_unfound_lost(int) {
// TODO: see PrimaryLogPG::mark_all_unfound_lost()
if (max_to_start > 0) {
max_to_start -= start_replica_recovery_ops(trigger, max_to_start, &started);
}
+ using interruptor =
+ crimson::interruptible::interruptor<crimson::osd::IOInterruptCondition>;
return interruptor::parallel_for_each(started,
[] (auto&& ifut) {
return std::move(ifut);
auto it = missing.get_items().find(head);
assert(it != missing.get_items().end());
auto head_need = it->second.need;
- out->emplace_back(recover_missing(trigger, head, head_need));
+ out->emplace_back(recover_missing(trigger, head, head_need, true));
++skipped;
} else {
- out->emplace_back(recover_missing(trigger, soid, item.need));
+ out->emplace_back(recover_missing(trigger, soid, item.need, true));
}
++started;
}
PGRecovery::interruptible_future<>
PGRecovery::recover_missing(
RecoveryBackend::RecoveryBlockingEvent::TriggerI& trigger,
- const hobject_t &soid, eversion_t need)
+ const hobject_t &soid,
+ eversion_t need,
+ bool with_throttle)
{
logger().info("{} {} v {}", __func__, soid, need);
auto [recovering, added] = pg->get_recovery_backend()->add_recovering(soid);
} else {
return recovering.wait_track_blocking(
trigger,
- pg->get_recovery_backend()->recover_object(soid, need)
+ with_throttle
+ ? recover_object_with_throttle(soid, need)
+ : recover_object(soid, need)
.handle_exception_interruptible(
[=, this, soid = std::move(soid)] (auto e) {
on_failed_recover({ pg->get_pg_whoami() }, soid, need);
logger().info("{} {} v {}, new recovery", __func__, soid, need);
return recovering.wait_track_blocking(
trigger,
- pg->get_recovery_backend()->recover_object(soid, need)
+ recover_object_with_throttle(soid, need)
.handle_exception_interruptible(
[=, this, soid = std::move(soid)] (auto e) {
on_failed_recover({ pg->get_pg_whoami() }, soid, need);
});
}
+PGRecovery::interruptible_future<>
+PGRecovery::recover_object_with_throttle(
+ const hobject_t &soid,
+ eversion_t need)
+{
+ crimson::osd::scheduler::params_t params =
+ {1, 0, 0, crimson::osd::scheduler::scheduler_class_t::background_best_effort};
+ auto &ss = pg->get_shard_services();
+ return ss.with_throttle(
+ std::move(params),
+ [this, soid, need] {
+ auto backend = pg->get_recovery_backend();
+ assert(backend);
+ return backend->recover_object(soid, need);
+ });
+}
+
void PGRecovery::enqueue_push(
const hobject_t& obj,
const eversion_t& v,
if (!added)
return;
peering_state.prepare_backfill_for_missing(obj, v, peers);
- std::ignore = pg->get_recovery_backend()->recover_object(obj, v).\
+ std::ignore = recover_object_with_throttle(obj, v).\
handle_exception_interruptible([] (auto) {
ceph_abort_msg("got exception on backfill's push");
return seastar::make_ready_future<>();
bool PGRecovery::budget_available() const
{
- crimson::osd::scheduler::params_t params =
- {1, 0, crimson::osd::scheduler::scheduler_class_t::background_best_effort};
auto &ss = pg->get_shard_services();
- auto futopt = ss.try_acquire_throttle_now(std::move(params));
- if (!futopt) {
- return true;
- }
- std::ignore = interruptor::make_interruptible(std::move(*futopt)
- ).then_interruptible([this] {
- assert(!backfill_state->is_triggered());
- using BackfillState = crimson::osd::BackfillState;
- backfill_state->process_event(
- BackfillState::ThrottleAcquired{}.intrusive_from_this());
- });
- return false;
+ return ss.throttle_available();
}
void PGRecovery::on_pg_clean()
class PGRecovery : public crimson::osd::BackfillState::BackfillListener {
public:
- using interruptor =
- crimson::interruptible::interruptor<crimson::osd::IOInterruptCondition>;
template <typename T = void>
using interruptible_future = RecoveryBackend::interruptible_future<T>;
PGRecovery(PGRecoveryListener* pg) : pg(pg) {}
}
RecoveryBackend::interruptible_future<> recover_missing(
RecoveryBackend::RecoveryBlockingEvent::TriggerI&,
- const hobject_t &soid, eversion_t need);
+ const hobject_t &soid,
+ eversion_t need,
+ bool with_throttle);
RecoveryBackend::interruptible_future<> prep_object_replica_deletes(
RecoveryBackend::RecoveryBlockingEvent::TriggerI& trigger,
const hobject_t& soid,
friend class ReplicatedRecoveryBackend;
friend class crimson::osd::UrgentRecovery;
+ interruptible_future<> recover_object_with_throttle(
+ const hobject_t &soid,
+ eversion_t need);
+
+ interruptible_future<> recover_object(
+ const hobject_t &soid,
+ eversion_t need) {
+ auto backend = pg->get_recovery_backend();
+ assert(backend);
+ return backend->recover_object(soid, need);
+ }
+
// backfill begin
std::unique_ptr<crimson::osd::BackfillState> backfill_state;
std::map<pg_shard_t,
namespace dmc = crimson::dmclock;
using namespace std::placeholders;
-using namespace std::string_literals;
+using namespace std::literals;
#define dout_context cct
-#define dout_subsys ceph_subsys_osd
+#define dout_subsys ceph_subsys_mclock
#undef dout_prefix
-#define dout_prefix *_dout
+#define dout_prefix *_dout << "mClockScheduler: "
namespace crimson::osd::scheduler {
-mClockScheduler::mClockScheduler(ConfigProxy &conf) :
- scheduler(
- std::bind(&mClockScheduler::ClientRegistry::get_info,
- &client_registry,
- _1),
- dmc::AtLimit::Allow,
- conf.get_val<double>("osd_mclock_scheduler_anticipation_timeout"))
+mClockScheduler::mClockScheduler(CephContext *cct,
+ int whoami,
+ uint32_t num_shards,
+ int shard_id,
+ bool is_rotational,
+ bool init_perfcounter)
+ : cct(cct),
+ whoami(whoami),
+ num_shards(num_shards),
+ shard_id(shard_id),
+ is_rotational(is_rotational),
+ logger(nullptr),
+ scheduler(
+ std::bind(&mClockScheduler::ClientRegistry::get_info,
+ &client_registry,
+ _1),
+ dmc::AtLimit::Wait,
+ cct->_conf.get_val<double>("osd_mclock_scheduler_anticipation_timeout"))
{
- conf.add_observer(this);
- client_registry.update_from_config(conf);
+ cct->_conf.add_observer(this);
+ ceph_assert(num_shards > 0);
+ auto get_op_queue_cut_off = [&conf = cct->_conf]() {
+ if (conf.get_val<std::string>("osd_op_queue_cut_off") == "debug_random") {
+ std::random_device rd;
+ std::mt19937 random_gen(rd());
+ return (random_gen() % 2 < 1) ? CEPH_MSG_PRIO_HIGH : CEPH_MSG_PRIO_LOW;
+ } else if (conf.get_val<std::string>("osd_op_queue_cut_off") == "high") {
+ return CEPH_MSG_PRIO_HIGH;
+ } else {
+ // default / catch-all is 'low'
+ return CEPH_MSG_PRIO_LOW;
+ }
+ };
+ cutoff_priority = get_op_queue_cut_off();
+ set_osd_capacity_params_from_config();
+ set_config_defaults_from_profile();
+ client_registry.update_from_config(
+ cct->_conf, osd_bandwidth_capacity_per_shard);
+
}
-void mClockScheduler::ClientRegistry::update_from_config(const ConfigProxy &conf)
+/* ClientRegistry holds the dmclock::ClientInfo configuration parameters
+ * (reservation (bytes/second), weight (unitless), limit (bytes/second))
+ * for each IO class in the OSD (client, background_recovery,
+ * background_best_effort).
+ *
+ * mclock expects limit and reservation to have units of <cost>/second
+ * (bytes/second), but osd_mclock_scheduler_client_(lim|res) are provided
+ * as ratios of the OSD's capacity. We convert from the one to the other
+ * using the capacity_per_shard parameter.
+ *
+ * Note, mclock profile information will already have been set as a default
+ * for the osd_mclock_scheduler_client_* parameters prior to calling
+ * update_from_config -- see set_config_defaults_from_profile().
+ */
+void mClockScheduler::ClientRegistry::update_from_config(
+ const ConfigProxy &conf,
+ const double capacity_per_shard)
{
+
+ auto get_res = [&](double res) {
+ if (res) {
+ return res * capacity_per_shard;
+ } else {
+ return default_min; // min reservation
+ }
+ };
+
+ auto get_lim = [&](double lim) {
+ if (lim) {
+ return lim * capacity_per_shard;
+ } else {
+ return default_max; // high limit
+ }
+ };
+
+ // Set external client infos
+ double res = conf.get_val<double>(
+ "osd_mclock_scheduler_client_res");
+ double lim = conf.get_val<double>(
+ "osd_mclock_scheduler_client_lim");
+ uint64_t wgt = conf.get_val<uint64_t>(
+ "osd_mclock_scheduler_client_wgt");
default_external_client_info.update(
- conf.get_val<double>("osd_mclock_scheduler_client_res"),
- conf.get_val<uint64_t>("osd_mclock_scheduler_client_wgt"),
- conf.get_val<double>("osd_mclock_scheduler_client_lim"));
+ get_res(res),
+ wgt,
+ get_lim(lim));
+ // Set background recovery client infos
+ res = conf.get_val<double>(
+ "osd_mclock_scheduler_background_recovery_res");
+ lim = conf.get_val<double>(
+ "osd_mclock_scheduler_background_recovery_lim");
+ wgt = conf.get_val<uint64_t>(
+ "osd_mclock_scheduler_background_recovery_wgt");
internal_client_infos[
static_cast<size_t>(scheduler_class_t::background_recovery)].update(
- conf.get_val<double>("osd_mclock_scheduler_background_recovery_res"),
- conf.get_val<uint64_t>("osd_mclock_scheduler_background_recovery_wgt"),
- conf.get_val<double>("osd_mclock_scheduler_background_recovery_lim"));
+ get_res(res),
+ wgt,
+ get_lim(lim));
+ // Set background best effort client infos
+ res = conf.get_val<double>(
+ "osd_mclock_scheduler_background_best_effort_res");
+ lim = conf.get_val<double>(
+ "osd_mclock_scheduler_background_best_effort_lim");
+ wgt = conf.get_val<uint64_t>(
+ "osd_mclock_scheduler_background_best_effort_wgt");
internal_client_infos[
static_cast<size_t>(scheduler_class_t::background_best_effort)].update(
- conf.get_val<double>("osd_mclock_scheduler_background_best_effort_res"),
- conf.get_val<uint64_t>("osd_mclock_scheduler_background_best_effort_wgt"),
- conf.get_val<double>("osd_mclock_scheduler_background_best_effort_lim"));
+ get_res(res),
+ wgt,
+ get_lim(lim));
}
const dmc::ClientInfo *mClockScheduler::ClientRegistry::get_external_client(
case scheduler_class_t::immediate:
ceph_assert(0 == "Cannot schedule immediate");
return (dmc::ClientInfo*)nullptr;
- case scheduler_class_t::repop:
case scheduler_class_t::client:
return get_external_client(id.client_profile_id);
default:
}
}
+void mClockScheduler::set_osd_capacity_params_from_config()
+{
+ uint64_t osd_bandwidth_capacity;
+ double osd_iop_capacity;
+
+ std::tie(osd_bandwidth_capacity, osd_iop_capacity) = [&, this] {
+ if (is_rotational) {
+ return std::make_tuple(
+ cct->_conf.get_val<Option::size_t>(
+ "osd_mclock_max_sequential_bandwidth_hdd"),
+ cct->_conf.get_val<double>("osd_mclock_max_capacity_iops_hdd"));
+ } else {
+ return std::make_tuple(
+ cct->_conf.get_val<Option::size_t>(
+ "osd_mclock_max_sequential_bandwidth_ssd"),
+ cct->_conf.get_val<double>("osd_mclock_max_capacity_iops_ssd"));
+ }
+ }();
+
+ osd_bandwidth_capacity = std::max<uint64_t>(1, osd_bandwidth_capacity);
+ osd_iop_capacity = std::max<double>(1.0, osd_iop_capacity);
+
+ osd_bandwidth_cost_per_io =
+ static_cast<double>(osd_bandwidth_capacity) / osd_iop_capacity;
+ osd_bandwidth_capacity_per_shard = static_cast<double>(osd_bandwidth_capacity)
+ / static_cast<double>(num_shards);
+
+ dout(1) << __func__ << ": osd_bandwidth_cost_per_io: "
+ << std::fixed << std::setprecision(2)
+ << osd_bandwidth_cost_per_io << " bytes/io"
+ << ", osd_bandwidth_capacity_per_shard "
+ << osd_bandwidth_capacity_per_shard << " bytes/second"
+ << dendl;
+}
+
+/**
+ * profile_t
+ *
+ * mclock profile -- 3 params for each of 3 client classes
+ * 0 (min): specifies no minimum reservation
+ * 0 (max): specifies no upper limit
+ */
+struct profile_t {
+ struct client_config_t {
+ double reservation;
+ uint64_t weight;
+ double limit;
+ };
+ client_config_t client;
+ client_config_t background_recovery;
+ client_config_t background_best_effort;
+};
+
+static std::ostream &operator<<(
+ std::ostream &lhs, const profile_t::client_config_t &rhs)
+{
+ return lhs << "{res: " << rhs.reservation
+ << ", wgt: " << rhs.weight
+ << ", lim: " << rhs.limit
+ << "}";
+}
+
+static std::ostream &operator<<(std::ostream &lhs, const profile_t &rhs)
+{
+ return lhs << "[client: " << rhs.client
+ << ", background_recovery: " << rhs.background_recovery
+ << ", background_best_effort: " << rhs.background_best_effort
+ << "]";
+}
+
+void mClockScheduler::set_config_defaults_from_profile()
+{
+ // Let only a single osd shard (id:0) set the profile configs
+ if (shard_id > 0) {
+ return;
+ }
+
+ /**
+ * high_client_ops
+ *
+ * Client Allocation:
+ * reservation: 60% | weight: 2 | limit: 0 (max) |
+ * Background Recovery Allocation:
+ * reservation: 40% | weight: 1 | limit: 0 (max) |
+ * Background Best Effort Allocation:
+ * reservation: 0 (min) | weight: 1 | limit: 70% |
+ */
+ static constexpr profile_t high_client_ops_profile{
+ { .6, 2, 0 },
+ { .4, 1, 0 },
+ { 0, 1, .7 }
+ };
+
+ /**
+ * high_recovery_ops
+ *
+ * Client Allocation:
+ * reservation: 30% | weight: 1 | limit: 0 (max) |
+ * Background Recovery Allocation:
+ * reservation: 70% | weight: 2 | limit: 0 (max) |
+ * Background Best Effort Allocation:
+ * reservation: 0 (min) | weight: 1 | limit: 0 (max) |
+ */
+ static constexpr profile_t high_recovery_ops_profile{
+ { .3, 1, 0 },
+ { .7, 2, 0 },
+ { 0, 1, 0 }
+ };
+
+ /**
+ * balanced
+ *
+ * Client Allocation:
+ * reservation: 50% | weight: 1 | limit: 0 (max) |
+ * Background Recovery Allocation:
+ * reservation: 50% | weight: 1 | limit: 0 (max) |
+ * Background Best Effort Allocation:
+ * reservation: 0 (min) | weight: 1 | limit: 90% |
+ */
+ static constexpr profile_t balanced_profile{
+ { .5, 1, 0 },
+ { .5, 1, 0 },
+ { 0, 1, .9 }
+ };
+
+ const profile_t *profile = nullptr;
+ auto mclock_profile = cct->_conf.get_val<std::string>("osd_mclock_profile");
+ if (mclock_profile == "high_client_ops") {
+ profile = &high_client_ops_profile;
+ dout(10) << "Setting high_client_ops profile " << *profile << dendl;
+ } else if (mclock_profile == "high_recovery_ops") {
+ profile = &high_recovery_ops_profile;
+ dout(10) << "Setting high_recovery_ops profile " << *profile << dendl;
+ } else if (mclock_profile == "balanced") {
+ profile = &balanced_profile;
+ dout(10) << "Setting balanced profile " << *profile << dendl;
+ } else if (mclock_profile == "custom") {
+ dout(10) << "Profile set to custom, not setting defaults" << dendl;
+ return;
+ } else {
+ derr << "Invalid mclock profile: " << mclock_profile << dendl;
+ ceph_assert("Invalid choice of mclock profile" == 0);
+ return;
+ }
+ ceph_assert(nullptr != profile);
+
+ auto set_config = [&conf = cct->_conf](const char *key, auto val) {
+ return conf.set_val_default_sync(key, std::to_string(val));
+ };
+
+ set_config("osd_mclock_scheduler_client_res", profile->client.reservation);
+ set_config("osd_mclock_scheduler_client_res", profile->client.reservation);
+ set_config("osd_mclock_scheduler_client_wgt", profile->client.weight);
+ set_config("osd_mclock_scheduler_client_lim", profile->client.limit);
+
+
+ set_config(
+ "osd_mclock_scheduler_background_recovery_res",
+ profile->background_recovery.reservation);
+ set_config(
+ "osd_mclock_scheduler_background_recovery_wgt",
+ profile->background_recovery.weight);
+ set_config(
+ "osd_mclock_scheduler_background_recovery_lim",
+ profile->background_recovery.limit);
+
+ set_config(
+ "osd_mclock_scheduler_background_best_effort_res",
+ profile->background_best_effort.reservation);
+ set_config(
+ "osd_mclock_scheduler_background_best_effort_wgt",
+ profile->background_best_effort.weight);
+ set_config(
+ "osd_mclock_scheduler_background_best_effort_lim",
+ profile->background_best_effort.limit);
+
+}
+
+uint32_t mClockScheduler::calc_scaled_cost(int item_cost)
+{
+ auto cost = static_cast<uint32_t>(
+ std::max<int>(
+ 1, // ensure cost is non-zero and positive
+ item_cost));
+ auto cost_per_io = static_cast<uint32_t>(osd_bandwidth_cost_per_io);
+
+ return std::max<uint32_t>(cost, cost_per_io);
+}
+
void mClockScheduler::dump(ceph::Formatter &f) const
{
+ // Display queue sizes
+ f.open_object_section("queue_sizes");
+ f.dump_int("high_priority_queue", high_priority.size());
+ f.dump_int("scheduler", scheduler.request_count());
+ f.close_section();
+
+ // client map and queue tops (res, wgt, lim)
+ std::ostringstream out;
+ f.open_object_section("mClockClients");
+ f.dump_int("client_count", scheduler.client_count());
+ //out << scheduler;
+ f.dump_string("clients", out.str());
+ f.close_section();
+
+ // Display sorted queues (res, wgt, lim)
+ f.open_object_section("mClockQueues");
+ f.dump_string("queues", display_queues());
+ f.close_section();
+
+ f.open_object_section("HighPriorityQueue");
+ for (auto it = high_priority.begin();
+ it != high_priority.end(); it++) {
+ f.dump_int("priority", it->first);
+ f.dump_int("queue_size", it->second.size());
+ }
+ f.close_section();
}
void mClockScheduler::enqueue(item_t&& item)
{
auto id = get_scheduler_id(item);
- auto cost = item.params.cost;
+ unsigned priority = item.get_priority();
+ // TODO: move this check into item, handle backwards compat
if (scheduler_class_t::immediate == item.params.klass) {
- immediate.push_front(std::move(item));
+ enqueue_high(immediate_class_priority, std::move(item));
+ } else if (priority >= cutoff_priority) {
+ enqueue_high(priority, std::move(item));
} else {
+ auto cost = calc_scaled_cost(item.get_cost());
+ dout(20) << __func__ << " " << id
+ << " item_cost: " << item.get_cost()
+ << " scaled_cost: " << cost
+ << dendl;
+
+ // Add item to scheduler queue
scheduler.add_request(
std::move(item),
id,
cost);
+ //_get_mclock_counter(id);
}
+
+ dout(10) << __func__ << " client_count: " << scheduler.client_count()
+ << " queue_sizes: [ "
+ << " high_priority_queue: " << high_priority.size()
+ << " sched: " << scheduler.request_count() << " ]"
+ << dendl;
+ dout(30) << __func__ << " mClockClients: "
+ << dendl;
+ dout(10) << __func__ << " mClockQueues: { "
+ << display_queues() << " }"
+ << dendl;
}
void mClockScheduler::enqueue_front(item_t&& item)
{
- immediate.push_back(std::move(item));
- // TODO: item may not be immediate, update mclock machinery to permit
- // putting the item back in the queue
+ unsigned priority = item.get_priority();
+
+ if (scheduler_class_t::immediate == item.params.klass) {
+ enqueue_high(immediate_class_priority, std::move(item), true);
+ } else if (priority >= cutoff_priority) {
+ enqueue_high(priority, std::move(item), true);
+ } else {
+ // mClock does not support enqueue at front, so we use
+ // the high queue with priority 0
+ enqueue_high(0, std::move(item), true);
+ }
+}
+
+void mClockScheduler::enqueue_high(unsigned priority,
+ item_t&& item,
+ bool front)
+{
+ if (front) {
+ high_priority[priority].push_back(std::move(item));
+ } else {
+ high_priority[priority].push_front(std::move(item));
+ }
}
-item_t mClockScheduler::dequeue()
+WorkItem mClockScheduler::dequeue()
{
- if (!immediate.empty()) {
- auto ret = std::move(immediate.back());
- immediate.pop_back();
+ if (!high_priority.empty()) {
+ auto iter = high_priority.begin();
+ // invariant: high_priority entries are never empty
+ assert(!iter->second.empty());
+ WorkItem ret{std::move(iter->second.back())};
+ iter->second.pop_back();
+ if (iter->second.empty()) {
+ // maintain invariant, high priority entries are never empty
+ high_priority.erase(iter);
+ }
+
return ret;
} else {
mclock_queue_t::PullReq result = scheduler.pull_request();
if (result.is_future()) {
- ceph_assert(
- 0 == "Not implemented, user would have to be able to be woken up");
- return std::move(*(item_t*)nullptr);
+ return result.getTime();
} else if (result.is_none()) {
ceph_assert(
0 == "Impossible, must have checked empty() first");
}
}
+std::string mClockScheduler::display_queues() const
+{
+ std::ostringstream out;
+ scheduler.display_queues(out);
+ return out.str();
+}
+
+/*const char** mClockScheduler::get_tracked_conf_keys() const
+{
+ static const char* KEYS[] = {
+ "osd_mclock_scheduler_client_res",
+ "osd_mclock_scheduler_client_wgt",
+ "osd_mclock_scheduler_client_lim",
+ "osd_mclock_scheduler_background_recovery_res",
+ "osd_mclock_scheduler_background_recovery_wgt",
+ "osd_mclock_scheduler_background_recovery_lim",
+ "osd_mclock_scheduler_background_best_effort_res",
+ "osd_mclock_scheduler_background_best_effort_wgt",
+ "osd_mclock_scheduler_background_best_effort_lim",
+ "osd_mclock_max_capacity_iops_hdd",
+ "osd_mclock_max_capacity_iops_ssd",
+ "osd_mclock_max_sequential_bandwidth_hdd",
+ "osd_mclock_max_sequential_bandwidth_ssd",
+ "osd_mclock_profile",
+ NULL
+ };
+ return KEYS;
+}*/
+
std::vector<std::string> mClockScheduler::get_tracked_keys() const noexcept
{
return {
- "osd_mclock_scheduler_client_res"s,
- "osd_mclock_scheduler_client_wgt"s,
- "osd_mclock_scheduler_client_lim"s,
- "osd_mclock_scheduler_background_recovery_res"s,
- "osd_mclock_scheduler_background_recovery_wgt"s,
- "osd_mclock_scheduler_background_recovery_lim"s,
- "osd_mclock_scheduler_background_best_effort_res"s,
- "osd_mclock_scheduler_background_best_effort_wgt"s,
- "osd_mclock_scheduler_background_best_effort_lim"s
- };
+ "osd_mclock_scheduler_client_res",
+ "osd_mclock_scheduler_client_wgt",
+ "osd_mclock_scheduler_client_lim",
+ "osd_mclock_scheduler_background_recovery_res",
+ "osd_mclock_scheduler_background_recovery_wgt",
+ "osd_mclock_scheduler_background_recovery_lim",
+ "osd_mclock_scheduler_background_best_effort_res",
+ "osd_mclock_scheduler_background_best_effort_wgt",
+ "osd_mclock_scheduler_background_best_effort_lim",
+ "osd_mclock_max_capacity_iops_hdd",
+ "osd_mclock_max_capacity_iops_ssd",
+ "osd_mclock_max_sequential_bandwidth_hdd",
+ "osd_mclock_max_sequential_bandwidth_ssd",
+ "osd_mclock_profile"
+ };
}
-
void mClockScheduler::handle_conf_change(
const ConfigProxy& conf,
const std::set<std::string> &changed)
{
- client_registry.update_from_config(conf);
+ if (changed.count("osd_mclock_max_capacity_iops_hdd") ||
+ changed.count("osd_mclock_max_capacity_iops_ssd")) {
+ set_osd_capacity_params_from_config();
+ client_registry.update_from_config(
+ conf, osd_bandwidth_capacity_per_shard);
+ }
+ if (changed.count("osd_mclock_max_sequential_bandwidth_hdd") ||
+ changed.count("osd_mclock_max_sequential_bandwidth_ssd")) {
+ set_osd_capacity_params_from_config();
+ client_registry.update_from_config(
+ conf, osd_bandwidth_capacity_per_shard);
+ }
+ if (changed.count("osd_mclock_profile")) {
+ set_config_defaults_from_profile();
+ client_registry.update_from_config(
+ conf, osd_bandwidth_capacity_per_shard);
+ }
+
+ auto get_changed_key = [&changed]() -> std::optional<std::string> {
+ static const std::vector<std::string> qos_params = {
+ "osd_mclock_scheduler_client_res",
+ "osd_mclock_scheduler_client_wgt",
+ "osd_mclock_scheduler_client_lim",
+ "osd_mclock_scheduler_background_recovery_res",
+ "osd_mclock_scheduler_background_recovery_wgt",
+ "osd_mclock_scheduler_background_recovery_lim",
+ "osd_mclock_scheduler_background_best_effort_res",
+ "osd_mclock_scheduler_background_best_effort_wgt",
+ "osd_mclock_scheduler_background_best_effort_lim"
+ };
+
+ for (auto &qp : qos_params) {
+ if (changed.count(qp)) {
+ return qp;
+ }
+ }
+ return std::nullopt;
+ };
+
+ if (auto key = get_changed_key(); key.has_value()) {
+ auto mclock_profile = cct->_conf.get_val<std::string>("osd_mclock_profile");
+ if (mclock_profile == "custom") {
+ client_registry.update_from_config(
+ conf, osd_bandwidth_capacity_per_shard);
+ }
+ }
+}
+
+mClockScheduler::~mClockScheduler()
+{
+ cct->_conf.remove_observer(this);
}
}
#pragma once
+#include <functional>
#include <ostream>
#include <map>
#include <vector>
#include "boost/variant.hpp"
#include "dmclock/src/dmclock_server.h"
-
#include "crimson/osd/scheduler/scheduler.h"
+#include "crimson/mon/MonClient.h"
+
#include "common/config.h"
#include "common/ceph_context.h"
namespace crimson::osd::scheduler {
-using client_id_t = uint64_t;
-using profile_id_t = uint64_t;
+constexpr double default_min = 0.0;
+constexpr double default_max = std::numeric_limits<double>::is_iec559 ?
+ std::numeric_limits<double>::infinity() :
+ std::numeric_limits<double>::max();
+/**
+ * client_profile_id_t
+ *
+ * client_id - global id (client.####) for client QoS
+ * profile_id - id generated by client's QoS profile
+ *
+ * Currently (Reef and below), both members are set to
+ * 0 which ensures that all external clients share the
+ * mClock profile allocated reservation and limit
+ * bandwidth.
+ *
+ * Note: Post Reef, both members will be set to non-zero
+ * values when the distributed feature of the mClock
+ * algorithm is utilized.
+ */
struct client_profile_id_t {
- client_id_t client_id;
- profile_id_t profile_id;
+ uint64_t client_id = 0;
+ uint64_t profile_id = 0;
+
+ client_profile_id_t(uint64_t _client_id, uint64_t _profile_id) :
+ client_id(_client_id),
+ profile_id(_profile_id) {}
+
+ client_profile_id_t() = default;
+
auto operator<=>(const client_profile_id_t&) const = default;
+ friend std::ostream& operator<<(std::ostream& out,
+ const client_profile_id_t& client_profile) {
+ out << " client_id: " << client_profile.client_id
+ << " profile_id: " << client_profile.profile_id;
+ return out;
+ }
};
-
struct scheduler_id_t {
scheduler_class_t class_id;
client_profile_id_t client_profile_id;
+
auto operator<=>(const scheduler_id_t&) const = default;
+ friend std::ostream& operator<<(std::ostream& out,
+ const scheduler_id_t& sched_id) {
+ out << "{ class_id: " << sched_id.class_id
+ << sched_id.client_profile_id;
+ return out << " }";
+ }
};
/**
*/
class mClockScheduler : public Scheduler, md_config_obs_t {
+ crimson::common::CephContext *cct;
+ const int whoami;
+ const uint32_t num_shards;
+ const int shard_id;
+ const bool is_rotational;
+ unsigned cutoff_priority;
+ PerfCounters *logger;
+
+ /**
+ * osd_bandwidth_cost_per_io
+ *
+ * mClock expects all queued items to have a uniform expression of
+ * "cost". However, IO devices generally have quite different capacity
+ * for sequential IO vs small random IO. This implementation handles this
+ * by expressing all costs as a number of sequential bytes written adding
+ * additional cost for each random IO equal to osd_bandwidth_cost_per_io.
+ *
+ * Thus, an IO operation requiring a total of <size> bytes to be written
+ * accross <iops> different locations will have a cost of
+ * <size> + (osd_bandwidth_cost_per_io * <iops>) bytes.
+ *
+ * Set in set_osd_capacity_params_from_config in the constructor and upon
+ * config change.
+ *
+ * Has units bytes/io.
+ */
+ double osd_bandwidth_cost_per_io;
+
+ /**
+ * osd_bandwidth_capacity_per_shard
+ *
+ * mClock expects reservation and limit paramters to be expressed in units
+ * of cost/second -- which means bytes/second for this implementation.
+ *
+ * Rather than expecting users to compute appropriate limit and reservation
+ * values for each class of OSDs in their cluster, we instead express
+ * reservation and limit paramaters as ratios of the OSD's maxmimum capacity.
+ * osd_bandwidth_capacity_per_shard is that capacity divided by the number
+ * of shards.
+ *
+ * Set in set_osd_capacity_params_from_config in the constructor and upon
+ * config change.
+ *
+ * This value gets passed to ClientRegistry::update_from_config in order
+ * to resolve the full reservaiton and limit parameters for mclock from
+ * the configured ratios.
+ *
+ * Has units bytes/second.
+ */
+ double osd_bandwidth_capacity_per_shard;
class ClientRegistry {
std::array<
crimson::dmclock::ClientInfo,
const crimson::dmclock::ClientInfo *get_external_client(
const client_profile_id_t &client) const;
public:
- void update_from_config(const ConfigProxy &conf);
+ /**
+ * update_from_config
+ *
+ * Sets the mclock paramaters (reservation, weight, and limit)
+ * for each class of IO (background_recovery, background_best_effort,
+ * and client).
+ */
+ void update_from_config(
+ const ConfigProxy &conf,
+ double capacity_per_shard);
const crimson::dmclock::ClientInfo *get_info(
const scheduler_id_t &id) const;
} client_registry;
true,
true,
2>;
+ using priority_t = unsigned;
+ using SubQueue = std::map<priority_t,
+ std::list<item_t>,
+ std::greater<priority_t>>;
mclock_queue_t scheduler;
- std::list<item_t> immediate;
+ /**
+ * high_priority
+ *
+ * Holds entries to be dequeued in strict order ahead of mClock
+ * Invariant: entries are never empty
+ */
+ SubQueue high_priority;
+ priority_t immediate_class_priority = std::numeric_limits<priority_t>::max();
static scheduler_id_t get_scheduler_id(const item_t &item) {
return scheduler_id_t{
item.params.klass,
- client_profile_id_t{
- item.params.owner,
- 0
- }
+ client_profile_id_t{
+ item.params.owner,
+ 0
+ }
};
}
+ /**
+ * set_osd_capacity_params_from_config
+ *
+ * mClockScheduler uses two parameters, osd_bandwidth_cost_per_io
+ * and osd_bandwidth_capacity_per_shard, internally. These two
+ * parameters are derived from config parameters
+ * osd_mclock_max_capacity_iops_(hdd|ssd) and
+ * osd_mclock_max_sequential_bandwidth_(hdd|ssd) as well as num_shards.
+ * Invoking set_osd_capacity_params_from_config() resets those derived
+ * params based on the current config and should be invoked any time they
+ * are modified as well as in the constructor. See handle_conf_change().
+ */
+ void set_osd_capacity_params_from_config();
+
+ // Set the mclock related config params based on the profile
+ void set_config_defaults_from_profile();
+
public:
- mClockScheduler(ConfigProxy &conf);
+ mClockScheduler(CephContext *cct, int whoami, uint32_t num_shards,
+ int shard_id, bool is_rotational, bool init_perfcounter=true);
+ ~mClockScheduler() override;
+
+ /// Calculate scaled cost per item
+ uint32_t calc_scaled_cost(int cost);
+
+ // Helper method to display mclock queues
+ std::string display_queues() const;
// Enqueue op in the back of the regular queue
void enqueue(item_t &&item) final;
- // Enqueue the op in the front of the regular queue
+ // Enqueue the op in the front of the high priority queue
void enqueue_front(item_t &&item) final;
// Return an op to be dispatch
- item_t dequeue() final;
+ WorkItem dequeue() final;
// Returns if the queue is empty
bool empty() const final {
- return immediate.empty() && scheduler.empty();
+ return scheduler.empty() && high_priority.empty();
}
// Formatted output of the queue
void dump(ceph::Formatter &f) const final;
void print(std::ostream &ostream) const final {
- ostream << "mClockScheduler";
+ ostream << "mClockScheduer ";
+ ostream << ", cutoff=" << cutoff_priority;
}
std::vector<std::string> get_tracked_keys() const noexcept final;
void handle_conf_change(const ConfigProxy& conf,
const std::set<std::string> &changed) final;
+
+ double get_cost_per_io() const {
+ return osd_bandwidth_cost_per_io;
+ }
+private:
+ // Enqueue the op to the high priority queue
+ void enqueue_high(unsigned prio, item_t &&item, bool front = false);
};
}
if (conf.get_val<std::string>("osd_op_queue_cut_off") == "debug_random") {
srand(time(NULL));
return (rand() % 2 < 1) ?
- scheduler_class_t::repop : scheduler_class_t::immediate;
+ scheduler_class_t::repop : scheduler_class_t::immediate;
} else if (conf.get_val<std::string>("osd_op_queue_cut_off") == "high") {
return scheduler_class_t::immediate;
} else {
return queue.empty();
}
- item_t dequeue() final {
+ WorkItem dequeue() final {
return queue.dequeue();
}
~ClassedOpQueueScheduler() final {};
};
-SchedulerRef make_scheduler(ConfigProxy &conf)
+SchedulerRef make_scheduler(CephContext *cct, ConfigProxy &conf, int whoami, uint32_t nshards, int sid,
+ bool is_rotational, bool perf_cnt)
{
const std::string _type = conf.get_val<std::string>("osd_op_queue");
const std::string *type = &_type;
conf->osd_op_pq_min_cost
);
} else if (*type == "mclock_scheduler") {
- return std::make_unique<mClockScheduler>(conf);
+ return std::make_unique<mClockScheduler>(cct, whoami, nshards, sid, is_rotational, perf_cnt);
} else {
ceph_assert("Invalid choice of wq" == 0);
return std::unique_ptr<mClockScheduler>();
namespace crimson::osd::scheduler {
enum class scheduler_class_t : uint8_t {
- background_best_effort = 0,
- background_recovery,
+ background_recovery = 0,
+ background_best_effort,
client,
repop,
immediate,
std::ostream &operator<<(std::ostream &, const scheduler_class_t &);
using client_t = uint64_t;
-using cost_t = uint64_t;
struct params_t {
- cost_t cost = 1;
+ int cost = 1;
+ unsigned priority = 0;
client_t owner;
scheduler_class_t klass;
};
struct item_t {
params_t params;
seastar::promise<> wake;
+ int get_cost() const { return params.cost; }
+ unsigned get_priority() const { return params.priority; }
};
+using WorkItem = std::variant<std::monostate, item_t, double>;
/**
* Base interface for classes responsible for choosing
* op processing order in the OSD.
virtual bool empty() const = 0;
// Return next op to be processed
- virtual item_t dequeue() = 0;
+ virtual WorkItem dequeue() = 0;
// Dump formatted representation for the queue
virtual void dump(ceph::Formatter &f) const = 0;
std::ostream &operator<<(std::ostream &lhs, const Scheduler &);
using SchedulerRef = std::unique_ptr<Scheduler>;
-SchedulerRef make_scheduler(ConfigProxy &);
+SchedulerRef make_scheduler(CephContext *cct, ConfigProxy &, int whoami, uint32_t num_shards,
+ int shard_id, bool is_rotational, bool perf_cnt);
}
PerfCounters *recoverystate_perf,
crimson::os::FuturizedStore &store,
OSDState& osd_state);
+
+ void initialize_scheduler(CephContext* cct, bool is_rotational) {
+ throttler.initialize_scheduler(cct, crimson::common::local_conf(), is_rotational, whoami);
+ }
};
/**
}
FORWARD_TO_OSD_SINGLETON(get_pool_info)
- FORWARD(with_throttle_while, with_throttle_while, local_state.throttler)
- FORWARD(try_acquire_throttle_now, try_acquire_throttle_now, local_state.throttler)
+ FORWARD(with_throttle, with_throttle, local_state.throttler)
FORWARD_TO_OSD_SINGLETON(build_incremental_map_msg)
FORWARD_TO_OSD_SINGLETON(send_incremental_map)
snap_dump_reservations,
snap_reserver.dump)
+ bool throttle_available() const {
+ return local_state.throttler.available();
+ }
auto local_update_priority(
singleton_orderer_t &orderer,