]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
common/OpQueue: Add base class for Op Queues. Introduce the new Weighted Priority...
authorRobert LeBlanc <robert.leblanc@endurance.com>
Thu, 17 Dec 2015 23:07:51 +0000 (23:07 +0000)
committerRobert LeBlanc <robert.leblanc@endurance.com>
Sat, 19 Dec 2015 05:15:21 +0000 (05:15 +0000)
Signed-Off-By: Robert LeBlanc <robert.leblanc@endurance.com>
src/common/Makefile.am
src/common/OpQueue.h [new file with mode: 0644]
src/common/PrioritizedQueue.h
src/common/WeightedPriorityQueue.h [new file with mode: 0644]

index 182295f9e6d629f970620c2ef109438618a3f3c8..cc5a4101a11681f97e7cf41b9f1782ac362b245c 100644 (file)
@@ -177,7 +177,9 @@ noinst_HEADERS += \
        common/Preforker.h \
        common/SloppyCRCMap.h \
        common/WorkQueue.h \
+       common/OpQueue.h \
        common/PrioritizedQueue.h \
+       common/WeightedPriorityQueue.h \
        common/ceph_argparse.h \
        common/ceph_context.h \
        common/xattr.h \
diff --git a/src/common/OpQueue.h b/src/common/OpQueue.h
new file mode 100644 (file)
index 0000000..34adc02
--- /dev/null
@@ -0,0 +1,63 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#ifndef OP_QUEUE_H
+#define OP_QUEUE_H
+
+#include "include/msgr.h"
+
+#include <list>
+#include <functional>
+
+namespace ceph {
+  class Formatter;
+}
+
+/**
+ * Abstract class for all Op Queues
+ *
+ * In order to provide optimized code, be sure to declare all
+ * virutal functions as final in the derived class.
+ */
+
+template <typename T, typename K>
+class OpQueue {
+
+  public:
+    // How many Ops are in the queue
+    virtual unsigned length() const = 0;
+    // Ops will be removed and placed in *removed if f is true
+    virtual void remove_by_filter(
+       std::function<bool (T)> f, std::list<T> *removed) = 0;
+    // Ops of this priority should be deleted immediately
+    virtual void remove_by_class(K k, std::list<T> *out) = 0;
+    // Enqueue op in the back of the strict queue
+    virtual void enqueue_strict(K cl, unsigned priority, T item) = 0;
+    // Enqueue op in the front of the strict queue
+    virtual void enqueue_strict_front(K cl, unsigned priority, T item) = 0;
+    // Enqueue op in the back of the regular queue
+    virtual void enqueue(K cl, unsigned priority, unsigned cost, T item) = 0;
+    // Enqueue the op in the front of the regular queue
+    virtual void enqueue_front(K cl, unsigned priority, unsigned cost, T item) = 0;
+    // Returns if the queue is empty
+    virtual bool empty() const = 0;
+    // Return an op to be dispatch
+    virtual T dequeue() = 0;
+    // Formatted output of the queue
+    virtual void dump(ceph::Formatter *f) const = 0;
+    // Don't leak resources on destruction
+    virtual ~OpQueue() {}; 
+};
+
+#endif
index 010f919ce006e2d805104b655836c7c0aa8cdf0b..8be6ca893acfbce414993b2284a3d6cc781c7282 100644 (file)
@@ -16,6 +16,7 @@
 #define PRIORITY_QUEUE_H
 
 #include "common/Formatter.h"
+#include "common/OpQueue.h"
 
 #include <map>
 #include <list>
  * to provide fairness for different clients.
  */
 template <typename T, typename K>
