]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: close enqueue/discard race
authorSage Weil <sage@newdream.net>
Fri, 12 Nov 2010 22:41:53 +0000 (14:41 -0800)
committerSage Weil <sage@newdream.net>
Fri, 12 Nov 2010 22:55:12 +0000 (14:55 -0800)
We need to re-check halt_delivery after dropping and retaking pipe_lock.

Signed-off-by: Sage Weil <sage@newdream.net>
src/msg/SimpleMessenger.h

index 052e88c9f3d11f4ba2fe6d6d49c1578ce17dc316..4929c0563271a386370dfa8997a9134be1d78ba2 100644 (file)
@@ -263,22 +263,6 @@ private:
     static const Pipe& Server(int s);
     static const Pipe& Client(const entity_addr_t& pi);
 
-    //callers make sure it's not already enqueued or you'll just
-    //push it to the back of the line!
-    //Also, call with pipe_lock held or bad things happen
-    void enqueue_me(int priority) {
-      assert(pipe_lock.is_locked());
-      pipe_lock.Unlock();
-      messenger->dispatch_queue.lock.Lock();
-      pipe_lock.Lock();
-      if (!queue_items.count(priority)) 
-       queue_items[priority] = new xlist<Pipe *>::item(this);
-      if (messenger->dispatch_queue.queued_pipes.empty())
-       messenger->dispatch_queue.cond.Signal();
-      messenger->dispatch_queue.queued_pipes[priority].push_back(queue_items[priority]);
-      messenger->dispatch_queue.lock.Unlock();
-    }
-
     //we have two queue_received's to allow local signal delivery
     // via Message * (that doesn't actually point to a Message)
     void queue_received(Message *m, int priority) {
@@ -287,13 +271,9 @@ private:
       list<Message *>& queue = in_q[priority];
       bool was_empty;
 
-      if (halt_delivery) {
-        if (m>(void *)5) // don't want to put local-delivery signals
-                         // this magic number should be larger than
-                         // the size of the D_CONNECT et al enum
-          m->put();
-        return;
-      }
+      if (halt_delivery)
+       goto halt;
+
       was_empty = queue.empty();
       queue.push_back(m);
 
@@ -303,8 +283,29 @@ private:
       ++messenger->dispatch_queue.qlen;
       messenger->dispatch_queue.qlen_lock.unlock();
 
-      if (was_empty) //this pipe isn't on the endpoint queue
-       enqueue_me(priority);
+      if (was_empty) { //this pipe isn't on the endpoint queue
+       pipe_lock.Unlock();
+       messenger->dispatch_queue.lock.Lock();
+       pipe_lock.Lock();
+       
+       if (halt_delivery)
+         goto halt;
+       
+       if (!queue_items.count(priority)) 
+         queue_items[priority] = new xlist<Pipe *>::item(this);
+       if (messenger->dispatch_queue.queued_pipes.empty())
+         messenger->dispatch_queue.cond.Signal();
+       messenger->dispatch_queue.queued_pipes[priority].push_back(queue_items[priority]);
+       messenger->dispatch_queue.lock.Unlock();
+      }
+      return;
+
+    halt:
+      // don't want to put local-delivery signals
+      // this magic number should be larger than
+      // the size of the D_CONNECT et al enum
+      if (m>(void *)5)
+       m->put();
     }
     
     void queue_received(Message *m) {