]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: clean up refs to static 'rank'
authorSage Weil <sage@newdream.net>
Fri, 1 May 2009 13:55:54 +0000 (06:55 -0700)
committerSage Weil <sage@newdream.net>
Fri, 1 May 2009 14:12:14 +0000 (07:12 -0700)
src/msg/SimpleMessenger.cc
src/msg/SimpleMessenger.h

index b6eaadb85e631c1ecdd41538c0624a48404d2251..b967ca0d83027128d4115f5cd7ca394acfd42a1d 100644 (file)
@@ -48,7 +48,7 @@ static ostream& _prefix() {
 
 
 // help find socket resource leaks
-int sockopen = 0;
+//static int sockopen = 0;
 #define closed_socket() //dout(20) << "closed_socket " << --sockopen << dendl;
 #define opened_socket() //dout(20) << "opened_socket " << ++sockopen << dendl;
 
@@ -65,36 +65,6 @@ sighandler_t old_sigint_handler = 0;
  * Accepter
  */
 
-void simplemessenger_sigint(int r)
-{
-  rank.sigint();
-  if (old_sigint_handler)
-    old_sigint_handler(r);
-}
-
-void Rank::sigint()
-{
-  lock.Lock();
-  derr(0) << "got control-c, exiting" << dendl;
-  
-  // force close listener socket
-  if (accepter.listen_sd >= 0) {
-    ::close(accepter.listen_sd);
-    accepter.listen_sd = -1;
-    closed_socket();
-  }
-
-  // force close all pipe sockets, too
-  for (hash_map<entity_addr_t, Pipe*>::iterator p = rank_pipe.begin();
-       p != rank_pipe.end();
-       ++p) 
-    p->second->force_close();
-
-  lock.Unlock();
-}
-
-
-
 void noop_signal_handler(int s)
 {
   //dout(0) << "blah_handler got " << s << dendl;
@@ -160,24 +130,24 @@ int Rank::Accepter::bind(int64_t force_nonce)
     return -errno;
   }
   
-  rank.rank_addr = g_my_addr;
-  if (rank.rank_addr != entity_addr_t())
-    rank.need_addr = false;
+  rank->rank_addr = g_my_addr;
+  if (rank->rank_addr != entity_addr_t())
+    rank->need_addr = false;
   else 
-    rank.need_addr = true;
-  if (rank.rank_addr.get_port() == 0) {
+    rank->need_addr = true;
+  if (rank->rank_addr.get_port() == 0) {
     entity_addr_t tmp;
     tmp.ipaddr = listen_addr;
-    rank.rank_addr.set_port(tmp.get_port());
+    rank->rank_addr.set_port(tmp.get_port());
     if (force_nonce >= 0)
-      rank.rank_addr.nonce = force_nonce;
+      rank->rank_addr.nonce = force_nonce;
     else
-      rank.rank_addr.nonce = getpid(); // FIXME: pid might not be best choice here.
+      rank->rank_addr.nonce = getpid(); // FIXME: pid might not be best choice here.
   }
-  rank.rank_addr.erank = 0;
+  rank->rank_addr.erank = 0;
 
-  dout(1) << "accepter.bind rank_addr is " << rank.rank_addr 
-         << " need_addr=" << rank.need_addr
+  dout(1) << "accepter.bind rank_addr is " << rank->rank_addr 
+         << " need_addr=" << rank->need_addr
          << dendl;
   return 0;
 }
@@ -185,8 +155,6 @@ int Rank::Accepter::bind(int64_t force_nonce)
 int Rank::Accepter::start()
 {
   dout(1) << "accepter.start" << dendl;
-  // set up signal handler
-  //old_sigint_handler = signal(SIGINT, simplemessenger_sigint);
 
   // set a harmless handle for SIGUSR1 (we'll use it to stop the accepter)
   struct sigaction sa;
@@ -235,14 +203,14 @@ void *Rank::Accepter::entry()
          dout(0) << "accepter could't set TCP_NODELAY: " << strerror(errno) << dendl;
       }
       
-      rank.lock.Lock();
-      if (rank.num_local > 0) {
-       Pipe *p = new Pipe(Pipe::STATE_ACCEPTING);
+      rank->lock.Lock();
+      if (rank->num_local > 0) {
+       Pipe *p = new Pipe(rank, Pipe::STATE_ACCEPTING);
        p->sd = sd;
        p->start_reader();
-       rank.pipes.insert(p);
+       rank->pipes.insert(p);
       }
-      rank.lock.Unlock();
+      rank->lock.Unlock();
     } else {
       dout(0) << "accepter no incoming connection?  sd = " << sd << " errno " << errno << " " << strerror(errno) << dendl;
       if (++errors > 4)
@@ -366,7 +334,7 @@ static void remove_pid_file()
 int Rank::start(bool nodaemon)
 {
   // register at least one entity, first!
-  assert(rank.my_type >= 0); 
+  assert(my_type >= 0); 
 
   lock.Lock();
   if (started) {
@@ -428,13 +396,13 @@ int Rank::start(bool nodaemon)
  */
 Rank::Pipe *Rank::connect_rank(const entity_addr_t& addr, const Policy& p)
 {
-  assert(rank.lock.is_locked());
-  assert(addr != rank.rank_addr);
+  assert(lock.is_locked());
+  assert(addr != rank_addr);
   
   dout(10) << "connect_rank to " << addr << ", creating pipe and registering" << dendl;
   
   // create pipe
-  Pipe *pipe = new Pipe(Pipe::STATE_CONNECTING);
+  Pipe *pipe = new Pipe(this, Pipe::STATE_CONNECTING);
   pipe->policy = p;
   pipe->peer_addr = addr;
   pipe->start_writer();
@@ -453,14 +421,14 @@ Rank::Pipe *Rank::connect_rank(const entity_addr_t& addr, const Policy& p)
 
 /* register_entity 
  */
-Rank::EntityMessenger *Rank::register_entity(entity_name_t name)
+Rank::Endpoint *Rank::register_entity(entity_name_t name)
 {
   dout(10) << "register_entity " << name << dendl;
   lock.Lock();
   
   // create messenger
   int erank = max_local;
-  EntityMessenger *msgr = new EntityMessenger(name, erank);
+  Endpoint *msgr = new Endpoint(this, name, erank);
 
   // now i know my type.
   if (my_type >= 0)
@@ -492,7 +460,7 @@ Rank::EntityMessenger *Rank::register_entity(entity_name_t name)
 }
 
 
-void Rank::unregister_entity(EntityMessenger *msgr)
+void Rank::unregister_entity(Endpoint *msgr)
 {
   lock.Lock();
   dout(10) << "unregister_entity " << msgr->get_myname() << dendl;
@@ -653,10 +621,10 @@ void Rank::wait()
 
 
 /**********************************
- * EntityMessenger
+ * Endpoint
  */
 
-void Rank::EntityMessenger::dispatch_entry()
+void Rank::Endpoint::dispatch_entry()
 {
   lock.Lock();
   while (!stop) {
@@ -727,11 +695,11 @@ void Rank::EntityMessenger::dispatch_entry()
   dout(15) << "dispatch: ending loop " << dendl;
 
   // deregister
-  rank.unregister_entity(this);
+  rank->unregister_entity(this);
   put();
 }
 
-void Rank::EntityMessenger::ready()
+void Rank::Endpoint::ready()
 {
   dout(10) << "ready " << get_myaddr() << dendl;
   assert(!dispatch_thread.is_started());
@@ -740,7 +708,7 @@ void Rank::EntityMessenger::ready()
 }
 
 
-int Rank::EntityMessenger::shutdown()
+int Rank::Endpoint::shutdown()
 {
   dout(10) << "shutdown " << get_myaddr() << dendl;
   
@@ -758,24 +726,24 @@ int Rank::EntityMessenger::shutdown()
   return 0;
 }
 
-void Rank::EntityMessenger::suicide()
+void Rank::Endpoint::suicide()
 {
   dout(10) << "suicide " << get_myaddr() << dendl;
   shutdown();
   // hmm, or exit(0)?
 }
 
-void Rank::EntityMessenger::prepare_dest(const entity_inst_t& inst)
+void Rank::Endpoint::prepare_dest(const entity_inst_t& inst)
 {
-  rank.lock.Lock();
+  rank->lock.Lock();
   {
-    if (rank.rank_pipe.count(inst.addr) == 0)
-      rank.connect_rank(inst.addr, rank.policy_map[inst.name.type()]);
+    if (rank->rank_pipe.count(inst.addr) == 0)
+      rank->connect_rank(inst.addr, rank->policy_map[inst.name.type()]);
   }
-  rank.lock.Unlock();
+  rank->lock.Unlock();
 }
 
-int Rank::EntityMessenger::send_message(Message *m, entity_inst_t dest)
+int Rank::Endpoint::send_message(Message *m, entity_inst_t dest)
 {
   // set envelope
   m->set_source_inst(_myinst);
@@ -795,12 +763,12 @@ int Rank::EntityMessenger::send_message(Message *m, entity_inst_t dest)
          << " " << m 
          << dendl;
 
-  rank.submit_message(m, dest.addr);
+  rank->submit_message(m, dest.addr);
 
   return 0;
 }
 
-int Rank::EntityMessenger::forward_message(Message *m, entity_inst_t dest)
+int Rank::Endpoint::forward_message(Message *m, entity_inst_t dest)
 {
   // set envelope
   m->set_source_inst(_myinst);
@@ -819,14 +787,14 @@ int Rank::EntityMessenger::forward_message(Message *m, entity_inst_t dest)
          << " " << m 
           << dendl;
 
-  rank.submit_message(m, dest.addr);
+  rank->submit_message(m, dest.addr);
 
   return 0;
 }
 
 
 
-int Rank::EntityMessenger::lazy_send_message(Message *m, entity_inst_t dest)
+int Rank::Endpoint::lazy_send_message(Message *m, entity_inst_t dest)
 {
   // set envelope
   m->set_source_inst(_myinst);
@@ -846,14 +814,14 @@ int Rank::EntityMessenger::lazy_send_message(Message *m, entity_inst_t dest)
          << " " << m 
           << dendl;
 
-  rank.submit_message(m, dest.addr, true);
+  rank->submit_message(m, dest.addr, true);
 
   return 0;
 }
 
 
 
-void Rank::EntityMessenger::reset_myname(entity_name_t newname)
+void Rank::Endpoint::reset_myname(entity_name_t newname)
 {
   entity_name_t oldname = get_myname();
   dout(10) << "reset_myname " << oldname << " to " << newname << dendl;
@@ -861,9 +829,9 @@ void Rank::EntityMessenger::reset_myname(entity_name_t newname)
 }
 
 
-void Rank::EntityMessenger::mark_down(entity_addr_t a)
+void Rank::Endpoint::mark_down(entity_addr_t a)
 {
-  rank.mark_down(a);
+  rank->mark_down(a);
 }
 
 void Rank::mark_down(entity_addr_t addr)
@@ -894,7 +862,7 @@ void Rank::mark_down(entity_addr_t addr)
 #define dout_prefix _pipe_prefix()
 ostream& Rank::Pipe::_pipe_prefix() {
   return *_dout << dbeginl << pthread_self()
-               << " -- " << rank.rank_addr << " >> " << peer_addr << " pipe(" << this
+               << " -- " << rank->rank_addr << " >> " << peer_addr << " pipe(" << this
                << " sd=" << sd
                << " pgs=" << peer_global_seq
                << " cs=" << connect_seq
@@ -917,7 +885,7 @@ int Rank::Pipe::accept()
   }
 
   // and my addr
-  rc = tcp_write(sd, (char*)&rank.rank_addr, sizeof(rank.rank_addr));
+  rc = tcp_write(sd, (char*)&rank->rank_addr, sizeof(rank->rank_addr));
   if (rc < 0) {
     dout(10) << "accept couldn't write my addr" << dendl;
     state = STATE_CLOSED;
@@ -977,17 +945,17 @@ int Rank::Pipe::accept()
             << " global_seq " << connect.global_seq
             << dendl;
     
-    rank.lock.Lock();
+    rank->lock.Lock();
 
     // note peer's type, flags
-    policy = rank.policy_map[connect.host_type];  /* apply policy */
+    policy = rank->policy_map[connect.host_type];  /* apply policy */
     lossy_rx = connect.flags & CEPH_MSG_CONNECT_LOSSY;
 
     memset(&reply, 0, sizeof(reply));
 
     // existing?
-    if (rank.rank_pipe.count(peer_addr)) {
-      existing = rank.rank_pipe[peer_addr];
+    if (rank->rank_pipe.count(peer_addr)) {
+      existing = rank->rank_pipe[peer_addr];
       existing->lock.Lock();
 
       if (connect.global_seq < existing->peer_global_seq) {
@@ -996,7 +964,7 @@ int Rank::Pipe::accept()
        reply.tag = CEPH_MSGR_TAG_RETRY_GLOBAL;
        reply.global_seq = existing->peer_global_seq;  // so we can send it below..
        existing->lock.Unlock();
-       rank.lock.Unlock();
+       rank->lock.Unlock();
        goto reply;
       } else {
        dout(10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq
@@ -1038,14 +1006,14 @@ int Rank::Pipe::accept()
          reply.tag = CEPH_MSGR_TAG_RETRY_SESSION;
          reply.connect_seq = existing->connect_seq;  // so we can send it below..
          existing->lock.Unlock();
-         rank.lock.Unlock();
+         rank->lock.Unlock();
          goto reply;
        }
       }
 
       if (connect.connect_seq == existing->connect_seq) {
        // connection race?
-       if (peer_addr < rank.rank_addr) {
+       if (peer_addr < rank->rank_addr) {
          // incoming wins
          dout(10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq
                   << " == " << connect.connect_seq << ", replacing my attempt" << dendl;
@@ -1057,11 +1025,11 @@ int Rank::Pipe::accept()
          // our existing outgoing wins
          dout(10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq
                   << " == " << connect.connect_seq << ", sending WAIT" << dendl;
-         assert(peer_addr > rank.rank_addr);
+         assert(peer_addr > rank->rank_addr);
          assert(existing->state == STATE_CONNECTING); // this will win
          reply.tag = CEPH_MSGR_TAG_WAIT;
          existing->lock.Unlock();
-         rank.lock.Unlock();
+         rank->lock.Unlock();
          goto reply;
        }
       }
@@ -1073,7 +1041,7 @@ int Rank::Pipe::accept()
                 << ", " << existing << ".cseq = " << existing->connect_seq
                 << "), sending RESETSESSION" << dendl;
        reply.tag = CEPH_MSGR_TAG_RESETSESSION;
-       rank.lock.Unlock();
+       rank->lock.Unlock();
        existing->lock.Unlock();
        goto reply;
       }
@@ -1086,7 +1054,7 @@ int Rank::Pipe::accept()
     else if (connect.connect_seq > 0) {
       // we reset, and they are opening a new session
       dout(0) << "accept we reset (peer sent cseq " << connect.connect_seq << "), sending RESETSESSION" << dendl;
-      rank.lock.Unlock();
+      rank->lock.Unlock();
       reply.tag = CEPH_MSGR_TAG_RESETSESSION;
       goto reply;
     } else {
@@ -1129,7 +1097,7 @@ int Rank::Pipe::accept()
 
   // send READY reply
   reply.tag = CEPH_MSGR_TAG_READY;
-  reply.global_seq = rank.get_global_seq();
+  reply.global_seq = rank->get_global_seq();
   reply.connect_seq = connect_seq;
   reply.flags = 0;
   if (policy.lossy_tx)
@@ -1137,7 +1105,7 @@ int Rank::Pipe::accept()
 
   // ok!
   register_pipe();
-  rank.lock.Unlock();
+  rank->lock.Unlock();
 
   rc = tcp_write(sd, (char*)&reply, sizeof(reply));
   if (rc < 0)
@@ -1154,7 +1122,7 @@ int Rank::Pipe::accept()
 
 
  fail:
-  rank.lock.Unlock();
+  rank->lock.Unlock();
  fail_unlocked:
   lock.Lock();
   state = STATE_CLOSED;
@@ -1174,7 +1142,7 @@ int Rank::Pipe::connect()
     closed_socket();
   }
   __u32 cseq = connect_seq;
-  __u32 gseq = rank.get_global_seq();
+  __u32 gseq = rank->get_global_seq();
 
   // stop reader thrad
   join_reader();
@@ -1273,8 +1241,8 @@ int Rank::Pipe::connect()
   
   // identify myself
   memset(&msg, 0, sizeof(msg));
-  msgvec[0].iov_base = (char*)&rank.rank_addr;
-  msgvec[0].iov_len = sizeof(rank.rank_addr);
+  msgvec[0].iov_base = (char*)&rank->rank_addr;
+  msgvec[0].iov_len = sizeof(rank->rank_addr);
   msg.msg_iov = msgvec;
   msg.msg_iovlen = 1;
   msglen = msgvec[0].iov_len;
@@ -1282,11 +1250,11 @@ int Rank::Pipe::connect()
     dout(2) << "connect couldn't write my addr, " << strerror(errno) << dendl;
     goto fail;
   }
-  dout(10) << "connect sent my addr " << rank.rank_addr << dendl;
+  dout(10) << "connect sent my addr " << rank->rank_addr << dendl;
 
   while (1) {
     ceph_msg_connect connect;
-    connect.host_type = rank.my_type;
+    connect.host_type = rank->my_type;
     connect.global_seq = gseq;
     connect.connect_seq = cseq;
     connect.flags = 0;
@@ -1331,7 +1299,7 @@ int Rank::Pipe::connect()
       continue;
     }
     if (reply.tag == CEPH_MSGR_TAG_RETRY_GLOBAL) {
-      gseq = rank.get_global_seq(reply.global_seq);
+      gseq = rank->get_global_seq(reply.global_seq);
       dout(10) << "connect got RETRY_GLOBAL " << reply.global_seq
               << " chose new " << gseq << dendl;
       lock.Unlock();
@@ -1389,18 +1357,18 @@ int Rank::Pipe::connect()
 void Rank::Pipe::register_pipe()
 {
   dout(10) << "register_pipe" << dendl;
-  assert(rank.lock.is_locked());
-  assert(rank.rank_pipe.count(peer_addr) == 0);
-  rank.rank_pipe[peer_addr] = this;
+  assert(rank->lock.is_locked());
+  assert(rank->rank_pipe.count(peer_addr) == 0);
+  rank->rank_pipe[peer_addr] = this;
 }
 
 void Rank::Pipe::unregister_pipe()
 {
-  assert(rank.lock.is_locked());
-  if (rank.rank_pipe.count(peer_addr) &&
-      rank.rank_pipe[peer_addr] == this) {
+  assert(rank->lock.is_locked());
+  if (rank->rank_pipe.count(peer_addr) &&
+      rank->rank_pipe[peer_addr] == this) {
     dout(10) << "unregister_pipe" << dendl;
-    rank.rank_pipe.erase(peer_addr);
+    rank->rank_pipe.erase(peer_addr);
   } else {
     dout(10) << "unregister_pipe - not registered" << dendl;
   }
@@ -1519,15 +1487,15 @@ void Rank::Pipe::fail()
   stop();
   report_failures();
 
-  for (unsigned i=0; i<rank.local.size(); i++) 
-    if (rank.local[i] && rank.local[i]->get_dispatcher())
-      rank.local[i]->queue_reset(peer_addr, last_dest_name);
+  for (unsigned i=0; i<rank->local.size(); i++) 
+    if (rank->local[i] && rank->local[i]->get_dispatcher())
+      rank->local[i]->queue_reset(peer_addr, last_dest_name);
 
   // unregister
   lock.Unlock();
-  rank.lock.Lock();
+  rank->lock.Lock();
   unregister_pipe();
-  rank.lock.Unlock();
+  rank->lock.Unlock();
   lock.Lock();
 }
 
@@ -1537,9 +1505,9 @@ void Rank::Pipe::was_session_reset()
 
   dout(10) << "was_session_reset" << dendl;
   report_failures();
-  for (unsigned i=0; i<rank.local.size(); i++) 
-    if (rank.local[i] && rank.local[i]->get_dispatcher())
-      rank.local[i]->queue_remote_reset(peer_addr, last_dest_name);
+  for (unsigned i=0; i<rank->local.size(); i++) 
+    if (rank->local[i] && rank->local[i]->get_dispatcher())
+      rank->local[i]->queue_remote_reset(peer_addr, last_dest_name);
 
   out_seq = 0;
   in_seq = 0;
@@ -1557,13 +1525,13 @@ void Rank::Pipe::report_failures()
 
     if (policy.drop_msg_callback) {
       unsigned srcrank = m->get_source_inst().addr.erank;
-      if (srcrank >= rank.max_local || rank.local[srcrank] == 0) {
+      if (srcrank >= rank->max_local || rank->local[srcrank] == 0) {
        dout(1) << "fail on " << *m << ", srcrank " << srcrank << " dne, dropping" << dendl;
-      } else if (rank.local[srcrank]->is_stopped()) {
+      } else if (rank->local[srcrank]->is_stopped()) {
        dout(1) << "fail on " << *m << ", dispatcher stopping, ignoring." << dendl;
       } else {
        dout(10) << "fail on " << *m << dendl;
-       rank.local[srcrank]->queue_failure(m, m->get_dest_inst());
+       rank->local[srcrank]->queue_failure(m, m->get_dest_inst());
       }
     }
     m->put();
@@ -1685,14 +1653,14 @@ void Rank::Pipe::reader()
               << " for " << m->get_dest() << dendl;
       
       // deliver
-      EntityMessenger *entity = 0;
+      Endpoint *entity = 0;
       
-      rank.lock.Lock();
+      rank->lock.Lock();
       {
        unsigned erank = m->get_dest_inst().addr.erank;
-       if (erank < rank.max_local && rank.local[erank]) {
+       if (erank < rank->max_local && rank->local[erank]) {
          // find entity
-         entity = rank.local[erank];
+         entity = rank->local[erank];
          entity->get();
 
          // first message?
@@ -1702,18 +1670,18 @@ void Rank::Pipe::reader()
            entity->need_addr = false;
          }
 
-         if (rank.need_addr) {
-           rank.rank_addr = m->get_dest_inst().addr;
-           rank.rank_addr.erank = 0;
-           dout(2) << "reader rank_addr is " << rank.rank_addr << dendl;
-           rank.need_addr = false;
+         if (rank->need_addr) {
+           rank->rank_addr = m->get_dest_inst().addr;
+           rank->rank_addr.erank = 0;
+           dout(2) << "reader rank_addr is " << rank->rank_addr << dendl;
+           rank->need_addr = false;
          }
 
        } else {
          derr(0) << "reader got message " << *m << " for " << m->get_dest() << ", which isn't local" << dendl;
        }
       }
-      rank.lock.Unlock();
+      rank->lock.Unlock();
       
       if (entity) {
        entity->queue_message(m);        // queue
@@ -1756,12 +1724,12 @@ void Rank::Pipe::reader()
       sd = -1;
       closed_socket();
     }
-    rank.lock.Lock();
+    rank->lock.Lock();
     {
-      rank.pipe_reap_queue.push_back(this);
-      rank.wait_cond.Signal();
+      rank->pipe_reap_queue.push_back(this);
+      rank->wait_cond.Signal();
     }
-    rank.lock.Unlock();
+    rank->lock.Unlock();
   }
 
   dout(10) << "reader done" << dendl;
@@ -1875,12 +1843,12 @@ void Rank::Pipe::writer()
       sd = -1;
       closed_socket();
     }
-    rank.lock.Lock();
+    rank->lock.Lock();
     {
-      rank.pipe_reap_queue.push_back(this);
-      rank.wait_cond.Signal();
+      rank->pipe_reap_queue.push_back(this);
+      rank->wait_cond.Signal();
     }
-    rank.lock.Unlock();
+    rank->lock.Unlock();
   }
 
   dout(10) << "writer done" << dendl;
index 1e4a4ef00684de6b592f8eca370f92d2a6bca81d..98f808c7cebc2edbaf027a308704db7f4b738ff0 100644 (file)
@@ -82,16 +82,17 @@ public:
   void sigint();
 
 private:
-  class EntityMessenger;
+  class Endpoint;
   class Pipe;
 
   // incoming
   class Accepter : public Thread {
   public:
+    Rank *rank;
     bool done;
     int listen_sd;
     
-    Accepter() : done(false), listen_sd(-1) {}
+    Accepter(Rank *r) : rank(r), done(false), listen_sd(-1) {}
     
     void *entry();
     void stop();
@@ -104,6 +105,7 @@ private:
   // pipe
   class Pipe {
   public:
+    Rank *rank;
     ostream& _pipe_prefix();
 
     enum {
@@ -177,7 +179,8 @@ private:
     friend class Writer;
     
   public:
-    Pipe(int st) : 
+    Pipe(Rank *r, int st) : 
+      rank(r),
       sd(-1),
       lock("Rank::Pipe::lock"),
       state(st), 
@@ -260,7 +263,8 @@ private:
 
 
   // messenger interface
-  class EntityMessenger : public Messenger {
+  class Endpoint : public Messenger {
+    Rank *rank;
     Mutex lock;
     Cond cond;
     map<int, list<Message*> > dispatch_queue;
@@ -272,9 +276,9 @@ private:
 
   private:
     class DispatchThread : public Thread {
-      EntityMessenger *m;
+      Endpoint *m;
     public:
-      DispatchThread(EntityMessenger *_m) : m(_m) {}
+      DispatchThread(Endpoint *_m) : m(_m) {}
       void *entry() {
         m->dispatch_entry();
         return 0;
@@ -327,15 +331,16 @@ private:
     }
 
   public:
-    EntityMessenger(entity_name_t name, int r) : 
+    Endpoint(Rank *r, entity_name_t name, int rn) : 
       Messenger(name),
-      lock("Rank::EntityMessenger::lock"),
+      rank(r),
+      lock("Rank::Endpoint::lock"),
       stop(false),
       qlen(0),
-      my_rank(r),
+      my_rank(rn),
       need_addr(false),
       dispatch_thread(this) { }
-    ~EntityMessenger() { }
+    ~Endpoint() { }
 
     void destroy() {
       // join dispatch thread
@@ -380,7 +385,7 @@ private:
   
   // local
   unsigned max_local, num_local;
-  vector<EntityMessenger*> local;
+  vector<Endpoint*> local;
   vector<bool>             stopped;
   
   // remote
@@ -404,7 +409,8 @@ private:
   void reaper();
 
 public:
-  Rank() : lock("Rank::lock"), started(false), need_addr(true),
+  Rank() : accepter(this),
+          lock("Rank::lock"), started(false), need_addr(true),
           max_local(0), num_local(0),
           my_type(-1),
           global_seq_lock("Rank::global_seq_lock"), global_seq(0) { }
@@ -423,15 +429,15 @@ public:
     return ++global_seq;
   }
 
-  EntityMessenger *register_entity(entity_name_t addr);
-  void rename_entity(EntityMessenger *ms, entity_name_t newaddr);
-  void unregister_entity(EntityMessenger *ms);
+  Endpoint *register_entity(entity_name_t addr);
+  void rename_entity(Endpoint *ms, entity_name_t newaddr);
+  void unregister_entity(Endpoint *ms);
 
   void submit_message(Message *m, const entity_addr_t& addr, bool lazy=false);  
   void prepare_dest(const entity_inst_t& inst);
 
   // create a new messenger
-  EntityMessenger *new_entity(entity_name_t addr);
+  Endpoint *new_entity(entity_name_t addr);
 
   void set_policy(int type, Policy p) {
     policy_map[type] = p;