Option("osd_op_queue", Option::TYPE_STR, Option::LEVEL_ADVANCED)
.set_default("wpq")
- .set_enum_allowed( { "wpq", "prioritized", "mclock_opclass", "mclock_client", "debug_random" } )
- .set_description("which operation queue algorithm to use")
- .set_long_description("which operation queue algorithm to use; mclock_opclass and mclock_client are currently experimental")
- .set_flag(Option::FLAG_STARTUP)
+ .set_enum_allowed( { "wpq", "prioritized",
+ "mclock_opclass", "mclock_client", "mclock_scheduler",
+ "debug_random" } )
+ .set_description("which operation priority queue algorithm to use")
+ .set_long_description("which operation priority queue algorithm to use; "
+ "mclock_opclass mclock_client, and "
+ "mclock_client_profile are currently experimental")
.add_see_also("osd_op_queue_cut_off"),
Option("osd_op_queue_cut_off", Option::TYPE_STR, Option::LEVEL_ADVANCED)
.set_long_description("the threshold between high priority ops that use strict priority ordering and low priority ops that use a fairness algorithm that may or may not incorporate priority")
.add_see_also("osd_op_queue"),
+ Option("osd_mclock_scheduler_client_res", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
+ .set_default(1)
+ .set_description("IO proportion reserved for each client (default)")
+ .set_long_description("Only considered for osd_op_queue = mClockScheduler")
+ .add_see_also("osd_op_queue"),
+
+ Option("osd_mclock_scheduler_client_wgt", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
+ .set_default(1)
+ .set_description("IO share for each client (default) over reservation")
+ .set_long_description("Only considered for osd_op_queue = mClockScheduler")
+ .add_see_also("osd_op_queue"),
+
+ Option("osd_mclock_scheduler_client_lim", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
+ .set_default(999999)
+ .set_description("IO limit for each client (default) over reservation")
+ .set_long_description("Only considered for osd_op_queue = mClockScheduler")
+ .add_see_also("osd_op_queue"),
+
+ Option("osd_mclock_scheduler_background_recovery_res", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
+ .set_default(1)
+ .set_description("IO proportion reserved for background recovery (default)")
+ .set_long_description("Only considered for osd_op_queue = mClockScheduler")
+ .add_see_also("osd_op_queue"),
+
+ Option("osd_mclock_scheduler_background_recovery_wgt", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
+ .set_default(1)
+ .set_description("IO share for each background recovery over reservation")
+ .set_long_description("Only considered for osd_op_queue = mClockScheduler")
+ .add_see_also("osd_op_queue"),
+
+ Option("osd_mclock_scheduler_background_recovery_lim", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
+ .set_default(999999)
+ .set_description("IO limit for background recovery over reservation")
+ .set_long_description("Only considered for osd_op_queue = mClockScheduler")
+ .add_see_also("osd_op_queue"),
+
+ Option("osd_mclock_scheduler_background_best_effort_res", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
+ .set_default(1)
+ .set_description("IO proportion reserved for background best_effort (default)")
+ .set_long_description("Only considered for osd_op_queue = mClockScheduler")
+ .add_see_also("osd_op_queue"),
+
+ Option("osd_mclock_scheduler_background_best_effort_wgt", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
+ .set_default(1)
+ .set_description("IO share for each background best_effort over reservation")
+ .set_long_description("Only considered for osd_op_queue = mClockScheduler")
+ .add_see_also("osd_op_queue"),
+
+ Option("osd_mclock_scheduler_background_best_effort_lim", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
+ .set_default(999999)
+ .set_description("IO limit for background best_effort over reservation")
+ .set_long_description("Only considered for osd_op_queue = mClockScheduler")
+ .add_see_also("osd_op_queue"),
+
Option("osd_op_queue_mclock_client_op_res", Option::TYPE_FLOAT, Option::LEVEL_ADVANCED)
.set_default(1000.0)
.set_description("mclock reservation of client operator requests")
-Subproject commit 1c2a7d5491268c19ed5b4b38d9ae075420b4a0d5
+Subproject commit 47703948cb73d3c858cdf0701b741bb82978020a
mClockClientQueue.cc
scheduler/OpScheduler.cc
scheduler/OpSchedulerItem.cc
+ scheduler/mClockScheduler.cc
PeeringState.cc
PGStateUtils.cc
MissingLoc.cc
static const std::string index_lookup[] = { "prioritized",
"mclock_opclass",
"mclock_client",
+ "mclock_scheduler",
"wpq" };
srand(time(NULL));
unsigned which = rand() % (sizeof(index_lookup) / sizeof(index_lookup[0]));
cct,
cct
);
+ } else if (*type == "mclock_scheduler") {
+ return std::make_unique<mClockScheduler>(cct);
} else if (*type == "wpq" ) {
// default is 'wpq'
return std::make_unique<
#include "osd/OpRequest.h"
#include "osd/PG.h"
#include "osd/PGPeeringEvent.h"
-#include "common/mClockCommon.h"
#include "messages/MOSDOp.h"
virtual void run(OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) = 0;
virtual op_scheduler_class get_scheduler_class() const = 0;
- virtual std::optional<ceph::qos::mclock_profile_params_t>
- get_mclock_profile_params() const {
- return std::nullopt;
- }
-
- virtual std::optional<ceph::qos::dmclock_request_t>
- get_dmclock_request_state() const {
- return std::nullopt;
- }
-
virtual ~OpQueueable() {}
friend ostream& operator<<(ostream& out, const OpQueueable& q) {
return q.print(out);
uint64_t get_owner() const { return owner; }
epoch_t get_map_epoch() const { return map_epoch; }
- auto get_mclock_profile_params() const {
- return qitem->get_mclock_profile_params();
- }
- auto get_dmclock_request_state() const {
- return qitem->get_dmclock_request_state();
- }
-
bool is_peering() const {
return qitem->is_peering();
}
}
}
- std::optional<ceph::qos::mclock_profile_params_t>
- get_mclock_profile_params() const final {
- auto op = maybe_get_mosd_op();
- if (!op)
- return std::nullopt;
-
- return op->get_mclock_profile_params();
- }
-
- std::optional<ceph::qos::dmclock_request_t>
- get_dmclock_request_state() const final {
- auto op = maybe_get_mosd_op();
- if (!op)
- return std::nullopt;
-
- return op->get_dmclock_request_state();
- }
-
void run(OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) final;
};
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 Red Hat Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#include <memory>
+#include <functional>
+
+#include "osd/scheduler/mClockScheduler.h"
+#include "common/dout.h"
+
+namespace dmc = crimson::dmclock;
+using namespace std::placeholders;
+
+#define dout_context cct
+#define dout_subsys ceph_subsys_osd
+#undef dout_prefix
+#define dout_prefix *_dout
+
+
+namespace ceph::osd::scheduler {
+
+mClockScheduler::mClockScheduler(CephContext *cct) :
+ scheduler(
+ std::bind(&mClockScheduler::ClientRegistry::get_info,
+ &client_registry,
+ _1),
+ dmc::AtLimit::Allow,
+ cct->_conf->osd_op_queue_mclock_anticipation_timeout)
+{
+ cct->_conf.add_observer(this);
+ client_registry.update_from_config(cct->_conf);
+}
+
+void mClockScheduler::ClientRegistry::update_from_config(const ConfigProxy &conf)
+{
+ default_external_client_info.update(
+ conf.get_val<uint64_t>("osd_mclock_scheduler_client_res"),
+ conf.get_val<uint64_t>("osd_mclock_scheduler_client_wgt"),
+ conf.get_val<uint64_t>("osd_mclock_scheduler_client_lim"));
+
+ internal_client_infos[
+ static_cast<size_t>(op_scheduler_class::background_recovery)].update(
+ conf.get_val<uint64_t>("osd_mclock_scheduler_background_recovery_res"),
+ conf.get_val<uint64_t>("osd_mclock_scheduler_background_recovery_wgt"),
+ conf.get_val<uint64_t>("osd_mclock_scheduler_background_recovery_lim"));
+
+ internal_client_infos[
+ static_cast<size_t>(op_scheduler_class::background_best_effort)].update(
+ conf.get_val<uint64_t>("osd_mclock_scheduler_background_best_effort_res"),
+ conf.get_val<uint64_t>("osd_mclock_scheduler_background_best_effort_wgt"),
+ conf.get_val<uint64_t>("osd_mclock_scheduler_background_best_effort_lim"));
+}
+
+const dmc::ClientInfo *mClockScheduler::ClientRegistry::get_external_client(
+ const client_profile_id_t &client) const
+{
+ auto ret = external_client_infos.find(client);
+ if (ret == external_client_infos.end())
+ return &default_external_client_info;
+ else
+ return &(ret->second);
+}
+
+const dmc::ClientInfo *mClockScheduler::ClientRegistry::get_info(
+ const scheduler_id_t &id) const {
+ switch (id.class_id) {
+ case op_scheduler_class::immediate:
+ ceph_assert(0 == "Cannot schedule immediate");
+ return (dmc::ClientInfo*)nullptr;
+ case op_scheduler_class::client:
+ return get_external_client(id.client_profile_id);
+ default:
+ ceph_assert(static_cast<size_t>(id.class_id) < internal_client_infos.size());
+ return &internal_client_infos[static_cast<size_t>(id.class_id)];
+ }
+}
+
+void mClockScheduler::dump(ceph::Formatter &f) const
+{
+}
+
+void mClockScheduler::enqueue(OpSchedulerItem&& item)
+{
+ auto id = get_scheduler_id(item);
+ // TODO: express cost, mclock params in terms of per-node capacity?
+ auto cost = 1; //std::max(item.get_cost(), 1);
+
+ // TODO: move this check into OpSchedulerItem, handle backwards compat
+ if (op_scheduler_class::immediate == item.get_scheduler_class()) {
+ immediate.push_front(std::move(item));
+ } else {
+ scheduler.add_request(
+ std::move(item),
+ id,
+ cost);
+ }
+}
+
+void mClockScheduler::enqueue_front(OpSchedulerItem&& item)
+{
+ immediate.push_back(std::move(item));
+ // TODO: item may not be immediate, update mclock machinery to permit
+ // putting the item back in the queue
+}
+
+OpSchedulerItem mClockScheduler::dequeue()
+{
+ if (!immediate.empty()) {
+ auto ret = std::move(immediate.back());
+ immediate.pop_back();
+ return ret;
+ } else {
+ mclock_queue_t::PullReq result = scheduler.pull_request();
+ if (result.is_future()) {
+ ceph_assert(
+ 0 == "Not implemented, user would have to be able to be woken up");
+ return std::move(*(OpSchedulerItem*)nullptr);
+ } else if (result.is_none()) {
+ ceph_assert(
+ 0 == "Impossible, must have checked empty() first");
+ return std::move(*(OpSchedulerItem*)nullptr);
+ } else {
+ ceph_assert(result.is_retn());
+
+ auto &retn = result.get_retn();
+ return std::move(*result.get_retn().request);
+ }
+ }
+}
+
+const char** mClockScheduler::get_tracked_conf_keys() const
+{
+ static const char* KEYS[] = {
+ "osd_mclock_scheduler_client_res",
+ "osd_mclock_scheduler_client_wgt",
+ "osd_mclock_scheduler_client_lim",
+ "osd_mclock_scheduler_background_recovery_res",
+ "osd_mclock_scheduler_background_recovery_wgt",
+ "osd_mclock_scheduler_background_recovery_lim",
+ "osd_mclock_scheduler_background_best_effort_res",
+ "osd_mclock_scheduler_background_best_effort_wgt",
+ "osd_mclock_scheduler_background_best_effort_lim",
+ NULL
+ };
+ return KEYS;
+}
+
+void mClockScheduler::handle_conf_change(
+ const ConfigProxy& conf,
+ const std::set<std::string> &changed)
+{
+ client_registry.update_from_config(conf);
+}
+
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 Red Hat Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#pragma once
+
+#include <ostream>
+#include <map>
+#include <vector>
+
+#include "boost/variant.hpp"
+
+#include "dmclock/src/dmclock_server.h"
+
+#include "osd/scheduler/OpScheduler.h"
+#include "common/config.h"
+#include "include/cmp.h"
+#include "common/ceph_context.h"
+#include "common/mClockPriorityQueue.h"
+#include "osd/scheduler/OpSchedulerItem.h"
+#include "osd/mClockOpClassSupport.h"
+
+
+namespace ceph::osd::scheduler {
+
+using client_id_t = uint64_t;
+using profile_id_t = uint64_t;
+
+struct client_profile_id_t {
+ client_id_t client_id;
+ profile_id_t profile_id;
+};
+
+WRITE_EQ_OPERATORS_2(client_profile_id_t, client_id, profile_id)
+WRITE_CMP_OPERATORS_2(client_profile_id_t, client_id, profile_id)
+
+
+struct scheduler_id_t {
+ op_scheduler_class class_id;
+ client_profile_id_t client_profile_id;
+};
+
+WRITE_EQ_OPERATORS_2(scheduler_id_t, class_id, client_profile_id)
+WRITE_CMP_OPERATORS_2(scheduler_id_t, class_id, client_profile_id)
+
+/**
+ * Scheduler implementation based on mclock.
+ *
+ * TODO: explain configs
+ */
+class mClockScheduler : public OpScheduler, md_config_obs_t {
+
+ class ClientRegistry {
+ std::array<
+ crimson::dmclock::ClientInfo,
+ static_cast<size_t>(op_scheduler_class::immediate)
+ > internal_client_infos = {
+ // Placeholder, gets replaced with configured values
+ crimson::dmclock::ClientInfo(1, 1, 1),
+ crimson::dmclock::ClientInfo(1, 1, 1)
+ };
+
+ crimson::dmclock::ClientInfo default_external_client_info = {1, 1, 1};
+ std::map<client_profile_id_t,
+ crimson::dmclock::ClientInfo> external_client_infos;
+ const crimson::dmclock::ClientInfo *get_external_client(
+ const client_profile_id_t &client) const;
+ public:
+ void update_from_config(const ConfigProxy &conf);
+ const crimson::dmclock::ClientInfo *get_info(
+ const scheduler_id_t &id) const;
+ } client_registry;
+
+ using mclock_queue_t = crimson::dmclock::PullPriorityQueue<
+ scheduler_id_t,
+ OpSchedulerItem,
+ true,
+ true,
+ 2>;
+ mclock_queue_t scheduler;
+ std::list<OpSchedulerItem> immediate;
+
+ static scheduler_id_t get_scheduler_id(const OpSchedulerItem &item) {
+ return scheduler_id_t{
+ item.get_scheduler_class(),
+ client_profile_id_t{
+ item.get_owner(),
+ 0
+ }
+ };
+ }
+
+public:
+ mClockScheduler(CephContext *cct);
+
+ // Enqueue op in the back of the regular queue
+ void enqueue(OpSchedulerItem &&item) final;
+
+ // Enqueue the op in the front of the regular queue
+ void enqueue_front(OpSchedulerItem &&item) final;
+
+ // Return an op to be dispatch
+ OpSchedulerItem dequeue() final;
+
+ // Returns if the queue is empty
+ bool empty() const final {
+ return immediate.empty() && scheduler.empty();
+ }
+
+ // Formatted output of the queue
+ void dump(ceph::Formatter &f) const final;
+
+ void print(std::ostream &ostream) const final {
+ ostream << "mClockScheduler";
+ }
+
+ const char** get_tracked_conf_keys() const final;
+ void handle_conf_change(const ConfigProxy& conf,
+ const std::set<std::string> &changed) final;
+};
+
+}
target_link_libraries(unittest_mclock_client_queue
global osd dmclock os
)
+
+# unittest_mclock_scheduler
+add_executable(unittest_mclock_scheduler
+ TestMClockScheduler.cc
+)
+add_ceph_unittest(unittest_mclock_scheduler)
+target_link_libraries(unittest_mclock_scheduler
+ global osd dmclock os
+)
{}
struct MockDmclockItem : public PGOpQueueable {
- ceph::qos::dmclock_request_t request;
- MockDmclockItem(decltype(request) _request) :
- PGOpQueueable(spg_t()), request(_request) {}
+ MockDmclockItem() :
+ PGOpQueueable(spg_t()) {}
public:
op_type_t get_op_type() const final {
return op_scheduler_class::client;
}
- std::optional<ceph::qos::dmclock_request_t>
- get_dmclock_request_state() const final {
- return request;
- }
-
void run(OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) final {}
};
Request r1 = create_snaptrim(100, client1);
Request r2 = create_snaptrim(101, client2);
Request r3 = create_snaptrim(102, client3);
- Request r4 = create_dmclock(103, client1, dmc::ReqParams(50,1));
- Request r5 = create_dmclock(104, client2, dmc::ReqParams(30,1));
- Request r6 = create_dmclock(105, client3, dmc::ReqParams(10,1));
+ Request r4 = create_dmclock(103, client1);
+ Request r5 = create_dmclock(104, client2);
+ Request r6 = create_dmclock(105, client3);
q.enqueue(client1, 12, 0, std::move(r1));
q.enqueue(client2, 12, 0, std::move(r2));
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+
+#include "gtest/gtest.h"
+
+#include "global/global_context.h"
+#include "global/global_init.h"
+#include "common/common_init.h"
+
+#include "osd/scheduler/mClockScheduler.h"
+#include "osd/scheduler/OpSchedulerItem.h"
+
+using namespace ceph::osd::scheduler;
+
+int main(int argc, char **argv) {
+ std::vector<const char*> args(argv, argv+argc);
+ auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_OSD,
+ CODE_ENVIRONMENT_UTILITY,
+ CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
+ common_init_finish(g_ceph_context);
+
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
+
+
+class mClockSchedulerTest : public testing::Test {
+public:
+ mClockScheduler q;
+
+ uint64_t client1;
+ uint64_t client2;
+ uint64_t client3;
+
+ mClockSchedulerTest() :
+ q(g_ceph_context),
+ client1(1001),
+ client2(9999),
+ client3(100000001)
+ {}
+
+ struct MockDmclockItem : public PGOpQueueable {
+ op_scheduler_class scheduler_class;
+
+ MockDmclockItem(op_scheduler_class _scheduler_class) :
+ PGOpQueueable(spg_t()),
+ scheduler_class(_scheduler_class) {}
+
+ MockDmclockItem()
+ : MockDmclockItem(op_scheduler_class::background_best_effort) {}
+
+ op_type_t get_op_type() const final {
+ return op_type_t::client_op; // not used
+ }
+
+ ostream &print(ostream &rhs) const final { return rhs; }
+
+ std::optional<OpRequestRef> maybe_get_op() const final {
+ return std::nullopt;
+ }
+
+ op_scheduler_class get_scheduler_class() const final {
+ return scheduler_class;
+ }
+
+ void run(OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) final {}
+ };
+};
+
+template <typename... Args>
+OpSchedulerItem create_item(
+ epoch_t e, uint64_t owner, Args&&... args)
+{
+ return OpSchedulerItem(
+ std::make_unique<mClockSchedulerTest::MockDmclockItem>(
+ std::forward<Args>(args)...),
+ 12, 12,
+ utime_t(), owner, e);
+}
+
+TEST_F(mClockSchedulerTest, TestEmpty) {
+ ASSERT_TRUE(q.empty());
+
+ q.enqueue(create_item(100, client1, op_scheduler_class::client));
+ q.enqueue(create_item(102, client1, op_scheduler_class::client));
+ q.enqueue(create_item(104, client1, op_scheduler_class::client));
+
+ ASSERT_FALSE(q.empty());
+
+ std::list<OpSchedulerItem> reqs;
+
+ reqs.push_back(q.dequeue());
+ reqs.push_back(q.dequeue());
+
+ ASSERT_FALSE(q.empty());
+
+ for (auto &&i : reqs) {
+ q.enqueue_front(std::move(i));
+ }
+ reqs.clear();
+
+ ASSERT_FALSE(q.empty());
+
+ for (int i = 0; i < 3; ++i) {
+ ASSERT_FALSE(q.empty());
+ q.dequeue();
+ }
+
+ ASSERT_TRUE(q.empty());
+}
+
+TEST_F(mClockSchedulerTest, TestSingleClientOrderedEnqueueDequeue) {
+ q.enqueue(create_item(100, client1));
+ q.enqueue(create_item(101, client1));
+ q.enqueue(create_item(102, client1));
+ q.enqueue(create_item(103, client1));
+ q.enqueue(create_item(104, client1));
+
+ auto r = q.dequeue();
+ ASSERT_EQ(100u, r.get_map_epoch());
+
+ r = q.dequeue();
+ ASSERT_EQ(101u, r.get_map_epoch());
+
+ r = q.dequeue();
+ ASSERT_EQ(102u, r.get_map_epoch());
+
+ r = q.dequeue();
+ ASSERT_EQ(103u, r.get_map_epoch());
+
+ r = q.dequeue();
+ ASSERT_EQ(104u, r.get_map_epoch());
+}
+
+TEST_F(mClockSchedulerTest, TestMultiClientOrderedEnqueueDequeue) {
+ const unsigned NUM = 1000;
+ for (unsigned i = 0; i < NUM; ++i) {
+ for (auto &&c: {client1, client2, client3}) {
+ q.enqueue(create_item(i, c));
+ }
+ }
+
+ std::map<uint64_t, epoch_t> next;
+ for (auto &&c: {client1, client2, client3}) {
+ next[c] = 0;
+ }
+ for (unsigned i = 0; i < NUM * 3; ++i) {
+ ASSERT_FALSE(q.empty());
+ auto r = q.dequeue();
+ auto owner = r.get_owner();
+ auto niter = next.find(owner);
+ ASSERT_FALSE(niter == next.end());
+ ASSERT_EQ(niter->second, r.get_map_epoch());
+ niter->second++;
+ }
+ ASSERT_TRUE(q.empty());
+}