From: Robert LeBlanc Date: Mon, 15 Feb 2016 17:48:49 +0000 (+0000) Subject: common/WeightedPriorityQueue Rewrote the queue to use intrusive contianers. Microbenc... X-Git-Tag: v10.1.0~286^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=f03de8e2d01b5a576d05815298402beca1399cbf;p=ceph.git common/WeightedPriorityQueue Rewrote the queue to use intrusive contianers. Microbenchmarks show 60-70% of execution time compared to before. Signed-off-by: Robert LeBlanc --- diff --git a/src/common/WeightedPriorityQueue.h b/src/common/WeightedPriorityQueue.h index 6a77a6cd4c9a..b1c48f38b2f9 100644 --- a/src/common/WeightedPriorityQueue.h +++ b/src/common/WeightedPriorityQueue.h @@ -15,345 +15,277 @@ #ifndef WP_QUEUE_H #define WP_QUEUE_H -#include "common/Formatter.h" -#include "common/OpQueue.h" +#include "OpQueue.h" -#include -#include +#include +#include +#include -/** - * Weighted Priority queue with strict priority queue - * - * This queue attempts to be fair to all classes of - * operations but is also weighted so that higher classes - * get more share of the operations. - */ - -template -class WeightedPriorityQueue : public OpQueue { - int64_t total_priority; +namespace bi = boost::intrusive; - typedef std::list> ListPairs; - static unsigned filter_list_pairs( - ListPairs *l, std::function f, - std::list *out) { - unsigned ret = 0; - if (out) { - for (typename ListPairs::reverse_iterator i = l->rbegin(); - i != l->rend(); - ++i) { - if (f(i->second)) { - out->push_front(i->second); - } - } - } - for (typename ListPairs::iterator i = l->begin(); - i != l->end(); ) { - if (f(i->second)) { - l->erase(i++); - ++ret; - } else { - ++i; - } - } - return ret; +template +class MapKey +{ + public: + bool operator()(const unsigned i, const T &k) const + { + return i < k.key; } + bool operator()(const T &k, const unsigned i) const + { + return k.key < i; + } +}; - struct SubQueue { - private: - typedef std::map Classes; - Classes q; - typename Classes::iterator cur; - unsigned q_size; +template +class DelItem +{ public: - SubQueue(const SubQueue &other) - : q(other.q), - cur(q.begin()), - q_size(0) {} - SubQueue() - : cur(q.begin()), - q_size(0) {} - void enqueue_front(K cl, unsigned cost, T item) { - q[cl].push_front(std::make_pair(cost, item)); - if (cur == q.end()) { - cur = q.begin(); - } - ++q_size; - } - void enqueue(K cl, unsigned cost, T item) { - q[cl].push_back(std::make_pair(cost, item)); - if (cur == q.end()) { - cur = q.begin(); - } - ++q_size; - } - std::pair front() const { - assert(!q.empty()); - assert(cur != q.end()); - assert(!cur->second.empty()); - return cur->second.front(); - } - void pop_front() { - assert(!q.empty()); - assert(cur != q.end()); - assert(!cur->second.empty()); - cur->second.pop_front(); - if (cur->second.empty()) { - cur = q.erase(cur); - } else { - ++cur; - } - if (cur == q.end()) { - cur = q.begin(); + void operator()(T* delete_this) + { delete delete_this; } +}; + +template +class WeightedPriorityQueue : public OpQueue +{ + private: + class ListPair : public bi::list_base_hook<> + { + public: + K klass; + unsigned cost; + T item; + ListPair(K& k, unsigned c, T& i) : + klass(k), + cost(c), + item(i) + {} + }; + class SubQueue : public bi::set_base_hook<> + { + typedef bi::list QueueItems; + typedef typename QueueItems::iterator QI; + public: + unsigned key; // priority + QueueItems qitems; + SubQueue(unsigned& p) : + key(p) + {} + bool empty() const { + return qitems.empty(); } - --q_size; - } - unsigned size() const { - return q_size; - } - bool empty() const { - return (q_size == 0); - } - unsigned remove_by_filter(std::function f, std::list *out) { - unsigned count = 0; - for (typename Classes::iterator i = q.begin(); - i != q.end(); ) { - count += filter_list_pairs(&(i->second), f, out); - if (i->second.empty()) { - if (cur == i) { - ++cur; - } - q.erase(i++); + void insert(K& cl, unsigned cost, T& item, bool front = false) { + if (front) { + qitems.push_front(*new ListPair(cl, cost, item)); } else { - ++i; + qitems.push_back(*new ListPair(cl, cost, item)); } } - if (cur == q.end()) { - cur = q.begin(); + unsigned get_cost() const { + return qitems.begin()->cost; } - q_size -= count; - return count; - } - unsigned remove_by_class(K k, std::list *out) { - typename Classes::iterator i = q.find(k); - if (i == q.end()) { - return 0; + T pop() { + T ret = qitems.begin()->item; + qitems.erase_and_dispose(qitems.begin(), DelItem()); + return ret; } - unsigned count = i->second.size(); - q_size -= count; - if (out) { - for (typename ListPairs::reverse_iterator j = - i->second.rbegin(); - j != i->second.rend(); - ++j) { - out->push_front(j->second); - } + unsigned filter_list_pairs(std::function& f, std::list* out) { + unsigned count = 0; + // intrusive containers can't erase with a reverse_iterator + // so we have to walk backwards on our own. Since there is + // no iterator before begin, we have to test at the end. + for (QI i = --qitems.end();; --i) { + if (f(i->item)) { + if (out) { + out->push_front(i->item); + } + i = qitems.erase_and_dispose(i, DelItem()); + ++count; + } + if (i == qitems.begin()) { + break; + } + } + return count; } - if (i == cur) { - ++cur; + unsigned filter_class(K& cl, std::list* out) { + unsigned count = 0; + for (QI i = --qitems.end();; --i) { + if (i->klass == cl) { + if (out) { + out->push_front(i->item); + } + i = qitems.erase_and_dispose(i, DelItem()); + ++count; + } + if (i == qitems.begin()) { + break; + } + } + return count; } - q.erase(i); - if (cur == q.end()) { - cur = q.begin(); + void dump(ceph::Formatter *f) const { + f->dump_int("num_keys", qitems.size()); + f->dump_int("first_item_cost", qitems.begin()->cost); } - return count; - } + }; + class Queue { + typedef bi::rbtree SubQueues; + typedef typename SubQueues::iterator Sit; + SubQueues queues; + unsigned total_prio; + unsigned max_cost; + public: + unsigned size; + Queue() : + total_prio(0), + max_cost(0), + size(0) + {} + bool empty() const { + return !size; + } + void insert(unsigned p, K& cl, unsigned cost, T& item, bool front = false) { + typename SubQueues::insert_commit_data insert_data; + std::pair ret = + queues.insert_unique_check(p, MapKey(), insert_data); + if (ret.second) { + ret.first = queues.insert_unique_commit(*new SubQueue(p), insert_data); + total_prio += p; + } + ret.first->insert(cl, cost, item, front); + if (cost > max_cost) { + max_cost = cost; + } + ++size; + } + T pop(bool strict = false) { + --size; + Sit i = --queues.end(); + if (strict) { + T ret = i->pop(); + if (i->empty()) { + queues.erase_and_dispose(i, DelItem()); + } + return ret; + } + if (queues.size() > 1) { + while (true) { + // Pick a new priority out of the total priority. + unsigned prio = rand() % total_prio + 1; + unsigned tp = total_prio - i->key; + // Find the priority coresponding to the picked number. + // Subtract high priorities to low priorities until the picked number + // is more than the total and try to dequeue that priority. + // Reverse the direction from previous implementation because there is a higher + // chance of dequeuing a high priority op so spend less time spinning. + while (prio <= tp) { + --i; + tp -= i->key; + } + // Flip a coin to see if this priority gets to run based on cost. + // The next op's cost is multiplied by .9 and subtracted from the + // max cost seen. Ops with lower costs will have a larger value + // and allow them to be selected easier than ops with high costs. + if (max_cost == 0 || rand() % max_cost <= + (max_cost - ((i->get_cost() * 9) / 10))) { + break; + } + i = --queues.end(); + } + } + T ret = i->pop(); + if (i->empty()) { + total_prio -= i->key; + queues.erase_and_dispose(i, DelItem()); + } + return ret; + } + void filter_list_pairs(std::function& f, std::list* out) { + for (Sit i = queues.begin(); i != queues.end();) { + size -= i->filter_list_pairs(f, out); + if (i->empty()) { + total_prio -= i->key; + i = queues.erase_and_dispose(i, DelItem()); + } else { + ++i; + } + } + } + void filter_class(K& cl, std::list* out) { + for (Sit i = queues.begin(); i != queues.end();) { + size -= i->filter_class(cl, out); + if (i->empty()) { + total_prio -= i->key; + i = queues.erase_and_dispose(i, DelItem()); + } else { + ++i; + } + } + } + void dump(ceph::Formatter *f) const { + for (typename SubQueues::const_iterator i = queues.begin(); + i != queues.end(); ++i) { + f->dump_int("total_priority", total_prio); + f->dump_int("max_cost", max_cost); + f->open_object_section("subqueue"); + f->dump_int("priority", i->key); + i->dump(f); + f->close_section(); + } + } + }; - void dump(ceph::Formatter *f) const { - f->dump_int("num_keys", q.size()); - if (!empty()) { - f->dump_int("first_item_cost", front().first); + Queue strict; + Queue normal; + public: + WeightedPriorityQueue(unsigned max_per, unsigned min_c) : + strict(), + normal() + { + std::srand(time(0)); } + unsigned length() const override final { + return strict.size + normal.size; } - }; - - unsigned high_size, wrr_size; - unsigned max_cost; - - typedef std::map SubQueues; - SubQueues high_queue; - SubQueues queue; - typename SubQueues::reverse_iterator dq; - - SubQueue *create_queue(unsigned priority) { - typename SubQueues::iterator p = queue.find(priority); - if (p != queue.end()) { - return &p->second; + void remove_by_filter(std::function f, std::list* removed = 0) override final { + strict.filter_list_pairs(f, removed); + normal.filter_list_pairs(f, removed); } - total_priority += priority; - SubQueue *sq = &queue[priority]; - return sq; - } - - void remove_queue(unsigned priority) { - assert(queue.count(priority)); - dq = (typename SubQueues::reverse_iterator) queue.erase(queue.find(priority)); - if (dq == queue.rend()) { - dq = queue.rbegin(); + void remove_by_class(K cl, std::list* removed = 0) override final { + strict.filter_class(cl, removed); + normal.filter_class(cl, removed); } - total_priority -= priority; - assert(total_priority >= 0); - } - -public: - WeightedPriorityQueue(unsigned max_per, unsigned min_c) - : total_priority(0), - high_size(0), - wrr_size(0), - max_cost(0), - dq(queue.rbegin()) - { - srand(time(0)); - } - - unsigned length() const override final { - return high_size + wrr_size; - } - - void remove_by_filter( - std::function f, std::list *removed = 0) override final { - for (typename SubQueues::iterator i = queue.begin(); - i != queue.end(); ++i) { - wrr_size -= i->second.remove_by_filter(f, removed); - unsigned priority = i->first; - if (i->second.empty()) { - remove_queue(priority); - } + bool empty() const override final { + return !(strict.size + normal.size); } - for (typename SubQueues::iterator i = high_queue.begin(); - i != high_queue.end(); - ) { - high_size -= i->second.remove_by_filter(f, removed); - if (i->second.empty()) { - high_queue.erase(i++); - } else { - ++i; - } + void enqueue_strict(K cl, unsigned p, T item) override final { + strict.insert(p, cl, 0, item); } - } - - void remove_by_class(K k, std::list *out = 0) override final { - for (typename SubQueues::iterator i = queue.begin(); - i != queue.end(); ++i) { - wrr_size -= i->second.remove_by_class(k, out); - unsigned priority = i->first; - if (i->second.empty()) { - remove_queue(priority); - } + void enqueue_strict_front(K cl, unsigned p, T item) override final { + strict.insert(p, cl, 0, item, true); } - for (typename SubQueues::iterator i = high_queue.begin(); - i != high_queue.end(); - ) { - high_size -= i->second.remove_by_class(k, out); - if (i->second.empty()) { - high_queue.erase(i++); - } else { - ++i; - } + void enqueue(K cl, unsigned p, unsigned cost, T item) override final { + normal.insert(p, cl, cost, item); } - } - - void enqueue_strict(K cl, unsigned priority, T item) override final { - high_queue[priority].enqueue(cl, 0, item); - ++high_size; - } - - void enqueue_strict_front(K cl, unsigned priority, T item) override final { - high_queue[priority].enqueue_front(cl, 0, item); - ++high_size; - } - - void enqueue(K cl, unsigned priority, unsigned cost, T item) override final { - if (cost > max_cost) { - max_cost = cost; - } - create_queue(priority)->enqueue(cl, cost, item); - ++wrr_size; - } - - void enqueue_front(K cl, unsigned priority, unsigned cost, T item) override final { - if (cost > max_cost) { - max_cost = cost; - } - create_queue(priority)->enqueue_front(cl, cost, item); - ++wrr_size; - } - - bool empty() const override final { - assert(total_priority >= 0); - assert((total_priority == 0) || !queue.empty()); - return (high_size + wrr_size == 0) ? true : false; - } - - T dequeue() override final { - assert(!empty()); - - if (!high_queue.empty()) { - T ret = high_queue.rbegin()->second.front().second; - high_queue.rbegin()->second.pop_front(); - if (high_queue.rbegin()->second.empty()) { - high_queue.erase(high_queue.rbegin()->first); - } - --high_size; - return ret; + void enqueue_front(K cl, unsigned p, unsigned cost, T item) override final { + normal.insert(p, cl, cost, item, true); } - // If there is more than one priority, choose one to run. - if (dq->second.size() != wrr_size) { - while (true) { - // Pick a new priority out of the total priority. - unsigned prio = rand() % total_priority; - typename SubQueues::iterator i = queue.begin(); - unsigned tp = i->first; - // Find the priority coresponding to the picked number. - // Add low priorities to high priorities until the picked number - // is less than the total and try to dequeue that priority. - while (prio > tp) { - ++i; - tp += i->first; - } - dq = (typename SubQueues::reverse_iterator) ++i; - // Flip a coin to see if this priority gets to run based on cost. - // The next op's cost is multiplied by .9 and subtracted from the - // max cost seen. Ops with lower costs will have a larger value - // and allow them to be selected easier than ops with high costs. - if (max_cost == 0 || rand() % max_cost <= - (max_cost - ((dq->second.front().first * 9) / 10))){ - break; - } + T dequeue() { + assert(strict.size + normal.size > 0); + if (!strict.empty()) { + return strict.pop(true); } + return normal.pop(); } - T ret = dq->second.front().second; - dq->second.pop_front(); - if (dq->second.empty()) { - remove_queue(dq->first); - } - --wrr_size; - return ret; - } - - void dump(ceph::Formatter *f) const { - f->dump_int("total_priority", total_priority); - f->open_array_section("high_queues"); - for (typename SubQueues::const_iterator p = high_queue.begin(); - p != high_queue.end(); - ++p) { - f->open_object_section("subqueue"); - f->dump_int("priority", p->first); - p->second.dump(f); + void dump(ceph::Formatter *f) const { + f->open_array_section("high_queues"); + strict.dump(f); f->close_section(); - } - f->close_section(); - f->open_array_section("queues"); - for (typename SubQueues::const_iterator p = queue.begin(); - p != queue.end(); - ++p) { - f->open_object_section("subqueue"); - f->dump_int("priority", p->first); - p->second.dump(f); + f->open_array_section("queues"); + normal.dump(f); f->close_section(); } - f->close_section(); - } }; #endif