#include "crimson/osd/scheduler/mclock_scheduler.h"
#include "common/dout.h"
-namespace dmc = crimson::dmclock;
-using namespace std::placeholders;
-using namespace std::string_literals;
-
#define dout_context cct
-#define dout_subsys ceph_subsys_osd
+#define dout_subsys ceph_subsys_mclock
#undef dout_prefix
-#define dout_prefix *_dout
+#define dout_prefix *_dout << "mClockScheduler: "
namespace crimson::osd::scheduler {
-mClockScheduler::mClockScheduler(ConfigProxy &conf) :
- scheduler(
- std::bind(&mClockScheduler::ClientRegistry::get_info,
- &client_registry,
- _1),
- dmc::AtLimit::Allow,
- conf.get_val<double>("osd_mclock_scheduler_anticipation_timeout"))
-{
- conf.add_observer(this);
- client_registry.update_from_config(conf);
-}
-
-void mClockScheduler::ClientRegistry::update_from_config(const ConfigProxy &conf)
+uint32_t mClockScheduler::calc_scaled_cost(int item_cost)
{
- default_external_client_info.update(
- conf.get_val<double>("osd_mclock_scheduler_client_res"),
- conf.get_val<uint64_t>("osd_mclock_scheduler_client_wgt"),
- conf.get_val<double>("osd_mclock_scheduler_client_lim"));
-
- internal_client_infos[
- static_cast<size_t>(scheduler_class_t::background_recovery)].update(
- conf.get_val<double>("osd_mclock_scheduler_background_recovery_res"),
- conf.get_val<uint64_t>("osd_mclock_scheduler_background_recovery_wgt"),
- conf.get_val<double>("osd_mclock_scheduler_background_recovery_lim"));
-
- internal_client_infos[
- static_cast<size_t>(scheduler_class_t::background_best_effort)].update(
- conf.get_val<double>("osd_mclock_scheduler_background_best_effort_res"),
- conf.get_val<uint64_t>("osd_mclock_scheduler_background_best_effort_wgt"),
- conf.get_val<double>("osd_mclock_scheduler_background_best_effort_lim"));
-}
-
-const dmc::ClientInfo *mClockScheduler::ClientRegistry::get_external_client(
- const client_profile_id_t &client) const
-{
- auto ret = external_client_infos.find(client);
- if (ret == external_client_infos.end())
- return &default_external_client_info;
- else
- return &(ret->second);
-}
-
-const dmc::ClientInfo *mClockScheduler::ClientRegistry::get_info(
- const scheduler_id_t &id) const {
- switch (id.class_id) {
- case scheduler_class_t::immediate:
- ceph_assert(0 == "Cannot schedule immediate");
- return (dmc::ClientInfo*)nullptr;
- case scheduler_class_t::repop:
- case scheduler_class_t::client:
- return get_external_client(id.client_profile_id);
- default:
- ceph_assert(static_cast<size_t>(id.class_id) < internal_client_infos.size());
- return &internal_client_infos[static_cast<size_t>(id.class_id)];
- }
+ return mclock_conf.calc_scaled_cost(item_cost);
}
void mClockScheduler::dump(ceph::Formatter &f) const
{
+ // Display queue sizes
+ f.open_object_section("queue_sizes");
+ f.dump_int("high_priority_queue", high_priority.size());
+ f.dump_int("scheduler", scheduler.request_count());
+ f.close_section();
+
+ // client map and queue tops (res, wgt, lim)
+ std::ostringstream out;
+ f.open_object_section("mClockClients");
+ f.dump_int("client_count", scheduler.client_count());
+ //out << scheduler;
+ f.dump_string("clients", out.str());
+ f.close_section();
+
+ // Display sorted queues (res, wgt, lim)
+ f.open_object_section("mClockQueues");
+ f.dump_string("queues", display_queues());
+ f.close_section();
+
+ f.open_object_section("HighPriorityQueue");
+ for (auto it = high_priority.begin();
+ it != high_priority.end(); it++) {
+ f.dump_int("priority", it->first);
+ f.dump_int("queue_size", it->second.size());
+ }
+ f.close_section();
}
void mClockScheduler::enqueue(item_t&& item)
{
auto id = get_scheduler_id(item);
- auto cost = item.params.cost;
+ unsigned priority = item.get_priority();
- if (scheduler_class_t::immediate == item.params.klass) {
- immediate.push_front(std::move(item));
+ // TODO: move this check into item, handle backwards compat
+ if (SchedulerClass::immediate == item.params.klass) {
+ enqueue_high(immediate_class_priority, std::move(item));
+ } else if (priority >= cutoff_priority) {
+ enqueue_high(priority, std::move(item));
} else {
+ auto cost = calc_scaled_cost(item.get_cost());
+ dout(20) << __func__ << " " << id
+ << " item_cost: " << item.get_cost()
+ << " scaled_cost: " << cost
+ << dendl;
+
+ // Add item to scheduler queue
scheduler.add_request(
std::move(item),
id,
cost);
}
+
+ dout(20) << __func__ << ": sched client_count: " << scheduler.client_count()
+ << " sched queue size: " << scheduler.request_count()
+ << dendl;
+
+ for (auto it = high_priority.begin();it != high_priority.end(); ++it) {
+ dout(20) << __func__ << " high_priority[" << it->first
+ << "]: " << it->second.size()
+ << dendl;
+ }
+
+ dout(30) << __func__ << " mClockClients: "
+ << dendl;
+ dout(30) << __func__ << " mClockQueues: { "
+ << display_queues() << " }"
+ << dendl;
}
void mClockScheduler::enqueue_front(item_t&& item)
{
- immediate.push_back(std::move(item));
- // TODO: item may not be immediate, update mclock machinery to permit
- // putting the item back in the queue
+ unsigned priority = item.get_priority();
+
+ if (SchedulerClass::immediate == item.params.klass) {
+ enqueue_high(immediate_class_priority, std::move(item), true);
+ } else if (priority >= cutoff_priority) {
+ enqueue_high(priority, std::move(item), true);
+ } else {
+ // mClock does not support enqueue at front, so we use
+ // the high queue with priority 0
+ enqueue_high(0, std::move(item), true);
+ }
+}
+
+void mClockScheduler::enqueue_high(unsigned priority,
+ item_t&& item,
+ bool front)
+{
+ if (front) {
+ high_priority[priority].push_back(std::move(item));
+ } else {
+ high_priority[priority].push_front(std::move(item));
+ }
}
-item_t mClockScheduler::dequeue()
+WorkItem mClockScheduler::dequeue()
{
- if (!immediate.empty()) {
- auto ret = std::move(immediate.back());
- immediate.pop_back();
+ if (!high_priority.empty()) {
+ auto iter = high_priority.begin();
+ // invariant: high_priority entries are never empty
+ assert(!iter->second.empty());
+ WorkItem ret{std::move(iter->second.back())};
+ iter->second.pop_back();
+ if (iter->second.empty()) {
+ // maintain invariant, high priority entries are never empty
+ high_priority.erase(iter);
+ }
return ret;
} else {
mclock_queue_t::PullReq result = scheduler.pull_request();
if (result.is_future()) {
- ceph_abort_msg(
- "Not implemented, user would have to be able to be woken up");
- return std::move(*(item_t*)nullptr);
+ return result.getTime();
} else if (result.is_none()) {
ceph_abort_msg(
"Impossible, must have checked empty() first");
}
}
-std::vector<std::string> mClockScheduler::get_tracked_keys() const noexcept
+std::string mClockScheduler::display_queues() const
{
- return {
- "osd_mclock_scheduler_client_res"s,
- "osd_mclock_scheduler_client_wgt"s,
- "osd_mclock_scheduler_client_lim"s,
- "osd_mclock_scheduler_background_recovery_res"s,
- "osd_mclock_scheduler_background_recovery_wgt"s,
- "osd_mclock_scheduler_background_recovery_lim"s,
- "osd_mclock_scheduler_background_best_effort_res"s,
- "osd_mclock_scheduler_background_best_effort_wgt"s,
- "osd_mclock_scheduler_background_best_effort_lim"s
- };
+ std::ostringstream out;
+ scheduler.display_queues(out);
+ return out.str();
}
+std::vector<std::string> mClockScheduler::get_tracked_keys() const noexcept
+{
+ return mclock_conf.get_tracked_keys();
+}
+
void mClockScheduler::handle_conf_change(
const ConfigProxy& conf,
const std::set<std::string> &changed)
{
- client_registry.update_from_config(conf);
+ mclock_conf.handle_conf_change(conf, changed);
+}
+
+mClockScheduler::~mClockScheduler()
+{
+ cct->_conf.remove_observer(this);
}
}
#include "boost/variant.hpp"
#include "dmclock/src/dmclock_server.h"
-
#include "crimson/osd/scheduler/scheduler.h"
+#include "crimson/mon/MonClient.h"
+
#include "common/config.h"
#include "common/ceph_context.h"
+#include "common/mclock_common.h"
+namespace dmc = crimson::dmclock;
+using namespace std::placeholders;
+using namespace std::literals;
namespace crimson::osd::scheduler {
-using client_id_t = uint64_t;
-using profile_id_t = uint64_t;
-
-struct client_profile_id_t {
- client_id_t client_id;
- profile_id_t profile_id;
- auto operator<=>(const client_profile_id_t&) const = default;
-};
-
-
-struct scheduler_id_t {
- scheduler_class_t class_id;
- client_profile_id_t client_profile_id;
- auto operator<=>(const scheduler_id_t&) const = default;
-};
-
/**
* Scheduler implementation based on mclock.
*
*/
class mClockScheduler : public Scheduler, md_config_obs_t {
- class ClientRegistry {
- std::array<
- crimson::dmclock::ClientInfo,
- static_cast<size_t>(scheduler_class_t::client)
- > internal_client_infos = {
- // Placeholder, gets replaced with configured values
- crimson::dmclock::ClientInfo(1, 1, 1),
- crimson::dmclock::ClientInfo(1, 1, 1)
- };
+ crimson::common::CephContext *cct;
+ unsigned cutoff_priority;
+ PerfCounters *logger;
- crimson::dmclock::ClientInfo default_external_client_info = {1, 1, 1};
- std::map<client_profile_id_t,
- crimson::dmclock::ClientInfo> external_client_infos;
- const crimson::dmclock::ClientInfo *get_external_client(
- const client_profile_id_t &client) const;
- public:
- void update_from_config(const ConfigProxy &conf);
- const crimson::dmclock::ClientInfo *get_info(
- const scheduler_id_t &id) const;
- } client_registry;
+ ClientRegistry client_registry;
+ MclockConfig mclock_conf;
class crimson_mclock_cleaning_job_t {
struct job_control_t {
true,
2,
crimson_mclock_cleaning_job_t>;
+ using priority_t = unsigned;
+ using SubQueue = std::map<priority_t,
+ std::list<item_t>,
+ std::greater<priority_t>>;
mclock_queue_t scheduler;
- std::list<item_t> immediate;
+ /**
+ * high_priority
+ *
+ * Holds entries to be dequeued in strict order ahead of mClock
+ * Invariant: entries are never empty
+ */
+ SubQueue high_priority;
+ priority_t immediate_class_priority = std::numeric_limits<priority_t>::max();
static scheduler_id_t get_scheduler_id(const item_t &item) {
return scheduler_id_t{
item.params.klass,
- client_profile_id_t{
- item.params.owner,
- 0
- }
+ client_profile_id_t()
};
}
public:
- mClockScheduler(ConfigProxy &conf);
+ template<typename Rep, typename Per>
+ mClockScheduler(CephContext *cct, int whoami, uint32_t num_shards,
+ int shard_id, bool is_rotational,
+ std::chrono::duration<Rep,Per> idle_age,
+ std::chrono::duration<Rep,Per> erase_age,
+ std::chrono::duration<Rep,Per> check_time,
+ bool init_perfcounter=true, MonClient *monc=nullptr)
+ : cct(cct),
+ logger(nullptr),
+ mclock_conf(cct, client_registry, num_shards, is_rotational, shard_id, whoami),
+ scheduler(
+ std::bind(&ClientRegistry::get_info,
+ &client_registry,
+ _1),
+ idle_age, erase_age, check_time,
+ dmc::AtLimit::Wait,
+ cct->_conf.get_val<double>("osd_mclock_scheduler_anticipation_timeout"))
+ {
+ cct->_conf.add_observer(this);
+ ceph_assert(num_shards > 0);
+ auto get_op_queue_cut_off = [&conf = cct->_conf]() {
+ if (conf.get_val<std::string>("osd_op_queue_cut_off") == "debug_random") {
+ std::random_device rd;
+ std::mt19937 random_gen(rd());
+ return (random_gen() % 2 < 1) ? CEPH_MSG_PRIO_HIGH : CEPH_MSG_PRIO_LOW;
+ } else if (conf.get_val<std::string>("osd_op_queue_cut_off") == "high") {
+ return CEPH_MSG_PRIO_HIGH;
+ } else {
+ // default / catch-all is 'low'
+ return CEPH_MSG_PRIO_LOW;
+ }
+ };
+ cutoff_priority = get_op_queue_cut_off();
+}
+ mClockScheduler(CephContext *cct, int whoami, uint32_t num_shards,
+ int shard_id, bool is_rotational,
+ bool init_perfcounter=true, MonClient *monc=nullptr) :
+ mClockScheduler(
+ cct, whoami, num_shards, shard_id, is_rotational,
+ crimson::dmclock::standard_idle_age,
+ crimson::dmclock::standard_erase_age,
+ crimson::dmclock::standard_check_time,
+ init_perfcounter, monc) {}
+ ~mClockScheduler() override;
+
+ /// Calculate scaled cost per item
+ uint32_t calc_scaled_cost(int cost);
+
+ // Helper method to display mclock queues
+ std::string display_queues() const;
// Enqueue op in the back of the regular queue
void enqueue(item_t &&item) final;
- // Enqueue the op in the front of the regular queue
+ // Enqueue the op in the front of the high priority queue
void enqueue_front(item_t &&item) final;
// Return an op to be dispatch
- item_t dequeue() final;
+ WorkItem dequeue() final;
// Returns if the queue is empty
bool empty() const final {
- return immediate.empty() && scheduler.empty();
+ return scheduler.empty() && high_priority.empty();
}
// Formatted output of the queue
void dump(ceph::Formatter &f) const final;
void print(std::ostream &ostream) const final {
- ostream << "mClockScheduler";
+ ostream << "mClockScheduer ";
+ ostream << ", cutoff=" << cutoff_priority;
}
std::vector<std::string> get_tracked_keys() const noexcept final;
void handle_conf_change(const ConfigProxy& conf,
const std::set<std::string> &changed) final;
+
+ double get_cost_per_io() const {
+ return mclock_conf.get_cost_per_io();
+ }
+private:
+ // Enqueue the op to the high priority queue
+ void enqueue_high(unsigned prio, item_t &&item, bool front = false);
};
}