]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: common/WeightedPriorityQueue.h Re-add Round Robin between classes
authorRobert LeBlanc <robert.leblanc@endurance.com>
Wed, 2 Mar 2016 22:05:02 +0000 (22:05 +0000)
committerRobert LeBlanc <robert.leblanc@endurance.com>
Thu, 3 Mar 2016 00:36:12 +0000 (00:36 +0000)
Fixes corner case where a single client can starve out other clients.
This enables round robin between all clients so that they all get a
fair share.

Signed-off-by: Robert LeBlanc <robert.leblanc@endurance.com>
src/common/WeightedPriorityQueue.h

index b1c48f38b2f9ec498233566b0d130a2debceba01..54cb4b0595b6aaef7e7a53367d6c43174736ff0b 100644 (file)
 
 namespace bi = boost::intrusive;
 
-template <typename T>
+template <typename T, typename S>
 class MapKey
 {
   public:
-  bool operator()(const unsigned i, const T &k) const
+  bool operator()(const S i, const T &k) const
   {
     return i < k.key;
   }
-  bool operator()(const T &k, const unsigned i) const
+  bool operator()(const T &k, const S i) const
   {
     return k.key < i;
   }
@@ -52,41 +52,122 @@ class WeightedPriorityQueue :  public OpQueue <T, K>
     class ListPair : public bi::list_base_hook<>
     {
       public:
-       K klass;
         unsigned cost;
         T item;
-        ListPair(K& k, unsigned c, T& i) :
-         klass(k),
+        ListPair(unsigned c, T& i) :
           cost(c),
           item(i)
           {}
     };
+    class Klass : public bi::set_base_hook<>
+    {
+      typedef bi::list<ListPair> ListPairs;
+      typedef typename ListPairs::iterator Lit;
+      public:
+        K key;         // klass
+        ListPairs lp;
+        Klass(K& k) :
+          key(k)
+          {}
+      void insert(unsigned cost, T& item, bool front) {
+        if (front) {
+          lp.push_front(*new ListPair(cost, item));
+        } else {
+          lp.push_back(*new ListPair(cost, item));
+        }
+      }
+      //Get the cost of the next item to dequeue
+      unsigned get_cost() const {
+       return lp.begin()->cost;
+      }
+      T pop() {
+       assert(!lp.empty());
+       T ret = lp.begin()->item;
+        lp.erase_and_dispose(lp.begin(), DelItem<ListPair>());
+        return ret;
+      }
+      bool empty() const {
+        return lp.empty();
+      }
+      unsigned get_size() const {
+       return lp.size();
+      }
+      unsigned filter_list_pairs(std::function<bool (T)>& f,
+        std::list<T>* out) {
+        unsigned count = 0;
+        // intrusive containers can't erase with a reverse_iterator
+        // so we have to walk backwards on our own. Since there is
+        // no iterator before begin, we have to test at the end.
+        for (Lit i = --lp.end();; --i) {
+          if (f(i->item)) {
+            if (out) {
+             out->push_front(i->item);
+            }
+            i = lp.erase_and_dispose(i, DelItem<ListPair>());
+            ++count;
+          }
+          if (i == lp.begin()) {
+            break;
+          }
+        }
+        return count;
+      }
+      unsigned filter_class(std::list<T>* out) {
+        unsigned count = 0;
+        for (Lit i = --lp.end();; --i) {
+          if (out) {
+            out->push_front(i->item);
+          }
+          i = lp.erase_and_dispose(i, DelItem<ListPair>());
+          ++count;
+          if (i == lp.begin()) {
+            break;
+          }
+        }
+        return count;
+      }
+    };
     class SubQueue : public bi::set_base_hook<>
     {
-      typedef bi::list<ListPair> QueueItems;
-      typedef typename QueueItems::iterator QI;
+      typedef bi::rbtree<Klass> Klasses;
+      typedef typename Klasses::iterator Kit;
+      void check_end() {
+        if (next == klasses.end()) {
+          next = klasses.begin();
+        }
+      }
       public:
        unsigned key;   // priority
-       QueueItems qitems;
+        Klasses klasses;
+       Kit next;
        SubQueue(unsigned& p) :
-         key(p)
+         key(p),
+         next(klasses.begin())
          {}
       bool empty() const {
-        return qitems.empty();
+        return klasses.empty();
       }
-      void insert(K& cl, unsigned cost, T& item, bool front = false) {
-       if (front) {
-         qitems.push_front(*new ListPair(cl, cost, item));
-       } else {
-         qitems.push_back(*new ListPair(cl, cost, item));
+      void insert(K cl, unsigned cost, T& item, bool front = false) {
+        typename Klasses::insert_commit_data insert_data;
+       std::pair<Kit, bool> ret =
+          klasses.insert_unique_check(cl, MapKey<Klass, K>(), insert_data);
+       if (ret.second) {
+         ret.first = klasses.insert_unique_commit(*new Klass(cl), insert_data);
+          check_end();
        }
+       ret.first->insert(cost, item, front);
       }
       unsigned get_cost() const {
-       return qitems.begin()->cost;
+       return next->get_cost();
       }
       T pop() {
-       T ret = qitems.begin()->item;
-       qitems.erase_and_dispose(qitems.begin(), DelItem<ListPair>());
+        T ret = next->pop();
+        if (next->empty()) {
+          next = klasses.erase_and_dispose(next, DelItem<Klass>());
+        } else {
+         ++next;
+       }
+        check_end();
        return ret;
       }
       unsigned filter_list_pairs(std::function<bool (T)>& f, std::list<T>* out) {
@@ -94,39 +175,36 @@ class WeightedPriorityQueue :  public OpQueue <T, K>
         // intrusive containers can't erase with a reverse_iterator
         // so we have to walk backwards on our own. Since there is
         // no iterator before begin, we have to test at the end.
-        for (QI i = --qitems.end();; --i) {
-          if (f(i->item)) {
-            if (out) {
-              out->push_front(i->item);
-            }
-            i = qitems.erase_and_dispose(i, DelItem<ListPair>());
-            ++count;
-          }
-          if (i == qitems.begin()) {
-            break;
+        for (Kit i = klasses.begin(); i != klasses.end();) {
+          count += i->filter_list_pairs(f, out);
+          if (i->empty()) {
+           if (next == i) {
+             ++next;
+           }
+            i = klasses.erase_and_dispose(i, DelItem<Klass>());
+          } else {
+            ++i;
           }
         }
+        check_end();
        return count;
       }
       unsigned filter_class(K& cl, std::list<T>* out) {
        unsigned count = 0;
-        for (QI i = --qitems.end();; --i) {
-         if (i->klass == cl) {
-           if (out) {
-             out->push_front(i->item);
-           }
-           i = qitems.erase_and_dispose(i, DelItem<ListPair>());
-           ++count;
-         }
-         if (i == qitems.begin()) {
-           break;
-         }
+        Kit i = klasses.find(cl, MapKey<Klass, K>());
+        if (i != klasses.end()) {
+          count = i->filter_class(out);
+         Kit tmp = klasses.erase_and_dispose(i, DelItem<Klass>());
+         if (next == i) {
+            next = tmp;
+          }
+          check_end();
         }
        return count;
       }
       void dump(ceph::Formatter *f) const {
-       f->dump_int("num_keys", qitems.size());
-       f->dump_int("first_item_cost", qitems.begin()->cost);
+       f->dump_int("num_keys", next->get_size());
+       f->dump_int("first_item_cost", next->get_cost());
       }
     };
     class Queue {
@@ -145,10 +223,10 @@ class WeightedPriorityQueue :  public OpQueue <T, K>
        bool empty() const {
          return !size;
        }
-       void insert(unsigned p, K& cl, unsigned cost, T& item, bool front = false) {
+       void insert(unsigned p, K cl, unsigned cost, T& item, bool front = false) {
          typename SubQueues::insert_commit_data insert_data;
          std::pair<typename SubQueues::iterator, bool> ret =
-           queues.insert_unique_check(p, MapKey<SubQueue>(), insert_data);
+           queues.insert_unique_check(p, MapKey<SubQueue, unsigned>(), insert_data);
          if (ret.second) {
            ret.first = queues.insert_unique_commit(*new SubQueue(p), insert_data);
            total_prio += p;