From: Robert LeBlanc Date: Thu, 17 Dec 2015 23:07:51 +0000 (+0000) Subject: common/OpQueue: Add base class for Op Queues. Introduce the new Weighted Priority... X-Git-Tag: v10.0.4~71^2~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=253d45bd55cafeacc82fde472e077a28dd6bd70d;p=ceph.git common/OpQueue: Add base class for Op Queues. Introduce the new Weighted Priority Queue and update Prioritized Queue to inherit the base class. Signed-Off-By: Robert LeBlanc --- diff --git a/src/common/Makefile.am b/src/common/Makefile.am index 182295f9e6d6..cc5a4101a116 100644 --- a/src/common/Makefile.am +++ b/src/common/Makefile.am @@ -177,7 +177,9 @@ noinst_HEADERS += \ common/Preforker.h \ common/SloppyCRCMap.h \ common/WorkQueue.h \ + common/OpQueue.h \ common/PrioritizedQueue.h \ + common/WeightedPriorityQueue.h \ common/ceph_argparse.h \ common/ceph_context.h \ common/xattr.h \ diff --git a/src/common/OpQueue.h b/src/common/OpQueue.h new file mode 100644 index 000000000000..34adc026810f --- /dev/null +++ b/src/common/OpQueue.h @@ -0,0 +1,63 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef OP_QUEUE_H +#define OP_QUEUE_H + +#include "include/msgr.h" + +#include +#include + +namespace ceph { + class Formatter; +} + +/** + * Abstract class for all Op Queues + * + * In order to provide optimized code, be sure to declare all + * virutal functions as final in the derived class. + */ + +template +class OpQueue { + + public: + // How many Ops are in the queue + virtual unsigned length() const = 0; + // Ops will be removed and placed in *removed if f is true + virtual void remove_by_filter( + std::function f, std::list *removed) = 0; + // Ops of this priority should be deleted immediately + virtual void remove_by_class(K k, std::list *out) = 0; + // Enqueue op in the back of the strict queue + virtual void enqueue_strict(K cl, unsigned priority, T item) = 0; + // Enqueue op in the front of the strict queue + virtual void enqueue_strict_front(K cl, unsigned priority, T item) = 0; + // Enqueue op in the back of the regular queue + virtual void enqueue(K cl, unsigned priority, unsigned cost, T item) = 0; + // Enqueue the op in the front of the regular queue + virtual void enqueue_front(K cl, unsigned priority, unsigned cost, T item) = 0; + // Returns if the queue is empty + virtual bool empty() const = 0; + // Return an op to be dispatch + virtual T dequeue() = 0; + // Formatted output of the queue + virtual void dump(ceph::Formatter *f) const = 0; + // Don't leak resources on destruction + virtual ~OpQueue() {}; +}; + +#endif diff --git a/src/common/PrioritizedQueue.h b/src/common/PrioritizedQueue.h index 010f919ce006..8be6ca893acf 100644 --- a/src/common/PrioritizedQueue.h +++ b/src/common/PrioritizedQueue.h @@ -16,6 +16,7 @@ #define PRIORITY_QUEUE_H #include "common/Formatter.h" +#include "common/OpQueue.h" #include #include @@ -41,15 +42,15 @@ * to provide fairness for different clients. */ template -class PrioritizedQueue { +class PrioritizedQueue : public OpQueue { int64_t total_priority; int64_t max_tokens_per_subqueue; int64_t min_cost; typedef std::list > ListPairs; - template static unsigned filter_list_pairs( - ListPairs *l, F f, + ListPairs *l, + std::function f, std::list *out) { unsigned ret = 0; if (out) { @@ -152,8 +153,9 @@ class PrioritizedQueue { bool empty() const { return q.empty(); } - template - void remove_by_filter(F f, std::list *out) { + void remove_by_filter( + std::function f, + std::list *out) { for (typename Classes::iterator i = q.begin(); i != q.end(); ) { @@ -193,7 +195,7 @@ class PrioritizedQueue { } } - void dump(Formatter *f) const { + void dump(ceph::Formatter *f) const { f->dump_int("tokens", tokens); f->dump_int("max_tokens", max_tokens); f->dump_int("size", size); @@ -244,7 +246,7 @@ public: min_cost(min_c) {} - unsigned length() const { + unsigned length() const override final { unsigned total = 0; for (typename SubQueues::const_iterator i = queue.begin(); i != queue.end(); @@ -261,8 +263,9 @@ public: return total; } - template - void remove_by_filter(F f, std::list *removed = 0) { + void remove_by_filter( + std::function f, + std::list *removed = 0) override final { for (typename SubQueues::iterator i = queue.begin(); i != queue.end(); ) { @@ -288,7 +291,7 @@ public: } } - void remove_by_class(K k, std::list *out = 0) { + void remove_by_class(K k, std::list *out = 0) override final { for (typename SubQueues::iterator i = queue.begin(); i != queue.end(); ) { @@ -313,15 +316,15 @@ public: } } - void enqueue_strict(K cl, unsigned priority, T item) { + void enqueue_strict(K cl, unsigned priority, T item) override final { high_queue[priority].enqueue(cl, 0, item); } - void enqueue_strict_front(K cl, unsigned priority, T item) { + void enqueue_strict_front(K cl, unsigned priority, T item) override final { high_queue[priority].enqueue_front(cl, 0, item); } - void enqueue(K cl, unsigned priority, unsigned cost, T item) { + void enqueue(K cl, unsigned priority, unsigned cost, T item) override final { if (cost < min_cost) cost = min_cost; if (cost > max_tokens_per_subqueue) @@ -329,7 +332,7 @@ public: create_queue(priority)->enqueue(cl, cost, item); } - void enqueue_front(K cl, unsigned priority, unsigned cost, T item) { + void enqueue_front(K cl, unsigned priority, unsigned cost, T item) override final { if (cost < min_cost) cost = min_cost; if (cost > max_tokens_per_subqueue) @@ -337,13 +340,13 @@ public: create_queue(priority)->enqueue_front(cl, cost, item); } - bool empty() const { + bool empty() const override final { assert(total_priority >= 0); assert((total_priority == 0) || !(queue.empty())); return queue.empty() && high_queue.empty(); } - T dequeue() { + T dequeue() override final { assert(!empty()); if (!(high_queue.empty())) { @@ -387,7 +390,7 @@ public: return ret; } - void dump(Formatter *f) const { + void dump(ceph::Formatter *f) const override final { f->dump_int("total_priority", total_priority); f->dump_int("max_tokens_per_subqueue", max_tokens_per_subqueue); f->dump_int("min_cost", min_cost); diff --git a/src/common/WeightedPriorityQueue.h b/src/common/WeightedPriorityQueue.h new file mode 100644 index 000000000000..6a77a6cd4c9a --- /dev/null +++ b/src/common/WeightedPriorityQueue.h @@ -0,0 +1,359 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef WP_QUEUE_H +#define WP_QUEUE_H + +#include "common/Formatter.h" +#include "common/OpQueue.h" + +#include +#include + +/** + * Weighted Priority queue with strict priority queue + * + * This queue attempts to be fair to all classes of + * operations but is also weighted so that higher classes + * get more share of the operations. + */ + +template +class WeightedPriorityQueue : public OpQueue { + int64_t total_priority; + + typedef std::list> ListPairs; + static unsigned filter_list_pairs( + ListPairs *l, std::function f, + std::list *out) { + unsigned ret = 0; + if (out) { + for (typename ListPairs::reverse_iterator i = l->rbegin(); + i != l->rend(); + ++i) { + if (f(i->second)) { + out->push_front(i->second); + } + } + } + for (typename ListPairs::iterator i = l->begin(); + i != l->end(); ) { + if (f(i->second)) { + l->erase(i++); + ++ret; + } else { + ++i; + } + } + return ret; + } + + struct SubQueue { + private: + typedef std::map Classes; + Classes q; + typename Classes::iterator cur; + unsigned q_size; + public: + SubQueue(const SubQueue &other) + : q(other.q), + cur(q.begin()), + q_size(0) {} + SubQueue() + : cur(q.begin()), + q_size(0) {} + void enqueue_front(K cl, unsigned cost, T item) { + q[cl].push_front(std::make_pair(cost, item)); + if (cur == q.end()) { + cur = q.begin(); + } + ++q_size; + } + void enqueue(K cl, unsigned cost, T item) { + q[cl].push_back(std::make_pair(cost, item)); + if (cur == q.end()) { + cur = q.begin(); + } + ++q_size; + } + std::pair front() const { + assert(!q.empty()); + assert(cur != q.end()); + assert(!cur->second.empty()); + return cur->second.front(); + } + void pop_front() { + assert(!q.empty()); + assert(cur != q.end()); + assert(!cur->second.empty()); + cur->second.pop_front(); + if (cur->second.empty()) { + cur = q.erase(cur); + } else { + ++cur; + } + if (cur == q.end()) { + cur = q.begin(); + } + --q_size; + } + unsigned size() const { + return q_size; + } + bool empty() const { + return (q_size == 0); + } + unsigned remove_by_filter(std::function f, std::list *out) { + unsigned count = 0; + for (typename Classes::iterator i = q.begin(); + i != q.end(); ) { + count += filter_list_pairs(&(i->second), f, out); + if (i->second.empty()) { + if (cur == i) { + ++cur; + } + q.erase(i++); + } else { + ++i; + } + } + if (cur == q.end()) { + cur = q.begin(); + } + q_size -= count; + return count; + } + unsigned remove_by_class(K k, std::list *out) { + typename Classes::iterator i = q.find(k); + if (i == q.end()) { + return 0; + } + unsigned count = i->second.size(); + q_size -= count; + if (out) { + for (typename ListPairs::reverse_iterator j = + i->second.rbegin(); + j != i->second.rend(); + ++j) { + out->push_front(j->second); + } + } + if (i == cur) { + ++cur; + } + q.erase(i); + if (cur == q.end()) { + cur = q.begin(); + } + return count; + } + + void dump(ceph::Formatter *f) const { + f->dump_int("num_keys", q.size()); + if (!empty()) { + f->dump_int("first_item_cost", front().first); + } + } + }; + + unsigned high_size, wrr_size; + unsigned max_cost; + + typedef std::map SubQueues; + SubQueues high_queue; + SubQueues queue; + typename SubQueues::reverse_iterator dq; + + SubQueue *create_queue(unsigned priority) { + typename SubQueues::iterator p = queue.find(priority); + if (p != queue.end()) { + return &p->second; + } + total_priority += priority; + SubQueue *sq = &queue[priority]; + return sq; + } + + void remove_queue(unsigned priority) { + assert(queue.count(priority)); + dq = (typename SubQueues::reverse_iterator) queue.erase(queue.find(priority)); + if (dq == queue.rend()) { + dq = queue.rbegin(); + } + total_priority -= priority; + assert(total_priority >= 0); + } + +public: + WeightedPriorityQueue(unsigned max_per, unsigned min_c) + : total_priority(0), + high_size(0), + wrr_size(0), + max_cost(0), + dq(queue.rbegin()) + { + srand(time(0)); + } + + unsigned length() const override final { + return high_size + wrr_size; + } + + void remove_by_filter( + std::function f, std::list *removed = 0) override final { + for (typename SubQueues::iterator i = queue.begin(); + i != queue.end(); ++i) { + wrr_size -= i->second.remove_by_filter(f, removed); + unsigned priority = i->first; + if (i->second.empty()) { + remove_queue(priority); + } + } + for (typename SubQueues::iterator i = high_queue.begin(); + i != high_queue.end(); + ) { + high_size -= i->second.remove_by_filter(f, removed); + if (i->second.empty()) { + high_queue.erase(i++); + } else { + ++i; + } + } + } + + void remove_by_class(K k, std::list *out = 0) override final { + for (typename SubQueues::iterator i = queue.begin(); + i != queue.end(); ++i) { + wrr_size -= i->second.remove_by_class(k, out); + unsigned priority = i->first; + if (i->second.empty()) { + remove_queue(priority); + } + } + for (typename SubQueues::iterator i = high_queue.begin(); + i != high_queue.end(); + ) { + high_size -= i->second.remove_by_class(k, out); + if (i->second.empty()) { + high_queue.erase(i++); + } else { + ++i; + } + } + } + + void enqueue_strict(K cl, unsigned priority, T item) override final { + high_queue[priority].enqueue(cl, 0, item); + ++high_size; + } + + void enqueue_strict_front(K cl, unsigned priority, T item) override final { + high_queue[priority].enqueue_front(cl, 0, item); + ++high_size; + } + + void enqueue(K cl, unsigned priority, unsigned cost, T item) override final { + if (cost > max_cost) { + max_cost = cost; + } + create_queue(priority)->enqueue(cl, cost, item); + ++wrr_size; + } + + void enqueue_front(K cl, unsigned priority, unsigned cost, T item) override final { + if (cost > max_cost) { + max_cost = cost; + } + create_queue(priority)->enqueue_front(cl, cost, item); + ++wrr_size; + } + + bool empty() const override final { + assert(total_priority >= 0); + assert((total_priority == 0) || !queue.empty()); + return (high_size + wrr_size == 0) ? true : false; + } + + T dequeue() override final { + assert(!empty()); + + if (!high_queue.empty()) { + T ret = high_queue.rbegin()->second.front().second; + high_queue.rbegin()->second.pop_front(); + if (high_queue.rbegin()->second.empty()) { + high_queue.erase(high_queue.rbegin()->first); + } + --high_size; + return ret; + } + // If there is more than one priority, choose one to run. + if (dq->second.size() != wrr_size) { + while (true) { + // Pick a new priority out of the total priority. + unsigned prio = rand() % total_priority; + typename SubQueues::iterator i = queue.begin(); + unsigned tp = i->first; + // Find the priority coresponding to the picked number. + // Add low priorities to high priorities until the picked number + // is less than the total and try to dequeue that priority. + while (prio > tp) { + ++i; + tp += i->first; + } + dq = (typename SubQueues::reverse_iterator) ++i; + // Flip a coin to see if this priority gets to run based on cost. + // The next op's cost is multiplied by .9 and subtracted from the + // max cost seen. Ops with lower costs will have a larger value + // and allow them to be selected easier than ops with high costs. + if (max_cost == 0 || rand() % max_cost <= + (max_cost - ((dq->second.front().first * 9) / 10))){ + break; + } + } + } + T ret = dq->second.front().second; + dq->second.pop_front(); + if (dq->second.empty()) { + remove_queue(dq->first); + } + --wrr_size; + return ret; + } + + void dump(ceph::Formatter *f) const { + f->dump_int("total_priority", total_priority); + f->open_array_section("high_queues"); + for (typename SubQueues::const_iterator p = high_queue.begin(); + p != high_queue.end(); + ++p) { + f->open_object_section("subqueue"); + f->dump_int("priority", p->first); + p->second.dump(f); + f->close_section(); + } + f->close_section(); + f->open_array_section("queues"); + for (typename SubQueues::const_iterator p = queue.begin(); + p != queue.end(); + ++p) { + f->open_object_section("subqueue"); + f->dump_int("priority", p->first); + p->second.dump(f); + f->close_section(); + } + f->close_section(); + } +}; + +#endif