-class PrioritizedQueue {
+class PrioritizedQueue : public OpQueue <T, K> {
   int64_t total_priority;
   int64_t max_tokens_per_subqueue;
   int64_t min_cost;
 
   typedef std::list<std::pair<unsigned, T> > ListPairs;
-  template <class F>
   static unsigned filter_list_pairs(
-    ListPairs *l, F f,
+    ListPairs *l,
+    std::function<bool (T)> f,
     std::list<T> *out) {
     unsigned ret = 0;
     if (out) {
@@ -152,8 +153,9 @@ class PrioritizedQueue {
     bool empty() const {
       return q.empty();
     }
-    template <class F>
-    void remove_by_filter(F f, std::list<T> *out) {
+    void remove_by_filter(
+       std::function<bool (T)> f,
+               std::list<T> *out) {
       for (typename Classes::iterator i = q.begin();
           i != q.end();
           ) {
@@ -193,7 +195,7 @@ class PrioritizedQueue {
       }
     }
 
-    void dump(Formatter *f) const {
+    void dump(ceph::Formatter *f) const {
       f->dump_int("tokens", tokens);
       f->dump_int("max_tokens", max_tokens);
       f->dump_int("size", size);
@@ -244,7 +246,7 @@ public:
       min_cost(min_c)
   {}
 
-  unsigned length() const {
+  unsigned length() const override final {
     unsigned total = 0;
     for (typename SubQueues::const_iterator i = queue.begin();
         i != queue.end();
@@ -261,8 +263,9 @@ public:
     return total;
   }
 
-  template <class F>
-  void remove_by_filter(F f, std::list<T> *removed = 0) {
+  void remove_by_filter(
+      std::function<bool (T)> f,
+      std::list<T> *removed = 0) override final {
     for (typename SubQueues::iterator i = queue.begin();
         i != queue.end();
         ) {
@@ -288,7 +291,7 @@ public:
     }
   }
 
-  void remove_by_class(K k, std::list<T> *out = 0) {
+  void remove_by_class(K k, std::list<T> *out = 0) override final {
     for (typename SubQueues::iterator i = queue.begin();
         i != queue.end();
         ) {
@@ -313,15 +316,15 @@ public:
     }
   }
 
-  void enqueue_strict(K cl, unsigned priority, T item) {
+  void enqueue_strict(K cl, unsigned priority, T item) override final {
     high_queue[priority].enqueue(cl, 0, item);
   }
 
-  void enqueue_strict_front(K cl, unsigned priority, T item) {
+  void enqueue_strict_front(K cl, unsigned priority, T item) override final {
     high_queue[priority].enqueue_front(cl, 0, item);
   }
 
-  void enqueue(K cl, unsigned priority, unsigned cost, T item) {
+  void enqueue(K cl, unsigned priority, unsigned cost, T item) override final {
     if (cost < min_cost)
       cost = min_cost;
     if (cost > max_tokens_per_subqueue)
@@ -329,7 +332,7 @@ public:
     create_queue(priority)->enqueue(cl, cost, item);
   }
 
-  void enqueue_front(K cl, unsigned priority, unsigned cost, T item) {
+  void enqueue_front(K cl, unsigned priority, unsigned cost, T item) override final {
     if (cost < min_cost)
       cost = min_cost;
     if (cost > max_tokens_per_subqueue)
@@ -337,13 +340,13 @@ public:
     create_queue(priority)->enqueue_front(cl, cost, item);
   }
 
-  bool empty() const {
+  bool empty() const override final {
     assert(total_priority >= 0);
     assert((total_priority == 0) || !(queue.empty()));
     return queue.empty() && high_queue.empty();
   }
 
-  T dequeue() {
+  T dequeue() override final {
     assert(!empty());
 
     if (!(high_queue.empty())) {
@@ -387,7 +390,7 @@ public:
     return ret;
   }
 
-  void dump(Formatter *f) const {
+  void dump(ceph::Formatter *f) const override final {
     f->dump_int("total_priority", total_priority);
     f->dump_int("max_tokens_per_subqueue", max_tokens_per_subqueue);
     f->dump_int("min_cost", min_cost);
diff --git a/src/common/WeightedPriorityQueue.h b/src/common/WeightedPriorityQueue.h
new file mode 100644 (file)
index 0000000..6a77a6c
--- /dev/null
@@ -0,0 +1,359 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#ifndef WP_QUEUE_H
+#define WP_QUEUE_H
+
+#include "common/Formatter.h"
+#include "common/OpQueue.h"
+
+#include <map>
+#include <list>
+
+/**
+ * Weighted Priority queue with strict priority queue
+ *
+ * This queue attempts to be fair to all classes of
+ * operations but is also weighted so that higher classes
+ * get more share of the operations.
+ */
+
+template <typename T, typename K>
+class WeightedPriorityQueue : public OpQueue <T, K> {
+  int64_t total_priority;
+
+  typedef std::list<std::pair<unsigned, T>> ListPairs;
+  static unsigned filter_list_pairs(
+    ListPairs *l, std::function<bool (T)> f,
+    std::list<T> *out) {
+    unsigned ret = 0;
+    if (out) {
+      for (typename ListPairs::reverse_iterator i = l->rbegin();
+          i != l->rend();
+          ++i) {
+       if (f(i->second)) {
+         out->push_front(i->second);
+       }
+      }
+    }
+    for (typename ListPairs::iterator i = l->begin();
+        i != l->end(); ) {
+      if (f(i->second)) {
+       l->erase(i++);
+       ++ret;
+      } else {
+       ++i;
+      }
+    }
+    return ret;
+  }
+
+  struct SubQueue {
+  private:
+    typedef std::map<K, ListPairs> Classes;
+    Classes q;
+    typename Classes::iterator cur;
+    unsigned q_size;
+  public:
+    SubQueue(const SubQueue &other)
+      : q(other.q),
+       cur(q.begin()),
+       q_size(0) {}
+    SubQueue()
+      :        cur(q.begin()),
+       q_size(0) {}
+    void enqueue_front(K cl, unsigned cost, T item) {
+      q[cl].push_front(std::make_pair(cost, item));
+      if (cur == q.end()) {
+       cur = q.begin();
+      }
+      ++q_size;
+    }
+    void enqueue(K cl, unsigned cost, T item) {
+      q[cl].push_back(std::make_pair(cost, item));
+      if (cur == q.end()) {
+       cur = q.begin();
+      }
+      ++q_size;
+    }
+    std::pair<unsigned, T> front() const {
+      assert(!q.empty());
+      assert(cur != q.end());
+      assert(!cur->second.empty());
+      return cur->second.front();
+    }
+    void pop_front() {
+      assert(!q.empty());
+      assert(cur != q.end());
+      assert(!cur->second.empty());
+      cur->second.pop_front();
+      if (cur->second.empty()) {
+       cur = q.erase(cur);
+      } else {
+       ++cur;
+      }
+      if (cur == q.end()) {
+       cur = q.begin();
+      }
+      --q_size;
+    }
+    unsigned size() const {
+      return q_size;
+    }
+    bool empty() const {
+      return (q_size == 0);
+    }
+    unsigned remove_by_filter(std::function<bool (T)> f, std::list<T> *out) {
+      unsigned count = 0;
+      for (typename Classes::iterator i = q.begin();
+          i != q.end(); ) {
+       count += filter_list_pairs(&(i->second), f, out);
+       if (i->second.empty()) {
+         if (cur == i) {
+           ++cur;
+         }
+         q.erase(i++);
+       } else {
+         ++i;
+       }
+      }
+      if (cur == q.end()) {
+       cur = q.begin();
+      }
+      q_size -= count;
+      return count;
+    }
+    unsigned remove_by_class(K k, std::list<T> *out) {
+      typename Classes::iterator i = q.find(k);
+      if (i == q.end()) {
+       return 0;
+      }
+      unsigned count = i->second.size();
+      q_size -= count;
+      if (out) {
+       for (typename ListPairs::reverse_iterator j =
+              i->second.rbegin();
+            j != i->second.rend();
+            ++j) {
+         out->push_front(j->second);
+       }
+      }
+      if (i == cur) {
+       ++cur;
+      }
+      q.erase(i);
+      if (cur == q.end()) {
+       cur = q.begin();
+      }
+      return count;
+    }
+
+    void dump(ceph::Formatter *f) const {
+      f->dump_int("num_keys", q.size());
+      if (!empty()) {
+       f->dump_int("first_item_cost", front().first);
+      }
+    }
+  };
+
+  unsigned high_size, wrr_size;
+  unsigned max_cost;
+
+  typedef std::map<unsigned, SubQueue> SubQueues;
+  SubQueues high_queue;
+  SubQueues queue;
+  typename SubQueues::reverse_iterator dq;
+
+  SubQueue *create_queue(unsigned priority) {
+    typename SubQueues::iterator p = queue.find(priority);
+    if (p != queue.end()) {
+      return &p->second;
+    }
+    total_priority += priority;
+    SubQueue *sq = &queue[priority];
+    return sq;
+  }
+
+  void remove_queue(unsigned priority) {
+    assert(queue.count(priority));
+    dq = (typename SubQueues::reverse_iterator) queue.erase(queue.find(priority));
+    if (dq == queue.rend()) {
+      dq = queue.rbegin();
+    }
+    total_priority -= priority;
+    assert(total_priority >= 0);
+  }
+
+public:
+  WeightedPriorityQueue(unsigned max_per, unsigned min_c)
+    : total_priority(0),
+      high_size(0),
+      wrr_size(0),
+      max_cost(0),
+      dq(queue.rbegin())
+  {
+    srand(time(0));
+  }
+
+  unsigned length() const override final {
+    return high_size + wrr_size;
+  }
+
+  void remove_by_filter(
+      std::function<bool (T)> f, std::list<T> *removed = 0) override final {
+    for (typename SubQueues::iterator i = queue.begin();
+        i != queue.end(); ++i) {
+      wrr_size -= i->second.remove_by_filter(f, removed);
+      unsigned priority = i->first;
+      if (i->second.empty()) {
+       remove_queue(priority);
+      }
+    }
+    for (typename SubQueues::iterator i = high_queue.begin();
+        i != high_queue.end();
+        ) {
+      high_size -= i->second.remove_by_filter(f, removed);
+      if (i->second.empty()) {
+       high_queue.erase(i++);
+      } else {
+       ++i;
+      }
+    }
+  }
+
+  void remove_by_class(K k, std::list<T> *out = 0) override final {
+    for (typename SubQueues::iterator i = queue.begin();
+        i != queue.end(); ++i) {
+      wrr_size -= i->second.remove_by_class(k, out);
+      unsigned priority = i->first;
+      if (i->second.empty()) {
+       remove_queue(priority);
+      }
+    }
+    for (typename SubQueues::iterator i = high_queue.begin();
+        i != high_queue.end();
+        ) {
+      high_size -= i->second.remove_by_class(k, out);
+      if (i->second.empty()) {
+       high_queue.erase(i++);
+      } else {
+       ++i;
+      }
+    }
+  }
+
+  void enqueue_strict(K cl, unsigned priority, T item) override final {
+    high_queue[priority].enqueue(cl, 0, item);
+    ++high_size;
+  }
+
+  void enqueue_strict_front(K cl, unsigned priority, T item) override final {
+    high_queue[priority].enqueue_front(cl, 0, item);
+    ++high_size;
+  }
+
+  void enqueue(K cl, unsigned priority, unsigned cost, T item) override final {
+    if (cost > max_cost) {
+      max_cost = cost;
+    }
+    create_queue(priority)->enqueue(cl, cost, item);
+    ++wrr_size;
+  }
+
+  void enqueue_front(K cl, unsigned priority, unsigned cost, T item) override final {
+    if (cost > max_cost) {
+      max_cost = cost;
+    }
+    create_queue(priority)->enqueue_front(cl, cost, item);
+    ++wrr_size;
+  }
+
+  bool empty() const override final {
+    assert(total_priority >= 0);
+    assert((total_priority == 0) || !queue.empty());
+    return (high_size + wrr_size  == 0) ? true : false;
+  }
+
+  T dequeue() override final {
+    assert(!empty());
+
+    if (!high_queue.empty()) {
+      T ret = high_queue.rbegin()->second.front().second;
+      high_queue.rbegin()->second.pop_front();
+      if (high_queue.rbegin()->second.empty()) {
+       high_queue.erase(high_queue.rbegin()->first);
+      }
+      --high_size;
+      return ret;
+    }
+    // If there is more than one priority, choose one to run.
+    if (dq->second.size() != wrr_size) {
+      while (true) {
+       // Pick a new priority out of the total priority.
+       unsigned prio = rand() % total_priority;
+       typename SubQueues::iterator i = queue.begin();
+       unsigned tp = i->first;
+       // Find the priority coresponding to the picked number.
+       // Add low priorities to high priorities until the picked number
+       // is less than the total and try to dequeue that priority.
+       while (prio > tp) {
+         ++i;
+         tp += i->first;
+       }
+       dq = (typename SubQueues::reverse_iterator) ++i;
+       // Flip a coin to see if this priority gets to run based on cost.
+       // The next op's cost is multiplied by .9 and subtracted from the
+       // max cost seen. Ops with lower costs will have a larger value
+       // and allow them to be selected easier than ops with high costs.
+       if (max_cost == 0 || rand() % max_cost <=
+           (max_cost - ((dq->second.front().first * 9) / 10))){
+         break;
+       }
+      }
+    }
+    T ret = dq->second.front().second;
+    dq->second.pop_front();
+    if (dq->second.empty()) {
+      remove_queue(dq->first);
+    }
+    --wrr_size;
+    return ret;
+  }
+
+  void dump(ceph::Formatter *f) const {
+    f->dump_int("total_priority", total_priority);
+    f->open_array_section("high_queues");
+    for (typename SubQueues::const_iterator p = high_queue.begin();
+        p != high_queue.end();
+        ++p) {
+      f->open_object_section("subqueue");
+      f->dump_int("priority", p->first);
+      p->second.dump(f);
+      f->close_section();
+    }
+    f->close_section();
+    f->open_array_section("queues");
+    for (typename SubQueues::const_iterator p = queue.begin();
+        p != queue.end();
+        ++p) {
+      f->open_object_section("subqueue");
+      f->dump_int("priority", p->first);
+      p->second.dump(f);
+      f->close_section();
+    }
+    f->close_section();
+  }
+};
+
+#endif