]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/DispatchQueue: fix locking in dispatch thread
authorSage Weil <sage@inktank.com>
Tue, 17 Jul 2012 22:27:27 +0000 (15:27 -0700)
committerSage Weil <sage@inktank.com>
Sat, 21 Jul 2012 01:36:32 +0000 (18:36 -0700)
The locking was awkward with locally delivered messages.. we dropped dq
lock, inq lock, re-took dq lock, etc.   We would also take + drop + retake
+ drop the dq lock when queuing events.  Blech!

Instead:

 * simplify the queueing of cons for the local_queue
 * dequeue the con under the original dq lock
 * queue events under a single dq lock interval, by telling
   local_queue.queue() we already have it.

Signed-off-by: Sage Weil <sage@inktank.com>
src/msg/DispatchQueue.cc
src/msg/DispatchQueue.h

index f0e8ef0be07abd8d8c794ed4302e1d0c1b5fcd54..437f767429ea02d6441abbfc5d61de9774ec5a98 100644 (file)
 #undef dout_prefix
 #define dout_prefix *_dout << "incomingqueue(" << this << " " << parent << ")."
 
-void IncomingQueue::queue(Message *m, int priority)
+void IncomingQueue::queue(Message *m, int priority, bool hold_dq_lock)
 {
   Mutex::Locker l(lock);
   ldout(cct,20) << "queue " << m << " prio " << priority << dendl;
   if (in_q.count(priority) == 0) {
     // queue inq AND message under inq AND dispatch_queue locks.
-    lock.Unlock();
-    dq->lock.Lock();
-    lock.Lock();
+    if (!hold_dq_lock) {
+      lock.Unlock();
+      dq->lock.Lock();
+      lock.Lock();
+    } else {
+      assert(dq->lock.is_locked());
+    }
 
     if (halt) {
-      dq->lock.Unlock();
+      if (!hold_dq_lock) {
+       dq->lock.Unlock();
+      } else {
+       assert(dq->lock.is_locked());
+      }
       goto halt;
     }
 
@@ -65,7 +73,11 @@ void IncomingQueue::queue(Message *m, int priority)
 
     queue.push_back(m);
 
-    dq->lock.Unlock();
+    if (!hold_dq_lock) {
+      dq->lock.Unlock();
+    } else {
+      assert(dq->lock.is_locked());
+    }
   } else {
     ldout(cct,20) << "queue " << m << " under existing queue" << dendl;
     // just queue message under our lock.
@@ -139,6 +151,7 @@ void IncomingQueue::restart_queue()
 }
 
 
+
 /*******************
  * DispatchQueue
  */
@@ -149,8 +162,7 @@ void IncomingQueue::restart_queue()
 
 void DispatchQueue::local_delivery(Message *m, int priority)
 {
-  if ((unsigned long)m > 10)
-    m->set_connection(msgr->local_connection->get());
+  m->set_connection(msgr->local_connection->get());
   local_queue.queue(m, priority);
 }
 
@@ -197,65 +209,55 @@ void DispatchQueue::entry()
                      << ", moved to end of list" << dendl;
        qlist->push_back(inq->queue_items[priority]);  // move to end of list
       }
-      lock.Unlock(); //done with the pipe queue for a while
+
+      Connection *con = NULL;
+      if ((long)m < DispatchQueue::D_NUM_CODES) {
+       assert(inq == &local_queue);
+       con = con_q.front();
+       con_q.pop_front();
+      }
+
+      lock.Unlock();
 
       inq->in_qlen--;
       qlen.dec();
 
-      inq->lock.Unlock(); // done with the pipe's message queue now
-
+      inq->lock.Unlock();
       if (dequeued)
        inq->put();
 
