From: Greg Farnum Date: Thu, 10 Dec 2009 19:16:40 +0000 (-0800) Subject: msgr: Cover qlen with a Spinlock for faster access and to limit blocking X-Git-Tag: v0.19~263 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=eb371aa65ef9a25966f6abd7bc46ecd01005755d;p=ceph.git msgr: Cover qlen with a Spinlock for faster access and to limit blocking --- diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index 2482790f9260..bab1af7374fb 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -283,8 +283,10 @@ void SimpleMessenger::Endpoint::dispatch_entry() } if (pipe_list.empty()) queued_pipes.erase(priority); - --qlen; endpoint_lock.Unlock(); //done with the pipe queue for a while + qlen_lock.lock(); + --qlen; + qlen_lock.unlock(); //get message from pipe Message *m = m_queue.front(); @@ -1187,6 +1189,7 @@ void SimpleMessenger::Pipe::discard_queue() i != queue_items.end(); ++i) i->second->remove_myself(); + rank->local_endpoint->endpoint_lock.Unlock(); endpoint = true; pipe_lock.Lock(); } @@ -1200,13 +1203,13 @@ void SimpleMessenger::Pipe::discard_queue() for (map >::iterator p = in_q.begin(); p != in_q.end(); p++) { if (endpoint) { int size = in_q.size(); + rank->local_endpoint->qlen_lock.lock(); rank->local_endpoint->qlen -= size; + rank->local_endpoint->qlen_lock.unlock(); } for (list::iterator r = p->second.begin(); r != p->second.end(); r++) delete *r; } - if (endpoint) - rank->local_endpoint->endpoint_lock.Unlock(); in_q.clear(); } diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index 051e62af3a0f..154b45260d33 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -26,6 +26,7 @@ using namespace std; using namespace __gnu_cxx; #include "common/Mutex.h" +#include "common/Spinlock.h" #include "common/Cond.h" #include "common/Thread.h" @@ -232,10 +233,10 @@ private: pipe_lock.Unlock(); //increment queue length counter - rank->local_endpoint->endpoint_lock.Lock(); + rank->local_endpoint->qlen_lock.lock(); ++rank->local_endpoint->qlen; + rank->local_endpoint->qlen_lock.unlock(); rank->local_endpoint->cond.Signal(); - rank->local_endpoint->endpoint_lock.Unlock(); dout(0) << "finished queuing received message " << m << "in msgr " << rank << dendl; } @@ -321,6 +322,7 @@ private: map::iterator> queued_pipe_iters; bool stop; int qlen; + Spinlock qlen_lock; int my_rank; class DispatchThread : public Thread { @@ -376,6 +378,7 @@ private: endpoint_lock("SimpleMessenger::Endpoint::endpoint_lock"), stop(false), qlen(0), + qlen_lock("SimpleMessenger::Endpoint::qlen_lock"), my_rank(rn), dispatch_thread(this) { local_pipe = new Pipe(r, Pipe::STATE_OPEN);