]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mClock: Add ability to handle high priority operations
authorAishwarya Mathuria <amathuri@redhat.com>
Wed, 9 Nov 2022 13:08:29 +0000 (18:38 +0530)
committerAishwarya Mathuria <amathuri@redhat.com>
Wed, 8 Feb 2023 06:37:47 +0000 (12:07 +0530)
There are some cases that may require mClock to handle high priority operations before other items in the queue.
In order to make this possible, we are introducing a "High Queue" that will hold high priority operations.
The high queue will be dequeued before the mClock queue.
High queue has been implemented as a priority queue, operations with higher priority will get preference at time of dequeue.

Trello: https://trello.com/c/Kelm8z0x/775-qos-add-ability-to-handle-high-priority-operations

Signed-off-by: Aishwarya Mathuria <amathuri@redhat.com>
src/osd/scheduler/mClockScheduler.cc
src/osd/scheduler/mClockScheduler.h
src/test/osd/TestMClockScheduler.cc

index be6e00444bc519640bab7bab9b4afb45b3afd8ea..5aa4f71530e354d9648bed817406fd869601f348 100644 (file)
@@ -401,6 +401,7 @@ void mClockScheduler::dump(ceph::Formatter &f) const
   // Display queue sizes
   f.open_object_section("queue_sizes");
   f.dump_int("immediate", immediate.size());
+  f.dump_int("high_priority_queue", high_priority.size());
   f.dump_int("scheduler", scheduler.request_count());
   f.close_section();
 
@@ -416,15 +417,27 @@ void mClockScheduler::dump(ceph::Formatter &f) const
   f.open_object_section("mClockQueues");
   f.dump_string("queues", display_queues());
   f.close_section();
