From: Robert LeBlanc Date: Wed, 2 Mar 2016 22:05:02 +0000 (+0000) Subject: osd: common/WeightedPriorityQueue.h Re-add Round Robin between classes X-Git-Tag: v10.2.0~19^2~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=de001bd99e0023214ea882ba0cbd6cf0e09fbdb6;p=ceph.git osd: common/WeightedPriorityQueue.h Re-add Round Robin between classes Fixes corner case where a single client can starve out other clients. This enables round robin between all clients so that they all get a fair share. Signed-off-by: Robert LeBlanc --- diff --git a/src/common/WeightedPriorityQueue.h b/src/common/WeightedPriorityQueue.h index b1c48f38b2f9..54cb4b0595b6 100644 --- a/src/common/WeightedPriorityQueue.h +++ b/src/common/WeightedPriorityQueue.h @@ -23,15 +23,15 @@ namespace bi = boost::intrusive; -template +template class MapKey { public: - bool operator()(const unsigned i, const T &k) const + bool operator()(const S i, const T &k) const { return i < k.key; } - bool operator()(const T &k, const unsigned i) const + bool operator()(const T &k, const S i) const { return k.key < i; } @@ -52,41 +52,122 @@ class WeightedPriorityQueue : public OpQueue class ListPair : public bi::list_base_hook<> { public: - K klass; unsigned cost; T item; - ListPair(K& k, unsigned c, T& i) : - klass(k), + ListPair(unsigned c, T& i) : cost(c), item(i) {} }; + class Klass : public bi::set_base_hook<> + { + typedef bi::list ListPairs; + typedef typename ListPairs::iterator Lit; + public: + K key; // klass + ListPairs lp; + Klass(K& k) : + key(k) + {} + void insert(unsigned cost, T& item, bool front) { + if (front) { + lp.push_front(*new ListPair(cost, item)); + } else { + lp.push_back(*new ListPair(cost, item)); + } + } + //Get the cost of the next item to dequeue + unsigned get_cost() const { + return lp.begin()->cost; + } + T pop() { + assert(!lp.empty()); + T ret = lp.begin()->item; + lp.erase_and_dispose(lp.begin(), DelItem()); + return ret; + } + bool empty() const { + return lp.empty(); + } + unsigned get_size() const { + return lp.size(); + } + unsigned filter_list_pairs(std::function& f, + std::list* out) { + unsigned count = 0; + // intrusive containers can't erase with a reverse_iterator + // so we have to walk backwards on our own. Since there is + // no iterator before begin, we have to test at the end. + for (Lit i = --lp.end();; --i) { + if (f(i->item)) { + if (out) { + out->push_front(i->item); + } + i = lp.erase_and_dispose(i, DelItem()); + ++count; + } + if (i == lp.begin()) { + break; + } + } + return count; + } + unsigned filter_class(std::list* out) { + unsigned count = 0; + for (Lit i = --lp.end();; --i) { + if (out) { + out->push_front(i->item); + } + i = lp.erase_and_dispose(i, DelItem()); + ++count; + if (i == lp.begin()) { + break; + } + } + return count; + } + }; class SubQueue : public bi::set_base_hook<> { - typedef bi::list QueueItems; - typedef typename QueueItems::iterator QI; + typedef bi::rbtree Klasses; + typedef typename Klasses::iterator Kit; + void check_end() { + if (next == klasses.end()) { + next = klasses.begin(); + } + } public: unsigned key; // priority - QueueItems qitems; + Klasses klasses; + Kit next; SubQueue(unsigned& p) : - key(p) + key(p), + next(klasses.begin()) {} bool empty() const { - return qitems.empty(); + return klasses.empty(); } - void insert(K& cl, unsigned cost, T& item, bool front = false) { - if (front) { - qitems.push_front(*new ListPair(cl, cost, item)); - } else { - qitems.push_back(*new ListPair(cl, cost, item)); + void insert(K cl, unsigned cost, T& item, bool front = false) { + typename Klasses::insert_commit_data insert_data; + std::pair ret = + klasses.insert_unique_check(cl, MapKey(), insert_data); + if (ret.second) { + ret.first = klasses.insert_unique_commit(*new Klass(cl), insert_data); + check_end(); } + ret.first->insert(cost, item, front); } unsigned get_cost() const { - return qitems.begin()->cost; + return next->get_cost(); } T pop() { - T ret = qitems.begin()->item; - qitems.erase_and_dispose(qitems.begin(), DelItem()); + T ret = next->pop(); + if (next->empty()) { + next = klasses.erase_and_dispose(next, DelItem()); + } else { + ++next; + } + check_end(); return ret; } unsigned filter_list_pairs(std::function& f, std::list* out) { @@ -94,39 +175,36 @@ class WeightedPriorityQueue : public OpQueue // intrusive containers can't erase with a reverse_iterator // so we have to walk backwards on our own. Since there is // no iterator before begin, we have to test at the end. - for (QI i = --qitems.end();; --i) { - if (f(i->item)) { - if (out) { - out->push_front(i->item); - } - i = qitems.erase_and_dispose(i, DelItem()); - ++count; - } - if (i == qitems.begin()) { - break; + for (Kit i = klasses.begin(); i != klasses.end();) { + count += i->filter_list_pairs(f, out); + if (i->empty()) { + if (next == i) { + ++next; + } + i = klasses.erase_and_dispose(i, DelItem()); + } else { + ++i; } } + check_end(); return count; } unsigned filter_class(K& cl, std::list* out) { unsigned count = 0; - for (QI i = --qitems.end();; --i) { - if (i->klass == cl) { - if (out) { - out->push_front(i->item); - } - i = qitems.erase_and_dispose(i, DelItem()); - ++count; - } - if (i == qitems.begin()) { - break; - } + Kit i = klasses.find(cl, MapKey()); + if (i != klasses.end()) { + count = i->filter_class(out); + Kit tmp = klasses.erase_and_dispose(i, DelItem()); + if (next == i) { + next = tmp; + } + check_end(); } return count; } void dump(ceph::Formatter *f) const { - f->dump_int("num_keys", qitems.size()); - f->dump_int("first_item_cost", qitems.begin()->cost); + f->dump_int("num_keys", next->get_size()); + f->dump_int("first_item_cost", next->get_cost()); } }; class Queue { @@ -145,10 +223,10 @@ class WeightedPriorityQueue : public OpQueue bool empty() const { return !size; } - void insert(unsigned p, K& cl, unsigned cost, T& item, bool front = false) { + void insert(unsigned p, K cl, unsigned cost, T& item, bool front = false) { typename SubQueues::insert_commit_data insert_data; std::pair ret = - queues.insert_unique_check(p, MapKey(), insert_data); + queues.insert_unique_check(p, MapKey(), insert_data); if (ret.second) { ret.first = queues.insert_unique_commit(*new SubQueue(p), insert_data); total_prio += p;