-      {
-       if ((long)m == DispatchQueue::D_BAD_REMOTE_RESET) {
-         lock.Lock();
-         Connection *con = remote_reset_q.front();
-         remote_reset_q.pop_front();
-         lock.Unlock();
-         msgr->ms_deliver_handle_remote_reset(con);
-         con->put();
-       } else if ((long)m == DispatchQueue::D_CONNECT) {
-         lock.Lock();
-         Connection *con = connect_q.front();
-         connect_q.pop_front();
-         lock.Unlock();
-         msgr->ms_deliver_handle_connect(con);
-         con->put();
-       } else if ((long)m == DispatchQueue::D_ACCEPT) {
-         lock.Lock();
-         Connection *con = accept_q.front();
-         accept_q.pop_front();
-         lock.Unlock();
-         msgr->ms_deliver_handle_accept(con);
-         con->put();
-       } else if ((long)m == DispatchQueue::D_BAD_RESET) {
-         lock.Lock();
-         Connection *con = reset_q.front();
-         reset_q.pop_front();
-         lock.Unlock();
-         msgr->ms_deliver_handle_reset(con);
-         con->put();
-       } else {
-         uint64_t msize = m->get_dispatch_throttle_size();
-         m->set_dispatch_throttle_size(0);  // clear it out, in case we requeue this message.
-
-         ldout(cct,1) << "<== " << m->get_source_inst()
-                 << " " << m->get_seq()
-                 << " ==== " << *m
-                 << " ==== " << m->get_payload().length() << "+" << m->get_middle().length()
-                 << "+" << m->get_data().length()
-                 << " (" << m->get_footer().front_crc << " " << m->get_footer().middle_crc
-                 << " " << m->get_footer().data_crc << ")"
-                 << " " << m << " con " << m->get_connection()
-                 << dendl;
-         msgr->ms_deliver_dispatch(m);
-
-         msgr->dispatch_throttle_release(msize);
-
-         ldout(cct,20) << "done calling dispatch on " << m << dendl;
-       }
+      if ((long)m == DispatchQueue::D_BAD_REMOTE_RESET) {
+       msgr->ms_deliver_handle_remote_reset(con);
+       con->put();
+      } else if ((long)m == DispatchQueue::D_CONNECT) {
+       msgr->ms_deliver_handle_connect(con);
+       con->put();
+      } else if ((long)m == DispatchQueue::D_ACCEPT) {
+       msgr->ms_deliver_handle_accept(con);
+       con->put();
+      } else if ((long)m == DispatchQueue::D_BAD_RESET) {
+       msgr->ms_deliver_handle_reset(con);
+       con->put();
+      } else {
+       uint64_t msize = m->get_dispatch_throttle_size();
+       m->set_dispatch_throttle_size(0);  // clear it out, in case we requeue this message.
+
+       ldout(cct,1) << "<== " << m->get_source_inst()
+                    << " " << m->get_seq()
+                    << " ==== " << *m
+                    << " ==== " << m->get_payload().length() << "+" << m->get_middle().length()
+                    << "+" << m->get_data().length()
+                    << " (" << m->get_footer().front_crc << " " << m->get_footer().middle_crc
+                    << " " << m->get_footer().data_crc << ")"
+                    << " " << m << " con " << m->get_connection()
+                    << dendl;
+       msgr->ms_deliver_dispatch(m);
+
+       msgr->dispatch_throttle_release(msize);
+
+       ldout(cct,20) << "done calling dispatch on " << m << dendl;
       }
+
       lock.Lock();
     }
     if (!stop)
index 37853a7d45adab14fe87f75368efdba66c4588c7..a0a25940f5f8502e5a87213014e3f960113bc895 100644 (file)
@@ -41,7 +41,7 @@ struct IncomingQueue : public RefCountedObject {
   map<int, xlist<IncomingQueue *>::item* > queue_items; // protected by pipe_lock AND q.lock
   bool halt;
 
-  void queue(Message *m, int priority);
+  void queue(Message *m, int priority, bool hold_dq_lock=false);
   void discard_queue();
   void restart_queue();
 
@@ -86,9 +86,7 @@ struct DispatchQueue {
   atomic_t qlen;
     
   enum { D_CONNECT = 1, D_ACCEPT, D_BAD_REMOTE_RESET, D_BAD_RESET, D_NUM_CODES };
-  list<Connection*> connect_q, accept_q;
-  list<Connection*> remote_reset_q;
-  list<Connection*> reset_q;
+  list<Connection*> con_q;
 
   IncomingQueue local_queue;
 
@@ -121,9 +119,9 @@ struct DispatchQueue {
       lock.Unlock();
       return;
     }
-    connect_q.push_back(con->get());
+    con_q.push_back(con->get());
+    local_queue.queue((Message*)D_CONNECT, CEPH_MSG_PRIO_HIGHEST, true);
     lock.Unlock();
-    local_delivery((Message*)D_CONNECT, CEPH_MSG_PRIO_HIGHEST);
   }
   void queue_accept(Connection *con) {
     lock.Lock();
@@ -131,9 +129,9 @@ struct DispatchQueue {
       lock.Unlock();
       return;
     }
-    accept_q.push_back(con->get());
+    con_q.push_back(con->get());
+    local_queue.queue((Message*)D_ACCEPT, CEPH_MSG_PRIO_HIGHEST, true);
     lock.Unlock();
-    local_delivery((Message*)D_ACCEPT, CEPH_MSG_PRIO_HIGHEST);
   }
   void queue_remote_reset(Connection *con) {
     lock.Lock();
@@ -141,9 +139,9 @@ struct DispatchQueue {
       lock.Unlock();
       return;
     }
-    remote_reset_q.push_back(con->get());
+    con_q.push_back(con->get());
+    local_queue.queue((Message*)D_BAD_REMOTE_RESET, CEPH_MSG_PRIO_HIGHEST, true);
     lock.Unlock();
-    local_delivery((Message*)D_BAD_REMOTE_RESET, CEPH_MSG_PRIO_HIGHEST);
   }
   void queue_reset(Connection *con) {
     lock.Lock();
@@ -151,9 +149,9 @@ struct DispatchQueue {
       lock.Unlock();
       return;
     }
-    reset_q.push_back(con->get());
+    con_q.push_back(con->get());
+    local_queue.queue((Message*)D_BAD_RESET, CEPH_MSG_PRIO_HIGHEST, true);
     lock.Unlock();
-    local_delivery((Message*)D_BAD_RESET, CEPH_MSG_PRIO_HIGHEST);
   }
 
   void start();