public:
// How many Ops are in the queue
virtual unsigned length() const = 0;
- // Ops will be removed and placed in *removed if f is true
+ // Ops will be removed f evaluates to true, f may have sideeffects
virtual void remove_by_filter(
- std::function<bool (T)> f, std::list<T> *removed) = 0;
+ std::function<bool (T)> f) = 0;
// Ops of this priority should be deleted immediately
virtual void remove_by_class(K k, std::list<T> *out) = 0;
// Enqueue op in the back of the strict queue
typedef std::list<std::pair<unsigned, T> > ListPairs;
static unsigned filter_list_pairs(
ListPairs *l,
- std::function<bool (T)> f,
- std::list<T> *out) {
+ std::function<bool (T)> f) {
unsigned ret = 0;
- if (out) {
- for (typename ListPairs::reverse_iterator i = l->rbegin();
- i != l->rend();
- ++i) {
- if (f(i->second)) {
- out->push_front(i->second);
- }
- }
- }
- for (typename ListPairs::iterator i = l->begin();
- i != l->end();
+ for (typename ListPairs::iterator i = l->end();
+ i != l->begin();
) {
- if (f(i->second)) {
- l->erase(i++);
+ auto next = i;
+ --next;
+ if (f(next->second)) {
++ret;
+ l->erase(next);
} else {
- ++i;
+ i = next;
}
}
return ret;
return q.empty();
}
void remove_by_filter(
- std::function<bool (T)> f,
- std::list<T> *out) {
+ std::function<bool (T)> f) {
for (typename Classes::iterator i = q.begin();
i != q.end();
) {
- size -= filter_list_pairs(&(i->second), f, out);
+ size -= filter_list_pairs(&(i->second), f);
if (i->second.empty()) {
if (cur == i) {
++cur;
}
void remove_by_filter(
- std::function<bool (T)> f,
- std::list<T> *removed = 0) override final {
+ std::function<bool (T)> f) override final {
for (typename SubQueues::iterator i = queue.begin();
i != queue.end();
) {
unsigned priority = i->first;
- i->second.remove_by_filter(f, removed);
+ i->second.remove_by_filter(f);
if (i->second.empty()) {
++i;
remove_queue(priority);
for (typename SubQueues::iterator i = high_queue.begin();
i != high_queue.end();
) {
- i->second.remove_by_filter(f, removed);
+ i->second.remove_by_filter(f);
if (i->second.empty()) {
high_queue.erase(i++);
} else {
unsigned get_size() const {
return lp.size();
}
- unsigned filter_list_pairs(std::function<bool (T)>& f,
- std::list<T>* out) {
+ unsigned filter_list_pairs(std::function<bool (T)>& f) {
unsigned count = 0;
// intrusive containers can't erase with a reverse_iterator
// so we have to walk backwards on our own. Since there is
// no iterator before begin, we have to test at the end.
for (Lit i = --lp.end();; --i) {
if (f(i->item)) {
- if (out) {
- out->push_front(i->item);
- }
i = lp.erase_and_dispose(i, DelItem<ListPair>());
++count;
}
check_end();
return ret;
}
- unsigned filter_list_pairs(std::function<bool (T)>& f, std::list<T>* out) {
+ unsigned filter_list_pairs(std::function<bool (T)>& f) {
unsigned count = 0;
// intrusive containers can't erase with a reverse_iterator
// so we have to walk backwards on our own. Since there is
// no iterator before begin, we have to test at the end.
for (Kit i = klasses.begin(); i != klasses.end();) {
- count += i->filter_list_pairs(f, out);
+ count += i->filter_list_pairs(f);
if (i->empty()) {
if (next == i) {
++next;
}
return ret;
}
- void filter_list_pairs(std::function<bool (T)>& f, std::list<T>* out) {
+ void filter_list_pairs(std::function<bool (T)>& f) {
for (Sit i = queues.begin(); i != queues.end();) {
- size -= i->filter_list_pairs(f, out);
+ size -= i->filter_list_pairs(f);
if (i->empty()) {
total_prio -= i->key;
i = queues.erase_and_dispose(i, DelItem<SubQueue>());
unsigned length() const override final {
return strict.size + normal.size;
}
- void remove_by_filter(std::function<bool (T)> f, std::list<T>* removed = 0) override final {
- strict.filter_list_pairs(f, removed);
- normal.filter_list_pairs(f, removed);
+ void remove_by_filter(std::function<bool (T)> f) override final {
+ strict.filter_list_pairs(f);
+ normal.filter_list_pairs(f);
}
void remove_by_class(K cl, std::list<T>* removed = 0) override final {
strict.filter_class(cl, removed);
}
}
+ /// Must be called on ops queued back to front
struct Pred {
PG *pg;
- explicit Pred(PG *pg) : pg(pg) {}
+ list<OpRequestRef> *out_ops;
+ void accumulate(PGQueueable &op) {
+ if (out_ops) {
+ boost::optional<OpRequestRef> mop = op.maybe_get_op();
+ if (mop)
+ out_ops->push_front(*mop);
+ }
+ }
bool operator()(const pair<PGRef, PGQueueable> &op) {
- return op.first == pg;
+ if (op.first == pg) {
+ accumulate(op.second);
+ return true;
+ } else {
+ return false;
+ }
}
};
void dequeue(PG *pg) {
- ShardData* sdata = NULL;
- assert(pg != NULL);
- uint32_t shard_index = pg->get_pgid().ps()% shard_list.size();
- sdata = shard_list[shard_index];
- assert(sdata != NULL);
- sdata->sdata_op_ordering_lock.Lock();
- sdata->pqueue->remove_by_filter(Pred(pg), 0);
- sdata->pg_for_processing.erase(pg);
- sdata->sdata_op_ordering_lock.Unlock();
+ return dequeue_and_get_ops(pg, nullptr);
}
void dequeue_and_get_ops(PG *pg, list<OpRequestRef> *dequeued) {
uint32_t shard_index = pg->get_pgid().ps()% shard_list.size();
sdata = shard_list[shard_index];
assert(sdata != NULL);
- assert(dequeued);
- list<pair<PGRef, PGQueueable> > _dequeued;
sdata->sdata_op_ordering_lock.Lock();
- sdata->pqueue->remove_by_filter(Pred(pg), &_dequeued);
- for (list<pair<PGRef, PGQueueable> >::iterator i = _dequeued.begin();
- i != _dequeued.end(); ++i) {
- boost::optional<OpRequestRef> mop = i->second.maybe_get_op();
- if (mop)
- dequeued->push_back(*mop);
- }
+
+ Pred f(pg, dequeued);
+
+ // items in pqueue are behind items in pg_for_processing
+ sdata->pqueue->remove_by_filter(f);
+
map<PG *, list<PGQueueable> >::iterator iter =
sdata->pg_for_processing.find(pg);
if (iter != sdata->pg_for_processing.end()) {
- for (list<PGQueueable>::reverse_iterator i = iter->second.rbegin();
+ for (auto i = iter->second.rbegin();
i != iter->second.rend();
++i) {
- boost::optional<OpRequestRef> mop = i->maybe_get_op();
- if (mop)
- dequeued->push_front(*mop);
+ f.accumulate(*i);
}
sdata->pg_for_processing.erase(iter);
}
+
sdata->sdata_op_ordering_lock.Unlock();
}