]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
PrioritizedQueue: let filter_by_predicate use f to accumulate
authorSamuel Just <sjust@redhat.com>
Tue, 26 Apr 2016 01:39:23 +0000 (18:39 -0700)
committerSamuel Just <sjust@redhat.com>
Mon, 16 May 2016 23:40:56 +0000 (16:40 -0700)
Signed-off-by: Samuel Just <sjust@redhat.com>
src/common/OpQueue.h
src/common/PrioritizedQueue.h
src/common/WeightedPriorityQueue.h
src/osd/OSD.h

index 34adc026810f2d413212c7da366df22985ea2cb4..208d762b37cf96077cb7b05b3444384559dc5c57 100644 (file)
@@ -37,9 +37,9 @@ class OpQueue {
   public:
     // How many Ops are in the queue
     virtual unsigned length() const = 0;
-    // Ops will be removed and placed in *removed if f is true
+    // Ops will be removed f evaluates to true, f may have sideeffects
     virtual void remove_by_filter(
-       std::function<bool (T)> f, std::list<T> *removed) = 0;
+       std::function<bool (T)> f) = 0;
     // Ops of this priority should be deleted immediately
     virtual void remove_by_class(K k, std::list<T> *out) = 0;
     // Enqueue op in the back of the strict queue
index 8be6ca893acfbce414993b2284a3d6cc781c7282..a831be52fdb488f643e0cbaf6d6b900e71355d4c 100644 (file)
@@ -50,26 +50,18 @@ class PrioritizedQueue : public OpQueue <T, K> {
   typedef std::list<std::pair<unsigned, T> > ListPairs;
   static unsigned filter_list_pairs(
     ListPairs *l,
-    std::function<bool (T)> f,
-    std::list<T> *out) {
+    std::function<bool (T)> f) {
     unsigned ret = 0;
-    if (out) {
-      for (typename ListPairs::reverse_iterator i = l->rbegin();
-          i != l->rend();
-          ++i) {
-       if (f(i->second)) {
-         out->push_front(i->second);
-       }
-      }
-    }
-    for (typename ListPairs::iterator i = l->begin();
-        i != l->end();
+    for (typename ListPairs::iterator i = l->end();
+        i != l->begin();
       ) {
-      if (f(i->second)) {
-       l->erase(i++);
+      auto next = i;
+      --next;
+      if (f(next->second)) {
        ++ret;
+       l->erase(next);
       } else {
-       ++i;
+       i = next;
       }
     }
     return ret;
@@ -154,12 +146,11 @@ class PrioritizedQueue : public OpQueue <T, K> {
       return q.empty();
     }
     void remove_by_filter(
-       std::function<bool (T)> f,
-               std::list<T> *out) {
+      std::function<bool (T)> f) {
       for (typename Classes::iterator i = q.begin();
           i != q.end();
           ) {
-       size -= filter_list_pairs(&(i->second), f, out);
+       size -= filter_list_pairs(&(i->second), f);
        if (i->second.empty()) {
          if (cur == i) {
            ++cur;
@@ -264,14 +255,13 @@ public:
   }
 
   void remove_by_filter(
-      std::function<bool (T)> f,
-      std::list<T> *removed = 0) override final {
+      std::function<bool (T)> f) override final {
     for (typename SubQueues::iterator i = queue.begin();
         i != queue.end();
         ) {
       unsigned priority = i->first;
       
-      i->second.remove_by_filter(f, removed);
+      i->second.remove_by_filter(f);
       if (i->second.empty()) {
        ++i;
        remove_queue(priority);
@@ -282,7 +272,7 @@ public:
     for (typename SubQueues::iterator i = high_queue.begin();
         i != high_queue.end();
         ) {
-      i->second.remove_by_filter(f, removed);
+      i->second.remove_by_filter(f);
       if (i->second.empty()) {
        high_queue.erase(i++);
       } else {
index 82b1fa005d50b73ff73a05b3dfcf1534fbb42cd2..39d5eb79b672c13278d10534b229283e69efc551 100644 (file)
@@ -98,17 +98,13 @@ class WeightedPriorityQueue :  public OpQueue <T, K>
       unsigned get_size() const {
        return lp.size();
       }
-      unsigned filter_list_pairs(std::function<bool (T)>& f,
-        std::list<T>* out) {
+      unsigned filter_list_pairs(std::function<bool (T)>& f) {
         unsigned count = 0;
         // intrusive containers can't erase with a reverse_iterator
         // so we have to walk backwards on our own. Since there is
         // no iterator before begin, we have to test at the end.
         for (Lit i = --lp.end();; --i) {
           if (f(i->item)) {
-            if (out) {
-             out->push_front(i->item);
-            }
             i = lp.erase_and_dispose(i, DelItem<ListPair>());
             ++count;
           }
@@ -182,13 +178,13 @@ class WeightedPriorityQueue :  public OpQueue <T, K>
         check_end();
        return ret;
       }
-      unsigned filter_list_pairs(std::function<bool (T)>& f, std::list<T>* out) {
+      unsigned filter_list_pairs(std::function<bool (T)>& f) {
        unsigned count = 0;
         // intrusive containers can't erase with a reverse_iterator
         // so we have to walk backwards on our own. Since there is
         // no iterator before begin, we have to test at the end.
         for (Kit i = klasses.begin(); i != klasses.end();) {
-          count += i->filter_list_pairs(f, out);
+          count += i->filter_list_pairs(f);
           if (i->empty()) {
            if (next == i) {
              ++next;
@@ -291,9 +287,9 @@ class WeightedPriorityQueue :  public OpQueue <T, K>
          }
          return ret;
        }
-       void filter_list_pairs(std::function<bool (T)>& f, std::list<T>* out) {
+       void filter_list_pairs(std::function<bool (T)>& f) {
          for (Sit i = queues.begin(); i != queues.end();) {
-           size -= i->filter_list_pairs(f, out);
+            size -= i->filter_list_pairs(f);
            if (i->empty()) {
              total_prio -= i->key;
              i = queues.erase_and_dispose(i, DelItem<SubQueue>());
@@ -338,9 +334,9 @@ class WeightedPriorityQueue :  public OpQueue <T, K>
     unsigned length() const override final {
       return strict.size + normal.size;
     }
-    void remove_by_filter(std::function<bool (T)> f, std::list<T>* removed = 0) override final {
-      strict.filter_list_pairs(f, removed);
-      normal.filter_list_pairs(f, removed);
+    void remove_by_filter(std::function<bool (T)> f) override final {
+      strict.filter_list_pairs(f);
+      normal.filter_list_pairs(f);
     }
     void remove_by_class(K cl, std::list<T>* removed = 0) override final {
       strict.filter_class(cl, removed);
index 87dc5522ea6a8db828c084435d53ff57ab4533a1..7b5d025382d0ed261559e94a0ab22c47edd585aa 100644 (file)
@@ -1768,24 +1768,29 @@ private:
       }
     }
 
+    /// Must be called on ops queued back to front
     struct Pred {
       PG *pg;
-      explicit Pred(PG *pg) : pg(pg) {}
+      list<OpRequestRef> *out_ops;
+      void accumulate(PGQueueable &op) {
+       if (out_ops) {
+         boost::optional<OpRequestRef> mop = op.maybe_get_op();
+         if (mop)
+           out_ops->push_front(*mop);
+       }
+      }
       bool operator()(const pair<PGRef, PGQueueable> &op) {
-       return op.first == pg;
+       if (op.first == pg) {
+         accumulate(op.second);
+         return true;
+       } else {
+         return false;
+       }
       }
     };
 
     void dequeue(PG *pg) {
-      ShardData* sdata = NULL;
-      assert(pg != NULL);
-      uint32_t shard_index = pg->get_pgid().ps()% shard_list.size();
-      sdata = shard_list[shard_index];
-      assert(sdata != NULL);
-      sdata->sdata_op_ordering_lock.Lock();
-      sdata->pqueue->remove_by_filter(Pred(pg), 0);
-      sdata->pg_for_processing.erase(pg);
-      sdata->sdata_op_ordering_lock.Unlock();
+      return dequeue_and_get_ops(pg, nullptr);
     }
 
     void dequeue_and_get_ops(PG *pg, list<OpRequestRef> *dequeued) {
@@ -1794,28 +1799,24 @@ private:
       uint32_t shard_index = pg->get_pgid().ps()% shard_list.size();
       sdata = shard_list[shard_index];
       assert(sdata != NULL);
-      assert(dequeued);
-      list<pair<PGRef, PGQueueable> > _dequeued;
       sdata->sdata_op_ordering_lock.Lock();
-      sdata->pqueue->remove_by_filter(Pred(pg), &_dequeued);
-      for (list<pair<PGRef, PGQueueable> >::iterator i = _dequeued.begin();
-          i != _dequeued.end(); ++i) {
-       boost::optional<OpRequestRef> mop = i->second.maybe_get_op();
-       if (mop)
-         dequeued->push_back(*mop);
-      }
+
+      Pred f(pg, dequeued);
+
+      // items in pqueue are behind items in pg_for_processing
+      sdata->pqueue->remove_by_filter(f);
+
       map<PG *, list<PGQueueable> >::iterator iter =
        sdata->pg_for_processing.find(pg);
       if (iter != sdata->pg_for_processing.end()) {
-       for (list<PGQueueable>::reverse_iterator i = iter->second.rbegin();
+       for (auto i = iter->second.rbegin();
             i != iter->second.rend();
             ++i) {
-         boost::optional<OpRequestRef> mop = i->maybe_get_op();
-         if (mop)
-           dequeued->push_front(*mop);
+         f.accumulate(*i);
        }
        sdata->pg_for_processing.erase(iter);
       }
+
       sdata->sdata_op_ordering_lock.Unlock();
     }