]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: remove 'dst' from ceph_msg_header
authorSage Weil <sage@newdream.net>
Thu, 27 Aug 2009 20:50:28 +0000 (13:50 -0700)
committerSage Weil <sage@newdream.net>
Thu, 27 Aug 2009 20:50:28 +0000 (13:50 -0700)
Shouldn't be there anyway.  We do still need to keep erank in there
though.

14 files changed:
src/cosd.cc
src/include/msgr.h
src/messages/MClientMountAck.h
src/messages/MClientReply.h
src/messages/MMonMap.h
src/messages/MOSDOpReply.h
src/mon/ClientMonitor.cc
src/mon/MonClient.cc
src/mon/MonClient.h
src/mon/Monitor.cc
src/msg/Dispatcher.h
src/msg/Message.h
src/msg/SimpleMessenger.cc
src/msg/SimpleMessenger.h

index f96854a6726f2643678156bef1bb8b08db255c2c..f67c54ca024ebd56186956f9684b70011c8a66af 100644 (file)
@@ -125,6 +125,8 @@ int main(int argc, const char **argv)
   SimpleMessenger rank;
   rank.bind();
 
+  rank.set_addr(mc.get_my_addr());
+
   cout << "starting osd" << whoami
        << " at " << rank.get_rank_addr() 
        << " osd_data " << g_conf.osd_data
index 50859cf4f47fdce83d60c4a9f691e8441bf03d81..1764012f870d108022b7b6a550a6da324efa4505 100644 (file)
@@ -21,7 +21,7 @@
  * whenever the wire protocol changes.  try to keep this string length
  * constant.
  */
-#define CEPH_BANNER "ceph 014\n"
+#define CEPH_BANNER "ceph 015\n"
 #define CEPH_BANNER_MAX_LEN 30
 
 
@@ -133,7 +133,8 @@ struct ceph_msg_header {
        __u8 osd_protocol, osdc_protocol;  /* internal and public */
        __u8 mds_protocol, mdsc_protocol;
 
-       struct ceph_entity_inst src, orig_src, dst;
+       struct ceph_entity_inst src, orig_src;
+       __le32 dst_erank;
        __le32 crc;       /* header crc32c */
 } __attribute__ ((packed));
 