+
+  f.open_object_section("HighPriorityQueue");
+  for (auto it = high_priority.begin();
+       it != high_priority.end(); it++) {
+    f.dump_int("priority", it->first);
+    f.dump_int("queue_size", it->second.size());
+  }
+  f.close_section();
 }
 
 void mClockScheduler::enqueue(OpSchedulerItem&& item)
 {
   auto id = get_scheduler_id(item);
-
+  unsigned priority = item.get_priority();
+  unsigned cutoff = get_io_prio_cut(cct);
+  
   // TODO: move this check into OpSchedulerItem, handle backwards compat
   if (op_scheduler_class::immediate == id.class_id) {
     immediate.push_front(std::move(item));
+  } else if (priority >= cutoff) {
+    enqueue_high(priority, std::move(item));
   } else {
     int cost = calc_scaled_cost(item.get_cost());
     item.set_qos_cost(cost);
@@ -442,6 +455,7 @@ void mClockScheduler::enqueue(OpSchedulerItem&& item)
 
  dout(20) << __func__ << " client_count: " << scheduler.client_count()
           << " queue_sizes: [ imm: " << immediate.size()
+         << " high_priority_queue: " << high_priority.size()
           << " sched: " << scheduler.request_count() << " ]"
           << dendl;
  dout(30) << __func__ << " mClockClients: "
@@ -454,9 +468,30 @@ void mClockScheduler::enqueue(OpSchedulerItem&& item)
 
 void mClockScheduler::enqueue_front(OpSchedulerItem&& item)
 {
-  immediate.push_back(std::move(item));
-  // TODO: item may not be immediate, update mclock machinery to permit
-  // putting the item back in the queue
+  unsigned priority = item.get_priority();
+  unsigned cutoff = get_io_prio_cut(cct);
+  auto id = get_scheduler_id(item);
+
+  if (op_scheduler_class::immediate == id.class_id) {
+    immediate.push_back(std::move(item));
+  } else if (priority >= cutoff) {
+    enqueue_high(priority, std::move(item), true);
+  } else {
+    // mClock does not support enqueue at front, so we use
+    // the high queue with priority 0
+    enqueue_high(0, std::move(item), true);
+  }
+}
+
+void mClockScheduler::enqueue_high(unsigned priority,
+                                   OpSchedulerItem&& item,
+                                  bool front)
+{
+  if (front) {
+    high_priority[priority].push_back(std::move(item));
+  } else {
+    high_priority[priority].push_front(std::move(item));
+  }
 }
 
 WorkItem mClockScheduler::dequeue()
@@ -465,6 +500,18 @@ WorkItem mClockScheduler::dequeue()
     WorkItem work_item{std::move(immediate.back())};
     immediate.pop_back();
     return work_item;
+  } else if (!high_priority.empty()) {
+    auto iter = high_priority.begin();
+    // invariant: high_priority entries are never empty
+    assert(!iter->second.empty());
+    WorkItem ret{std::move(iter->second.back())};
+    iter->second.pop_back();
+    if (iter->second.empty()) {
+      // maintain invariant, high priority entries are never empty
+      high_priority.erase(iter);
+    }
+    ceph_assert(std::get_if<OpSchedulerItem>(&ret));
+    return ret;
   } else {
     mclock_queue_t::PullReq result = scheduler.pull_request();
     if (result.is_future()) {
index c3b79dba44bcc3cb918300697499dbd96432bd48..65e9c3e38a6f12be7b40ff5efa2e4e12a4942473 100644 (file)
@@ -15,6 +15,7 @@
 
 #pragma once
 
+#include <functional>
 #include <ostream>
 #include <map>
 #include <vector>
@@ -130,7 +131,19 @@ class mClockScheduler : public OpScheduler, md_config_obs_t {
     true,
     true,
     2>;
+  using priority_t = unsigned;
+  using SubQueue = std::map<priority_t,
+       std::list<OpSchedulerItem>,
+       std::greater<priority_t>>;
+  using SubQueueIter = SubQueue::iterator;
   mclock_queue_t scheduler;
+  /**
+   * high_priority
+   *
+   * Holds entries to be dequeued in strict order ahead of mClock
+   * Invariant: entries are never empty
+   */
+  SubQueue high_priority;
   std::list<OpSchedulerItem> immediate;
 
   static scheduler_id_t get_scheduler_id(const OpSchedulerItem &item) {
@@ -143,6 +156,19 @@ class mClockScheduler : public OpScheduler, md_config_obs_t {
     };
   }
 
+  static unsigned int get_io_prio_cut(CephContext *cct) {
+    if (cct->_conf->osd_op_queue_cut_off == "debug_random") {
+      std::random_device rd;
+      std::mt19937 random_gen(rd());
+      return (random_gen() % 2 < 1) ? CEPH_MSG_PRIO_HIGH : CEPH_MSG_PRIO_LOW;
+    } else if (cct->_conf->osd_op_queue_cut_off == "high") {
+      return CEPH_MSG_PRIO_HIGH;
+    } else {
+      // default / catch-all is 'low'
+      return CEPH_MSG_PRIO_LOW;
+    }
+  }
+
 public:
   mClockScheduler(CephContext *cct, uint32_t num_shards, bool is_rotational);
   ~mClockScheduler() override;
@@ -186,7 +212,7 @@ public:
   // Enqueue op in the back of the regular queue
   void enqueue(OpSchedulerItem &&item) final;
 
-  // Enqueue the op in the front of the regular queue
+  // Enqueue the op in the front of the high priority queue or the immediate queue (based on priority)
   void enqueue_front(OpSchedulerItem &&item) final;
 
   // Return an op to be dispatch
@@ -194,7 +220,7 @@ public:
 
   // Returns if the queue is empty
   bool empty() const final {
-    return immediate.empty() && scheduler.empty();
+    return immediate.empty() && scheduler.empty() && high_priority.empty();
   }
 
   // Formatted output of the queue
@@ -210,6 +236,9 @@ public:
   const char** get_tracked_conf_keys() const final;
   void handle_conf_change(const ConfigProxy& conf,
                          const std::set<std::string> &changed) final;
+private:
+  // Enqueue the op to the high priority queue
+  void enqueue_high(unsigned prio, OpSchedulerItem &&item, bool front = false);
 };
 
 }
index 0feb427ec1016fb6211ad9a6b5578b2e013e0d10..40a830000aaf05f191156be84a10a309aceb87d4 100644 (file)
@@ -83,6 +83,18 @@ OpSchedulerItem create_item(
     utime_t(), owner, e);
 }
 
+template <typename... Args>
+OpSchedulerItem create_high_prio_item(
+  unsigned priority, epoch_t e, uint64_t owner, Args&&... args)
+{
+  // Create high priority item for testing high prio queue
+  return OpSchedulerItem(
+    std::make_unique<mClockSchedulerTest::MockDmclockItem>(
+      std::forward<Args>(args)...),
+    12, priority,
+    utime_t(), owner, e);
+}
+
 OpSchedulerItem get_item(WorkItem item)
 {
   return std::move(std::get<OpSchedulerItem>(item));
@@ -169,3 +181,74 @@ TEST_F(mClockSchedulerTest, TestMultiClientOrderedEnqueueDequeue) {
   }
   ASSERT_TRUE(q.empty());
 }
+
+TEST_F(mClockSchedulerTest, TestHighPriorityQueueEnqueueDequeue) {
+  ASSERT_TRUE(q.empty());
+  for (unsigned i = 200; i < 205; ++i) {
+    q.enqueue(create_high_prio_item(i, i, client1, op_scheduler_class::client));
+    std::this_thread::sleep_for(std::chrono::milliseconds(1));
+  }
+
+  ASSERT_FALSE(q.empty());
+  // Higher priority ops should be dequeued first
+  auto r = get_item(q.dequeue());
+  ASSERT_EQ(204u, r.get_map_epoch());
+
+  r = get_item(q.dequeue());
+  ASSERT_EQ(203u, r.get_map_epoch());
+
+  r = get_item(q.dequeue());
+  ASSERT_EQ(202u, r.get_map_epoch());
+
+  r = get_item(q.dequeue());
+  ASSERT_EQ(201u, r.get_map_epoch());
+
+  r = get_item(q.dequeue());
+  ASSERT_EQ(200u, r.get_map_epoch());
+
+  ASSERT_TRUE(q.empty());
+}
+
+TEST_F(mClockSchedulerTest, TestAllQueuesEnqueueDequeue) {
+  ASSERT_TRUE(q.empty());
+
+  // Insert ops into the mClock queue
+  for (unsigned i = 100; i < 102; ++i) {
+    q.enqueue(create_item(i, client1, op_scheduler_class::client));
+    std::this_thread::sleep_for(std::chrono::microseconds(1));
+  }
+
+  // Insert ops into the immediate queue
+  for (unsigned i = 103; i < 105; ++i) {
+    q.enqueue(create_item(i, client1, op_scheduler_class::immediate));
+    std::this_thread::sleep_for(std::chrono::microseconds(1));
+  }
+
+  // Insert ops into the high queue
+  for (unsigned i = 200; i < 202; ++i) {
+    q.enqueue(create_high_prio_item(i, i, client1, op_scheduler_class::client));
+    std::this_thread::sleep_for(std::chrono::milliseconds(1));
+  }
+
+  ASSERT_FALSE(q.empty());
+  auto r = get_item(q.dequeue());
+  // Immediate queue should be dequeued first
+  ASSERT_EQ(103u, r.get_map_epoch());
+  r = get_item(q.dequeue());
+  ASSERT_EQ(104u, r.get_map_epoch());
+
+  // High priority queue should be dequeued second
+  // higher priority operation first
+  r = get_item(q.dequeue());
+  ASSERT_EQ(201u, r.get_map_epoch());
+  r = get_item(q.dequeue());
+  ASSERT_EQ(200u, r.get_map_epoch());
+
+  // mClock queue will be dequeued last
+  r = get_item(q.dequeue());
+  ASSERT_EQ(100u, r.get_map_epoch());
+  r = get_item(q.dequeue());
+  ASSERT_EQ(101u, r.get_map_epoch());
+
+  ASSERT_TRUE(q.empty());
+}