heartbeat_dispatcher(this),
stat_lock("OSD::stat_lock"),
finished_lock("OSD::finished_lock"),
+ op_queue_len(0),
op_wq(this, g_conf->osd_op_thread_timeout, &op_tp),
map_lock("OSD::map_lock"),
peer_map_epoch_lock("OSD::peer_map_epoch_lock"),
map_in_progress_cond = new Cond();
- pending_ops = 0;
- waiting_for_no_ops = false;
}
OSD::~OSD()
command_tp.stop();
// finish ops
- wait_for_no_ops();
+ op_wq.drain();
dout(10) << "no ops" << dendl;
recovery_tp.stop();
op_wq.lock();
list<Message*> rq;
- while (!op_queue.empty()) {
- PG *pg = op_queue.back();
+ while (true) {
+ PG *pg = op_wq._dequeue();
+ if (!pg)
+ break;
pg->lock();
- op_queue.pop_back();
- pending_ops--;
- Message *mess = pg->op_queue.back();
- pg->op_queue.pop_back();
+ Message *mess = pg->op_queue.front();
+ pg->op_queue.pop_front();
pg->unlock();
pg->put();
dout(15) << " will requeue " << *mess << dendl;
- rq.push_front(mess);
+ rq.push_back(mess);
}
- assert(pending_ops == 0); // we paused the wq, and just emptied out the queue
- logger->set(l_osd_opq, pending_ops);
push_waiters(rq); // requeue under osd_lock!
op_wq.unlock();
// add to pg's op_queue
pg->op_queue.push_back(op);
- pending_ops++;
- logger->set(l_osd_opq, pending_ops);
op_wq.queue(pg);
}
+bool OSD::OpWQ::_enqueue(PG *pg)
+{
+ pg->get();
+ osd->op_queue.push_back(pg);
+ osd->op_queue_len++;
+ osd->logger->set(l_osd_opq, osd->op_queue_len);
+ return true;
+}
+
+PG *OSD::OpWQ::_dequeue()
+{
+ if (osd->op_queue.empty())
+ return NULL;
+ PG *pg = osd->op_queue.front();
+ osd->op_queue.pop_front();
+ osd->op_queue_len--;
+ osd->logger->set(l_osd_opq, osd->op_queue_len);
+ return pg;
+}
+
/*
* requeue ops at _front_ of queue. these are previously queued
* operations that need to get requeued ahead of anything the dispatch
dout(15) << *pg << " requeue_ops " << ls << dendl;
assert(pg->is_locked());
+ // you can't call this on pg->op_queue!
+ assert(&ls != &pg->op_queue);
+
// set current queue contents aside..
list<Message*> orig_queue;
orig_queue.swap(pg->op_queue);
op = pg->op_queue.front();
pg->op_queue.pop_front();
- dout(10) << "dequeue_op " << *op << " pg " << *pg
- << ", " << (pending_ops-1) << " more pending"
- << dendl;
+ dout(10) << "dequeue_op " << *op << " pg " << *pg << dendl;
// share map?
// do this preemptively while we hold osd_lock and pg->lock
//scrub_wq.queue(pg);
// finish
- osd_lock.Lock();
- {
- dout(10) << "dequeue_op " << op << " finish" << dendl;
- assert(pending_ops > 0);
-
- pending_ops--;
- logger->set(l_osd_opq, pending_ops);
- if (pending_ops == 0 && waiting_for_no_ops)
- no_pending_ops.Signal();
- }
- osd_lock.Unlock();
-}
-
-void OSD::wait_for_no_ops()
-{
- if (pending_ops > 0) {
- dout(7) << "wait_for_no_ops - waiting for " << pending_ops << dendl;
- waiting_for_no_ops = true;
- while (pending_ops > 0)
- no_pending_ops.Wait(osd_lock);
- waiting_for_no_ops = false;
- assert(pending_ops == 0);
- }
- dout(7) << "wait_for_no_ops - none" << dendl;
+ dout(10) << "dequeue_op " << op << " finish" << dendl;
}
// -- op queue --
deque<PG*> op_queue;
-
+ int op_queue_len;
+
struct OpWQ : public ThreadPool::WorkQueue<PG> {
OSD *osd;
OpWQ(OSD *o, time_t ti, ThreadPool *tp)
: ThreadPool::WorkQueue<PG>("OSD::OpWQ", ti, ti*10, tp), osd(o) {}
- bool _enqueue(PG *pg) {
- pg->get();
- osd->op_queue.push_back(pg);
- return true;
- }
+ bool _enqueue(PG *pg);
void _dequeue(PG *pg) {
assert(0);
}
bool _empty() {
return osd->op_queue.empty();
}
- PG *_dequeue() {
- if (osd->op_queue.empty())
- return NULL;
- PG *pg = osd->op_queue.front();
- osd->op_queue.pop_front();
- return pg;
- }
+ PG *_dequeue();
void _process(PG *pg) {
osd->dequeue_op(pg);
}
}
} op_wq;
- int pending_ops;
- bool waiting_for_no_ops;
- Cond no_pending_ops;
-
- void wait_for_no_ops();
void enqueue_op(PG *pg, Message *op);
void requeue_ops(PG *pg, list<Message*>& ls);
void dequeue_op(PG *pg);