#ifndef WP_QUEUE_H
#define WP_QUEUE_H
-#include "common/Formatter.h"
-#include "common/OpQueue.h"
+#include "OpQueue.h"
-#include <map>
-#include <list>
+#include <boost/intrusive/list.hpp>
+#include <boost/intrusive/rbtree.hpp>
+#include <boost/intrusive/avl_set.hpp>
-/**
- * Weighted Priority queue with strict priority queue
- *
- * This queue attempts to be fair to all classes of
- * operations but is also weighted so that higher classes
- * get more share of the operations.
- */
-
-template <typename T, typename K>
-class WeightedPriorityQueue : public OpQueue <T, K> {
- int64_t total_priority;
+namespace bi = boost::intrusive;
- typedef std::list<std::pair<unsigned, T>> ListPairs;
- static unsigned filter_list_pairs(
- ListPairs *l, std::function<bool (T)> f,
- std::list<T> *out) {
- unsigned ret = 0;
- if (out) {
- for (typename ListPairs::reverse_iterator i = l->rbegin();
- i != l->rend();
- ++i) {
- if (f(i->second)) {
- out->push_front(i->second);
- }
- }
- }
- for (typename ListPairs::iterator i = l->begin();
- i != l->end(); ) {
- if (f(i->second)) {
- l->erase(i++);
- ++ret;
- } else {
- ++i;
- }
- }
- return ret;
+template <typename T>
+class MapKey
+{
+ public:
+ bool operator()(const unsigned i, const T &k) const
+ {
+ return i < k.key;
}
+ bool operator()(const T &k, const unsigned i) const
+ {
+ return k.key < i;
+ }
+};
- struct SubQueue {
- private:
- typedef std::map<K, ListPairs> Classes;
- Classes q;
- typename Classes::iterator cur;
- unsigned q_size;
+template <typename T>
+class DelItem
+{
public:
- SubQueue(const SubQueue &other)
- : q(other.q),
- cur(q.begin()),
- q_size(0) {}
- SubQueue()
- : cur(q.begin()),
- q_size(0) {}
- void enqueue_front(K cl, unsigned cost, T item) {
- q[cl].push_front(std::make_pair(cost, item));
- if (cur == q.end()) {
- cur = q.begin();
- }
- ++q_size;
- }
- void enqueue(K cl, unsigned cost, T item) {
- q[cl].push_back(std::make_pair(cost, item));
- if (cur == q.end()) {
- cur = q.begin();
- }
- ++q_size;
- }
- std::pair<unsigned, T> front() const {
- assert(!q.empty());
- assert(cur != q.end());
- assert(!cur->second.empty());
- return cur->second.front();
- }
- void pop_front() {
- assert(!q.empty());
- assert(cur != q.end());
- assert(!cur->second.empty());
- cur->second.pop_front();
- if (cur->second.empty()) {
- cur = q.erase(cur);
- } else {
- ++cur;
- }
- if (cur == q.end()) {
- cur = q.begin();
+ void operator()(T* delete_this)
+ { delete delete_this; }
+};
+
+template <typename T, typename K>
+class WeightedPriorityQueue : public OpQueue <T, K>
+{
+ private:
+ class ListPair : public bi::list_base_hook<>
+ {
+ public:
+ K klass;
+ unsigned cost;
+ T item;
+ ListPair(K& k, unsigned c, T& i) :
+ klass(k),
+ cost(c),
+ item(i)
+ {}
+ };
+ class SubQueue : public bi::set_base_hook<>
+ {
+ typedef bi::list<ListPair> QueueItems;
+ typedef typename QueueItems::iterator QI;
+ public:
+ unsigned key; // priority
+ QueueItems qitems;
+ SubQueue(unsigned& p) :
+ key(p)
+ {}
+ bool empty() const {
+ return qitems.empty();
}
- --q_size;
- }
- unsigned size() const {
- return q_size;
- }
- bool empty() const {
- return (q_size == 0);
- }
- unsigned remove_by_filter(std::function<bool (T)> f, std::list<T> *out) {
- unsigned count = 0;
- for (typename Classes::iterator i = q.begin();
- i != q.end(); ) {
- count += filter_list_pairs(&(i->second), f, out);
- if (i->second.empty()) {
- if (cur == i) {
- ++cur;
- }
- q.erase(i++);
+ void insert(K& cl, unsigned cost, T& item, bool front = false) {
+ if (front) {
+ qitems.push_front(*new ListPair(cl, cost, item));
} else {
- ++i;
+ qitems.push_back(*new ListPair(cl, cost, item));
}
}
- if (cur == q.end()) {
- cur = q.begin();
+ unsigned get_cost() const {
+ return qitems.begin()->cost;
}
- q_size -= count;
- return count;
- }
- unsigned remove_by_class(K k, std::list<T> *out) {
- typename Classes::iterator i = q.find(k);
- if (i == q.end()) {
- return 0;
+ T pop() {
+ T ret = qitems.begin()->item;
+ qitems.erase_and_dispose(qitems.begin(), DelItem<ListPair>());
+ return ret;
}
- unsigned count = i->second.size();
- q_size -= count;
- if (out) {
- for (typename ListPairs::reverse_iterator j =
- i->second.rbegin();
- j != i->second.rend();
- ++j) {
- out->push_front(j->second);
- }
+ unsigned filter_list_pairs(std::function<bool (T)>& f, std::list<T>* out) {
+ unsigned count = 0;
+ // intrusive containers can't erase with a reverse_iterator
+ // so we have to walk backwards on our own. Since there is
+ // no iterator before begin, we have to test at the end.
+ for (QI i = --qitems.end();; --i) {
+ if (f(i->item)) {
+ if (out) {
+ out->push_front(i->item);
+ }
+ i = qitems.erase_and_dispose(i, DelItem<ListPair>());
+ ++count;
+ }
+ if (i == qitems.begin()) {
+ break;
+ }
+ }
+ return count;
}
- if (i == cur) {
- ++cur;
+ unsigned filter_class(K& cl, std::list<T>* out) {
+ unsigned count = 0;
+ for (QI i = --qitems.end();; --i) {
+ if (i->klass == cl) {
+ if (out) {
+ out->push_front(i->item);
+ }
+ i = qitems.erase_and_dispose(i, DelItem<ListPair>());
+ ++count;
+ }
+ if (i == qitems.begin()) {
+ break;
+ }
+ }
+ return count;
}
- q.erase(i);
- if (cur == q.end()) {
- cur = q.begin();
+ void dump(ceph::Formatter *f) const {
+ f->dump_int("num_keys", qitems.size());
+ f->dump_int("first_item_cost", qitems.begin()->cost);
}
- return count;
- }
+ };
+ class Queue {
+ typedef bi::rbtree<SubQueue> SubQueues;
+ typedef typename SubQueues::iterator Sit;
+ SubQueues queues;
+ unsigned total_prio;
+ unsigned max_cost;
+ public:
+ unsigned size;
+ Queue() :
+ total_prio(0),
+ max_cost(0),
+ size(0)
+ {}
+ bool empty() const {
+ return !size;
+ }
+ void insert(unsigned p, K& cl, unsigned cost, T& item, bool front = false) {
+ typename SubQueues::insert_commit_data insert_data;
+ std::pair<typename SubQueues::iterator, bool> ret =
+ queues.insert_unique_check(p, MapKey<SubQueue>(), insert_data);
+ if (ret.second) {
+ ret.first = queues.insert_unique_commit(*new SubQueue(p), insert_data);
+ total_prio += p;
+ }
+ ret.first->insert(cl, cost, item, front);
+ if (cost > max_cost) {
+ max_cost = cost;
+ }
+ ++size;
+ }
+ T pop(bool strict = false) {
+ --size;
+ Sit i = --queues.end();
+ if (strict) {
+ T ret = i->pop();
+ if (i->empty()) {
+ queues.erase_and_dispose(i, DelItem<SubQueue>());
+ }
+ return ret;
+ }
+ if (queues.size() > 1) {
+ while (true) {
+ // Pick a new priority out of the total priority.
+ unsigned prio = rand() % total_prio + 1;
+ unsigned tp = total_prio - i->key;
+ // Find the priority coresponding to the picked number.
+ // Subtract high priorities to low priorities until the picked number
+ // is more than the total and try to dequeue that priority.
+ // Reverse the direction from previous implementation because there is a higher
+ // chance of dequeuing a high priority op so spend less time spinning.
+ while (prio <= tp) {
+ --i;
+ tp -= i->key;
+ }
+ // Flip a coin to see if this priority gets to run based on cost.
+ // The next op's cost is multiplied by .9 and subtracted from the
+ // max cost seen. Ops with lower costs will have a larger value
+ // and allow them to be selected easier than ops with high costs.
+ if (max_cost == 0 || rand() % max_cost <=
+ (max_cost - ((i->get_cost() * 9) / 10))) {
+ break;
+ }
+ i = --queues.end();
+ }
+ }
+ T ret = i->pop();
+ if (i->empty()) {
+ total_prio -= i->key;
+ queues.erase_and_dispose(i, DelItem<SubQueue>());
+ }
+ return ret;
+ }
+ void filter_list_pairs(std::function<bool (T)>& f, std::list<T>* out) {
+ for (Sit i = queues.begin(); i != queues.end();) {
+ size -= i->filter_list_pairs(f, out);
+ if (i->empty()) {
+ total_prio -= i->key;
+ i = queues.erase_and_dispose(i, DelItem<SubQueue>());
+ } else {
+ ++i;
+ }
+ }
+ }
+ void filter_class(K& cl, std::list<T>* out) {
+ for (Sit i = queues.begin(); i != queues.end();) {
+ size -= i->filter_class(cl, out);
+ if (i->empty()) {
+ total_prio -= i->key;
+ i = queues.erase_and_dispose(i, DelItem<SubQueue>());
+ } else {
+ ++i;
+ }
+ }
+ }
+ void dump(ceph::Formatter *f) const {
+ for (typename SubQueues::const_iterator i = queues.begin();
+ i != queues.end(); ++i) {
+ f->dump_int("total_priority", total_prio);
+ f->dump_int("max_cost", max_cost);
+ f->open_object_section("subqueue");
+ f->dump_int("priority", i->key);
+ i->dump(f);
+ f->close_section();
+ }
+ }
+ };
- void dump(ceph::Formatter *f) const {
- f->dump_int("num_keys", q.size());
- if (!empty()) {
- f->dump_int("first_item_cost", front().first);
+ Queue strict;
+ Queue normal;
+ public:
+ WeightedPriorityQueue(unsigned max_per, unsigned min_c) :
+ strict(),
+ normal()
+ {
+ std::srand(time(0));
}
+ unsigned length() const override final {
+ return strict.size + normal.size;
}
- };
-
- unsigned high_size, wrr_size;
- unsigned max_cost;
-
- typedef std::map<unsigned, SubQueue> SubQueues;
- SubQueues high_queue;
- SubQueues queue;
- typename SubQueues::reverse_iterator dq;
-
- SubQueue *create_queue(unsigned priority) {
- typename SubQueues::iterator p = queue.find(priority);
- if (p != queue.end()) {
- return &p->second;
+ void remove_by_filter(std::function<bool (T)> f, std::list<T>* removed = 0) override final {
+ strict.filter_list_pairs(f, removed);
+ normal.filter_list_pairs(f, removed);
}
- total_priority += priority;
- SubQueue *sq = &queue[priority];
- return sq;
- }
-
- void remove_queue(unsigned priority) {
- assert(queue.count(priority));
- dq = (typename SubQueues::reverse_iterator) queue.erase(queue.find(priority));
- if (dq == queue.rend()) {
- dq = queue.rbegin();
+ void remove_by_class(K cl, std::list<T>* removed = 0) override final {
+ strict.filter_class(cl, removed);
+ normal.filter_class(cl, removed);
}
- total_priority -= priority;
- assert(total_priority >= 0);
- }
-
-public:
- WeightedPriorityQueue(unsigned max_per, unsigned min_c)
- : total_priority(0),
- high_size(0),
- wrr_size(0),
- max_cost(0),
- dq(queue.rbegin())
- {
- srand(time(0));
- }
-
- unsigned length() const override final {
- return high_size + wrr_size;
- }
-
- void remove_by_filter(
- std::function<bool (T)> f, std::list<T> *removed = 0) override final {
- for (typename SubQueues::iterator i = queue.begin();
- i != queue.end(); ++i) {
- wrr_size -= i->second.remove_by_filter(f, removed);
- unsigned priority = i->first;
- if (i->second.empty()) {
- remove_queue(priority);
- }
+ bool empty() const override final {
+ return !(strict.size + normal.size);
}
- for (typename SubQueues::iterator i = high_queue.begin();
- i != high_queue.end();
- ) {
- high_size -= i->second.remove_by_filter(f, removed);
- if (i->second.empty()) {
- high_queue.erase(i++);
- } else {
- ++i;
- }
+ void enqueue_strict(K cl, unsigned p, T item) override final {
+ strict.insert(p, cl, 0, item);
}
- }
-
- void remove_by_class(K k, std::list<T> *out = 0) override final {
- for (typename SubQueues::iterator i = queue.begin();
- i != queue.end(); ++i) {
- wrr_size -= i->second.remove_by_class(k, out);
- unsigned priority = i->first;
- if (i->second.empty()) {
- remove_queue(priority);
- }
+ void enqueue_strict_front(K cl, unsigned p, T item) override final {
+ strict.insert(p, cl, 0, item, true);
}
- for (typename SubQueues::iterator i = high_queue.begin();
- i != high_queue.end();
- ) {
- high_size -= i->second.remove_by_class(k, out);
- if (i->second.empty()) {
- high_queue.erase(i++);
- } else {
- ++i;
- }
+ void enqueue(K cl, unsigned p, unsigned cost, T item) override final {
+ normal.insert(p, cl, cost, item);
}
- }
-
- void enqueue_strict(K cl, unsigned priority, T item) override final {
- high_queue[priority].enqueue(cl, 0, item);
- ++high_size;
- }
-
- void enqueue_strict_front(K cl, unsigned priority, T item) override final {
- high_queue[priority].enqueue_front(cl, 0, item);
- ++high_size;
- }
-
- void enqueue(K cl, unsigned priority, unsigned cost, T item) override final {
- if (cost > max_cost) {
- max_cost = cost;
- }
- create_queue(priority)->enqueue(cl, cost, item);
- ++wrr_size;
- }
-
- void enqueue_front(K cl, unsigned priority, unsigned cost, T item) override final {
- if (cost > max_cost) {
- max_cost = cost;
- }
- create_queue(priority)->enqueue_front(cl, cost, item);
- ++wrr_size;
- }
-
- bool empty() const override final {
- assert(total_priority >= 0);
- assert((total_priority == 0) || !queue.empty());
- return (high_size + wrr_size == 0) ? true : false;
- }
-
- T dequeue() override final {
- assert(!empty());
-
- if (!high_queue.empty()) {
- T ret = high_queue.rbegin()->second.front().second;
- high_queue.rbegin()->second.pop_front();
- if (high_queue.rbegin()->second.empty()) {
- high_queue.erase(high_queue.rbegin()->first);
- }
- --high_size;
- return ret;
+ void enqueue_front(K cl, unsigned p, unsigned cost, T item) override final {
+ normal.insert(p, cl, cost, item, true);
}
- // If there is more than one priority, choose one to run.
- if (dq->second.size() != wrr_size) {
- while (true) {
- // Pick a new priority out of the total priority.
- unsigned prio = rand() % total_priority;
- typename SubQueues::iterator i = queue.begin();
- unsigned tp = i->first;
- // Find the priority coresponding to the picked number.
- // Add low priorities to high priorities until the picked number
- // is less than the total and try to dequeue that priority.
- while (prio > tp) {
- ++i;
- tp += i->first;
- }
- dq = (typename SubQueues::reverse_iterator) ++i;
- // Flip a coin to see if this priority gets to run based on cost.
- // The next op's cost is multiplied by .9 and subtracted from the
- // max cost seen. Ops with lower costs will have a larger value
- // and allow them to be selected easier than ops with high costs.
- if (max_cost == 0 || rand() % max_cost <=
- (max_cost - ((dq->second.front().first * 9) / 10))){
- break;
- }
+ T dequeue() {
+ assert(strict.size + normal.size > 0);
+ if (!strict.empty()) {
+ return strict.pop(true);
}
+ return normal.pop();
}
- T ret = dq->second.front().second;
- dq->second.pop_front();
- if (dq->second.empty()) {
- remove_queue(dq->first);
- }
- --wrr_size;
- return ret;
- }
-
- void dump(ceph::Formatter *f) const {
- f->dump_int("total_priority", total_priority);
- f->open_array_section("high_queues");
- for (typename SubQueues::const_iterator p = high_queue.begin();
- p != high_queue.end();
- ++p) {
- f->open_object_section("subqueue");
- f->dump_int("priority", p->first);
- p->second.dump(f);
+ void dump(ceph::Formatter *f) const {
+ f->open_array_section("high_queues");
+ strict.dump(f);
f->close_section();
- }
- f->close_section();
- f->open_array_section("queues");
- for (typename SubQueues::const_iterator p = queue.begin();
- p != queue.end();
- ++p) {
- f->open_object_section("subqueue");
- f->dump_int("priority", p->first);
- p->second.dump(f);
+ f->open_array_section("queues");
+ normal.dump(f);
f->close_section();
}
- f->close_section();
- }
};
#endif