]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: maintain per-pipe in_qlen; avoid list.size(); some cleanup
authorSage Weil <sage@newdream.net>
Fri, 11 Dec 2009 21:40:53 +0000 (13:40 -0800)
committerSage Weil <sage@newdream.net>
Fri, 11 Dec 2009 21:40:53 +0000 (13:40 -0800)
src/msg/SimpleMessenger.cc
src/msg/SimpleMessenger.h

index 3f390887d360f32f4fb6452cee15b7415b5681a0..665a1e3ce90849a67e534d4be5a3f788f5ded0b2 100644 (file)
@@ -281,20 +281,23 @@ void SimpleMessenger::Endpoint::dispatch_entry()
       //move pipe to back of line -- or just take off if no more messages
       pipe->pipe_lock.Lock();
       list<Message *>& m_queue = pipe->in_q[priority];
-      pipe_list.pop_front();
-      if (m_queue.size() > 1) {
-       pipe_list.push_back(pipe->queue_items[priority]);
+      Message *m = m_queue.front();
+      m_queue.pop_front();
+
+      if (m_queue.empty()) {
+       pipe_list.pop_front();  // pipe is done
+       if (pipe_list.empty())
+         queued_pipes.erase(priority);
+      } else {
+       pipe_list.push_back(pipe->queue_items[priority]);  // move to end of list
       }
-      if (pipe_list.empty())
-       queued_pipes.erase(priority);
       endpoint_lock.Unlock(); //done with the pipe queue for a while
+
+      pipe->in_qlen--;
       qlen_lock.lock();
-      --qlen;
+      qlen--;
       qlen_lock.unlock();
 
-      //get message from pipe
-      Message *m = m_queue.front();
-      m_queue.pop_front();
       pipe->pipe_lock.Unlock(); // done with the pipe's message queue now
       {
        if ((long)m == D_BAD_REMOTE_RESET) {
@@ -1200,15 +1203,27 @@ void SimpleMessenger::Pipe::discard_queue()
     rank->local_endpoint->endpoint_lock.Lock();//to remove from round-robin
     for (map<int, xlist<Pipe *>::item* >::iterator i = queue_items.begin();
         i != queue_items.end();
-        ++i)
+        ++i) {
       if ((list_on = i->second->get_xlist())) { //if in round-robin
        i->second->remove_myself(); //take off
        if (!list_on->size()) //if round-robin queue is empty
          rank->local_endpoint->queued_pipes.erase(i->first); //remove from map
       }
+    }
     rank->local_endpoint->endpoint_lock.Unlock();
     endpoint = true;
     pipe_lock.Lock();
+
+    // clear queue_items
+    while (!queue_items.empty()) {
+      delete queue_items.begin()->second;
+      queue_items.erase(queue_items.begin());
+    }
+
+    // adjust qlen
+    rank->local_endpoint->qlen_lock.lock();
+    rank->local_endpoint->qlen -= in_qlen;
+    rank->local_endpoint->qlen_lock.unlock();
   }
   for (list<Message*>::iterator p = sent.begin(); p != sent.end(); p++)
     (*p)->put();
@@ -1217,17 +1232,11 @@ void SimpleMessenger::Pipe::discard_queue()
     for (list<Message*>::iterator r = p->second.begin(); r != p->second.end(); r++)
       (*r)->put();
   out_q.clear();
-  for (map<int,list<Message*> >::iterator p = in_q.begin(); p != in_q.end(); p++) {
-    if (endpoint) {
-      int size = in_q.size();
-      rank->local_endpoint->qlen_lock.lock();
-      rank->local_endpoint->qlen -= size;
-      rank->local_endpoint->qlen_lock.unlock();
-    }
+  for (map<int,list<Message*> >::iterator p = in_q.begin(); p != in_q.end(); p++)
     for (list<Message*>::iterator r = p->second.begin(); r != p->second.end(); r++)
       delete *r;
-  }
   in_q.clear();
+  in_qlen = 0;
 }
 
 
index 86396f2c947c4d08a925b922f9ad90e026d0b00f..c181214caaba60c611f19eac5d7214450ac3d78b 100644 (file)
@@ -124,7 +124,8 @@ private:
 
     map<int, list<Message*> > out_q;  // priority queue for outbound msgs
     map<int, list<Message*> > in_q; // and inbound ones
-    map<int, xlist<Pipe *>::item* > queue_items; //put in msgr queue
+    int in_qlen;
+    map<int, xlist<Pipe *>::item* > queue_items; // _map_ protected by pipe_lock, *item protected by endpoint_lock.
     list<Message*> sent;
     Cond cond;
     bool keepalive;
@@ -175,7 +176,7 @@ private:
       state(st), 
       connection_state(new Connection),
       reader_running(false), writer_running(false),
-      keepalive(false),
+      in_qlen(0), keepalive(false),
       connect_seq(0), peer_global_seq(0),
       out_seq(0), in_seq(0), in_seq_acked(0),
       reader_thread(this), writer_thread(this) { }
@@ -220,8 +221,9 @@ private:
        queue_items[priority] = new xlist<Pipe *>::item(this);
       pipe_lock.Unlock();
       rank->local_endpoint->endpoint_lock.Lock();
-      rank->local_endpoint->
-       queued_pipes[priority].push_back(queue_items[priority]);
+      if (rank->local_endpoint->queued_pipes.empty())
+       rank->local_endpoint->cond.Signal();
+      rank->local_endpoint->queued_pipes[priority].push_back(queue_items[priority]);
       rank->local_endpoint->endpoint_lock.Unlock();
       pipe_lock.Lock();
     }
@@ -235,25 +237,22 @@ private:
       pipe_lock.Lock();
       bool was_empty = queue.empty();
       queue.push_back(m);
-      if (was_empty) { //this pipe isn't on the endpoint queue
-       if (!queue_items.count(priority)) { //create an item for that priority
-         queue_items[priority] = new xlist<Pipe *>::item(this);
-       }
+      if (was_empty) //this pipe isn't on the endpoint queue
        enqueue_me(priority);
-      }
-      pipe_lock.Unlock();
 
-      //increment queue length counter
+      //increment queue length counters
+      in_qlen++;
       rank->local_endpoint->qlen_lock.lock();
       ++rank->local_endpoint->qlen;
       rank->local_endpoint->qlen_lock.unlock();
-      rank->local_endpoint->cond.Signal();
+
+      pipe_lock.Unlock();
     }
     
     void queue_received(Message *m) {
       m->set_recv_stamp(g_clock.now());
-      assert (m->nref.test() == 0);
-      queue_received(m, m->get_priority() );
+      assert(m->nref.test() == 0);
+      queue_received(m, m->get_priority());
     }
 
     __u32 get_out_seq() { return out_seq; }