]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
common/WeightedPriorityQueue Rewrote the queue to use intrusive contianers. Microbenc...
authorRobert LeBlanc <robert.leblanc@endurance.com>
Mon, 15 Feb 2016 17:48:49 +0000 (17:48 +0000)
committerRobert LeBlanc <robert.leblanc@endurance.com>
Tue, 23 Feb 2016 20:42:43 +0000 (20:42 +0000)
show 60-70% of execution time compared to before.

Signed-off-by: Robert LeBlanc <robert.leblanc@endurance.com>
src/common/WeightedPriorityQueue.h

index 6a77a6cd4c9a6275fa959fe1ef36dcea6f4a6ce6..b1c48f38b2f9ec498233566b0d130a2debceba01 100644 (file)
 #ifndef WP_QUEUE_H
 #define WP_QUEUE_H
 
-#include "common/Formatter.h"
-#include "common/OpQueue.h"
+#include "OpQueue.h"
 
-#include <map>
-#include <list>
+#include <boost/intrusive/list.hpp>
+#include <boost/intrusive/rbtree.hpp>
+#include <boost/intrusive/avl_set.hpp>
 
-/**
- * Weighted Priority queue with strict priority queue
- *
- * This queue attempts to be fair to all classes of
- * operations but is also weighted so that higher classes
- * get more share of the operations.
- */
-
-template <typename T, typename K>
-class WeightedPriorityQueue : public OpQueue <T, K> {
-  int64_t total_priority;
+namespace bi = boost::intrusive;
 
-  typedef std::list<std::pair<unsigned, T>> ListPairs;
-  static unsigned filter_list_pairs(
-    ListPairs *l, std::function<bool (T)> f,
-    std::list<T> *out) {
-    unsigned ret = 0;
-    if (out) {
-      for (typename ListPairs::reverse_iterator i = l->rbegin();
-          i != l->rend();
-          ++i) {
-       if (f(i->second)) {
-         out->push_front(i->second);
-       }
-      }
-    }
-    for (typename ListPairs::iterator i = l->begin();
-        i != l->end(); ) {
-      if (f(i->second)) {
-       l->erase(i++);
-       ++ret;
-      } else {
-       ++i;
-      }
-    }
-    return ret;
+template <typename T>
+class MapKey
+{
+  public:
+  bool operator()(const unsigned i, const T &k) const
+  {
+    return i < k.key;
   }
+  bool operator()(const T &k, const unsigned i) const
+  {
+    return k.key < i;
+  }
+};
 
-  struct SubQueue {
-  private:
-    typedef std::map<K, ListPairs> Classes;
-    Classes q;
-    typename Classes::iterator cur;
-    unsigned q_size;
+template <typename T>
+class DelItem
+{
   public:
-    SubQueue(const SubQueue &other)
-      : q(other.q),
-       cur(q.begin()),
-       q_size(0) {}
-    SubQueue()
-      :        cur(q.begin()),
-       q_size(0) {}
-    void enqueue_front(K cl, unsigned cost, T item) {
-      q[cl].push_front(std::make_pair(cost, item));
-      if (cur == q.end()) {
-       cur = q.begin();
-      }
-      ++q_size;
-    }
-    void enqueue(K cl, unsigned cost, T item) {
-      q[cl].push_back(std::make_pair(cost, item));
-      if (cur == q.end()) {
-       cur = q.begin();
-      }
-      ++q_size;
-    }
-    std::pair<unsigned, T> front() const {
-      assert(!q.empty());
-      assert(cur != q.end());
-      assert(!cur->second.empty());
-      return cur->second.front();
-    }
-    void pop_front() {
-      assert(!q.empty());
-      assert(cur != q.end());
-      assert(!cur->second.empty());
-      cur->second.pop_front();
-      if (cur->second.empty()) {
-       cur = q.erase(cur);
-      } else {
-       ++cur;
-      }
-      if (cur == q.end()) {
-       cur = q.begin();
+  void operator()(T* delete_this)
+    { delete delete_this; }
+};
+
+template <typename T, typename K>
+class WeightedPriorityQueue :  public OpQueue <T, K>
+{
+  private:
+    class ListPair : public bi::list_base_hook<>
+    {
+      public:
+       K klass;
+        unsigned cost;
+        T item;
+        ListPair(K& k, unsigned c, T& i) :
+         klass(k),
+          cost(c),
+          item(i)
+          {}
+    };
+    class SubQueue : public bi::set_base_hook<>
+    {
+      typedef bi::list<ListPair> QueueItems;
+      typedef typename QueueItems::iterator QI;
+      public:
+       unsigned key;   // priority
+       QueueItems qitems;
+       SubQueue(unsigned& p) :
+         key(p)
+         {}
+      bool empty() const {
+        return qitems.empty();
       }
-      --q_size;
-    }
-    unsigned size() const {
-      return q_size;
-    }
-    bool empty() const {
-      return (q_size == 0);
-    }
-    unsigned remove_by_filter(std::function<bool (T)> f, std::list<T> *out) {
-      unsigned count = 0;
-      for (typename Classes::iterator i = q.begin();
-          i != q.end(); ) {
-       count += filter_list_pairs(&(i->second), f, out);
-       if (i->second.empty()) {
-         if (cur == i) {
-           ++cur;
-         }
-         q.erase(i++);
+      void insert(K& cl, unsigned cost, T& item, bool front = false) {
+       if (front) {
+         qitems.push_front(*new ListPair(cl, cost, item));
        } else {
-         ++i;
+         qitems.push_back(*new ListPair(cl, cost, item));
        }
       }
-      if (cur == q.end()) {
-       cur = q.begin();
+      unsigned get_cost() const {
+       return qitems.begin()->cost;
       }
-      q_size -= count;
-      return count;
-    }
-    unsigned remove_by_class(K k, std::list<T> *out) {
-      typename Classes::iterator i = q.find(k);
-      if (i == q.end()) {
-       return 0;
+      T pop() {
+       T ret = qitems.begin()->item;
+       qitems.erase_and_dispose(qitems.begin(), DelItem<ListPair>());
+       return ret;
       }
-      unsigned count = i->second.size();
-      q_size -= count;
-      if (out) {
-       for (typename ListPairs::reverse_iterator j =
-              i->second.rbegin();
-            j != i->second.rend();
-            ++j) {
-         out->push_front(j->second);
-       }
+      unsigned filter_list_pairs(std::function<bool (T)>& f, std::list<T>* out) {
+       unsigned count = 0;
+        // intrusive containers can't erase with a reverse_iterator
+        // so we have to walk backwards on our own. Since there is
+        // no iterator before begin, we have to test at the end.
+        for (QI i = --qitems.end();; --i) {
+          if (f(i->item)) {
+            if (out) {
+              out->push_front(i->item);
+            }
+            i = qitems.erase_and_dispose(i, DelItem<ListPair>());
+            ++count;
+          }
+          if (i == qitems.begin()) {
+            break;
+          }
+        }
+       return count;
       }
-      if (i == cur) {
-       ++cur;
+      unsigned filter_class(K& cl, std::list<T>* out) {
+       unsigned count = 0;
+        for (QI i = --qitems.end();; --i) {
+         if (i->klass == cl) {
+           if (out) {
+             out->push_front(i->item);
+           }
+           i = qitems.erase_and_dispose(i, DelItem<ListPair>());
+           ++count;
+         }
+         if (i == qitems.begin()) {
+           break;
+         }
+        }
+       return count;
       }
-      q.erase(i);
-      if (cur == q.end()) {
-       cur = q.begin();
+      void dump(ceph::Formatter *f) const {
+       f->dump_int("num_keys", qitems.size());
+       f->dump_int("first_item_cost", qitems.begin()->cost);
       }
-      return count;
-    }
+    };
+    class Queue {
+      typedef bi::rbtree<SubQueue> SubQueues;
+      typedef typename SubQueues::iterator Sit;
+      SubQueues queues;
+      unsigned total_prio;
+      unsigned max_cost;
+      public:
+       unsigned size;
+       Queue() :
+         total_prio(0),
+         max_cost(0),
+         size(0)
+         {}
+       bool empty() const {
+         return !size;
+       }
+       void insert(unsigned p, K& cl, unsigned cost, T& item, bool front = false) {
+         typename SubQueues::insert_commit_data insert_data;
+         std::pair<typename SubQueues::iterator, bool> ret =
+           queues.insert_unique_check(p, MapKey<SubQueue>(), insert_data);
+         if (ret.second) {
+           ret.first = queues.insert_unique_commit(*new SubQueue(p), insert_data);
+           total_prio += p;
+         }
+         ret.first->insert(cl, cost, item, front);
+         if (cost > max_cost) {
+           max_cost = cost;
+         }
+         ++size;
+       }
+       T pop(bool strict = false) {
+         --size;
+         Sit i = --queues.end();
+         if (strict) {
+           T ret = i->pop();
+           if (i->empty()) {
+             queues.erase_and_dispose(i, DelItem<SubQueue>());
+           }
+           return ret;
+         }
+         if (queues.size() > 1) {
+           while (true) {
+             // Pick a new priority out of the total priority.
+             unsigned prio = rand() % total_prio + 1;
+             unsigned tp = total_prio - i->key;
+             // Find the priority coresponding to the picked number.
+             // Subtract high priorities to low priorities until the picked number
+             // is more than the total and try to dequeue that priority.
+             // Reverse the direction from previous implementation because there is a higher
+             // chance of dequeuing a high priority op so spend less time spinning.
+             while (prio <= tp) {
+               --i;
+               tp -= i->key;
+             }
+             // Flip a coin to see if this priority gets to run based on cost.
+             // The next op's cost is multiplied by .9 and subtracted from the
+             // max cost seen. Ops with lower costs will have a larger value
+             // and allow them to be selected easier than ops with high costs.
+             if (max_cost == 0 || rand() % max_cost <=
+                 (max_cost - ((i->get_cost() * 9) / 10))) {
+               break;
+             }
+             i = --queues.end();
+           }
+         }
+         T ret = i->pop();
+         if (i->empty()) {
+           total_prio -= i->key;
+           queues.erase_and_dispose(i, DelItem<SubQueue>());
+         }
+         return ret;
+       }
+       void filter_list_pairs(std::function<bool (T)>& f, std::list<T>* out) {
+         for (Sit i = queues.begin(); i != queues.end();) {
+           size -= i->filter_list_pairs(f, out);
+           if (i->empty()) {
+             total_prio -= i->key;
+             i = queues.erase_and_dispose(i, DelItem<SubQueue>());
+           } else {
+             ++i;
+           }
+         }
+       }
+       void filter_class(K& cl, std::list<T>* out) {
+         for (Sit i = queues.begin(); i != queues.end();) {
+           size -= i->filter_class(cl, out);
+           if (i->empty()) {
+             total_prio -= i->key;
+             i = queues.erase_and_dispose(i, DelItem<SubQueue>());
+           } else {
+             ++i;
+           }
+         }
+       }
+       void dump(ceph::Formatter *f) const {
+         for (typename SubQueues::const_iterator i = queues.begin();
+               i != queues.end(); ++i) {
+           f->dump_int("total_priority", total_prio);
+           f->dump_int("max_cost", max_cost);
+           f->open_object_section("subqueue");
+           f->dump_int("priority", i->key);
+           i->dump(f);
+           f->close_section();
+         }
+       }
+    };
 
-    void dump(ceph::Formatter *f) const {
-      f->dump_int("num_keys", q.size());
-      if (!empty()) {
-       f->dump_int("first_item_cost", front().first);
+    Queue strict;
+    Queue normal;
+  public:
+    WeightedPriorityQueue(unsigned max_per, unsigned min_c) :
+      strict(),
+      normal()
+      {
+       std::srand(time(0));
       }
+    unsigned length() const override final {
+      return strict.size + normal.size;
     }
-  };
-
-  unsigned high_size, wrr_size;
-  unsigned max_cost;
-
-  typedef std::map<unsigned, SubQueue> SubQueues;
-  SubQueues high_queue;
-  SubQueues queue;
-  typename SubQueues::reverse_iterator dq;
-
-  SubQueue *create_queue(unsigned priority) {
-    typename SubQueues::iterator p = queue.find(priority);
-    if (p != queue.end()) {
-      return &p->second;
+    void remove_by_filter(std::function<bool (T)> f, std::list<T>* removed = 0) override final {
+      strict.filter_list_pairs(f, removed);
+      normal.filter_list_pairs(f, removed);
     }
-    total_priority += priority;
-    SubQueue *sq = &queue[priority];
-    return sq;
-  }
-
-  void remove_queue(unsigned priority) {
-    assert(queue.count(priority));
-    dq = (typename SubQueues::reverse_iterator) queue.erase(queue.find(priority));
-    if (dq == queue.rend()) {
-      dq = queue.rbegin();
+    void remove_by_class(K cl, std::list<T>* removed = 0) override final {
+      strict.filter_class(cl, removed);
+      normal.filter_class(cl, removed);
     }
-    total_priority -= priority;
-    assert(total_priority >= 0);
-  }
-
-public:
-  WeightedPriorityQueue(unsigned max_per, unsigned min_c)
-    : total_priority(0),
-      high_size(0),
-      wrr_size(0),
-      max_cost(0),
-      dq(queue.rbegin())
-  {
-    srand(time(0));
-  }
-
-  unsigned length() const override final {
-    return high_size + wrr_size;
-  }
-
-  void remove_by_filter(
-      std::function<bool (T)> f, std::list<T> *removed = 0) override final {
-    for (typename SubQueues::iterator i = queue.begin();
-        i != queue.end(); ++i) {
-      wrr_size -= i->second.remove_by_filter(f, removed);
-      unsigned priority = i->first;
-      if (i->second.empty()) {
-       remove_queue(priority);
-      }
+    bool empty() const override final {
+      return !(strict.size + normal.size);
     }
-    for (typename SubQueues::iterator i = high_queue.begin();
-        i != high_queue.end();
-        ) {
-      high_size -= i->second.remove_by_filter(f, removed);
-      if (i->second.empty()) {
-       high_queue.erase(i++);
-      } else {
-       ++i;
-      }
+    void enqueue_strict(K cl, unsigned p, T item) override final {
+      strict.insert(p, cl, 0, item);
     }
-  }
-
-  void remove_by_class(K k, std::list<T> *out = 0) override final {
-    for (typename SubQueues::iterator i = queue.begin();
-        i != queue.end(); ++i) {
-      wrr_size -= i->second.remove_by_class(k, out);
-      unsigned priority = i->first;
-      if (i->second.empty()) {
-       remove_queue(priority);
-      }
+    void enqueue_strict_front(K cl, unsigned p, T item) override final {
+      strict.insert(p, cl, 0, item, true);
     }
-    for (typename SubQueues::iterator i = high_queue.begin();
-        i != high_queue.end();
-        ) {
-      high_size -= i->second.remove_by_class(k, out);
-      if (i->second.empty()) {
-       high_queue.erase(i++);
-      } else {
-       ++i;
-      }
+    void enqueue(K cl, unsigned p, unsigned cost, T item) override final {
+      normal.insert(p, cl, cost, item);
     }
-  }
-
-  void enqueue_strict(K cl, unsigned priority, T item) override final {
-    high_queue[priority].enqueue(cl, 0, item);
-    ++high_size;
-  }
-
-  void enqueue_strict_front(K cl, unsigned priority, T item) override final {
-    high_queue[priority].enqueue_front(cl, 0, item);
-    ++high_size;
-  }
-
-  void enqueue(K cl, unsigned priority, unsigned cost, T item) override final {
-    if (cost > max_cost) {
-      max_cost = cost;
-    }
-    create_queue(priority)->enqueue(cl, cost, item);
-    ++wrr_size;
-  }
-
-  void enqueue_front(K cl, unsigned priority, unsigned cost, T item) override final {
-    if (cost > max_cost) {
-      max_cost = cost;
-    }
-    create_queue(priority)->enqueue_front(cl, cost, item);
-    ++wrr_size;
-  }
-
-  bool empty() const override final {
-    assert(total_priority >= 0);
-    assert((total_priority == 0) || !queue.empty());
-    return (high_size + wrr_size  == 0) ? true : false;
-  }
-
-  T dequeue() override final {
-    assert(!empty());
-
-    if (!high_queue.empty()) {
-      T ret = high_queue.rbegin()->second.front().second;
-      high_queue.rbegin()->second.pop_front();
-      if (high_queue.rbegin()->second.empty()) {
-       high_queue.erase(high_queue.rbegin()->first);
-      }
-      --high_size;
-      return ret;
+    void enqueue_front(K cl, unsigned p, unsigned cost, T item) override final {
+      normal.insert(p, cl, cost, item, true);
     }
-    // If there is more than one priority, choose one to run.
-    if (dq->second.size() != wrr_size) {
-      while (true) {
-       // Pick a new priority out of the total priority.
-       unsigned prio = rand() % total_priority;
-       typename SubQueues::iterator i = queue.begin();
-       unsigned tp = i->first;
-       // Find the priority coresponding to the picked number.
-       // Add low priorities to high priorities until the picked number
-       // is less than the total and try to dequeue that priority.
-       while (prio > tp) {
-         ++i;
-         tp += i->first;
-       }
-       dq = (typename SubQueues::reverse_iterator) ++i;
-       // Flip a coin to see if this priority gets to run based on cost.
-       // The next op's cost is multiplied by .9 and subtracted from the
-       // max cost seen. Ops with lower costs will have a larger value
-       // and allow them to be selected easier than ops with high costs.
-       if (max_cost == 0 || rand() % max_cost <=
-           (max_cost - ((dq->second.front().first * 9) / 10))){
-         break;
-       }
+    T dequeue() {
+      assert(strict.size + normal.size > 0);
+      if (!strict.empty()) {
+       return strict.pop(true);
       }
+      return normal.pop();
     }
-    T ret = dq->second.front().second;
-    dq->second.pop_front();
-    if (dq->second.empty()) {
-      remove_queue(dq->first);
-    }
-    --wrr_size;
-    return ret;
-  }
-
-  void dump(ceph::Formatter *f) const {
-    f->dump_int("total_priority", total_priority);
-    f->open_array_section("high_queues");
-    for (typename SubQueues::const_iterator p = high_queue.begin();
-        p != high_queue.end();
-        ++p) {
-      f->open_object_section("subqueue");
-      f->dump_int("priority", p->first);
-      p->second.dump(f);
+    void dump(ceph::Formatter *f) const {
+      f->open_array_section("high_queues");
+      strict.dump(f);
       f->close_section();
-    }
-    f->close_section();
-    f->open_array_section("queues");
-    for (typename SubQueues::const_iterator p = queue.begin();
-        p != queue.end();
-        ++p) {
-      f->open_object_section("subqueue");
-      f->dump_int("priority", p->first);
-      p->second.dump(f);
+      f->open_array_section("queues");
+      normal.dump(f);
       f->close_section();
     }
-    f->close_section();
-  }
 };
 
 #endif