There are some cases that may require mClock to handle high priority operations before other items in the queue.
In order to make this possible, we are introducing a "High Queue" that will hold high priority operations.
The high queue will be dequeued before the mClock queue.
High queue has been implemented as a priority queue, operations with higher priority will get preference at time of dequeue.
Trello: https://trello.com/c/Kelm8z0x/775-qos-add-ability-to-handle-high-priority-operations
Signed-off-by: Aishwarya Mathuria <amathuri@redhat.com>
// Display queue sizes
f.open_object_section("queue_sizes");
f.dump_int("immediate", immediate.size());
+ f.dump_int("high_priority_queue", high_priority.size());
f.dump_int("scheduler", scheduler.request_count());
f.close_section();
f.open_object_section("mClockQueues");
f.dump_string("queues", display_queues());
f.close_section();
+
+ f.open_object_section("HighPriorityQueue");
+ for (auto it = high_priority.begin();
+ it != high_priority.end(); it++) {
+ f.dump_int("priority", it->first);
+ f.dump_int("queue_size", it->second.size());
+ }
+ f.close_section();
}
void mClockScheduler::enqueue(OpSchedulerItem&& item)
{
auto id = get_scheduler_id(item);
-
+ unsigned priority = item.get_priority();
+ unsigned cutoff = get_io_prio_cut(cct);
+
// TODO: move this check into OpSchedulerItem, handle backwards compat
if (op_scheduler_class::immediate == id.class_id) {
immediate.push_front(std::move(item));
+ } else if (priority >= cutoff) {
+ enqueue_high(priority, std::move(item));
} else {
int cost = calc_scaled_cost(item.get_cost());
item.set_qos_cost(cost);
dout(20) << __func__ << " client_count: " << scheduler.client_count()
<< " queue_sizes: [ imm: " << immediate.size()
+ << " high_priority_queue: " << high_priority.size()
<< " sched: " << scheduler.request_count() << " ]"
<< dendl;
dout(30) << __func__ << " mClockClients: "
void mClockScheduler::enqueue_front(OpSchedulerItem&& item)
{
- immediate.push_back(std::move(item));
- // TODO: item may not be immediate, update mclock machinery to permit
- // putting the item back in the queue
+ unsigned priority = item.get_priority();
+ unsigned cutoff = get_io_prio_cut(cct);
+ auto id = get_scheduler_id(item);
+
+ if (op_scheduler_class::immediate == id.class_id) {
+ immediate.push_back(std::move(item));
+ } else if (priority >= cutoff) {
+ enqueue_high(priority, std::move(item), true);
+ } else {
+ // mClock does not support enqueue at front, so we use
+ // the high queue with priority 0
+ enqueue_high(0, std::move(item), true);
+ }
+}
+
+void mClockScheduler::enqueue_high(unsigned priority,
+ OpSchedulerItem&& item,
+ bool front)
+{
+ if (front) {
+ high_priority[priority].push_back(std::move(item));
+ } else {
+ high_priority[priority].push_front(std::move(item));
+ }
}
WorkItem mClockScheduler::dequeue()
WorkItem work_item{std::move(immediate.back())};
immediate.pop_back();
return work_item;
+ } else if (!high_priority.empty()) {
+ auto iter = high_priority.begin();
+ // invariant: high_priority entries are never empty
+ assert(!iter->second.empty());
+ WorkItem ret{std::move(iter->second.back())};
+ iter->second.pop_back();
+ if (iter->second.empty()) {
+ // maintain invariant, high priority entries are never empty
+ high_priority.erase(iter);
+ }
+ ceph_assert(std::get_if<OpSchedulerItem>(&ret));
+ return ret;
} else {
mclock_queue_t::PullReq result = scheduler.pull_request();
if (result.is_future()) {
#pragma once
+#include <functional>
#include <ostream>
#include <map>
#include <vector>
true,
true,
2>;
+ using priority_t = unsigned;
+ using SubQueue = std::map<priority_t,
+ std::list<OpSchedulerItem>,
+ std::greater<priority_t>>;
+ using SubQueueIter = SubQueue::iterator;
mclock_queue_t scheduler;
+ /**
+ * high_priority
+ *
+ * Holds entries to be dequeued in strict order ahead of mClock
+ * Invariant: entries are never empty
+ */
+ SubQueue high_priority;
std::list<OpSchedulerItem> immediate;
static scheduler_id_t get_scheduler_id(const OpSchedulerItem &item) {
};
}
+ static unsigned int get_io_prio_cut(CephContext *cct) {
+ if (cct->_conf->osd_op_queue_cut_off == "debug_random") {
+ std::random_device rd;
+ std::mt19937 random_gen(rd());
+ return (random_gen() % 2 < 1) ? CEPH_MSG_PRIO_HIGH : CEPH_MSG_PRIO_LOW;
+ } else if (cct->_conf->osd_op_queue_cut_off == "high") {
+ return CEPH_MSG_PRIO_HIGH;
+ } else {
+ // default / catch-all is 'low'
+ return CEPH_MSG_PRIO_LOW;
+ }
+ }
+
public:
mClockScheduler(CephContext *cct, uint32_t num_shards, bool is_rotational);
~mClockScheduler() override;
// Enqueue op in the back of the regular queue
void enqueue(OpSchedulerItem &&item) final;
- // Enqueue the op in the front of the regular queue
+ // Enqueue the op in the front of the high priority queue or the immediate queue (based on priority)
void enqueue_front(OpSchedulerItem &&item) final;
// Return an op to be dispatch
// Returns if the queue is empty
bool empty() const final {
- return immediate.empty() && scheduler.empty();
+ return immediate.empty() && scheduler.empty() && high_priority.empty();
}
// Formatted output of the queue
const char** get_tracked_conf_keys() const final;
void handle_conf_change(const ConfigProxy& conf,
const std::set<std::string> &changed) final;
+private:
+ // Enqueue the op to the high priority queue
+ void enqueue_high(unsigned prio, OpSchedulerItem &&item, bool front = false);
};
}
utime_t(), owner, e);
}
+template <typename... Args>
+OpSchedulerItem create_high_prio_item(
+ unsigned priority, epoch_t e, uint64_t owner, Args&&... args)
+{
+ // Create high priority item for testing high prio queue
+ return OpSchedulerItem(
+ std::make_unique<mClockSchedulerTest::MockDmclockItem>(
+ std::forward<Args>(args)...),
+ 12, priority,
+ utime_t(), owner, e);
+}
+
OpSchedulerItem get_item(WorkItem item)
{
return std::move(std::get<OpSchedulerItem>(item));
}
ASSERT_TRUE(q.empty());
}
+
+TEST_F(mClockSchedulerTest, TestHighPriorityQueueEnqueueDequeue) {
+ ASSERT_TRUE(q.empty());
+ for (unsigned i = 200; i < 205; ++i) {
+ q.enqueue(create_high_prio_item(i, i, client1, op_scheduler_class::client));
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+
+ ASSERT_FALSE(q.empty());
+ // Higher priority ops should be dequeued first
+ auto r = get_item(q.dequeue());
+ ASSERT_EQ(204u, r.get_map_epoch());
+
+ r = get_item(q.dequeue());
+ ASSERT_EQ(203u, r.get_map_epoch());
+
+ r = get_item(q.dequeue());
+ ASSERT_EQ(202u, r.get_map_epoch());
+
+ r = get_item(q.dequeue());
+ ASSERT_EQ(201u, r.get_map_epoch());
+
+ r = get_item(q.dequeue());
+ ASSERT_EQ(200u, r.get_map_epoch());
+
+ ASSERT_TRUE(q.empty());
+}
+
+TEST_F(mClockSchedulerTest, TestAllQueuesEnqueueDequeue) {
+ ASSERT_TRUE(q.empty());
+
+ // Insert ops into the mClock queue
+ for (unsigned i = 100; i < 102; ++i) {
+ q.enqueue(create_item(i, client1, op_scheduler_class::client));
+ std::this_thread::sleep_for(std::chrono::microseconds(1));
+ }
+
+ // Insert ops into the immediate queue
+ for (unsigned i = 103; i < 105; ++i) {
+ q.enqueue(create_item(i, client1, op_scheduler_class::immediate));
+ std::this_thread::sleep_for(std::chrono::microseconds(1));
+ }
+
+ // Insert ops into the high queue
+ for (unsigned i = 200; i < 202; ++i) {
+ q.enqueue(create_high_prio_item(i, i, client1, op_scheduler_class::client));
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+
+ ASSERT_FALSE(q.empty());
+ auto r = get_item(q.dequeue());
+ // Immediate queue should be dequeued first
+ ASSERT_EQ(103u, r.get_map_epoch());
+ r = get_item(q.dequeue());
+ ASSERT_EQ(104u, r.get_map_epoch());
+
+ // High priority queue should be dequeued second
+ // higher priority operation first
+ r = get_item(q.dequeue());
+ ASSERT_EQ(201u, r.get_map_epoch());
+ r = get_item(q.dequeue());
+ ASSERT_EQ(200u, r.get_map_epoch());
+
+ // mClock queue will be dequeued last
+ r = get_item(q.dequeue());
+ ASSERT_EQ(100u, r.get_map_epoch());
+ r = get_item(q.dequeue());
+ ASSERT_EQ(101u, r.get_map_epoch());
+
+ ASSERT_TRUE(q.empty());
+}