]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: protect pipe queuing with _both_ pipe and dispatch_queue locks
authorSage Weil <sage@newdream.net>
Fri, 12 Nov 2010 21:41:14 +0000 (13:41 -0800)
committerSage Weil <sage@newdream.net>
Fri, 12 Nov 2010 22:55:11 +0000 (14:55 -0800)
We want to make sure the pipe's queue item doesn't go away.

Also, make queue_received() require pipe_lock to be held.  This avoids some
useless unlocking/locking, since (in the case where the pipe is already
queued) we then don't need to drop the pipe_lock at all.

Signed-off-by: Sage Weil <sage@newdream.net>
src/msg/SimpleMessenger.cc
src/msg/SimpleMessenger.h

index 51e64693cfa19311f576c50a65ac7a7781b34a9f..748373ff784201d93e56d35f18922dd798b0308f 100644 (file)
@@ -1285,6 +1285,7 @@ void SimpleMessenger::Pipe::discard_queue()
   pipe_lock.Unlock();
   xlist<Pipe *>* list_on;
   q.lock.Lock();//to remove from round-robin
+  pipe_lock.Lock();
   for (map<int, xlist<Pipe *>::item* >::iterator i = queue_items.begin();
        i != queue_items.end();
        ++i) {
@@ -1295,7 +1296,6 @@ void SimpleMessenger::Pipe::discard_queue()
     }
   }
   q.lock.Unlock();
-  pipe_lock.Lock();
 
   // clear queue_items
   while (!queue_items.empty()) {
@@ -1539,14 +1539,11 @@ void SimpleMessenger::Pipe::reader()
       in_seq = m->get_seq();
 
       cond.Signal();  // wake up writer, to ack this
-      pipe_lock.Unlock();
       
       dout(10) << "reader got message "
               << m->get_seq() << " " << m << " " << *m
               << dendl;
       queue_received(m);
-
-      pipe_lock.Lock();
     } 
     
     else if (tag == CEPH_MSGR_TAG_CLOSE) {
index d450352465cd2138555795a3a58ac485ab52acad..b1a0415aa09ebbece0150692d72a4248e6518e7a 100644 (file)
@@ -267,35 +267,35 @@ private:
     //push it to the back of the line!
     //Also, call with pipe_lock held or bad things happen
     void enqueue_me(int priority) {
+      assert(pipe_lock.is_locked());
       if (!queue_items.count(priority))
        queue_items[priority] = new xlist<Pipe *>::item(this);
       pipe_lock.Unlock();
       messenger->dispatch_queue.lock.Lock();
+      pipe_lock.Lock();
       if (messenger->dispatch_queue.queued_pipes.empty())
        messenger->dispatch_queue.cond.Signal();
       messenger->dispatch_queue.queued_pipes[priority].push_back(queue_items[priority]);
       messenger->dispatch_queue.lock.Unlock();
-      pipe_lock.Lock();
     }
 
     //we have two queue_received's to allow local signal delivery
     // via Message * (that doesn't actually point to a Message)
-    //Don't call while holding pipe_lock!
     void queue_received(Message *m, int priority) {
+      assert(pipe_lock.is_locked());
+
       list<Message *>& queue = in_q[priority];
       bool was_empty;
-      pipe_lock.Lock();
+
       if (halt_delivery) {
         if (m>(void *)5) // don't want to put local-delivery signals
                          // this magic number should be larger than
                          // the size of the D_CONNECT et al enum
           m->put();
-        goto unlock_return;
+        return;
       }
       was_empty = queue.empty();
       queue.push_back(m);
-      if (was_empty) //this pipe isn't on the endpoint queue
-       enqueue_me(priority);
 
       //increment queue length counters
       in_qlen++;
@@ -303,8 +303,8 @@ private:
       ++messenger->dispatch_queue.qlen;
       messenger->dispatch_queue.qlen_lock.unlock();
 
-unlock_return:
-      pipe_lock.Unlock();
+      if (was_empty) //this pipe isn't on the endpoint queue
+       enqueue_me(priority);
     }
     
     void queue_received(Message *m) {
@@ -404,9 +404,11 @@ unlock_return:
 
     Pipe *local_pipe;
     void local_delivery(Message *m, int priority) {
+      local_pipe->pipe_lock.Lock();
       if ((unsigned long)m > 10)
        m->set_connection(local_pipe->connection_state->get());
       local_pipe->queue_received(m, priority);
+      local_pipe->pipe_lock.Unlock();
     }
 
     int get_queue_len() {
@@ -416,12 +418,6 @@ unlock_return:
       return l;
     }
     
-    void local_delivery(Message *m) {
-      if ((unsigned long)m > 10)
-       m->set_connection(local_pipe->connection_state->get());
-      local_pipe->queue_received(m);
-    }
-
     void queue_connect(Connection *con) {
       lock.Lock();
       connect_q.push_back(con);