Option("crimson_osd_obc_lru_size", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
.set_default(10)
- .set_description("Number of obcs to cache")
+ .set_description("Number of obcs to cache"),
+
+ Option("crimson_osd_scheduler_concurrency", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
+ .set_default(0)
+ .set_description("The maximum number concurrent IO operations, 0 for unlimited")
});
}
osd_operations/peering_event.cc
osd_operations/pg_advance_map.cc
osd_operations/replicated_request.cc
+ osd_operations/background_recovery.cc
+ scheduler/scheduler.cc
+ scheduler/mclock_scheduler.cc
osdmap_gate.cc
pg_map.cc
objclass.cc
watch.cc
)
target_link_libraries(crimson-osd
- crimson-common crimson-os crimson fmt::fmt)
+ crimson-common
+ crimson-os
+ crimson
+ fmt::fmt
+ dmclock::dmclock)
set_target_properties(crimson-osd PROPERTIES
POSITION_INDEPENDENT_CODE ${EXE_LINKER_USE_PIE})
install(TARGETS crimson-osd DESTINATION bin)
f->close_section();
}
+
+OperationThrottler::OperationThrottler(ConfigProxy &conf)
+ : scheduler(crimson::osd::scheduler::make_scheduler(conf))
+{
+ conf.add_observer(this);
+ update_from_config(conf);
+}
+
+void OperationThrottler::wake()
+{
+ while ((!max_in_progress || in_progress < max_in_progress) &&
+ !scheduler->empty()) {
+ auto item = scheduler->dequeue();
+ item.wake.set_value();
+ ++in_progress;
+ --pending;
+ }
+}
+
+void OperationThrottler::release_throttle()
+{
+ ceph_assert(in_progress > 0);
+ --in_progress;
+ wake();
+}
+
+blocking_future<> OperationThrottler::acquire_throttle(
+ crimson::osd::scheduler::params_t params)
+{
+ crimson::osd::scheduler::item_t item{params, seastar::promise<>()};
+ auto fut = item.wake.get_future();
+ scheduler->enqueue(std::move(item));
+ return make_blocking_future(std::move(fut));
+}
+
+void OperationThrottler::dump_detail(Formatter *f) const
+{
+ f->dump_unsigned("max_in_progress", max_in_progress);
+ f->dump_unsigned("in_progress", in_progress);
+ f->open_object_section("scheduler");
+ {
+ scheduler->dump(*f);
+ }
+ f->close_section();
+}
+
+void OperationThrottler::update_from_config(const ConfigProxy &conf)
+{
+ max_in_progress = conf.get_val<uint64_t>("crimson_osd_scheduler_concurrency");
+ wake();
+}
+
+const char** OperationThrottler::get_tracked_conf_keys() const
+{
+ static const char* KEYS[] = {
+ "crimson_osd_scheduler_concurrency",
+ NULL
+ };
+ return KEYS;
+}
+
+void OperationThrottler::handle_conf_change(
+ const ConfigProxy& conf,
+ const std::set<std::string> &changed)
+{
+ update_from_config(conf);
+}
+
+
void OrderedPipelinePhase::Handle::exit()
{
if (phase) {
#include <seastar/core/future.hh>
#include "include/ceph_assert.h"
+#include "crimson/osd/scheduler/scheduler.h"
namespace ceph {
class Formatter;
enum class OperationTypeCode {
client_request = 0,
- peering_event = 1,
- compound_peering_request = 2,
- pg_advance_map = 3,
- pg_creation = 4,
- replicated_request = 5,
- last_op = 6
+ peering_event,
+ compound_peering_request,
+ pg_advance_map,
+ pg_creation,
+ replicated_request,
+ background_recovery,
+ last_op
};
static constexpr const char* const OP_NAMES[] = {
"pg_advance_map",
"pg_creation",
"replicated_request",
+ "background_recovery",
};
// prevent the addition of OperationTypeCode-s with no matching OP_NAMES entry:
}
};
+/**
+ * Throttles set of currently running operations
+ *
+ * Very primitive currently, assumes all ops are equally
+ * expensive and simply limits the number that can be
+ * concurrently active.
+ */
+class OperationThrottler : public Blocker, md_config_obs_t {
+public:
+ OperationThrottler(ConfigProxy &conf);
+
+ const char** get_tracked_conf_keys() const final;
+ void handle_conf_change(const ConfigProxy& conf,
+ const std::set<std::string> &changed) final;
+ void update_from_config(const ConfigProxy &conf);
+
+ template <typename F>
+ auto with_throttle(
+ OperationRef op,
+ crimson::osd::scheduler::params_t params,
+ F &&f) {
+ if (!max_in_progress) return f();
+ auto fut = acquire_throttle(params);
+ return op->with_blocking_future(std::move(fut))
+ .then(std::forward<F>(f))
+ .then([this](auto x) {
+ release_throttle();
+ return x;
+ });
+ }
+
+ template <typename F>
+ seastar::future<> with_throttle_while(
+ OperationRef op,
+ crimson::osd::scheduler::params_t params,
+ F &&f) {
+ return with_throttle(op, params, f).then([this, params, op, f](bool cont) {
+ if (cont)
+ return with_throttle_while(op, params, f);
+ else
+ return seastar::now();
+ });
+ }
+protected:
+ void dump_detail(Formatter *f) const final;
+ const char *get_type_name() const final {
+ return "OperationThrottler";
+ }
+private:
+ crimson::osd::scheduler::SchedulerRef scheduler;
+
+ uint64_t max_in_progress = 0;
+ uint64_t in_progress = 0;
+
+ uint64_t pending = 0;
+
+ void wake();
+
+ blocking_future<> acquire_throttle(
+ crimson::osd::scheduler::params_t params);
+
+ void release_throttle();
+};
+
/**
* Ensures that at most one op may consider itself in the phase at a time.
* Ops will see enter() unblock in the order in which they tried to enter
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <seastar/core/future.hh>
+
+#include "messages/MOSDOp.h"
+
+#include "crimson/osd/pg.h"
+#include "crimson/osd/shard_services.h"
+#include "common/Formatter.h"
+#include "crimson/osd/osd_operations/background_recovery.h"
+
+namespace {
+ seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_osd);
+ }
+}
+
+namespace crimson::osd {
+
+BackgroundRecovery::BackgroundRecovery(
+ ShardServices &ss,
+ PG &pg,
+ : ss(ss), pg(pg), scheduler_class(scheduler_class)
+ crimson::osd::scheduler::scheduler_class_t scheduler_class)
+{}
+
+seastar::future<bool> BackgroundRecovery::do_recovery()
+{
+ return seastar::make_ready_future<bool>(false);
+}
+
+void BackgroundRecovery::print(std::ostream &lhs) const
+{
+ lhs << "BackgroundRecovery(" << pg.get_pgid() << ")";
+}
+
+void BackgroundRecovery::dump_detail(Formatter *f) const
+{
+ f->dump_stream("pgid") << pg.get_pgid();
+}
+
+seastar::future<> BackgroundRecovery::start()
+{
+ logger().debug("{}: start", *this);
+
+ IRef ref = this;
+ return ss.throttler.with_throttle_while(
+ this, get_scheduler_params(), [ref, this] {
+ return do_recovery();
+ });
+}
+
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "crimson/net/Connection.h"
+#include "crimson/osd/osd_operation.h"
+#include "crimson/common/type_helpers.h"
+
+class MOSDOp;
+
+namespace crimson::osd {
+class PG;
+class ShardServices;
+
+class BackgroundRecovery final : public OperationT<BackgroundRecovery> {
+public:
+ static constexpr OperationTypeCode type = OperationTypeCode::background_recovery;
+
+ BackgroundRecovery(
+ ShardServices &ss,
+ PG &pg,
+ crimson::osd::scheduler::scheduler_class_t scheduler_class);
+
+ void print(std::ostream &) const final;
+ void dump_detail(Formatter *f) const final;
+ seastar::future<> start();
+private:
+ ShardServices &ss;
+ PG &pg;
+ crimson::osd::scheduler::scheduler_class_t scheduler_class;
+
+ auto get_scheduler_params() const {
+ return ceph::osd::scheduler::params_t{
+ 1, // cost
+ 0, // owner
+ scheduler_class
+ };
+ }
+
+ seastar::future<bool> do_recovery();
+};
+
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 Red Hat Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#include <memory>
+#include <functional>
+
+#include "crimson/osd/scheduler/mclock_scheduler.h"
+#include "common/dout.h"
+
+namespace dmc = crimson::dmclock;
+using namespace std::placeholders;
+
+#define dout_context cct
+#define dout_subsys ceph_subsys_osd
+#undef dout_prefix
+#define dout_prefix *_dout
+
+
+namespace crimson::osd::scheduler {
+
+mClockScheduler::mClockScheduler(ConfigProxy &conf) :
+ scheduler(
+ std::bind(&mClockScheduler::ClientRegistry::get_info,
+ &client_registry,
+ _1),
+ dmc::AtLimit::Allow,
+ conf.get_val<double>("osd_mclock_scheduler_anticipation_timeout"))
+{
+ conf.add_observer(this);
+ client_registry.update_from_config(conf);
+}
+
+void mClockScheduler::ClientRegistry::update_from_config(const ConfigProxy &conf)
+{
+ default_external_client_info.update(
+ conf.get_val<uint64_t>("osd_mclock_scheduler_client_res"),
+ conf.get_val<uint64_t>("osd_mclock_scheduler_client_wgt"),
+ conf.get_val<uint64_t>("osd_mclock_scheduler_client_lim"));
+
+ internal_client_infos[
+ static_cast<size_t>(scheduler_class_t::background_recovery)].update(
+ conf.get_val<uint64_t>("osd_mclock_scheduler_background_recovery_res"),
+ conf.get_val<uint64_t>("osd_mclock_scheduler_background_recovery_wgt"),
+ conf.get_val<uint64_t>("osd_mclock_scheduler_background_recovery_lim"));
+
+ internal_client_infos[
+ static_cast<size_t>(scheduler_class_t::background_best_effort)].update(
+ conf.get_val<uint64_t>("osd_mclock_scheduler_background_best_effort_res"),
+ conf.get_val<uint64_t>("osd_mclock_scheduler_background_best_effort_wgt"),
+ conf.get_val<uint64_t>("osd_mclock_scheduler_background_best_effort_lim"));
+}
+
+const dmc::ClientInfo *mClockScheduler::ClientRegistry::get_external_client(
+ const client_profile_id_t &client) const
+{
+ auto ret = external_client_infos.find(client);
+ if (ret == external_client_infos.end())
+ return &default_external_client_info;
+ else
+ return &(ret->second);
+}
+
+const dmc::ClientInfo *mClockScheduler::ClientRegistry::get_info(
+ const scheduler_id_t &id) const {
+ switch (id.class_id) {
+ case scheduler_class_t::immediate:
+ ceph_assert(0 == "Cannot schedule immediate");
+ return (dmc::ClientInfo*)nullptr;
+ case scheduler_class_t::repop:
+ case scheduler_class_t::client:
+ return get_external_client(id.client_profile_id);
+ default:
+ ceph_assert(static_cast<size_t>(id.class_id) < internal_client_infos.size());
+ return &internal_client_infos[static_cast<size_t>(id.class_id)];
+ }
+}
+
+void mClockScheduler::dump(ceph::Formatter &f) const
+{
+}
+
+void mClockScheduler::enqueue(item_t&& item)
+{
+ auto id = get_scheduler_id(item);
+ auto cost = item.params.cost;
+
+ if (scheduler_class_t::immediate == item.params.klass) {
+ immediate.push_front(std::move(item));
+ } else {
+ scheduler.add_request(
+ std::move(item),
+ id,
+ cost);
+ }
+}
+
+void mClockScheduler::enqueue_front(item_t&& item)
+{
+ immediate.push_back(std::move(item));
+ // TODO: item may not be immediate, update mclock machinery to permit
+ // putting the item back in the queue
+}
+
+item_t mClockScheduler::dequeue()
+{
+ if (!immediate.empty()) {
+ auto ret = std::move(immediate.back());
+ immediate.pop_back();
+ return ret;
+ } else {
+ mclock_queue_t::PullReq result = scheduler.pull_request();
+ if (result.is_future()) {
+ ceph_assert(
+ 0 == "Not implemented, user would have to be able to be woken up");
+ return std::move(*(item_t*)nullptr);
+ } else if (result.is_none()) {
+ ceph_assert(
+ 0 == "Impossible, must have checked empty() first");
+ return std::move(*(item_t*)nullptr);
+ } else {
+ ceph_assert(result.is_retn());
+
+ auto &retn = result.get_retn();
+ return std::move(*retn.request);
+ }
+ }
+}
+
+const char** mClockScheduler::get_tracked_conf_keys() const
+{
+ static const char* KEYS[] = {
+ "osd_mclock_scheduler_client_res",
+ "osd_mclock_scheduler_client_wgt",
+ "osd_mclock_scheduler_client_lim",
+ "osd_mclock_scheduler_background_recovery_res",
+ "osd_mclock_scheduler_background_recovery_wgt",
+ "osd_mclock_scheduler_background_recovery_lim",
+ "osd_mclock_scheduler_background_best_effort_res",
+ "osd_mclock_scheduler_background_best_effort_wgt",
+ "osd_mclock_scheduler_background_best_effort_lim",
+ NULL
+ };
+ return KEYS;
+}
+
+void mClockScheduler::handle_conf_change(
+ const ConfigProxy& conf,
+ const std::set<std::string> &changed)
+{
+ client_registry.update_from_config(conf);
+}
+
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 Red Hat Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#pragma once
+
+#include <ostream>
+#include <map>
+#include <vector>
+
+#include "boost/variant.hpp"
+
+#include "dmclock/src/dmclock_server.h"
+
+#include "crimson/osd/scheduler/scheduler.h"
+#include "common/config.h"
+#include "include/cmp.h"
+#include "common/ceph_context.h"
+
+
+namespace crimson::osd::scheduler {
+
+using client_id_t = uint64_t;
+using profile_id_t = uint64_t;
+
+struct client_profile_id_t {
+ client_id_t client_id;
+ profile_id_t profile_id;
+};
+
+WRITE_EQ_OPERATORS_2(client_profile_id_t, client_id, profile_id)
+WRITE_CMP_OPERATORS_2(client_profile_id_t, client_id, profile_id)
+
+
+struct scheduler_id_t {
+ scheduler_class_t class_id;
+ client_profile_id_t client_profile_id;
+};
+
+WRITE_EQ_OPERATORS_2(scheduler_id_t, class_id, client_profile_id)
+WRITE_CMP_OPERATORS_2(scheduler_id_t, class_id, client_profile_id)
+
+/**
+ * Scheduler implementation based on mclock.
+ *
+ * TODO: explain configs
+ */
+class mClockScheduler : public Scheduler, md_config_obs_t {
+
+ class ClientRegistry {
+ std::array<
+ crimson::dmclock::ClientInfo,
+ static_cast<size_t>(scheduler_class_t::client)
+ > internal_client_infos = {
+ // Placeholder, gets replaced with configured values
+ crimson::dmclock::ClientInfo(1, 1, 1),
+ crimson::dmclock::ClientInfo(1, 1, 1)
+ };
+
+ crimson::dmclock::ClientInfo default_external_client_info = {1, 1, 1};
+ std::map<client_profile_id_t,
+ crimson::dmclock::ClientInfo> external_client_infos;
+ const crimson::dmclock::ClientInfo *get_external_client(
+ const client_profile_id_t &client) const;
+ public:
+ void update_from_config(const ConfigProxy &conf);
+ const crimson::dmclock::ClientInfo *get_info(
+ const scheduler_id_t &id) const;
+ } client_registry;
+
+ using mclock_queue_t = crimson::dmclock::PullPriorityQueue<
+ scheduler_id_t,
+ item_t,
+ true,
+ true,
+ 2>;
+ mclock_queue_t scheduler;
+ std::list<item_t> immediate;
+
+ static scheduler_id_t get_scheduler_id(const item_t &item) {
+ return scheduler_id_t{
+ item.params.klass,
+ client_profile_id_t{
+ item.params.owner,
+ 0
+ }
+ };
+ }
+
+public:
+ mClockScheduler(ConfigProxy &conf);
+
+ // Enqueue op in the back of the regular queue
+ void enqueue(item_t &&item) final;
+
+ // Enqueue the op in the front of the regular queue
+ void enqueue_front(item_t &&item) final;
+
+ // Return an op to be dispatch
+ item_t dequeue() final;
+
+ // Returns if the queue is empty
+ bool empty() const final {
+ return immediate.empty() && scheduler.empty();
+ }
+
+ // Formatted output of the queue
+ void dump(ceph::Formatter &f) const final;
+
+ void print(std::ostream &ostream) const final {
+ ostream << "mClockScheduler";
+ }
+
+ const char** get_tracked_conf_keys() const final;
+ void handle_conf_change(const ConfigProxy& conf,
+ const std::set<std::string> &changed) final;
+};
+
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2019 Red Hat Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <ostream>
+
+#include <seastar/core/print.hh>
+
+#include "crimson/osd/scheduler/scheduler.h"
+#include "crimson/osd/scheduler/mclock_scheduler.h"
+#include "common/WeightedPriorityQueue.h"
+
+namespace crimson::osd::scheduler {
+
+std::ostream &operator<<(std::ostream &lhs, const scheduler_class_t &c)
+{
+ switch (c) {
+ case scheduler_class_t::background_best_effort:
+ return lhs << "background_best_effort";
+ case scheduler_class_t::background_recovery:
+ return lhs << "background_recovery";
+ case scheduler_class_t::client:
+ return lhs << "client";
+ case scheduler_class_t::repop:
+ return lhs << "repop";
+ case scheduler_class_t::immediate:
+ return lhs << "immediate";
+ default:
+ return lhs;
+ }
+}
+
+/**
+ * Implements Scheduler in terms of OpQueue
+ *
+ * Templated on queue type to avoid dynamic dispatch, T should implement
+ * OpQueue<Scheduleritem_t, client_t>. This adapter is mainly responsible for
+ * the boilerplate priority cutoff/strict concept which is needed for
+ * OpQueue based implementations.
+ */
+template <typename T>
+class ClassedOpQueueScheduler : public Scheduler {
+ const scheduler_class_t cutoff;
+ T queue;
+
+ using priority_t = uint64_t;
+ std::array<
+ priority_t,
+ static_cast<size_t>(scheduler_class_t::immediate)
+ > priority_map = {
+ // Placeholder, gets replaced with configured values
+ 0, 0, 0
+ };
+
+ static scheduler_class_t get_io_prio_cut(ConfigProxy &conf) {
+ if (conf.get_val<std::string>("osd_op_queue_cut_off") == "debug_random") {
+ srand(time(NULL));
+ return (rand() % 2 < 1) ?
+ scheduler_class_t::repop : scheduler_class_t::immediate;
+ } else if (conf.get_val<std::string>("osd_op_queue_cut_off") == "high") {
+ return scheduler_class_t::immediate;
+ } else {
+ return scheduler_class_t::repop;
+ }
+ }
+
+ bool use_strict(scheduler_class_t kl) const {
+ return static_cast<uint8_t>(kl) >= static_cast<uint8_t>(cutoff);
+ }
+
+ priority_t get_priority(scheduler_class_t kl) const {
+ ceph_assert(static_cast<size_t>(kl) <
+ static_cast<size_t>(scheduler_class_t::immediate));
+ return priority_map[static_cast<size_t>(kl)];
+ }
+
+public:
+ template <typename... Args>
+ ClassedOpQueueScheduler(ConfigProxy &conf, Args&&... args) :
+ cutoff(get_io_prio_cut(conf)),
+ queue(std::forward<Args>(args)...)
+ {
+ priority_map[
+ static_cast<size_t>(scheduler_class_t::background_best_effort)
+ ] = conf.get_val<uint64_t>("osd_scrub_priority");
+ priority_map[
+ static_cast<size_t>(scheduler_class_t::background_recovery)
+ ] = conf.get_val<uint64_t>("osd_recovery_op_priority");
+ priority_map[
+ static_cast<size_t>(scheduler_class_t::client)
+ ] = conf.get_val<uint64_t>("osd_client_op_priority");
+ priority_map[
+ static_cast<size_t>(scheduler_class_t::repop)
+ ] = conf.get_val<uint64_t>("osd_client_op_priority");
+ }
+
+ void enqueue(item_t &&item) final {
+ if (use_strict(item.params.klass))
+ queue.enqueue_strict(
+ item.params.owner, get_priority(item.params.klass), std::move(item));
+ else
+ queue.enqueue(
+ item.params.owner, get_priority(item.params.klass),
+ item.params.cost, std::move(item));
+ }
+
+ void enqueue_front(item_t &&item) final {
+ if (use_strict(item.params.klass))
+ queue.enqueue_strict_front(
+ item.params.owner, get_priority(item.params.klass), std::move(item));
+ else
+ queue.enqueue_front(
+ item.params.owner, get_priority(item.params.klass),
+ item.params.cost, std::move(item));
+ }
+
+ bool empty() const final {
+ return queue.empty();
+ }
+
+ item_t dequeue() final {
+ return queue.dequeue();
+ }
+
+ void dump(ceph::Formatter &f) const final {
+ return queue.dump(&f);
+ }
+
+ void print(std::ostream &out) const final {
+ out << "ClassedOpQueueScheduler(queue=";
+ queue.print(out);
+ out << ", cutoff=" << cutoff << ")";
+ }
+
+ ~ClassedOpQueueScheduler() final {};
+};
+
+SchedulerRef make_scheduler(ConfigProxy &conf)
+{
+ const std::string _type = conf.get_val<std::string>("osd_op_queue");
+ const std::string *type = &_type;
+ if (*type == "debug_random") {
+ static const std::string index_lookup[] = { "mclock_scheduler",
+ "wpq" };
+ srand(time(NULL));
+ unsigned which = rand() % (sizeof(index_lookup) / sizeof(index_lookup[0]));
+ type = &index_lookup[which];
+ }
+
+ if (*type == "wpq" ) {
+ // default is 'wpq'
+ return std::make_unique<
+ ClassedOpQueueScheduler<WeightedPriorityQueue<item_t, client_t>>>(
+ conf,
+ conf.get_val<uint64_t>("osd_op_pq_max_tokens_per_priority"),
+ conf->osd_op_pq_min_cost
+ );
+ } else if (*type == "mclock_scheduler") {
+ return std::make_unique<mClockScheduler>(conf);
+ } else {
+ ceph_assert("Invalid choice of wq" == 0);
+ return std::unique_ptr<mClockScheduler>();
+ }
+}
+
+std::ostream &operator<<(std::ostream &lhs, const Scheduler &rhs) {
+ rhs.print(lhs);
+ return lhs;
+}
+
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2019 Red Hat Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <seastar/core/future.hh>
+#include <ostream>
+
+#include "crimson/common/config_proxy.h"
+
+namespace crimson::osd::scheduler {
+
+enum class scheduler_class_t : uint8_t {
+ background_best_effort = 0,
+ background_recovery,
+ client,
+ repop,
+ immediate,
+};
+
+std::ostream &operator<<(std::ostream &, const scheduler_class_t &);
+
+using client_t = uint64_t;
+using cost_t = uint64_t;
+
+struct params_t {
+ cost_t cost = 1;
+ client_t owner;
+ scheduler_class_t klass;
+};
+
+struct item_t {
+ params_t params;
+ seastar::promise<> wake;
+};
+
+/**
+ * Base interface for classes responsible for choosing
+ * op processing order in the OSD.
+ */
+class Scheduler {
+public:
+ // Enqueue op for scheduling
+ virtual void enqueue(item_t &&item) = 0;
+
+ // Enqueue op for processing as though it were enqueued prior
+ // to other items already scheduled.
+ virtual void enqueue_front(item_t &&item) = 0;
+
+ // Returns true iff there are no ops scheduled
+ virtual bool empty() const = 0;
+
+ // Return next op to be processed
+ virtual item_t dequeue() = 0;
+
+ // Dump formatted representation for the queue
+ virtual void dump(ceph::Formatter &f) const = 0;
+
+ // Print human readable brief description with relevant parameters
+ virtual void print(std::ostream &out) const = 0;
+
+ // Destructor
+ virtual ~Scheduler() {};
+};
+
+std::ostream &operator<<(std::ostream &lhs, const Scheduler &);
+using SchedulerRef = std::unique_ptr<Scheduler>;
+
+SchedulerRef make_scheduler(ConfigProxy &);
+
+}
monc(monc),
mgrc(mgrc),
store(store),
+ throttler(crimson::common::local_conf()),
obc_registry(crimson::common::local_conf()),
local_reserver(
&cct,
return osdmap_service;
}
- // Op Tracking
+ // Op Management
OperationRegistry registry;
+ OperationThrottler throttler;
template <typename T, typename... Args>
auto start_operation(Args&&... args) {