]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: clean up Pipe::queue_received locking
authorSage Weil <sage.weil@dreamhost.com>
Fri, 11 Feb 2011 05:09:42 +0000 (21:09 -0800)
committerSage Weil <sage.weil@dreamhost.com>
Sat, 12 Feb 2011 06:41:40 +0000 (22:41 -0800)
Ensure we maintain the invariant that a pipe has a non-empty queue IFF
the pipe is queued.

Prompted by #798.

Signed-off-by: Sage Weil <sage.weil@dreamhost.com>
src/msg/SimpleMessenger.cc

index 88f1b8a86f28ccca0d1185e58ff1a0fac225301c..620062736a4bb5c86582f53e8bf99c64aaa3ae4a 100644 (file)
@@ -534,38 +534,44 @@ void SimpleMessenger::Pipe::queue_received(Message *m, int priority)
   assert(pipe_lock.is_locked());
   
   list<Message *>& queue = in_q[priority];
-  bool was_empty;
   
   if (halt_delivery)
     goto halt;
   
-  was_empty = queue.empty();
-  queue.push_back(m);
-  
-  //increment queue length counters
-  in_qlen++;
-  messenger->dispatch_queue.qlen_lock.lock();
-  ++messenger->dispatch_queue.qlen;
-  messenger->dispatch_queue.qlen_lock.unlock();
-  
-  if (was_empty) { //this pipe isn't on the endpoint queue
+  if (queue.empty()) {
+    // queue pipe AND message under pipe AND dispatch_queue locks.
     pipe_lock.Unlock();
     messenger->dispatch_queue.lock.Lock();
     pipe_lock.Lock();
-    
+
     if (halt_delivery) {
       messenger->dispatch_queue.lock.Unlock();
       goto halt;
     }
-    
-    dout(20) << "queue_received queuing pipe" << dendl;
-    if (!queue_items.count(priority)) 
-      queue_items[priority] = new xlist<Pipe *>::item(this);
-    if (messenger->dispatch_queue.queued_pipes.empty())
-      messenger->dispatch_queue.cond.Signal();
-    messenger->dispatch_queue.queued_pipes[priority].push_back(queue_items[priority]);
+
+    if (queue.empty()) {
+      dout(20) << "queue_received queuing pipe" << dendl;
+      if (!queue_items.count(priority)) 
+       queue_items[priority] = new xlist<Pipe *>::item(this);
+      if (messenger->dispatch_queue.queued_pipes.empty())
+       messenger->dispatch_queue.cond.Signal();
+      messenger->dispatch_queue.queued_pipes[priority].push_back(queue_items[priority]);
+    }
+
+    queue.push_back(m);
+
     messenger->dispatch_queue.lock.Unlock();
+  } else {
+    // just queue message under pipe lock.
+    queue.push_back(m);
   }
+  
+  // increment queue length counters
+  in_qlen++;
+  messenger->dispatch_queue.qlen_lock.lock();
+  ++messenger->dispatch_queue.qlen;
+  messenger->dispatch_queue.qlen_lock.unlock();
+  
   return;
   
  halt: