]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: rename locks for clarity; move some around to prevent blocking.
authorGreg Farnum <gregf@hq.newdream.net>
Wed, 9 Dec 2009 23:12:00 +0000 (15:12 -0800)
committerGreg Farnum <gregf@hq.newdream.net>
Thu, 10 Dec 2009 22:33:31 +0000 (14:33 -0800)
src/msg/SimpleMessenger.cc
src/msg/SimpleMessenger.h

index e22e9cab8e497dd5d3e3ac275210ba4353823a1a..17841c9ac8738473dea12683f6d1cf0c0aea11f4 100644 (file)
@@ -261,7 +261,7 @@ void SimpleMessenger::Endpoint::dispatch_entry()
 {
   dout(0) << "entered SimpleMessenger::Endpoint::dispatch_entry" << dendl;
   map<int, xlist<Pipe *> >::reverse_iterator high_iter;
-  lock.Lock();
+  endpoint_lock.Lock();
   while (!stop) {
     dout(0) << "in outer !stop loop of SimpleMessenger::Endpoint::dispatch_entry" << dendl;
     while (!queued_pipes.empty()) {
@@ -273,7 +273,7 @@ void SimpleMessenger::Endpoint::dispatch_entry()
       Pipe *pipe = pipe_list.front();
       dout(0) << "high priority: " << priority << " taking pipe " << pipe << dendl;
       //move pipe to back of line -- or just take off if no more messages
-      pipe->lock.Lock();
+      pipe->pipe_lock.Lock();
       list<Message *>& m_queue = pipe->in_q[priority];
       pipe_list.pop_front();
       if (m_queue.size() > 1) {
@@ -283,32 +283,32 @@ void SimpleMessenger::Endpoint::dispatch_entry()
       if (pipe_list.empty())
        queued_pipes.erase(priority);
       --qlen;
-      lock.Unlock(); //done with the pipe queue for a while
+      endpoint_lock.Unlock(); //done with the pipe queue for a while
 
       //get message from pipe
       Message *m = m_queue.front();
       m_queue.pop_front();
-      pipe->lock.Unlock(); // done with the pipe's message queue now
+      pipe->pipe_lock.Unlock(); // done with the pipe's message queue now
       {
        if ((long)m == D_BAD_REMOTE_RESET) {
-         lock.Lock();
+         endpoint_lock.Lock();
          Connection *con = remote_reset_q.front();
          remote_reset_q.pop_front();
-         lock.Unlock();
+         endpoint_lock.Unlock();
          ms_deliver_handle_remote_reset(con);
          con->put();
        } else if ((long)m == D_CONNECT) {
-         lock.Lock();
+         endpoint_lock.Lock();
          Connection *con = connect_q.front();
          connect_q.pop_front();
-         lock.Unlock();
+         endpoint_lock.Unlock();
          ms_deliver_handle_connect(con);
          con->put();
        } else if ((long)m == D_BAD_RESET) {
-         lock.Lock();
+         endpoint_lock.Lock();
          Connection *con = reset_q.front();
          reset_q.pop_front();
-         lock.Unlock();
+         endpoint_lock.Unlock();
          ms_deliver_handle_reset(con);
          con->put();
        } else {
@@ -325,11 +325,11 @@ void SimpleMessenger::Endpoint::dispatch_entry()
          dout(20) << "done calling dispatch on " << m << dendl;
        }
       }
-      lock.Lock();
+      endpoint_lock.Lock();
     }
-    cond.Wait(lock); //wait for something to get put on queue
+    cond.Wait(endpoint_lock); //wait for something to get put on queue
   }
-  lock.Unlock();
+  endpoint_lock.Unlock();
   dout(15) << "dispatch: ending loop " << dendl;
   
   // deregister
@@ -356,10 +356,10 @@ int SimpleMessenger::Endpoint::shutdown()
     stop = true;
   } else {
     dout(10) << "shutdown i am not dispatch, setting stop flag and joining thread." << dendl;
-    lock.Lock();
+    endpoint_lock.Lock();
     stop = true;
     cond.Signal();
-    lock.Unlock();
+    endpoint_lock.Unlock();
   }
   return 0;
 }
@@ -657,14 +657,14 @@ int SimpleMessenger::Pipe::accept()
     // existing?
     if (rank->rank_pipe.count(peer_addr)) {
       existing = rank->rank_pipe[peer_addr];
-      existing->lock.Lock();
+      existing->pipe_lock.Lock();
 
       if (connect.global_seq < existing->peer_global_seq) {
        dout(10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq
                 << " > " << connect.global_seq << ", RETRY_GLOBAL" << dendl;
        reply.tag = CEPH_MSGR_TAG_RETRY_GLOBAL;
        reply.global_seq = existing->peer_global_seq;  // so we can send it below..
-       existing->lock.Unlock();
+       existing->pipe_lock.Unlock();
        rank->lock.Unlock();
        goto reply;
       } else {
@@ -707,7 +707,7 @@ int SimpleMessenger::Pipe::accept()
                   << " > " << connect.connect_seq << ", RETRY_SESSION" << dendl;
          reply.tag = CEPH_MSGR_TAG_RETRY_SESSION;
          reply.connect_seq = existing->connect_seq;  // so we can send it below..
-         existing->lock.Unlock();
+         existing->pipe_lock.Unlock();
          rank->lock.Unlock();
          goto reply;
        }
@@ -731,7 +731,7 @@ int SimpleMessenger::Pipe::accept()
          assert(peer_addr > rank->rank_addr);
          assert(existing->state == STATE_CONNECTING); // this will win
          reply.tag = CEPH_MSGR_TAG_WAIT;
-         existing->lock.Unlock();
+         existing->pipe_lock.Unlock();
          rank->lock.Unlock();
          goto reply;
        }
@@ -745,7 +745,7 @@ int SimpleMessenger::Pipe::accept()
                 << "), sending RESETSESSION" << dendl;
        reply.tag = CEPH_MSGR_TAG_RESETSESSION;
        rank->lock.Unlock();
-       existing->lock.Unlock();
+       existing->pipe_lock.Unlock();
        goto reply;
       }
 
@@ -796,7 +796,7 @@ int SimpleMessenger::Pipe::accept()
        p++)
     out_q[p->first].splice(out_q[p->first].begin(), p->second);
   
-  existing->lock.Unlock();
+  existing->pipe_lock.Unlock();
 
  open:
   // open
@@ -827,21 +827,21 @@ int SimpleMessenger::Pipe::accept()
       goto fail_unlocked;
   }
 
-  lock.Lock();
+  pipe_lock.Lock();
   if (state != STATE_CLOSED) {
     dout(10) << "accept starting writer, " << "state=" << state << dendl;
     start_writer();
   }
   dout(20) << "accept done" << dendl;
-  lock.Unlock();
+  pipe_lock.Unlock();
   return 0;   // success.
 
 
  fail_unlocked:
-  lock.Lock();
+  pipe_lock.Lock();
   state = STATE_CLOSED;
   fault();
-  lock.Unlock();
+  pipe_lock.Unlock();
   return -1;
 }
 
@@ -850,7 +850,7 @@ int SimpleMessenger::Pipe::connect()
   bool got_bad_auth = false;
 
   dout(10) << "connect " << connect_seq << dendl;
-  assert(lock.is_locked());
+  assert(pipe_lock.is_locked());
 
   if (sd >= 0) {
     ::close(sd);
@@ -863,7 +863,7 @@ int SimpleMessenger::Pipe::connect()
   // stop reader thrad
   join_reader();
 
-  lock.Unlock();
+  pipe_lock.Unlock();
   
   char tag = -1;
   int rc;
@@ -1049,7 +1049,7 @@ int SimpleMessenger::Pipe::connect()
       }
     }
 
-    lock.Lock();
+    pipe_lock.Lock();
     if (state != STATE_CONNECTING) {
       dout(0) << "connect got RESETSESSION but no longer connecting" << dendl;
       goto stop_locked;
@@ -1066,7 +1066,7 @@ int SimpleMessenger::Pipe::connect()
       if (got_bad_auth)
         goto stop_locked;
       got_bad_auth = true;
-      lock.Unlock();
+      pipe_lock.Unlock();
       authorizer = rank->get_authorizer(peer_type, true);  // try harder
       continue;
     }
@@ -1074,14 +1074,14 @@ int SimpleMessenger::Pipe::connect()
       dout(0) << "connect got RESETSESSION" << dendl;
       was_session_reset();
       cseq = 0;
-      lock.Unlock();
+      pipe_lock.Unlock();
       continue;
     }
     if (reply.tag == CEPH_MSGR_TAG_RETRY_GLOBAL) {
       gseq = rank->get_global_seq(reply.global_seq);
       dout(10) << "connect got RETRY_GLOBAL " << reply.global_seq
               << " chose new " << gseq << dendl;
-      lock.Unlock();
+      pipe_lock.Unlock();
       continue;
     }
     if (reply.tag == CEPH_MSGR_TAG_RETRY_SESSION) {
@@ -1089,7 +1089,7 @@ int SimpleMessenger::Pipe::connect()
       dout(10) << "connect got RETRY_SESSION " << connect_seq
               << " -> " << reply.connect_seq << dendl;
       cseq = connect_seq = reply.connect_seq;
-      lock.Unlock();
+      pipe_lock.Unlock();
       continue;
     }
 
@@ -1125,7 +1125,7 @@ int SimpleMessenger::Pipe::connect()
   }
 
  fail:
-  lock.Lock();
+  pipe_lock.Lock();
  fail_locked:
   if (state == STATE_CONNECTING)
     fault();
@@ -1189,7 +1189,7 @@ void SimpleMessenger::Pipe::discard_queue()
 
 void SimpleMessenger::Pipe::fault(bool onconnect, bool onread)
 {
-  assert(lock.is_locked());
+  assert(pipe_lock.is_locked());
   cond.Signal();
 
   if (onread && state == STATE_CONNECTING) {
@@ -1248,7 +1248,7 @@ void SimpleMessenger::Pipe::fault(bool onconnect, bool onread)
     backoff.set_from_double(g_conf.ms_initial_backoff);
   } else {
     dout(10) << "fault waiting " << backoff << dendl;
-    cond.WaitInterval(lock, backoff);
+    cond.WaitInterval(pipe_lock, backoff);
     backoff += backoff;
     if (backoff > g_conf.ms_max_backoff)
       backoff.set_from_double(g_conf.ms_max_backoff);
@@ -1259,7 +1259,7 @@ void SimpleMessenger::Pipe::fault(bool onconnect, bool onread)
 void SimpleMessenger::Pipe::fail()
 {
   derr(10) << "fail" << dendl;
-  assert(lock.is_locked());
+  assert(pipe_lock.is_locked());
 
   stop();
 
@@ -1271,7 +1271,7 @@ void SimpleMessenger::Pipe::fail()
 
 void SimpleMessenger::Pipe::was_session_reset()
 {
-  assert(lock.is_locked());
+  assert(pipe_lock.is_locked());
 
   dout(10) << "was_session_reset" << dendl;
   discard_queue();
@@ -1287,7 +1287,7 @@ void SimpleMessenger::Pipe::was_session_reset()
 void SimpleMessenger::Pipe::stop()
 {
   dout(10) << "stop" << dendl;
-  assert(lock.is_locked());
+  assert(pipe_lock.is_locked());
   state = STATE_CLOSED;
   cond.Signal();
   if (sd >= 0) {
@@ -1309,28 +1309,28 @@ void SimpleMessenger::Pipe::reader()
   if (state == STATE_ACCEPTING) 
     accept();
 
-  lock.Lock();
+  pipe_lock.Lock();
 
   // loop.
   while (state != STATE_CLOSED &&
         state != STATE_CONNECTING) {
-    assert(lock.is_locked());
+    assert(pipe_lock.is_locked());
 
     // sleep if (re)connecting
     if (state == STATE_STANDBY) {
       dout(20) << "reader sleeping during reconnect|standby" << dendl;
-      cond.Wait(lock);
+      cond.Wait(pipe_lock);
       continue;
     }
 
-    lock.Unlock();
+    pipe_lock.Unlock();
 
     char buf[80];
     char tag = -1;
     dout(20) << "reader reading tag..." << dendl;
     int rc = tcp_read(sd, (char*)&tag, 1);
     if (rc < 0) {
-      lock.Lock();
+      pipe_lock.Lock();
       dout(2) << "reader couldn't read tag, " << strerror_r(errno, buf, sizeof(buf)) << dendl;
       fault(false, true);
       continue;
@@ -1338,7 +1338,7 @@ void SimpleMessenger::Pipe::reader()
 
     if (tag == CEPH_MSGR_TAG_KEEPALIVE) {
       dout(20) << "reader got KEEPALIVE" << dendl;
-      lock.Lock();
+      pipe_lock.Lock();
       continue;
     }
 
@@ -1347,7 +1347,7 @@ void SimpleMessenger::Pipe::reader()
       dout(20) << "reader got ACK" << dendl;
       __le64 seq;
       int rc = tcp_read( sd, (char*)&seq, sizeof(seq));
-      lock.Lock();
+      pipe_lock.Lock();
       if (rc < 0) {
        dout(2) << "reader couldn't read ack seq, " << strerror_r(errno, buf, sizeof(buf)) << dendl;
        fault(false, true);
@@ -1370,7 +1370,7 @@ void SimpleMessenger::Pipe::reader()
       dout(20) << "reader got MSG" << dendl;
       Message *m = read_message();
 
-      lock.Lock();
+      pipe_lock.Lock();
       
       if (!m) {
        derr(2) << "reader read null message, " << strerror_r(errno, buf, sizeof(buf)) << dendl;
@@ -1406,7 +1406,7 @@ void SimpleMessenger::Pipe::reader()
       }
 
       cond.Signal();  // wake up writer, to ack this
-      lock.Unlock();
+      pipe_lock.Unlock();
       
       dout(10) << "reader got message "
               << m->get_seq() << " " << m << " " << *m
@@ -1418,12 +1418,12 @@ void SimpleMessenger::Pipe::reader()
       else derr(0) << "reader got message " << *m
                   << "but there is no endpoint!" << dendl;
 
-      lock.Lock();
+      pipe_lock.Lock();
     } 
     
     else if (tag == CEPH_MSGR_TAG_CLOSE) {
       dout(20) << "reader got CLOSE" << dendl;
-      lock.Lock();
+      pipe_lock.Lock();
       if (state == STATE_CLOSING)
        state = STATE_CLOSED;
       else
@@ -1433,7 +1433,7 @@ void SimpleMessenger::Pipe::reader()
     }
     else {
       dout(0) << "reader bad tag " << (int)tag << dendl;
-      lock.Lock();
+      pipe_lock.Lock();
       fault(false, true);
     }
   }
@@ -1452,7 +1452,7 @@ void SimpleMessenger::Pipe::writer()
 {
   char buf[80];
 
-  lock.Lock();
+  pipe_lock.Lock();
 
   while (state != STATE_CLOSED) {// && state != STATE_WAIT) {
     dout(10) << "writer: state = " << state << " policy.server=" << policy.server << dendl;
@@ -1476,9 +1476,9 @@ void SimpleMessenger::Pipe::writer()
       dout(20) << "writer writing CLOSE tag" << dendl;
       char tag = CEPH_MSGR_TAG_CLOSE;
       state = STATE_CLOSED;
-      lock.Unlock();
+      pipe_lock.Unlock();
       if (sd) ::write(sd, &tag, 1);
-      lock.Lock();
+      pipe_lock.Lock();
       continue;
     }
 
@@ -1487,9 +1487,9 @@ void SimpleMessenger::Pipe::writer()
 
       // keepalive?
       if (keepalive) {
-       lock.Unlock();
+       pipe_lock.Unlock();
        int rc = write_keepalive();
-       lock.Lock();
+       pipe_lock.Lock();
        if (rc < 0) {
          dout(2) << "writer couldn't write keepalive, " << strerror_r(errno, buf, sizeof(buf)) << dendl;
          fault();
@@ -1501,9 +1501,9 @@ void SimpleMessenger::Pipe::writer()
       // send ack?
       if (in_seq > in_seq_acked) {
        int send_seq = in_seq;
-       lock.Unlock();
+       pipe_lock.Unlock();
        int rc = write_ack(send_seq);
-       lock.Lock();
+       pipe_lock.Lock();
        if (rc < 0) {
          dout(2) << "writer couldn't write ack, " << strerror_r(errno, buf, sizeof(buf)) << dendl;
          fault();
@@ -1518,7 +1518,7 @@ void SimpleMessenger::Pipe::writer()
        m->set_seq(++out_seq);
        sent.push_back(m); // move to sent list
        m->get();
-       lock.Unlock();
+       pipe_lock.Unlock();
 
         dout(20) << "writer encoding " << m->get_seq() << " " << m << " " << *m << dendl;
 
@@ -1528,7 +1528,7 @@ void SimpleMessenger::Pipe::writer()
         dout(20) << "writer sending " << m->get_seq() << " " << m << dendl;
        int rc = write_message(m);
 
-       lock.Lock();
+       pipe_lock.Lock();
        if (rc < 0) {
           derr(1) << "writer error sending " << m << ", "
                  << errno << ": " << strerror_r(errno, buf, sizeof(buf)) << dendl;
@@ -1541,7 +1541,7 @@ void SimpleMessenger::Pipe::writer()
     
     // wait
     dout(20) << "writer sleeping" << dendl;
-    cond.Wait(lock);
+    cond.Wait(pipe_lock);
   }
   
   dout(20) << "writer finishing" << dendl;
@@ -1562,7 +1562,7 @@ void SimpleMessenger::Pipe::unlock_maybe_reap()
       closed_socket();
     }
 
-    lock.Unlock();
+    pipe_lock.Unlock();
 
     // queue for reap
     dout(10) << "unlock_maybe_reap queueing for reap" << dendl;
@@ -1573,7 +1573,7 @@ void SimpleMessenger::Pipe::unlock_maybe_reap()
     }
     rank->lock.Unlock();
   } else {
-    lock.Unlock();
+    pipe_lock.Unlock();
   }
 }
 
@@ -1938,8 +1938,8 @@ void SimpleMessenger::reaper()
     Pipe *p = pipe_reap_queue.front();
     pipe_reap_queue.pop_front();
     dout(10) << "reaper reaping pipe " << p << " " << p->get_peer_addr() << dendl;
-    p->lock.Lock();
-    p->lock.Unlock();
+    p->pipe_lock.Lock();
+    p->pipe_lock.Unlock();
     p->unregister_pipe();
     assert(pipes.count(p));
     pipes.erase(p);
@@ -2215,17 +2215,17 @@ void SimpleMessenger::submit_message(Message *m, const entity_inst_t& dest, bool
       if (rank_pipe.count( dest_proc_addr )) {
         // connected?
         pipe = rank_pipe[ dest_proc_addr ];
-       pipe->lock.Lock();
+       pipe->pipe_lock.Lock();
        if (pipe->state == Pipe::STATE_CLOSED) {
          dout(20) << "submit_message " << *m << " remote, " << dest_addr << ", ignoring old closed pipe." << dendl;
          pipe->unregister_pipe();
-         pipe->lock.Unlock();
+         pipe->pipe_lock.Unlock();
          pipe = 0;
        } else {
          dout(20) << "submit_message " << *m << " remote, " << dest_addr << ", have pipe." << dendl;
 
          pipe->_send(m);
-         pipe->lock.Unlock();
+         pipe->pipe_lock.Unlock();
        }
       }
       if (!pipe) {
@@ -2259,16 +2259,16 @@ void SimpleMessenger::send_keepalive(const entity_inst_t& dest)
       if (rank_pipe.count( dest_proc_addr )) {
         // connected?
         pipe = rank_pipe[ dest_proc_addr ];
-       pipe->lock.Lock();
+       pipe->pipe_lock.Lock();
        if (pipe->state == Pipe::STATE_CLOSED) {
          dout(20) << "send_keepalive remote, " << dest_addr << ", ignoring old closed pipe." << dendl;
          pipe->unregister_pipe();
-         pipe->lock.Unlock();
+         pipe->pipe_lock.Unlock();
          pipe = 0;
        } else {
          dout(20) << "send_keepalive remote, " << dest_addr << ", have pipe." << dendl;
          pipe->_send_keepalive();
-         pipe->lock.Unlock();
+         pipe->pipe_lock.Unlock();
        }
       }
       if (!pipe)
@@ -2314,9 +2314,9 @@ void SimpleMessenger::wait()
     while (!rank_pipe.empty()) {
       Pipe *p = rank_pipe.begin()->second;
       p->unregister_pipe();
-      p->lock.Lock();
+      p->pipe_lock.Lock();
       p->stop();
-      p->lock.Unlock();
+      p->pipe_lock.Unlock();
     }
 
     reaper();
@@ -2346,9 +2346,9 @@ void SimpleMessenger::mark_down(entity_addr_t addr)
     Pipe *p = rank_pipe[addr];
     dout(1) << "mark_down " << addr << " -- " << p << dendl;
     p->unregister_pipe();
-    p->lock.Lock();
+    p->pipe_lock.Lock();
     p->stop();
-    p->lock.Unlock();
+    p->pipe_lock.Unlock();
   } else {
     dout(1) << "mark_down " << addr << " -- pipe dne" << dendl;
   }
index c023984fde68605e19a35d490d6405b18ab7d66e..c41972cdaf999b15ff3d52931b041ad2cdd80bc0 100644 (file)
@@ -98,7 +98,7 @@ private:
     entity_addr_t peer_addr;
     Policy policy;
     
-    Mutex lock;
+    Mutex pipe_lock;
     int state;
 
   protected:
@@ -159,7 +159,7 @@ private:
     Pipe(SimpleMessenger *r, int st) : 
       rank(r),
       sd(-1), peer_type(-1),
-      lock("SimpleMessenger::Pipe::lock"),
+      pipe_lock("SimpleMessenger::Pipe::pipe_lock"),
       state(st), 
       connection_state(new Connection),
       reader_running(false), writer_running(false),
@@ -191,9 +191,9 @@ private:
        return;
       cond.Signal();
       reader_thread.kill(SIGUSR2);
-      lock.Unlock();
+      pipe_lock.Unlock();
       reader_thread.join();
-      lock.Lock();
+      pipe_lock.Lock();
     }
 
     // public constructors
@@ -206,7 +206,7 @@ private:
       dout(0) << "queuing received message " << m << "in msgr " << rank << dendl;
       list<Message *>& queue = in_q[priority];
 
-      lock.Lock();
+      pipe_lock.Lock();
       queue.push_back(m);
       if ( 1 == queue.size()) { //this pipe isn't on the endpoint queue
        if (!queue_items.count(priority)) { //create an item for that priority
@@ -214,18 +214,20 @@ private:
            pair_item(priority, new xlist<Pipe *>::item(this));
          queue_items.insert(pair_item);
        }
-       rank->local_endpoint->queue_lock.Lock();
+       pipe_lock.Unlock();
+       rank->local_endpoint->endpoint_lock.Lock();
        rank->local_endpoint->
          queued_pipes[priority].push_back(queue_items[priority]);
-       rank->local_endpoint->queue_lock.Unlock();
+       rank->local_endpoint->endpoint_lock.Unlock();
+       pipe_lock.Lock();
       }
-      lock.Unlock();
+      pipe_lock.Unlock();
 
       //increment queue length counter
-      rank->local_endpoint->lock.Lock();
+      rank->local_endpoint->endpoint_lock.Lock();
       ++rank->local_endpoint->qlen;
       rank->local_endpoint->cond.Signal();
-      rank->local_endpoint->lock.Unlock();
+      rank->local_endpoint->endpoint_lock.Unlock();
     }
     
     void queue_received(Message *m) {
@@ -259,9 +261,9 @@ private:
     void stop();
 
     void send(Message *m) {
-      lock.Lock();
+      pipe_lock.Lock();
       _send(m);
-      lock.Unlock();
+      pipe_lock.Unlock();
     }    
     void _send(Message *m) {
       m->get();
@@ -269,9 +271,9 @@ private:
       cond.Signal();
     }
     void send_keepalive() {
-      lock.Lock();
+      pipe_lock.Lock();
       _send_keepalive();
-      lock.Unlock();
+      pipe_lock.Unlock();
     }    
     void _send_keepalive() {
       keepalive = true;
@@ -304,9 +306,8 @@ private:
   class Endpoint : public Messenger {
     SimpleMessenger *rank;
     Pipe *local_pipe;
-    Mutex lock;
+    Mutex endpoint_lock;
     Cond cond;
-    Mutex queue_lock;
     map<int, xlist<Pipe *> > queued_pipes;
     map<int, xlist<Pipe *>::iterator> queued_pipe_iters;
     bool stop;
@@ -341,33 +342,32 @@ private:
     }
 
     void queue_connect(Connection *con) {
-      lock.Lock();
+      endpoint_lock.Lock();
       connect_q.push_back(con);
       local_delivery((Message*)D_CONNECT, CEPH_MSG_PRIO_HIGHEST);
       cond.Signal();
-      lock.Unlock();
+      endpoint_lock.Unlock();
     }
     void queue_remote_reset(Connection *con) {
-      lock.Lock();
+      endpoint_lock.Lock();
       remote_reset_q.push_back(con);
       local_delivery((Message*)D_BAD_REMOTE_RESET, CEPH_MSG_PRIO_HIGHEST);
       cond.Signal();
-      lock.Unlock();
+      endpoint_lock.Unlock();
     }
     void queue_reset(Connection *con) {
-      lock.Lock();
+      endpoint_lock.Lock();
       reset_q.push_back(con);
       local_delivery((Message*)D_BAD_RESET, CEPH_MSG_PRIO_HIGHEST);
       cond.Signal();
-      lock.Unlock();
+      endpoint_lock.Unlock();
     }
 
   public:
     Endpoint(SimpleMessenger *r, entity_name_t name, int rn) : 
       Messenger(name),
       rank(r),
-      lock("SimpleMessenger::Endpoint::lock"),
-      queue_lock("SimpleMessenger::Endpoint:queue_lock"),
+      endpoint_lock("SimpleMessenger::Endpoint::endpoint_lock"),
       stop(false),
       qlen(0),
       my_rank(rn),