From: Samuel Just Date: Tue, 26 Apr 2016 01:39:23 +0000 (-0700) Subject: PrioritizedQueue: let filter_by_predicate use f to accumulate X-Git-Tag: v11.0.0~465^2~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=6f9de9291050647b31d249fa738239f63ebf0371;p=ceph.git PrioritizedQueue: let filter_by_predicate use f to accumulate Signed-off-by: Samuel Just --- diff --git a/src/common/OpQueue.h b/src/common/OpQueue.h index 34adc026810f..208d762b37cf 100644 --- a/src/common/OpQueue.h +++ b/src/common/OpQueue.h @@ -37,9 +37,9 @@ class OpQueue { public: // How many Ops are in the queue virtual unsigned length() const = 0; - // Ops will be removed and placed in *removed if f is true + // Ops will be removed f evaluates to true, f may have sideeffects virtual void remove_by_filter( - std::function f, std::list *removed) = 0; + std::function f) = 0; // Ops of this priority should be deleted immediately virtual void remove_by_class(K k, std::list *out) = 0; // Enqueue op in the back of the strict queue diff --git a/src/common/PrioritizedQueue.h b/src/common/PrioritizedQueue.h index 8be6ca893acf..a831be52fdb4 100644 --- a/src/common/PrioritizedQueue.h +++ b/src/common/PrioritizedQueue.h @@ -50,26 +50,18 @@ class PrioritizedQueue : public OpQueue { typedef std::list > ListPairs; static unsigned filter_list_pairs( ListPairs *l, - std::function f, - std::list *out) { + std::function f) { unsigned ret = 0; - if (out) { - for (typename ListPairs::reverse_iterator i = l->rbegin(); - i != l->rend(); - ++i) { - if (f(i->second)) { - out->push_front(i->second); - } - } - } - for (typename ListPairs::iterator i = l->begin(); - i != l->end(); + for (typename ListPairs::iterator i = l->end(); + i != l->begin(); ) { - if (f(i->second)) { - l->erase(i++); + auto next = i; + --next; + if (f(next->second)) { ++ret; + l->erase(next); } else { - ++i; + i = next; } } return ret; @@ -154,12 +146,11 @@ class PrioritizedQueue : public OpQueue { return q.empty(); } void remove_by_filter( - std::function f, - std::list *out) { + std::function f) { for (typename Classes::iterator i = q.begin(); i != q.end(); ) { - size -= filter_list_pairs(&(i->second), f, out); + size -= filter_list_pairs(&(i->second), f); if (i->second.empty()) { if (cur == i) { ++cur; @@ -264,14 +255,13 @@ public: } void remove_by_filter( - std::function f, - std::list *removed = 0) override final { + std::function f) override final { for (typename SubQueues::iterator i = queue.begin(); i != queue.end(); ) { unsigned priority = i->first; - i->second.remove_by_filter(f, removed); + i->second.remove_by_filter(f); if (i->second.empty()) { ++i; remove_queue(priority); @@ -282,7 +272,7 @@ public: for (typename SubQueues::iterator i = high_queue.begin(); i != high_queue.end(); ) { - i->second.remove_by_filter(f, removed); + i->second.remove_by_filter(f); if (i->second.empty()) { high_queue.erase(i++); } else { diff --git a/src/common/WeightedPriorityQueue.h b/src/common/WeightedPriorityQueue.h index 82b1fa005d50..39d5eb79b672 100644 --- a/src/common/WeightedPriorityQueue.h +++ b/src/common/WeightedPriorityQueue.h @@ -98,17 +98,13 @@ class WeightedPriorityQueue : public OpQueue unsigned get_size() const { return lp.size(); } - unsigned filter_list_pairs(std::function& f, - std::list* out) { + unsigned filter_list_pairs(std::function& f) { unsigned count = 0; // intrusive containers can't erase with a reverse_iterator // so we have to walk backwards on our own. Since there is // no iterator before begin, we have to test at the end. for (Lit i = --lp.end();; --i) { if (f(i->item)) { - if (out) { - out->push_front(i->item); - } i = lp.erase_and_dispose(i, DelItem()); ++count; } @@ -182,13 +178,13 @@ class WeightedPriorityQueue : public OpQueue check_end(); return ret; } - unsigned filter_list_pairs(std::function& f, std::list* out) { + unsigned filter_list_pairs(std::function& f) { unsigned count = 0; // intrusive containers can't erase with a reverse_iterator // so we have to walk backwards on our own. Since there is // no iterator before begin, we have to test at the end. for (Kit i = klasses.begin(); i != klasses.end();) { - count += i->filter_list_pairs(f, out); + count += i->filter_list_pairs(f); if (i->empty()) { if (next == i) { ++next; @@ -291,9 +287,9 @@ class WeightedPriorityQueue : public OpQueue } return ret; } - void filter_list_pairs(std::function& f, std::list* out) { + void filter_list_pairs(std::function& f) { for (Sit i = queues.begin(); i != queues.end();) { - size -= i->filter_list_pairs(f, out); + size -= i->filter_list_pairs(f); if (i->empty()) { total_prio -= i->key; i = queues.erase_and_dispose(i, DelItem()); @@ -338,9 +334,9 @@ class WeightedPriorityQueue : public OpQueue unsigned length() const override final { return strict.size + normal.size; } - void remove_by_filter(std::function f, std::list* removed = 0) override final { - strict.filter_list_pairs(f, removed); - normal.filter_list_pairs(f, removed); + void remove_by_filter(std::function f) override final { + strict.filter_list_pairs(f); + normal.filter_list_pairs(f); } void remove_by_class(K cl, std::list* removed = 0) override final { strict.filter_class(cl, removed); diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 87dc5522ea6a..7b5d025382d0 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -1768,24 +1768,29 @@ private: } } + /// Must be called on ops queued back to front struct Pred { PG *pg; - explicit Pred(PG *pg) : pg(pg) {} + list *out_ops; + void accumulate(PGQueueable &op) { + if (out_ops) { + boost::optional mop = op.maybe_get_op(); + if (mop) + out_ops->push_front(*mop); + } + } bool operator()(const pair &op) { - return op.first == pg; + if (op.first == pg) { + accumulate(op.second); + return true; + } else { + return false; + } } }; void dequeue(PG *pg) { - ShardData* sdata = NULL; - assert(pg != NULL); - uint32_t shard_index = pg->get_pgid().ps()% shard_list.size(); - sdata = shard_list[shard_index]; - assert(sdata != NULL); - sdata->sdata_op_ordering_lock.Lock(); - sdata->pqueue->remove_by_filter(Pred(pg), 0); - sdata->pg_for_processing.erase(pg); - sdata->sdata_op_ordering_lock.Unlock(); + return dequeue_and_get_ops(pg, nullptr); } void dequeue_and_get_ops(PG *pg, list *dequeued) { @@ -1794,28 +1799,24 @@ private: uint32_t shard_index = pg->get_pgid().ps()% shard_list.size(); sdata = shard_list[shard_index]; assert(sdata != NULL); - assert(dequeued); - list > _dequeued; sdata->sdata_op_ordering_lock.Lock(); - sdata->pqueue->remove_by_filter(Pred(pg), &_dequeued); - for (list >::iterator i = _dequeued.begin(); - i != _dequeued.end(); ++i) { - boost::optional mop = i->second.maybe_get_op(); - if (mop) - dequeued->push_back(*mop); - } + + Pred f(pg, dequeued); + + // items in pqueue are behind items in pg_for_processing + sdata->pqueue->remove_by_filter(f); + map >::iterator iter = sdata->pg_for_processing.find(pg); if (iter != sdata->pg_for_processing.end()) { - for (list::reverse_iterator i = iter->second.rbegin(); + for (auto i = iter->second.rbegin(); i != iter->second.rend(); ++i) { - boost::optional mop = i->maybe_get_op(); - if (mop) - dequeued->push_front(*mop); + f.accumulate(*i); } sdata->pg_for_processing.erase(iter); } + sdata->sdata_op_ordering_lock.Unlock(); }