]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: protect pipe queue_item map with pipe_lock AND dispatch_queue lock
authorSage Weil <sage@newdream.net>
Fri, 12 Nov 2010 22:05:56 +0000 (14:05 -0800)
committerSage Weil <sage@newdream.net>
Fri, 12 Nov 2010 22:55:12 +0000 (14:55 -0800)
Close a few different races here.

Also, assert that queue_items are not queued in ~Pipe().

Signed-off-by: Sage Weil <sage@newdream.net>
src/msg/SimpleMessenger.cc
src/msg/SimpleMessenger.h

index 2da27e166cb50d85ecc4e67d68eabb5c13d3ff41..97bcbc878c2e515222ed3fc0c8f473e3c8cf9975 100644 (file)
@@ -1267,22 +1267,24 @@ void SimpleMessenger::Pipe::requeue_sent(uint64_t max_acked)
 void SimpleMessenger::Pipe::discard_queue()
 {
   dout(10) << "discard_queue" << dendl;
-  DispatchQueue& q = messenger->dispatch_queue;
+
   halt_delivery = true;
+
+  // dequeue pipe
+  DispatchQueue& q = messenger->dispatch_queue;
   pipe_lock.Unlock();
-  xlist<Pipe *>* list_on;
-  q.lock.Lock();//to remove from round-robin
+  q.lock.Lock();
   pipe_lock.Lock();
   for (map<int, xlist<Pipe *>::item* >::iterator i = queue_items.begin();
        i != queue_items.end();
        ++i) {
+    xlist<Pipe *>* list_on;
     if ((list_on = i->second->get_list())) { //if in round-robin
       i->second->remove_myself(); //take off
       if (list_on->empty()) //if round-robin queue is empty
        q.queued_pipes.erase(i->first); //remove from map
     }
   }
-  q.lock.Unlock();
 
   // clear queue_items
   while (!queue_items.empty()) {
@@ -1290,6 +1292,8 @@ void SimpleMessenger::Pipe::discard_queue()
     queue_items.erase(queue_items.begin());
   }
 
+  q.lock.Unlock();
+
   // adjust qlen
   q.qlen_lock.lock();
   q.qlen -= in_qlen;
index 88aefefe62166ed79467ecfb9ab9df43c453d391..052e88c9f3d11f4ba2fe6d6d49c1578ce17dc316 100644 (file)
@@ -144,7 +144,7 @@ private:
     map<int, list<Message*> > out_q;  // priority queue for outbound msgs
     map<int, list<Message*> > in_q; // and inbound ones
     int in_qlen;
-    map<int, xlist<Pipe *>::item* > queue_items; // _map_ protected by pipe_lock, *item protected by q.lock
+    map<int, xlist<Pipe *>::item* > queue_items; // protected by pipe_lock AND q.lock
     list<Message*> sent;
     Cond cond;
     bool keepalive;
@@ -224,8 +224,7 @@ private:
       for (map<int, xlist<Pipe *>::item* >::iterator i = queue_items.begin();
           i != queue_items.end();
           ++i) {
-       if (i->second->is_on_list())
-         i->second->remove_myself();
+       assert(!i->second->is_on_list());
        delete i->second;
       }
       assert(out_q.empty());
@@ -269,11 +268,11 @@ private:
     //Also, call with pipe_lock held or bad things happen
     void enqueue_me(int priority) {
       assert(pipe_lock.is_locked());
-      if (!queue_items.count(priority))
-       queue_items[priority] = new xlist<Pipe *>::item(this);
       pipe_lock.Unlock();
       messenger->dispatch_queue.lock.Lock();
       pipe_lock.Lock();
+      if (!queue_items.count(priority)) 
+       queue_items[priority] = new xlist<Pipe *>::item(this);
       if (messenger->dispatch_queue.queued_pipes.empty())
        messenger->dispatch_queue.cond.Signal();
       messenger->dispatch_queue.queued_pipes[priority].push_back(queue_items[priority]);