]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: protect handle_osd_map requeueing with queue lock
authorSage Weil <sage.weil@dreamhost.com>
Mon, 21 Nov 2011 21:23:59 +0000 (13:23 -0800)
committerSage Weil <sage.weil@dreamhost.com>
Mon, 21 Nov 2011 21:23:59 +0000 (13:23 -0800)
pending_ops was protected by osd_lock, but it tracks something in the
queue, which has it's own lock.  Messy.  Also, useless, since
wait_for_no_ops had a single caller in shutdown() that op_wq.drain() can
do for us.

Rip it out, and track queue size under the queue lock.

Fixes: #1727
Signed-off-by: Sage Weil <sage.weil@dreamhost.com>
src/osd/OSD.cc
src/osd/OSD.h

index 492f7881626451a59e829c14a71962605dc10d0e..aeba6b8a51378565df97b65aceeed04131d27ff5 100644 (file)
@@ -527,6 +527,7 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger,
   heartbeat_dispatcher(this),
   stat_lock("OSD::stat_lock"),
   finished_lock("OSD::finished_lock"),
+  op_queue_len(0),
   op_wq(this, g_conf->osd_op_thread_timeout, &op_tp),
   map_lock("OSD::map_lock"),
   peer_map_epoch_lock("OSD::peer_map_epoch_lock"),
@@ -558,8 +559,6 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger,
 
   map_in_progress_cond = new Cond();
   
-  pending_ops = 0;
-  waiting_for_no_ops = false;
 }
 
 OSD::~OSD()
@@ -817,7 +816,7 @@ int OSD::shutdown()
   command_tp.stop();
 
   // finish ops
-  wait_for_no_ops();
+  op_wq.drain();
   dout(10) << "no ops" << dendl;
 
   recovery_tp.stop();
@@ -3173,20 +3172,18 @@ void OSD::handle_osd_map(MOSDMap *m)
   op_wq.lock();
 
   list<Message*> rq;
-  while (!op_queue.empty()) {
-    PG *pg = op_queue.back();
+  while (true) {
+    PG *pg = op_wq._dequeue();
+    if (!pg)
+      break;
     pg->lock();
-    op_queue.pop_back();
-    pending_ops--;
-    Message *mess = pg->op_queue.back();
-    pg->op_queue.pop_back();
+    Message *mess = pg->op_queue.front();
+    pg->op_queue.pop_front();
     pg->unlock();
     pg->put();
     dout(15) << " will requeue " << *mess << dendl;
-    rq.push_front(mess);
+    rq.push_back(mess);
   }
-  assert(pending_ops == 0);  // we paused the wq, and just emptied out the queue
-  logger->set(l_osd_opq, pending_ops);
   push_waiters(rq);  // requeue under osd_lock!
   op_wq.unlock();
 
@@ -5423,12 +5420,30 @@ void OSD::enqueue_op(PG *pg, Message *op)
 
   // add to pg's op_queue
   pg->op_queue.push_back(op);
-  pending_ops++;
-  logger->set(l_osd_opq, pending_ops);
   
   op_wq.queue(pg);
 }
 
+bool OSD::OpWQ::_enqueue(PG *pg)
+{
+  pg->get();
+  osd->op_queue.push_back(pg);
+  osd->op_queue_len++;
+  osd->logger->set(l_osd_opq, osd->op_queue_len);
+  return true;
+}
+
+PG *OSD::OpWQ::_dequeue()
+{
+  if (osd->op_queue.empty())
+    return NULL;
+  PG *pg = osd->op_queue.front();
+  osd->op_queue.pop_front();
+  osd->op_queue_len--;
+  osd->logger->set(l_osd_opq, osd->op_queue_len);
+  return pg;
+}
+
 /*
  * requeue ops at _front_ of queue.  these are previously queued
  * operations that need to get requeued ahead of anything the dispatch
@@ -5440,6 +5455,9 @@ void OSD::requeue_ops(PG *pg, list<Message*>& ls)
   dout(15) << *pg << " requeue_ops " << ls << dendl;
   assert(pg->is_locked());
 
+  // you can't call this on pg->op_queue!
+  assert(&ls != &pg->op_queue);
+
   // set current queue contents aside..
   list<Message*> orig_queue;
   orig_queue.swap(pg->op_queue);
@@ -5476,9 +5494,7 @@ void OSD::dequeue_op(PG *pg)
     op = pg->op_queue.front();
     pg->op_queue.pop_front();
     
-    dout(10) << "dequeue_op " << *op << " pg " << *pg
-            << ", " << (pending_ops-1) << " more pending"
-            << dendl;
+    dout(10) << "dequeue_op " << *op << " pg " << *pg << dendl;
 
     // share map?
     //  do this preemptively while we hold osd_lock and pg->lock
@@ -5509,30 +5525,7 @@ void OSD::dequeue_op(PG *pg)
   //scrub_wq.queue(pg);
 
   // finish
-  osd_lock.Lock();
-  {
-    dout(10) << "dequeue_op " << op << " finish" << dendl;
-    assert(pending_ops > 0);
-    
-    pending_ops--;
-    logger->set(l_osd_opq, pending_ops);
-    if (pending_ops == 0 && waiting_for_no_ops)
-      no_pending_ops.Signal();
-  }
-  osd_lock.Unlock();
-}
-
-void OSD::wait_for_no_ops()
-{
-  if (pending_ops > 0) {
-    dout(7) << "wait_for_no_ops - waiting for " << pending_ops << dendl;
-    waiting_for_no_ops = true;
-    while (pending_ops > 0)
-      no_pending_ops.Wait(osd_lock);
-    waiting_for_no_ops = false;
-    assert(pending_ops == 0);
-  } 
-  dout(7) << "wait_for_no_ops - none" << dendl;
+  dout(10) << "dequeue_op " << op << " finish" << dendl;
 }
 
 
index 7d94035a7176708217e145bc55ecb623a99fe9dc..8a5bffaaaf580b3e6c301707edcecfae11af2aef 100644 (file)
@@ -324,30 +324,21 @@ private:
   
   // -- op queue --
   deque<PG*> op_queue;
-  
+  int op_queue_len;
+
   struct OpWQ : public ThreadPool::WorkQueue<PG> {
     OSD *osd;
     OpWQ(OSD *o, time_t ti, ThreadPool *tp)
       : ThreadPool::WorkQueue<PG>("OSD::OpWQ", ti, ti*10, tp), osd(o) {}
 
-    bool _enqueue(PG *pg) {
-      pg->get();
-      osd->op_queue.push_back(pg);
-      return true;
-    }
+    bool _enqueue(PG *pg);
     void _dequeue(PG *pg) {
       assert(0);
     }
     bool _empty() {
       return osd->op_queue.empty();
     }
-    PG *_dequeue() {
-      if (osd->op_queue.empty())
-       return NULL;
-      PG *pg = osd->op_queue.front();
-      osd->op_queue.pop_front();
-      return pg;
-    }
+    PG *_dequeue();
     void _process(PG *pg) {
       osd->dequeue_op(pg);
     }
@@ -356,11 +347,6 @@ private:
     }
   } op_wq;
 
-  int   pending_ops;
-  bool  waiting_for_no_ops;
-  Cond  no_pending_ops;
-  
-  void wait_for_no_ops();
   void enqueue_op(PG *pg, Message *op);
   void requeue_ops(PG *pg, list<Message*>& ls);
   void dequeue_op(PG *pg);