From 4ac70ae196a7aa05ba98bab46ddc7b516a686577 Mon Sep 17 00:00:00 2001 From: Aishwarya Mathuria Date: Wed, 9 Nov 2022 18:38:29 +0530 Subject: [PATCH] mClock: Add ability to handle high priority operations There are some cases that may require mClock to handle high priority operations before other items in the queue. In order to make this possible, we are introducing a "High Queue" that will hold high priority operations. The high queue will be dequeued before the mClock queue. High queue has been implemented as a priority queue, operations with higher priority will get preference at time of dequeue. Trello: https://trello.com/c/Kelm8z0x/775-qos-add-ability-to-handle-high-priority-operations Signed-off-by: Aishwarya Mathuria --- src/osd/scheduler/mClockScheduler.cc | 55 ++++++++++++++++-- src/osd/scheduler/mClockScheduler.h | 33 ++++++++++- src/test/osd/TestMClockScheduler.cc | 83 ++++++++++++++++++++++++++++ 3 files changed, 165 insertions(+), 6 deletions(-) diff --git a/src/osd/scheduler/mClockScheduler.cc b/src/osd/scheduler/mClockScheduler.cc index be6e00444bc51..5aa4f71530e35 100644 --- a/src/osd/scheduler/mClockScheduler.cc +++ b/src/osd/scheduler/mClockScheduler.cc @@ -401,6 +401,7 @@ void mClockScheduler::dump(ceph::Formatter &f) const // Display queue sizes f.open_object_section("queue_sizes"); f.dump_int("immediate", immediate.size()); + f.dump_int("high_priority_queue", high_priority.size()); f.dump_int("scheduler", scheduler.request_count()); f.close_section(); @@ -416,15 +417,27 @@ void mClockScheduler::dump(ceph::Formatter &f) const f.open_object_section("mClockQueues"); f.dump_string("queues", display_queues()); f.close_section(); + + f.open_object_section("HighPriorityQueue"); + for (auto it = high_priority.begin(); + it != high_priority.end(); it++) { + f.dump_int("priority", it->first); + f.dump_int("queue_size", it->second.size()); + } + f.close_section(); } void mClockScheduler::enqueue(OpSchedulerItem&& item) { auto id = get_scheduler_id(item); - + unsigned priority = item.get_priority(); + unsigned cutoff = get_io_prio_cut(cct); + // TODO: move this check into OpSchedulerItem, handle backwards compat if (op_scheduler_class::immediate == id.class_id) { immediate.push_front(std::move(item)); + } else if (priority >= cutoff) { + enqueue_high(priority, std::move(item)); } else { int cost = calc_scaled_cost(item.get_cost()); item.set_qos_cost(cost); @@ -442,6 +455,7 @@ void mClockScheduler::enqueue(OpSchedulerItem&& item) dout(20) << __func__ << " client_count: " << scheduler.client_count() << " queue_sizes: [ imm: " << immediate.size() + << " high_priority_queue: " << high_priority.size() << " sched: " << scheduler.request_count() << " ]" << dendl; dout(30) << __func__ << " mClockClients: " @@ -454,9 +468,30 @@ void mClockScheduler::enqueue(OpSchedulerItem&& item) void mClockScheduler::enqueue_front(OpSchedulerItem&& item) { - immediate.push_back(std::move(item)); - // TODO: item may not be immediate, update mclock machinery to permit - // putting the item back in the queue + unsigned priority = item.get_priority(); + unsigned cutoff = get_io_prio_cut(cct); + auto id = get_scheduler_id(item); + + if (op_scheduler_class::immediate == id.class_id) { + immediate.push_back(std::move(item)); + } else if (priority >= cutoff) { + enqueue_high(priority, std::move(item), true); + } else { + // mClock does not support enqueue at front, so we use + // the high queue with priority 0 + enqueue_high(0, std::move(item), true); + } +} + +void mClockScheduler::enqueue_high(unsigned priority, + OpSchedulerItem&& item, + bool front) +{ + if (front) { + high_priority[priority].push_back(std::move(item)); + } else { + high_priority[priority].push_front(std::move(item)); + } } WorkItem mClockScheduler::dequeue() @@ -465,6 +500,18 @@ WorkItem mClockScheduler::dequeue() WorkItem work_item{std::move(immediate.back())}; immediate.pop_back(); return work_item; + } else if (!high_priority.empty()) { + auto iter = high_priority.begin(); + // invariant: high_priority entries are never empty + assert(!iter->second.empty()); + WorkItem ret{std::move(iter->second.back())}; + iter->second.pop_back(); + if (iter->second.empty()) { + // maintain invariant, high priority entries are never empty + high_priority.erase(iter); + } + ceph_assert(std::get_if(&ret)); + return ret; } else { mclock_queue_t::PullReq result = scheduler.pull_request(); if (result.is_future()) { diff --git a/src/osd/scheduler/mClockScheduler.h b/src/osd/scheduler/mClockScheduler.h index c3b79dba44bcc..65e9c3e38a6f1 100644 --- a/src/osd/scheduler/mClockScheduler.h +++ b/src/osd/scheduler/mClockScheduler.h @@ -15,6 +15,7 @@ #pragma once +#include #include #include #include @@ -130,7 +131,19 @@ class mClockScheduler : public OpScheduler, md_config_obs_t { true, true, 2>; + using priority_t = unsigned; + using SubQueue = std::map, + std::greater>; + using SubQueueIter = SubQueue::iterator; mclock_queue_t scheduler; + /** + * high_priority + * + * Holds entries to be dequeued in strict order ahead of mClock + * Invariant: entries are never empty + */ + SubQueue high_priority; std::list immediate; static scheduler_id_t get_scheduler_id(const OpSchedulerItem &item) { @@ -143,6 +156,19 @@ class mClockScheduler : public OpScheduler, md_config_obs_t { }; } + static unsigned int get_io_prio_cut(CephContext *cct) { + if (cct->_conf->osd_op_queue_cut_off == "debug_random") { + std::random_device rd; + std::mt19937 random_gen(rd()); + return (random_gen() % 2 < 1) ? CEPH_MSG_PRIO_HIGH : CEPH_MSG_PRIO_LOW; + } else if (cct->_conf->osd_op_queue_cut_off == "high") { + return CEPH_MSG_PRIO_HIGH; + } else { + // default / catch-all is 'low' + return CEPH_MSG_PRIO_LOW; + } + } + public: mClockScheduler(CephContext *cct, uint32_t num_shards, bool is_rotational); ~mClockScheduler() override; @@ -186,7 +212,7 @@ public: // Enqueue op in the back of the regular queue void enqueue(OpSchedulerItem &&item) final; - // Enqueue the op in the front of the regular queue + // Enqueue the op in the front of the high priority queue or the immediate queue (based on priority) void enqueue_front(OpSchedulerItem &&item) final; // Return an op to be dispatch @@ -194,7 +220,7 @@ public: // Returns if the queue is empty bool empty() const final { - return immediate.empty() && scheduler.empty(); + return immediate.empty() && scheduler.empty() && high_priority.empty(); } // Formatted output of the queue @@ -210,6 +236,9 @@ public: const char** get_tracked_conf_keys() const final; void handle_conf_change(const ConfigProxy& conf, const std::set &changed) final; +private: + // Enqueue the op to the high priority queue + void enqueue_high(unsigned prio, OpSchedulerItem &&item, bool front = false); }; } diff --git a/src/test/osd/TestMClockScheduler.cc b/src/test/osd/TestMClockScheduler.cc index 0feb427ec1016..40a830000aaf0 100644 --- a/src/test/osd/TestMClockScheduler.cc +++ b/src/test/osd/TestMClockScheduler.cc @@ -83,6 +83,18 @@ OpSchedulerItem create_item( utime_t(), owner, e); } +template +OpSchedulerItem create_high_prio_item( + unsigned priority, epoch_t e, uint64_t owner, Args&&... args) +{ + // Create high priority item for testing high prio queue + return OpSchedulerItem( + std::make_unique( + std::forward(args)...), + 12, priority, + utime_t(), owner, e); +} + OpSchedulerItem get_item(WorkItem item) { return std::move(std::get(item)); @@ -169,3 +181,74 @@ TEST_F(mClockSchedulerTest, TestMultiClientOrderedEnqueueDequeue) { } ASSERT_TRUE(q.empty()); } + +TEST_F(mClockSchedulerTest, TestHighPriorityQueueEnqueueDequeue) { + ASSERT_TRUE(q.empty()); + for (unsigned i = 200; i < 205; ++i) { + q.enqueue(create_high_prio_item(i, i, client1, op_scheduler_class::client)); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + + ASSERT_FALSE(q.empty()); + // Higher priority ops should be dequeued first + auto r = get_item(q.dequeue()); + ASSERT_EQ(204u, r.get_map_epoch()); + + r = get_item(q.dequeue()); + ASSERT_EQ(203u, r.get_map_epoch()); + + r = get_item(q.dequeue()); + ASSERT_EQ(202u, r.get_map_epoch()); + + r = get_item(q.dequeue()); + ASSERT_EQ(201u, r.get_map_epoch()); + + r = get_item(q.dequeue()); + ASSERT_EQ(200u, r.get_map_epoch()); + + ASSERT_TRUE(q.empty()); +} + +TEST_F(mClockSchedulerTest, TestAllQueuesEnqueueDequeue) { + ASSERT_TRUE(q.empty()); + + // Insert ops into the mClock queue + for (unsigned i = 100; i < 102; ++i) { + q.enqueue(create_item(i, client1, op_scheduler_class::client)); + std::this_thread::sleep_for(std::chrono::microseconds(1)); + } + + // Insert ops into the immediate queue + for (unsigned i = 103; i < 105; ++i) { + q.enqueue(create_item(i, client1, op_scheduler_class::immediate)); + std::this_thread::sleep_for(std::chrono::microseconds(1)); + } + + // Insert ops into the high queue + for (unsigned i = 200; i < 202; ++i) { + q.enqueue(create_high_prio_item(i, i, client1, op_scheduler_class::client)); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + + ASSERT_FALSE(q.empty()); + auto r = get_item(q.dequeue()); + // Immediate queue should be dequeued first + ASSERT_EQ(103u, r.get_map_epoch()); + r = get_item(q.dequeue()); + ASSERT_EQ(104u, r.get_map_epoch()); + + // High priority queue should be dequeued second + // higher priority operation first + r = get_item(q.dequeue()); + ASSERT_EQ(201u, r.get_map_epoch()); + r = get_item(q.dequeue()); + ASSERT_EQ(200u, r.get_map_epoch()); + + // mClock queue will be dequeued last + r = get_item(q.dequeue()); + ASSERT_EQ(100u, r.get_map_epoch()); + r = get_item(q.dequeue()); + ASSERT_EQ(101u, r.get_map_epoch()); + + ASSERT_TRUE(q.empty()); +} -- 2.39.5