index fb5b7f0eaa66c81709cb7093b12696bba391e596..37020df92e200354c60bd74f5f03643c209ef1e4 100644 (file)
 #include "msg/Message.h"
 
 struct MClientMountAck : public Message {
+  __s32 client;
   __s32 result;
   cstring result_msg;
   bufferlist monmap_bl;
   bufferlist signed_ticket;
 
-  MClientMountAck(int r = 0, const char *msg = 0) :
+  MClientMountAck(int c = -1, int r = 0, const char *msg = 0) :
     Message(CEPH_MSG_CLIENT_MOUNT_ACK),
-    result(r),
+    client(c), result(r),
     result_msg(msg) { }
 
   const char *get_type_name() { return "client_mount_ack"; }
   void print(ostream& o) {
-    o << "client_mount_ack(" << result;
+    o << "client_mount_ack(client" << client << " " << result;
     if (result_msg.length()) o << " " << result_msg;
     if (monmap_bl.length()) o << " + monmap";
     if (signed_ticket.length()) o << " + ticket";
@@ -39,12 +40,14 @@ struct MClientMountAck : public Message {
 
   void decode_payload() {
     bufferlist::iterator p = payload.begin();
+    ::decode(client, p);
     ::decode(result, p);
     ::decode(result_msg, p);
     ::decode(monmap_bl, p);
     ::decode(signed_ticket, p);
   }
   void encode_payload() {
+    ::encode(client, payload);
     ::encode(result, payload);
     ::encode(result_msg, payload);
     ::encode(monmap_bl, payload);
index e07977348adfdf610439faa91c9e1062431fe733..899a558593926c61de61f1db088e19669547c127 100644 (file)
@@ -202,7 +202,7 @@ public:
   }
   const char *get_type_name() { return "creply"; }
   void print(ostream& o) {
-    o << "client_reply(" << header.dst.name << ":" << head.tid;
+    o << "client_reply(???:" << head.tid;
     o << " = " << get_result();
     if (get_result() <= 0)
       o << " " << strerror(-get_result());
index e6e3e326d63ec1896c26f85dd40f76cd8ce7899c..b2f5d966fc010c46150f60268a5f7a65171c0c92 100644 (file)
 
 class MMonMap : public Message {
 public:
+  entity_addr_t addr;
   bufferlist monmapbl;
 
   MMonMap() : Message(CEPH_MSG_MON_MAP) { }
-  MMonMap(bufferlist &bl) : Message(CEPH_MSG_MON_MAP) { 
+  MMonMap(entity_addr_t t, bufferlist &bl) : Message(CEPH_MSG_MON_MAP) { 
+    addr = t;
     monmapbl.claim(bl);
   }
 
   const char *get_type_name() { return "mon_map"; }
 
   void encode_payload() { 
-    payload = monmapbl;
+    ::encode(addr, payload);
+    ::encode(monmapbl, payload);
   }
   void decode_payload() { 
-    monmapbl = payload;
+    bufferlist::iterator p = payload.begin();
+    ::decode(addr, p);
+    ::decode(monmapbl, p);
   }
 };
 
index 55fa0e841cfbd457acba0f020e0ef5563b7d4364..f5d15677d094ba3d2924b5424c0708b5bfd35a41 100644 (file)
@@ -55,9 +55,10 @@ class MOSDOpReply : public Message {
   // osdmap
   epoch_t get_map_epoch() { return head.osdmap_epoch; }
 
-  osd_reqid_t get_reqid() { return osd_reqid_t(get_dest(),
+  /*osd_reqid_t get_reqid() { return osd_reqid_t(get_dest(),
                                               head.client_inc,
                                               head.tid); }
+  */
 
 public:
   MOSDOpReply(MOSDOp *req, __s32 result, epoch_t e, int acktype) :
@@ -100,7 +101,7 @@ public:
   const char *get_type_name() { return "osd_op_reply"; }
   
   void print(ostream& out) {
-    out << "osd_op_reply(" << get_reqid()
+    out << "osd_op_reply(" << head.tid
        << " " << oid << " " << ops;
     if (may_write()) {
       if (is_ondisk())
index 3d3642f6d129a4090451169eeecc0152df39b8f3..be9998027e1a2010e5c14d856fcab23943b01693 100644 (file)
@@ -139,7 +139,7 @@ bool ClientMonitor::check_mount(MClientMount *m)
 
       string s;
       getline(ss, s);
-      mon->messenger->send_message(new MClientMountAck(-EPERM, s.c_str()),
+      mon->messenger->send_message(new MClientMountAck(-1, -EPERM, s.c_str()),
                                   m->get_orig_source_inst());
       return true;
     }
index 03fd483f1a1a13b8baea5cfdbf76984a50726ed8..25066ab3c0f1f34c50f15451e1d06ada6414b618 100644 (file)
@@ -174,6 +174,11 @@ void MonClient::handle_monmap(MMonMap *m)
 {
   dout(10) << "handle_monmap " << *m << dendl;
   monc_lock.Lock();
+
+  my_addr = m->addr;
+  messenger->_set_myaddr(m->addr);
+  dout(10) << " i am " << m->addr << dendl;
+
   bufferlist::iterator p = m->monmapbl.begin();
   ::decode(monmap, p);
   map_cond.Signal();
@@ -259,7 +264,7 @@ void MonClient::handle_mount_ack(MClientMountAck* m)
   p = signed_ticket.begin();
   ::decode(ticket, p);
 
-  messenger->reset_myname(m->get_dest());
+  messenger->reset_myname(entity_name_t::CLIENT(m->client));
 
   mount_cond.Signal();
 
index c7e0ebdfb1c255ea808ca8f16bf01b16f697d938..246ef7c36100b742d0fabddea373c0209939c092 100644 (file)
@@ -34,6 +34,8 @@ public:
 private:
   Messenger *messenger;
 
+  entity_addr_t my_addr;
+
   ClientTicket ticket;
   bufferlist signed_ticket;
 
@@ -87,6 +89,8 @@ private:
   }
   void pick_new_mon();
 
+  entity_addr_t get_my_addr() { return my_addr; }
+
   const ceph_fsid_t& get_fsid() {
     return monmap.fsid;
   }
index f672105cea9d443d37b60f637287372b765d3a89..5592cef386b1ed7fb993c30a5acdcf5065ca32e5 100644 (file)
@@ -360,7 +360,7 @@ bool Monitor::dispatch_impl(Message *m)
       ss << "client protocol v " << (int)m->get_header().monc_protocol << " != server v " << CEPH_MONC_PROTOCOL;
       string s;
       getline(ss, s);
-      messenger->send_message(new MClientMountAck(-EINVAL, s.c_str()),
+      messenger->send_message(new MClientMountAck(-1, -EINVAL, s.c_str()),
                              m->get_orig_source_inst());
     }
 
@@ -477,7 +477,7 @@ void Monitor::handle_mon_get_map(MMonGetMap *m)
   dout(10) << "handle_mon_get_map" << dendl;
   bufferlist bl;
   monmap->encode(bl);
-  messenger->send_message(new MMonMap(bl), m->get_orig_source_inst());
+  messenger->send_message(new MMonMap(m->get_orig_source_addr(), bl), m->get_orig_source_inst());
   delete m;
 }
 
index c2e42860e59f763929afddda958b589aa57a7d1a..3f06a54e7afef10915ccc864bad540bb316e1b2a 100644 (file)
@@ -64,18 +64,18 @@ public:
   }
 
   // how i deal with transmission failures.
-  virtual void ms_handle_failure(Message *m, const entity_inst_t& inst) {  }
+  virtual void ms_handle_failure(Message *m, const entity_addr_t& addr) {  }
 
   /*
    * on any connection reset.
    * this indicates that the ordered+reliable delivery semantics have 
    * been violated.  messages may have been lost.
    */
-  virtual void ms_handle_reset(const entity_addr_t& peer, entity_name_t last) { }
+  virtual void ms_handle_reset(const entity_addr_t& peer) { }
 
   // on deliberate reset of connection by remote
   //  implies incoming messages dropped; possibly/probably some of our previous outgoing too.
-  virtual void ms_handle_remote_reset(const entity_addr_t& peer, entity_name_t last) { }
+  virtual void ms_handle_remote_reset(const entity_addr_t& peer) { }
 };
 
 #endif
index 386df64b3bfddb26da20e07ea95a82506ee3cd5f..1d89994e102115bf6e94b54db3ded90e14bf547c 100644 (file)
@@ -222,10 +222,6 @@ public:
   void set_priority(__s16 p) { header.priority = p; }
 
   // source/dest
-  entity_inst_t get_dest_inst() { return entity_inst_t(header.dst); }
-  entity_name_t get_dest() { return entity_name_t(header.dst.name); }
-  void set_dest_inst(entity_inst_t& inst) { header.dst = inst; }
-
   entity_inst_t get_source_inst() { return entity_inst_t(header.src); }
   entity_name_t get_source() { return entity_name_t(header.src.name); }
   entity_addr_t get_source_addr() { return entity_addr_t(header.src.addr); }
index 554bc9a1231756fddd40b819159961bd1c674581..23377e758ad539bc95e771ae22c6bc0373526cf8 100644 (file)
@@ -286,29 +286,26 @@ void SimpleMessenger::Endpoint::dispatch_entry()
           ls.pop_front();
          if ((long)m == BAD_REMOTE_RESET) {
            lock.Lock();
-           entity_addr_t a = remote_reset_q.front().first;
-           entity_name_t n = remote_reset_q.front().second;
+           entity_addr_t a = remote_reset_q.front();
            remote_reset_q.pop_front();
            lock.Unlock();
-           get_dispatcher()->ms_handle_remote_reset(a, n);
+           get_dispatcher()->ms_handle_remote_reset(a);
          } else if ((long)m == BAD_RESET) {
            lock.Lock();
-           entity_addr_t a = reset_q.front().first;
-           entity_name_t n = reset_q.front().second;
+           entity_addr_t a = reset_q.front();
            reset_q.pop_front();
            lock.Unlock();
-           get_dispatcher()->ms_handle_reset(a, n);
+           get_dispatcher()->ms_handle_reset(a);
          } else if ((long)m == BAD_FAILED) {
            lock.Lock();
            m = failed_q.front().first;
-           entity_inst_t i = failed_q.front().second;
+           entity_addr_t a = failed_q.front().second;
            failed_q.pop_front();
            lock.Unlock();
-           get_dispatcher()->ms_handle_failure(m, i);
+           get_dispatcher()->ms_handle_failure(m, a);
            m->put();
          } else {
-           dout(1) << m->get_dest() 
-                   << " <== " << m->get_source_inst()
+           dout(1) << "<== " << m->get_source_inst()
                    << " " << m->get_seq()
                    << " ==== " << *m
                    << " ==== " << m->get_payload().length() << "+" << m->get_middle().length()
@@ -384,7 +381,6 @@ int SimpleMessenger::Endpoint::send_message(Message *m, entity_inst_t dest)
   // set envelope
   m->set_source_inst(_myinst);
   m->set_orig_source_inst(_myinst);
-  m->set_dest_inst(dest);
   if (!m->get_priority()) m->set_priority(get_default_send_priority());
  
   dout(1) << m->get_source()
@@ -394,7 +390,7 @@ int SimpleMessenger::Endpoint::send_message(Message *m, entity_inst_t dest)
          << " " << m 
          << dendl;
 
-  rank->submit_message(m, dest.addr);
+  rank->submit_message(m, dest);
 
   return 0;
 }
@@ -403,7 +399,6 @@ int SimpleMessenger::Endpoint::forward_message(Message *m, entity_inst_t dest)
 {
   // set envelope
   m->set_source_inst(_myinst);
-  m->set_dest_inst(dest);
   if (!m->get_priority()) m->set_priority(get_default_send_priority());
  
   dout(1) << m->get_source()
@@ -413,7 +408,7 @@ int SimpleMessenger::Endpoint::forward_message(Message *m, entity_inst_t dest)
          << " " << m 
           << dendl;
 
-  rank->submit_message(m, dest.addr);
+  rank->submit_message(m, dest);
 
   return 0;
 }
@@ -425,7 +420,6 @@ int SimpleMessenger::Endpoint::lazy_send_message(Message *m, entity_inst_t dest)
   // set envelope
   m->set_source_inst(_myinst);
   m->set_orig_source_inst(_myinst);
-  m->set_dest_inst(dest);
   if (!m->get_priority()) m->set_priority(get_default_send_priority());
  
   dout(1) << "lazy " << m->get_source()
@@ -435,7 +429,7 @@ int SimpleMessenger::Endpoint::lazy_send_message(Message *m, entity_inst_t dest)
          << " " << m 
           << dendl;
 
-  rank->submit_message(m, dest.addr, true);
+  rank->submit_message(m, dest, true);
 
   return 0;
 }
@@ -1100,7 +1094,7 @@ void SimpleMessenger::Pipe::fail()
 
   for (unsigned i=0; i<rank->local.size(); i++) 
     if (rank->local[i] && rank->local[i]->get_dispatcher())
-      rank->local[i]->queue_reset(peer_addr, last_dest_name);
+      rank->local[i]->queue_reset(peer_addr);
 
   // unregister
   lock.Unlock();
@@ -1118,7 +1112,7 @@ void SimpleMessenger::Pipe::was_session_reset()
   report_failures();
   for (unsigned i=0; i<rank->local.size(); i++) 
     if (rank->local[i] && rank->local[i]->get_dispatcher())
-      rank->local[i]->queue_remote_reset(peer_addr, last_dest_name);
+      rank->local[i]->queue_remote_reset(peer_addr);
 
   out_seq = 0;
   in_seq = 0;
@@ -1142,7 +1136,7 @@ void SimpleMessenger::Pipe::report_failures()
        dout(1) << "fail on " << *m << ", dispatcher stopping, ignoring." << dendl;
       } else {
        dout(10) << "fail on " << *m << dendl;
-       rank->local[srcrank]->queue_failure(m, m->get_dest_inst());
+       rank->local[srcrank]->queue_failure(m, peer_addr);
       }
     }
     m->put();
@@ -1248,7 +1242,6 @@ void SimpleMessenger::Pipe::reader()
       if (m->get_seq() <= in_seq) {
        dout(-10) << "reader got old message "
                  << m->get_seq() << " <= " << in_seq << " " << m << " " << *m
-                 << " for " << m->get_dest() 
                  << ", discarding" << dendl;
        delete m;
        continue;
@@ -1271,20 +1264,21 @@ void SimpleMessenger::Pipe::reader()
       
       dout(10) << "reader got message "
               << m->get_seq() << " " << m << " " << *m
-              << " for " << m->get_dest() << dendl;
+              << dendl;
       
       // deliver
       Endpoint *entity = 0;
       
       rank->lock.Lock();
       {
-       unsigned erank = m->get_dest_inst().addr.erank;
+       unsigned erank = m->get_header().dst_erank;
        if (erank < rank->max_local && rank->local[erank]) {
          // find entity
          entity = rank->local[erank];
          entity->get();
 
          // first message?
+         /*
          if (entity->need_addr) {
            entity->_set_myaddr(m->get_dest_inst().addr);
            dout(2) << "reader entity addr is " << entity->get_myaddr() << dendl;
@@ -1297,9 +1291,10 @@ void SimpleMessenger::Pipe::reader()
            dout(2) << "reader rank_addr is " << rank->rank_addr << dendl;
            rank->need_addr = false;
          }
+         */
 
        } else {
-         derr(0) << "reader got message " << *m << " for " << m->get_dest() << ", which isn't local" << dendl;
+         derr(0) << "reader got message " << *m << ", which isn't local" << dendl;
        }
       }
       rank->lock.Unlock();
@@ -1443,7 +1438,7 @@ void SimpleMessenger::Pipe::writer()
 
        lock.Lock();
        if (rc < 0) {
-          derr(1) << "writer error sending " << m << " to " << m->get_header().dst << ", "
+          derr(1) << "writer error sending " << m << ", "
                  << errno << ": " << strerror(errno) << dendl;
          fault();
         }
@@ -1497,7 +1492,7 @@ Message *SimpleMessenger::Pipe::read_message()
     return 0;
   
   dout(20) << "reader got envelope type=" << header.type
-           << " src " << header.src << " dst " << header.dst
+           << " src " << header.src
            << " front=" << header.front_len
           << " data=" << header.data_len
           << " off " << header.data_off
@@ -1687,7 +1682,7 @@ int SimpleMessenger::Pipe::write_message(Message *m)
   blist.append(m->get_middle());
   blist.append(m->get_data());
   
-  dout(20)  << "write_message " << m << " to " << header.dst << dendl;
+  dout(20)  << "write_message " << m << dendl;
   
   // set up msghdr and iovecs
   struct msghdr msg;
@@ -2011,9 +2006,10 @@ void SimpleMessenger::unregister_entity(Endpoint *msgr)
 }
 
 
-void SimpleMessenger::submit_message(Message *m, const entity_addr_t& dest_addr, bool lazy)
+void SimpleMessenger::submit_message(Message *m, const entity_inst_t& dest, bool lazy)
 {
-  const entity_name_t dest = m->get_dest();
+  const entity_addr_t& dest_addr = dest.addr;
+  m->get_header().dst_erank = dest_addr.erank;
 
   assert(m->nref.test() == 0);
 
@@ -2034,10 +2030,10 @@ void SimpleMessenger::submit_message(Message *m, const entity_addr_t& dest_addr,
     if (rank_addr.is_local_to(dest_addr)) {
       if (dest_addr.erank < max_local && local[dest_addr.erank]) {
         // local
-        dout(20) << "submit_message " << *m << " dest " << dest << " local" << dendl;
+        dout(20) << "submit_message " << *m << " local" << dendl;
        local[dest_addr.erank]->queue_message(m);
       } else {
-        derr(0) << "submit_message " << *m << " dest " << dest << " " << dest_addr << " local but not in local map?  dropping." << dendl;
+        derr(0) << "submit_message " << *m << " " << dest_addr << " local but not in local map?  dropping." << dendl;
         //assert(0);  // hmpf, this is probably mds->mon beacon from newsyn.
        delete m;
       }
@@ -2050,19 +2046,12 @@ void SimpleMessenger::submit_message(Message *m, const entity_addr_t& dest_addr,
         pipe = rank_pipe[ dest_proc_addr ];
        pipe->lock.Lock();
        if (pipe->state == Pipe::STATE_CLOSED) {
-         dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", ignoring old closed pipe." << dendl;
+         dout(20) << "submit_message " << *m << " remote, " << dest_addr << ", ignoring old closed pipe." << dendl;
          pipe->unregister_pipe();
          pipe->lock.Unlock();
          pipe = 0;
        } else {
-         dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", have pipe." << dendl;
-
-         /*
-         // if this pipe was created by an incoming connection, but we haven't received
-         // a message yet, then it won't have the policy set.
-         if (pipe->get_out_seq() == 0)
-           pipe->policy = policy_map[m->get_dest().type()];
-         */
+         dout(20) << "submit_message " << *m << " remote, " << dest_addr << ", have pipe." << dendl;
 
          pipe->_send(m);
          pipe->lock.Unlock();
@@ -2070,12 +2059,12 @@ void SimpleMessenger::submit_message(Message *m, const entity_addr_t& dest_addr,
       }
       if (!pipe) {
        if (lazy) {
-         dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", lazy, dropping." << dendl;
+         dout(20) << "submit_message " << *m << " remote, " << dest_addr << ", lazy, dropping." << dendl;
          delete m;
        } else {
-         dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", new pipe." << dendl;
+         dout(20) << "submit_message " << *m << " remote, " << dest_addr << ", new pipe." << dendl;
          // not connected.
-         pipe = connect_rank(dest_proc_addr, policy_map[m->get_dest().type()]);
+         pipe = connect_rank(dest_proc_addr, policy_map[dest.name.type()]);
          pipe->send(m);
        }
       }
index 6af3fc9ade87e64f7a79b0acc7f874498eb404b7..dda1be3057ccabb793a2e8402081ba0c17d24c4a 100644 (file)
@@ -129,7 +129,6 @@ private:
     int sd;
     int new_sd;
     entity_addr_t peer_addr;
-    entity_name_t last_dest_name;
     Policy policy;
     bool lossy_rx;
     
@@ -244,7 +243,6 @@ private:
     void _send(Message *m) {
       m->get();
       q[m->get_priority()].push_back(m);
-      last_dest_name = m->get_dest();
       cond.Signal();
     }
     Message *_get_next_outgoing() {
@@ -311,28 +309,28 @@ private:
     }
 
     enum { BAD_REMOTE_RESET, BAD_RESET, BAD_FAILED };
-    list<pair<entity_addr_t,entity_name_t> > remote_reset_q;
-    list<pair<entity_addr_t,entity_name_t> > reset_q;
-    list<pair<Message*,entity_inst_t> > failed_q;
+    list<entity_addr_t> remote_reset_q;
+    list<entity_addr_t> reset_q;
+    list<pair<Message*,entity_addr_t> > failed_q;
 
-    void queue_remote_reset(entity_addr_t a, entity_name_t n) {
+    void queue_remote_reset(entity_addr_t a) {
       lock.Lock();
-      remote_reset_q.push_back(pair<entity_addr_t,entity_name_t>(a,n));
+      remote_reset_q.push_back(a);
       dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)BAD_REMOTE_RESET);
       cond.Signal();
       lock.Unlock();
     }
-    void queue_reset(entity_addr_t a, entity_name_t n) {
+    void queue_reset(entity_addr_t a) {
       lock.Lock();
-      reset_q.push_back(pair<entity_addr_t,entity_name_t>(a,n));
+      reset_q.push_back(a);
       dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)BAD_RESET);
       cond.Signal();
       lock.Unlock();
     }
-    void queue_failure(Message *m, entity_inst_t i) {
+    void queue_failure(Message *m, entity_addr_t a) {
       lock.Lock();
       m->get();
-      failed_q.push_back(pair<Message*,entity_inst_t>(m,i));
+      failed_q.push_back(pair<Message*,entity_addr_t>(m, a));
       dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)BAD_FAILED);
       cond.Signal();
       lock.Unlock();
@@ -438,11 +436,16 @@ public:
     return ++global_seq;
   }
 
+  void set_addr(entity_addr_t a) {
+    rank_addr = a;
+    need_addr = false;
+  }
+
   Endpoint *register_entity(entity_name_t addr);
   void rename_entity(Endpoint *ms, entity_name_t newaddr);
   void unregister_entity(Endpoint *ms);
 
-  void submit_message(Message *m, const entity_addr_t& addr, bool lazy=false);  
+  void submit_message(Message *m, const entity_inst_t& addr, bool lazy=false);  
   void prepare_dest(const entity_inst_t& inst);
 
   // create a new messenger