struct PeeringWQ : public ThreadPool::BatchWorkQueue<PG> {
list<PG*> peering_queue;
OSD *osd;
+ set<PG*> in_use;
+ const size_t batch_size;
PeeringWQ(OSD *o, time_t ti, ThreadPool *tp, size_t batch_size)
: ThreadPool::BatchWorkQueue<PG>(
- "OSD::PeeringWQ", ti, ti*10, tp, batch_size), osd(o) {}
+ "OSD::PeeringWQ", ti, ti*10, tp), osd(o), batch_size(batch_size) {}
void _dequeue(PG *pg) {
for (list<PG*>::iterator i = peering_queue.begin();
bool _empty() {
return peering_queue.empty();
}
- PG *_dequeue() {
- if (peering_queue.empty())
- return 0;
- PG *retval = peering_queue.front();
- peering_queue.pop_front();
- return retval;
+ void _dequeue(list<PG*> *out) {
+ set<PG*> got;
+ for (list<PG*>::iterator i = peering_queue.begin();
+ i != peering_queue.end() && out->size() < batch_size;
+ ) {
+ if (in_use.count(*i)) {
+ ++i;
+ } else {
+ out->push_back(*i);
+ got.insert(*i);
+ peering_queue.erase(i++);
+ }
+ }
+ in_use.insert(got.begin(), got.end());
}
void _process(const list<PG *> &pgs) {
osd->process_peering_events(pgs);
(*i)->put();
}
}
+ void _process_finish(const list<PG *> &pgs) {
+ for (list<PG*>::const_iterator i = pgs.begin();
+ i != pgs.end();
+ ++i) {
+ in_use.erase(*i);
+ }
+ }
void _clear() {
assert(peering_queue.empty());
}