]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: Fix a number of locking and deletion bugs. Appears to work.
authorGreg Farnum <gregf@hq.newdream.net>
Thu, 10 Dec 2009 03:10:45 +0000 (19:10 -0800)
committerGreg Farnum <gregf@hq.newdream.net>
Thu, 10 Dec 2009 22:33:32 +0000 (14:33 -0800)
src/msg/SimpleMessenger.cc
src/msg/SimpleMessenger.h

index d1eb84a7c3e1538cc42b1843f811766e4ce9d60a..2482790f9260022d1099c80ba09ae63582ab1d9e 100644 (file)
@@ -1179,6 +1179,17 @@ void SimpleMessenger::Pipe::requeue_sent()
 void SimpleMessenger::Pipe::discard_queue()
 {
   dout(10) << "discard_queue" << dendl;
+  bool endpoint = false;
+  if (rank->local_endpoint) {
+    pipe_lock.Unlock();
+    rank->local_endpoint->endpoint_lock.Lock();//to remove from pipe queue
+    for (map<int, xlist<Pipe *>::item* >::iterator i = queue_items.begin();
+        i != queue_items.end();
+        ++i)
+      i->second->remove_myself();
+    endpoint = true;
+    pipe_lock.Lock();
+  }
   for (list<Message*>::iterator p = sent.begin(); p != sent.end(); p++)
     (*p)->put();
   sent.clear();
@@ -1186,6 +1197,17 @@ void SimpleMessenger::Pipe::discard_queue()
     for (list<Message*>::iterator r = p->second.begin(); r != p->second.end(); r++)
       (*r)->put();
   out_q.clear();
+  for (map<int,list<Message*> >::iterator p = in_q.begin(); p != in_q.end(); p++) {
+    if (endpoint) {
+      int size = in_q.size();
+      rank->local_endpoint->qlen -= size;
+    }
+    for (list<Message*>::iterator r = p->second.begin(); r != p->second.end(); r++)
+      delete *r;
+  }
+  if (endpoint)
+    rank->local_endpoint->endpoint_lock.Unlock();
+  in_q.clear();
 }
 
 
@@ -1941,12 +1963,12 @@ void SimpleMessenger::reaper()
     pipe_reap_queue.pop_front();
     dout(10) << "reaper reaping pipe " << p << " " << p->get_peer_addr() << dendl;
     p->pipe_lock.Lock();
+    p->discard_queue();
     p->pipe_lock.Unlock();
     p->unregister_pipe();
     assert(pipes.count(p));
     pipes.erase(p);
     p->join();
-    p->discard_queue();
     dout(10) << "reaper reaped pipe " << p << " " << p->get_peer_addr() << dendl;
     assert(p->sd < 0);
     delete p;
index 2f1a5aa6e6f1e6ef74d1c09322dd12e4abc994c2..59cb6e9c875f542aec092f230d11437f822a10c4 100644 (file)
@@ -200,6 +200,20 @@ private:
     static const Pipe& Server(int s);
     static const Pipe& Client(const entity_addr_t& pi);
 
+    //callers make sure it's not already enqueued or you'll just
+    //push it to the back of the line!
+    //Also, call with pipe_lock held or bad things happen
+    void enqueue_me(int priority) {
+      if (!queue_items.count(priority))
+       queue_items[priority] = new xlist<Pipe *>::item(this);
+      pipe_lock.Unlock();
+      rank->local_endpoint->endpoint_lock.Lock();
+      rank->local_endpoint->
+       queued_pipes[priority].push_back(queue_items[priority]);
+      rank->local_endpoint->endpoint_lock.Unlock();
+      pipe_lock.Lock();
+    }
+
     //we have two queue_received's to allow local signal delivery
     // via Message * (that doesn't actually point to a Message)
     void queue_received(Message *m, int priority) {
@@ -207,19 +221,13 @@ private:
       list<Message *>& queue = in_q[priority];
 
       pipe_lock.Lock();
+      bool was_empty = queue.empty();
       queue.push_back(m);
-      if ( 1 == queue.size()) { //this pipe isn't on the endpoint queue
+      if (was_empty) { //this pipe isn't on the endpoint queue
        if (!queue_items.count(priority)) { //create an item for that priority
-         pair< int, xlist<Pipe *>::item* >
-           pair_item(priority, new xlist<Pipe *>::item(this));
-         queue_items.insert(pair_item);
+         queue_items[priority] = new xlist<Pipe *>::item(this);
        }
-       pipe_lock.Unlock();
-       rank->local_endpoint->endpoint_lock.Lock();
-       rank->local_endpoint->
-         queued_pipes[priority].push_back(queue_items[priority]);
-       rank->local_endpoint->endpoint_lock.Unlock();
-       pipe_lock.Lock();
+       enqueue_me(priority);
       }
       pipe_lock.Unlock();
 
@@ -228,6 +236,7 @@ private:
       ++rank->local_endpoint->qlen;
       rank->local_endpoint->cond.Signal();
       rank->local_endpoint->endpoint_lock.Unlock();
+      dout(0) << "finished queuing received message " << m << "in msgr " << rank << dendl;
     }
     
     void queue_received(Message *m) {
@@ -371,7 +380,14 @@ private:
       dispatch_thread(this) {
       local_pipe = new Pipe(r, Pipe::STATE_OPEN);
     }
-    ~Endpoint() { delete local_pipe; }
+    ~Endpoint() {
+      for (map< int, xlist<Pipe *> >::iterator i = queued_pipes.begin();
+          i != queued_pipes.end();
+          ++i) {
+       i->second.clear();
+      }
+      delete local_pipe;
+    }
 
     void destroy() {
       // join dispatch thread