]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: Cover qlen with a Spinlock for faster access and to limit blocking
authorGreg Farnum <gregf@hq.newdream.net>
Thu, 10 Dec 2009 19:16:40 +0000 (11:16 -0800)
committerGreg Farnum <gregf@hq.newdream.net>
Thu, 10 Dec 2009 22:33:32 +0000 (14:33 -0800)
src/msg/SimpleMessenger.cc
src/msg/SimpleMessenger.h

index 2482790f9260022d1099c80ba09ae63582ab1d9e..bab1af7374fbfbb44ff79a48fce42b2cf0b8e860 100644 (file)
@@ -283,8 +283,10 @@ void SimpleMessenger::Endpoint::dispatch_entry()
       }
       if (pipe_list.empty())
        queued_pipes.erase(priority);
-      --qlen;
       endpoint_lock.Unlock(); //done with the pipe queue for a while
+      qlen_lock.lock();
+      --qlen;
+      qlen_lock.unlock();
 
       //get message from pipe
       Message *m = m_queue.front();
@@ -1187,6 +1189,7 @@ void SimpleMessenger::Pipe::discard_queue()
         i != queue_items.end();
         ++i)
       i->second->remove_myself();
+    rank->local_endpoint->endpoint_lock.Unlock();
     endpoint = true;
     pipe_lock.Lock();
   }
@@ -1200,13 +1203,13 @@ void SimpleMessenger::Pipe::discard_queue()
   for (map<int,list<Message*> >::iterator p = in_q.begin(); p != in_q.end(); p++) {
     if (endpoint) {
       int size = in_q.size();
+      rank->local_endpoint->qlen_lock.lock();
       rank->local_endpoint->qlen -= size;
+      rank->local_endpoint->qlen_lock.unlock();
     }
     for (list<Message*>::iterator r = p->second.begin(); r != p->second.end(); r++)
       delete *r;
   }
-  if (endpoint)
-    rank->local_endpoint->endpoint_lock.Unlock();
   in_q.clear();
 }
 
index 051e62af3a0fc964e52c876683323a378135c261..154b45260d335af2090d3a021c9a6b0eda227e53 100644 (file)
@@ -26,6 +26,7 @@ using namespace std;
 using namespace __gnu_cxx;
 
 #include "common/Mutex.h"
+#include "common/Spinlock.h"
 #include "common/Cond.h"
 #include "common/Thread.h"
 
@@ -232,10 +233,10 @@ private:
       pipe_lock.Unlock();
 
       //increment queue length counter
-      rank->local_endpoint->endpoint_lock.Lock();
+      rank->local_endpoint->qlen_lock.lock();
       ++rank->local_endpoint->qlen;
+      rank->local_endpoint->qlen_lock.unlock();
       rank->local_endpoint->cond.Signal();
-      rank->local_endpoint->endpoint_lock.Unlock();
       dout(0) << "finished queuing received message " << m << "in msgr " << rank << dendl;
     }
     
@@ -321,6 +322,7 @@ private:
     map<int, xlist<Pipe *>::iterator> queued_pipe_iters;
     bool stop;
     int qlen;
+    Spinlock qlen_lock;
     int my_rank;
 
     class DispatchThread : public Thread {
@@ -376,6 +378,7 @@ private:
       endpoint_lock("SimpleMessenger::Endpoint::endpoint_lock"),
       stop(false),
       qlen(0),
+      qlen_lock("SimpleMessenger::Endpoint::qlen_lock"),
       my_rank(rn),
       dispatch_thread(this) {
       local_pipe = new Pipe(r, Pipe::STATE_OPEN);