]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/Messenger: *myinst -> *myname and *myaddr
authorSage Weil <sage@redhat.com>
Sat, 26 May 2018 13:42:07 +0000 (08:42 -0500)
committerSage Weil <sage@redhat.com>
Thu, 31 May 2018 13:10:23 +0000 (08:10 -0500)
Initial step moving Messenger interface away from entity_inst_t.  This
is mostly s/my_inst.addr/my_addr/ and s/my_inst.name/my_name/.

Also add Connection::get_peer_addrs() and Messenger::get_myaddrs().

Signed-off-by: Sage Weil <sage@redhat.com>
src/msg/Connection.h
src/msg/Messenger.h
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncMessenger.cc
src/msg/async/AsyncMessenger.h
src/msg/simple/Accepter.cc
src/msg/simple/Pipe.cc
src/msg/simple/SimpleMessenger.cc
src/osd/OSD.cc
src/test/mon/test-mon-msg.cc
src/test/msgr/test_msgr.cc

index a19092b93eb6f9c734256f67d230467e367d1dea..13d99d7359530ae43b8cd896dc6b68481f511c84 100644 (file)
@@ -158,6 +158,9 @@ public:
   bool peer_is_client() const { return peer_type == CEPH_ENTITY_TYPE_CLIENT; }
 
   const entity_addr_t& get_peer_addr() const { return peer_addr; }
+  entity_addrvec_t get_peer_addrs() const {
+    return entity_addrvec_t(peer_addr);
+  }
   void set_peer_addr(const entity_addr_t& a) { peer_addr = a; }
 
   uint64_t get_features() const { return features; }
index ebb84566ffb578e629d2cdf596252103eda154e6..2c8b94c7d7fc22fd34ed847c947b19d1e8475db7 100644 (file)
@@ -48,7 +48,11 @@ private:
 
 protected:
   /// the "name" of the local daemon. eg client.99
-  entity_inst_t my_inst;
+  entity_name_t my_name;
+
+  /// my addr
+  entity_addr_t my_addr;
+
   int default_send_priority;
   /// set to true once the Messenger has started, and set to false on shutdown
   bool started;
@@ -141,15 +145,13 @@ public:
    */
   Messenger(CephContext *cct_, entity_name_t w)
     : trace_endpoint("0.0.0.0", 0, "Messenger"),
-      my_inst(),
-      default_send_priority(CEPH_MSG_PRIO_DEFAULT), started(false),
+      my_name(w),
+      default_send_priority(CEPH_MSG_PRIO_DEFAULT),
+      started(false),
       magic(0),
       socket_priority(-1),
       cct(cct_),
-      crcflags(get_default_crc_flags(cct->_conf))
-  {
-    my_inst.name = w;
-  }
+      crcflags(get_default_crc_flags(cct->_conf)) {}
   virtual ~Messenger() {}
 
   /**
@@ -192,20 +194,15 @@ public:
    * @defgroup Accessors
    * @{
    */
+  int get_mytype() const { return my_name.type(); }
+
   /**
-   * Retrieve the Messenger's instance.
+   * Retrieve the Messenger's name
    *
-   * @return A const reference to the instance this Messenger
+   * @return A const reference to the name this Messenger
    * currently believes to be its own.
    */
-  const entity_inst_t& get_myinst() { return my_inst; }
-  /**
-   * set messenger's instance
-   */
-  void set_myinst(entity_inst_t i) { my_inst = i; }
-
-  uint32_t get_magic() { return magic; }
-  void set_magic(int _magic) { magic = _magic; }
+  const entity_name_t& get_myname() { return my_name; }
 
   /**
    * Retrieve the Messenger's address.
@@ -213,14 +210,24 @@ public:
    * @return A const reference to the address this Messenger
    * currently believes to be its own.
    */
-  const entity_addr_t& get_myaddr() { return my_inst.addr; }
+  const entity_addr_t& get_myaddr() { return my_addr; }
+  entity_addrvec_t get_myaddrs() {
+    return entity_addrvec_t(my_addr);
+  }
+
+  /**
+   * set messenger's instance
+   */
+  uint32_t get_magic() { return magic; }
+  void set_magic(int _magic) { magic = _magic; }
+
 protected:
   /**
    * set messenger's address
    */
   virtual void set_myaddr(const entity_addr_t& a) {
-    my_inst.addr = a;
-    set_endpoint_addr(a, my_inst.name);
+    my_addr = a;
+    set_endpoint_addr(a, my_name);
   }
 public:
   /**
@@ -230,13 +237,6 @@ public:
     return &trace_endpoint;
   }
 
-  /**
-   * Retrieve the Messenger's name.
-   *
-   * @return A const reference to the name this Messenger
-   * currently believes to be its own.
-   */
-  const entity_name_t& get_myname() { return my_inst.name; }
   /**
    * Set the name of the local entity. The name is reported to others and
    * can be changed while the system is running, but doing so at incorrect
@@ -244,7 +244,8 @@ public:
    *
    * @param m The name to set.
    */
-  void set_myname(const entity_name_t& m) { my_inst.name = m; }
+  void set_myname(const entity_name_t& m) { my_name = m; }
+
   /**
    * Set the unknown address components for this Messenger.
    * This is useful if the Messenger doesn't know its full address just by
index b4569d0215e33021d3b7a6e578ab0545d0fe8fc8..27b7094f0b6a22b97d2ad677f5ba24f5a4e90de0 100644 (file)
@@ -33,7 +33,7 @@
 #undef dout_prefix
 #define dout_prefix _conn_prefix(_dout)
 ostream& AsyncConnection::_conn_prefix(std::ostream *_dout) {
-  return *_dout << "-- " << async_msgr->get_myinst().addr << " >> " << peer_addr << " conn(" << this
+  return *_dout << "-- " << async_msgr->get_myaddr() << " >> " << peer_addr << " conn(" << this
                 << " :" << port
                 << " s=" << get_state_name(state)
                 << " pgs=" << peer_global_seq
@@ -1017,7 +1017,7 @@ ssize_t AsyncConnection::_process_connection()
         bufferlist bl;
 
         connect_msg.features = policy.features_supported;
-        connect_msg.host_type = async_msgr->get_myinst().name.type();
+        connect_msg.host_type = async_msgr->get_myname().type();
         connect_msg.global_seq = global_seq;
         connect_msg.connect_seq = connect_seq;
         connect_msg.protocol_version = async_msgr->get_proto_version(peer_type, true);
index 032d7314c8650a375e559bb95b71c7a3cda1c6a8..dfa0bee4bed166b15ae76776df1c6881f7b93815 100644 (file)
@@ -386,7 +386,7 @@ int AsyncMessenger::rebind(const set<int>& avoid_ports)
   // adjust the nonce; we want our entity_addr_t to be truly unique.
   nonce += 1000000;
   ldout(cct, 10) << __func__ << " new nonce " << nonce
-                << " and inst " << get_myinst() << dendl;
+                << " and addr " << get_myaddr() << dendl;
 
   entity_addr_t bound_addr;
   entity_addr_t bind_addr = get_myaddr();
@@ -417,7 +417,7 @@ int AsyncMessenger::client_bind(const entity_addr_t &bind_addr)
     return 0;
   Mutex::Locker l(lock);
   if (did_bind) {
-    assert(my_inst.addr == bind_addr);
+    assert(my_addr == bind_addr);
     return 0;
   }
   if (started) {
@@ -446,7 +446,7 @@ void AsyncMessenger::_finish_bind(const entity_addr_t& bind_addr,
 
   init_local_connection();
 
-  ldout(cct,1) << __func__ << " bind my_inst.addr is " << get_myaddr() << dendl;
+  ldout(cct,1) << __func__ << " bind my_addr is " << get_myaddr() << dendl;
   did_bind = true;
 }
 
@@ -456,14 +456,14 @@ int AsyncMessenger::start()
   ldout(cct,1) << __func__ << " start" << dendl;
 
   // register at least one entity, first!
-  assert(my_inst.name.type() >= 0);
+  assert(my_name.type() >= 0);
 
   assert(!started);
   started = true;
   stopped = false;
 
   if (!did_bind) {
-    my_inst.addr.nonce = nonce;
+    my_addr.nonce = nonce;
     _init_local_connection();
   }
 
@@ -512,7 +512,7 @@ void AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket, entity_ad
 AsyncConnectionRef AsyncMessenger::create_connect(const entity_addr_t& addr, int type)
 {
   assert(lock.is_locked());
-  assert(addr != my_inst.addr);
+  assert(addr != my_addr);
 
   ldout(cct, 10) << __func__ << " " << addr
       << ", creating connection and registering" << dendl;
@@ -531,7 +531,7 @@ AsyncConnectionRef AsyncMessenger::create_connect(const entity_addr_t& addr, int
 ConnectionRef AsyncMessenger::get_connection(const entity_inst_t& dest)
 {
   Mutex::Locker l(lock);
-  if (my_inst.addr == dest.addr) {
+  if (my_addr == dest.addr) {
     // local
     return local_connection;
   }
@@ -600,7 +600,7 @@ void AsyncMessenger::submit_message(Message *m, AsyncConnectionRef con,
   }
 
   // local?
-  if (my_inst.addr == dest_addr) {
+  if (my_addr == dest_addr) {
     // local
     local_connection->send_message(m);
     return ;
@@ -621,16 +621,16 @@ void AsyncMessenger::submit_message(Message *m, AsyncConnectionRef con,
 }
 
 /**
- * If my_inst.addr doesn't have an IP set, this function
+ * If my_addr doesn't have an IP set, this function
  * will fill it in from the passed addr. Otherwise it does nothing and returns.
  */
 void AsyncMessenger::set_addr_unknowns(const entity_addr_t &addr)
 {
   Mutex::Locker l(lock);
-  if (my_inst.addr.is_blank_ip()) {
-    int port = my_inst.addr.get_port();
-    my_inst.addr.u = addr.u;
-    my_inst.addr.set_port(port);
+  if (my_addr.is_blank_ip()) {
+    int port = my_addr.get_port();
+    my_addr.u = addr.u;
+    my_addr.set_port(port);
     _init_local_connection();
   }
 }
@@ -692,7 +692,7 @@ void AsyncMessenger::mark_down(const entity_addr_t& addr)
 
 int AsyncMessenger::get_proto_version(int peer_type, bool connect) const
 {
-  int my_type = my_inst.name.type();
+  int my_type = my_name.type();
 
   // set reply protocol version
   if (peer_type == my_type) {
@@ -712,7 +712,7 @@ int AsyncMessenger::get_proto_version(int peer_type, bool connect) const
 void AsyncMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
 {
   // be careful here: multiple threads may block here, and readers of
-  // my_inst.addr do NOT hold any lock.
+  // my_addr do NOT hold any lock.
 
   // this always goes from true -> false under the protection of the
   // mutex.  if it is already false, we need not retake the mutex at
@@ -723,10 +723,10 @@ void AsyncMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
   if (need_addr) {
     need_addr = false;
     entity_addr_t t = peer_addr_for_me;
-    t.set_port(my_inst.addr.get_port());
-    t.set_nonce(my_inst.addr.get_nonce());
-    my_inst.addr = t;
-    ldout(cct, 1) << __func__ << " learned my addr " << my_inst.addr << dendl;
+    t.set_port(my_addr.get_port());
+    t.set_nonce(my_addr.get_nonce());
+    my_addr = t;
+    ldout(cct, 1) << __func__ << " learned my addr " << my_addr << dendl;
     _init_local_connection();
   }
   lock.Unlock();
index a07ed4c69c3b26b997bc2cd3a145b0ede3b1d9c4..c517f3fdb4342af02a6d57beb4d208749ce14e69 100644 (file)
@@ -319,8 +319,8 @@ private:
 
   void _init_local_connection() {
     assert(lock.is_locked());
-    local_connection->peer_addr = my_inst.addr;
-    local_connection->peer_type = my_inst.name.type();
+    local_connection->peer_addr = my_addr;
+    local_connection->peer_type = my_name.type();
     local_connection->set_features(CEPH_FEATURES_ALL);
     ms_deliver_handle_fast_connect(local_connection.get());
   }
index a95e046c7ba0870cea53464abe5c66791cd04cd6..9bb3e355ee25300424412ae20f462990cafcd75a 100644 (file)
@@ -270,9 +270,9 @@ int Accepter::rebind(const set<int>& avoid_ports)
 
   // adjust the nonce; we want our entity_addr_t to be truly unique.
   nonce += 1000000;
-  msgr->my_inst.addr.nonce = nonce;
-  ldout(msgr->cct,10) << __func__ << " new nonce " << nonce << " and inst " 
-                       << msgr->my_inst << dendl;
+  msgr->my_addr.nonce = nonce;
+  ldout(msgr->cct,10) << __func__ << " new nonce " << nonce << " and addr "
+                       << msgr->my_addr << dendl;
 
   ldout(msgr->cct,10) << " will try " << addr << " and avoid ports " << new_avoid << dendl;
   int r = bind(addr, new_avoid);
index f452258369cb8080c91cb3c4921c988d04b205af..9f44dd8b7c006cc90c46f8fdf5b2e6e256bc78ea 100644 (file)
@@ -44,7 +44,7 @@
 #undef dout_prefix
 #define dout_prefix *_dout << *this
 ostream& Pipe::_pipe_prefix(std::ostream &out) const {
-  return out << "-- " << msgr->get_myinst().addr << " >> " << peer_addr << " pipe(" << this
+  return out << "-- " << msgr->get_myaddr() << " >> " << peer_addr << " pipe(" << this
             << " sd=" << sd << " :" << port
              << " s=" << state
              << " pgs=" << peer_global_seq
@@ -363,9 +363,9 @@ int Pipe::accept()
   }
 
   // and my addr
-  encode(msgr->my_inst.addr, addrs, 0);  // legacy
+  encode(msgr->my_addr, addrs, 0);  // legacy
 
-  port = msgr->my_inst.addr.get_port();
+  port = msgr->my_addr.get_port();
 
   // and peer's socket addr (they might not know their ip)
   sockaddr_storage ss;
@@ -611,7 +611,7 @@ int Pipe::accept()
        }
 
        // connection race?
-       if (peer_addr < msgr->my_inst.addr ||
+       if (peer_addr < msgr->my_addr ||
            existing->policy.server) {
          // incoming wins
          ldout(msgr->cct,10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq
@@ -630,7 +630,7 @@ int Pipe::accept()
          // our existing outgoing wins
          ldout(msgr->cct,10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq
                   << " == " << connect.connect_seq << ", sending WAIT" << dendl;
-         assert(peer_addr > msgr->my_inst.addr);
+         assert(peer_addr > msgr->my_addr);
          if (!(existing->state == STATE_CONNECTING))
            lderr(msgr->cct) << "accept race bad state, would send wait, existing="
                             << existing->get_state_name()
@@ -1100,7 +1100,7 @@ int Pipe::connect()
 
   msgr->learned_addr(peer_addr_for_me);
 
-  encode(msgr->my_inst.addr, myaddrbl, 0);  // legacy
+  encode(msgr->my_addr, myaddrbl, 0);  // legacy
 
   memset(&msg, 0, sizeof(msg));
   msgvec[0].iov_base = myaddrbl.c_str();
@@ -1113,7 +1113,7 @@ int Pipe::connect()
     ldout(msgr->cct,2) << "connect couldn't write my addr, " << cpp_strerror(rc) << dendl;
     goto fail;
   }
-  ldout(msgr->cct,10) << "connect sent my addr " << msgr->my_inst.addr << dendl;
+  ldout(msgr->cct,10) << "connect sent my addr " << msgr->my_addr << dendl;
 
 
   while (1) {
@@ -1123,7 +1123,7 @@ int Pipe::connect()
 
     ceph_msg_connect connect;
     connect.features = policy.features_supported;
-    connect.host_type = msgr->get_myinst().name.type();
+    connect.host_type = msgr->get_myname().type();
     connect.global_seq = gseq;
     connect.connect_seq = cseq;
     connect.protocol_version = msgr->get_proto_version(peer_type, true);
index 4918c52a7af49d00073154996402e53aa7795bcb..36a0ee371a1c03b2cd09527b9147f784c19a49f6 100644 (file)
@@ -149,10 +149,10 @@ int SimpleMessenger::_send_message(Message *m, Connection *con)
  */
 void SimpleMessenger::set_addr_unknowns(const entity_addr_t &addr)
 {
-  if (my_inst.addr.is_blank_ip()) {
-    int port = my_inst.addr.get_port();
-    my_inst.addr.u = addr.u;
-    my_inst.addr.set_port(port);
+  if (my_addr.is_blank_ip()) {
+    int port = my_addr.get_port();
+    my_addr.u = addr.u;
+    my_addr.set_port(port);
     init_local_connection();
   }
 }
@@ -167,7 +167,7 @@ void SimpleMessenger::set_addr(const entity_addr_t &addr)
 
 int SimpleMessenger::get_proto_version(int peer_type, bool connect)
 {
-  int my_type = my_inst.name.type();
+  int my_type = my_name.type();
 
   // set reply protocol version
   if (peer_type == my_type) {
@@ -319,7 +319,7 @@ int SimpleMessenger::client_bind(const entity_addr_t &bind_addr)
     return 0;
   Mutex::Locker l(lock);
   if (did_bind) {
-    assert(my_inst.addr == bind_addr);
+    assert(my_addr == bind_addr);
     return 0;
   }
   if (started) {
@@ -339,14 +339,14 @@ int SimpleMessenger::start()
   ldout(cct,1) << "messenger.start" << dendl;
 
   // register at least one entity, first!
-  assert(my_inst.name.type() >= 0);
+  assert(my_name.type() >= 0);
 
   assert(!started);
   started = true;
   stopped = false;
 
   if (!did_bind) {
-    my_inst.addr.nonce = nonce;
+    my_addr.nonce = nonce;
     init_local_connection();
   }
 
@@ -380,7 +380,7 @@ Pipe *SimpleMessenger::connect_rank(const entity_addr_t& addr,
                                    Message *first)
 {
   assert(lock.is_locked());
-  assert(addr != my_inst.addr);
+  assert(addr != my_addr);
   
   ldout(cct,10) << "connect_rank to " << addr << ", creating pipe and registering" << dendl;
   
@@ -421,7 +421,7 @@ bool SimpleMessenger::verify_authorizer(Connection *con, int peer_type,
 ConnectionRef SimpleMessenger::get_connection(const entity_inst_t& dest)
 {
   Mutex::Locker l(lock);
-  if (my_inst.addr == dest.addr) {
+  if (my_addr == dest.addr) {
     // local
     return local_connection;
   }
@@ -502,7 +502,7 @@ void SimpleMessenger::submit_message(Message *m, PipeConnection *con,
   }
 
   // local?
-  if (my_inst.addr == dest_addr) {
+  if (my_addr == dest_addr) {
     // local
     ldout(cct,20) << "submit_message " << *m << " local" << dendl;
     m->set_connection(local_connection.get());
@@ -723,7 +723,7 @@ void SimpleMessenger::mark_disposable(Connection *con)
 void SimpleMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
 {
   // be careful here: multiple threads may block here, and readers of
-  // my_inst.addr do NOT hold any lock.
+  // my_addr do NOT hold any lock.
 
   // this always goes from true -> false under the protection of the
   // mutex.  if it is already false, we need not retake the mutex at
@@ -734,12 +734,12 @@ void SimpleMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
   lock.Lock();
   if (need_addr) {
     entity_addr_t t = peer_addr_for_me;
-    t.set_port(my_inst.addr.get_port());
-    t.set_nonce(my_inst.addr.get_nonce());
-    ANNOTATE_BENIGN_RACE_SIZED(&my_inst.addr, sizeof(my_inst.addr),
+    t.set_port(my_addr.get_port());
+    t.set_nonce(my_addr.get_nonce());
+    ANNOTATE_BENIGN_RACE_SIZED(&my_addr, sizeof(my_addr),
                                "SimpleMessenger learned addr");
-    my_inst.addr = t;
-    ldout(cct,1) << "learned my addr " << my_inst.addr << dendl;
+    my_addr = t;
+    ldout(cct,1) << "learned my addr " << my_addr << dendl;
     need_addr = false;
     init_local_connection();
   }
@@ -748,8 +748,8 @@ void SimpleMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
 
 void SimpleMessenger::init_local_connection()
 {
-  local_connection->peer_addr = my_inst.addr;
-  local_connection->peer_type = my_inst.name.type();
+  local_connection->peer_addr = my_addr;
+  local_connection->peer_type = my_name.type();
   local_connection->set_features(CEPH_FEATURES_ALL);
   ms_deliver_handle_fast_connect(local_connection.get());
 }
index 8a90d9d0370afd738fd1129a98c399fd047edf9c..f60e0ef7e05136565ba6b54a2b0e8a0fd2d4097c 100644 (file)
@@ -7436,7 +7436,7 @@ void OSD::_committed_osd_maps(epoch_t first, epoch_t last, MOSDMap *m)
     service.retrieve_epochs(&boot_epoch, &up_epoch, NULL);
     if (!up_epoch &&
        osdmap->is_up(whoami) &&
-       osdmap->get_inst(whoami) == client_messenger->get_myinst()) {
+       osdmap->get_addr(whoami) == client_messenger->get_myaddr()) {
       up_epoch = osdmap->get_epoch();
       dout(10) << "up_epoch is " << up_epoch << dendl;
       if (!boot_epoch) {
index 7e9832057d06756e47c29962acb49bd1944ae134..bb2c4638a65f6c0e36fc35941dc39f45d0c38ffc 100644 (file)
@@ -301,7 +301,8 @@ TEST_F(MonMsgTest, MRouteTest)
   Message *payload = new MGenericMessage(CEPH_MSG_SHUTDOWN);
   MRoute *m = new MRoute;
   m->msg = payload;
-  m->dest = msg->get_myinst();
+  m->dest.addr = msg->get_myaddr();
+  m->dest.name = entity_name_t(msg->get_mytype(), -1);
   Message *r = send_wait_reply(m, CEPH_MSG_SHUTDOWN);
   // we want an error
   ASSERT_NE(IS_ERR(r), 0);
index 772c5a0200cae0fd97117c5df7d4ba2c8e76a4f8..9a36af697ea54f46e5712d38520eddfe1432bf4d 100644 (file)
@@ -226,7 +226,8 @@ TEST_P(MessengerTest, SimpleTest) {
 
   // 1. simple round trip
   MPing *m = new MPing();
-  ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
+  ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
+                                              server_msgr->get_myaddrs());
   {
     ASSERT_EQ(conn->send_message(m), 0);
     Mutex::Locker l(cli_dispatcher.lock);
@@ -240,12 +241,18 @@ TEST_P(MessengerTest, SimpleTest) {
 
   // 2. test rebind port
   set<int> avoid_ports;
-  for (int i = 0; i < 10 ; i++)
-    avoid_ports.insert(server_msgr->get_myaddr().get_port() + i);
+  for (int i = 0; i < 10 ; i++) {
+    for (auto a : server_msgr->get_myaddrs().v) {
+      avoid_ports.insert(a.get_port() + i);
+    }
+  }
   server_msgr->rebind(avoid_ports);
-  ASSERT_TRUE(avoid_ports.count(server_msgr->get_myaddr().get_port()) == 0);
+  for (auto a : server_msgr->get_myaddrs().v) {
+    ASSERT_TRUE(avoid_ports.count(a.get_port()) == 0);
+  }
 
-  conn = client_msgr->get_connection(server_msgr->get_myinst());
+  conn = client_msgr->connect_to(server_msgr->get_mytype(),
+                                server_msgr->get_myaddrs());
   {
     m = new MPing();
     ASSERT_EQ(conn->send_message(m), 0);
@@ -300,7 +307,8 @@ TEST_P(MessengerTest, NameAddrTest) {
   client_msgr->start();
 
   MPing *m = new MPing();
-  ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
+  ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
+                                              server_msgr->get_myaddrs());
   {
     ASSERT_EQ(conn->send_message(m), 0);
     Mutex::Locker l(cli_dispatcher.lock);
@@ -309,8 +317,9 @@ TEST_P(MessengerTest, NameAddrTest) {
     cli_dispatcher.got_new = false;
   }
   ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
-  ASSERT_TRUE(conn->get_peer_addr() == server_msgr->get_myaddr());
-  ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst());
+  ASSERT_TRUE(conn->get_peer_addrs() == server_msgr->get_myaddrs());
+  ConnectionRef server_conn = server_msgr->connect_to(
+    client_msgr->get_mytype(), client_msgr->get_myaddrs());
   // Make should server_conn is the one we already accepted from client,
   // so it means client_msgr has the same addr when server connection has
   ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
@@ -345,7 +354,8 @@ TEST_P(MessengerTest, FeatureTest) {
   client_msgr->start();
 
   MPing *m = new MPing();
-  ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
+  ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
+                                              server_msgr->get_myaddrs());
   conn->send_message(m);
   CHECK_AND_WAIT_TRUE(!conn->is_connected());
   // should failed build a connection
@@ -360,7 +370,8 @@ TEST_P(MessengerTest, FeatureTest) {
   client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
   client_msgr->start();
 
-  conn = client_msgr->get_connection(server_msgr->get_myinst());
+  conn = client_msgr->connect_to(server_msgr->get_mytype(),
+                                server_msgr->get_myaddrs());
   {
     m = new MPing();
     ASSERT_EQ(conn->send_message(m), 0);
@@ -391,7 +402,8 @@ TEST_P(MessengerTest, TimeoutTest) {
 
   // 1. build the connection
   MPing *m = new MPing();
-  ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
+  ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
+                                                   server_msgr->get_myaddrs());
   {
     ASSERT_EQ(conn->send_message(m), 0);
     Mutex::Locker l(cli_dispatcher.lock);
@@ -432,7 +444,8 @@ TEST_P(MessengerTest, StatefulTest) {
   client_msgr->start();
 
   // 1. test for server standby
-  ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
+  ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
+                                              server_msgr->get_myaddrs());
   {
     m = new MPing();
     ASSERT_EQ(conn->send_message(m), 0);
@@ -444,12 +457,14 @@ TEST_P(MessengerTest, StatefulTest) {
   ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
   conn->mark_down();
   ASSERT_FALSE(conn->is_connected());
-  ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst());
+  ConnectionRef server_conn = server_msgr->connect_to(
+    client_msgr->get_mytype(), client_msgr->get_myaddrs());
   // don't lose state
   ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
 
   srv_dispatcher.got_new = false;
-  conn = client_msgr->get_connection(server_msgr->get_myinst());
+  conn = client_msgr->connect_to(server_msgr->get_mytype(),
+                                server_msgr->get_myaddrs());
   {
     m = new MPing();
     ASSERT_EQ(conn->send_message(m), 0);
@@ -459,7 +474,8 @@ TEST_P(MessengerTest, StatefulTest) {
     cli_dispatcher.got_new = false;
   }
   ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
-  server_conn = server_msgr->get_connection(client_msgr->get_myinst());
+  server_conn = server_msgr->connect_to(client_msgr->get_mytype(),
+                                       client_msgr->get_myaddrs());
   {
     Mutex::Locker l(srv_dispatcher.lock);
     while (!srv_dispatcher.got_remote_reset)
@@ -500,7 +516,8 @@ TEST_P(MessengerTest, StatefulTest) {
   }
   // resetcheck happen
   ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
-  server_conn = server_msgr->get_connection(client_msgr->get_myinst());
+  server_conn = server_msgr->connect_to(client_msgr->get_mytype(),
+                                       client_msgr->get_myaddrs());
   ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
   cli_dispatcher.got_remote_reset = false;
 
@@ -527,7 +544,8 @@ TEST_P(MessengerTest, StatelessTest) {
   client_msgr->start();
 
   // 1. test for server lose state
-  ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
+  ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
+                                                   server_msgr->get_myaddrs());
   {
     m = new MPing();
     ASSERT_EQ(conn->send_message(m), 0);
@@ -541,7 +559,8 @@ TEST_P(MessengerTest, StatelessTest) {
   ASSERT_FALSE(conn->is_connected());
 
   srv_dispatcher.got_new = false;
-  conn = client_msgr->get_connection(server_msgr->get_myinst());
+  conn = client_msgr->connect_to(server_msgr->get_mytype(),
+                                     server_msgr->get_myaddrs());
   {
     m = new MPing();
     ASSERT_EQ(conn->send_message(m), 0);
@@ -551,7 +570,8 @@ TEST_P(MessengerTest, StatelessTest) {
     cli_dispatcher.got_new = false;
   }
   ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
-  ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst());
+  ConnectionRef server_conn = server_msgr->connect_to(client_msgr->get_mytype(),
+                                                     client_msgr->get_myaddrs());
   // server lose state
   {
     Mutex::Locker l(srv_dispatcher.lock);
@@ -566,7 +586,8 @@ TEST_P(MessengerTest, StatelessTest) {
   conn->send_keepalive();
   CHECK_AND_WAIT_TRUE(!conn->is_connected());
   ASSERT_FALSE(conn->is_connected());
-  conn = client_msgr->get_connection(server_msgr->get_myinst());
+  conn = client_msgr->connect_to(server_msgr->get_mytype(),
+                                                   server_msgr->get_myaddrs());
   {
     m = new MPing();
     ASSERT_EQ(conn->send_message(m), 0);
@@ -600,7 +621,8 @@ TEST_P(MessengerTest, ClientStandbyTest) {
   client_msgr->start();
 
   // 1. test for client standby, resetcheck
-  ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
+  ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
+                                                   server_msgr->get_myaddrs());
   {
     m = new MPing();
     ASSERT_EQ(conn->send_message(m), 0);
@@ -610,7 +632,9 @@ TEST_P(MessengerTest, ClientStandbyTest) {
     cli_dispatcher.got_new = false;
   }
   ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
-  ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst());
+  ConnectionRef server_conn = server_msgr->connect_to(
+    client_msgr->get_mytype(),
+    client_msgr->get_myaddrs());
   ASSERT_FALSE(cli_dispatcher.got_remote_reset);
   cli_dispatcher.got_connect = false;
   server_conn->mark_down();
@@ -641,7 +665,8 @@ TEST_P(MessengerTest, ClientStandbyTest) {
     cli_dispatcher.got_new = false;
   }
   ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
-  server_conn = server_msgr->get_connection(client_msgr->get_myinst());
+  server_conn = server_msgr->connect_to(client_msgr->get_mytype(),
+                                       client_msgr->get_myaddrs());
   ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
 
   server_msgr->shutdown();
@@ -666,7 +691,8 @@ TEST_P(MessengerTest, AuthTest) {
 
   // 1. simple auth round trip
   MPing *m = new MPing();
-  ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
+  ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
+                                                   server_msgr->get_myaddrs());
   {
     ASSERT_EQ(conn->send_message(m), 0);
     Mutex::Locker l(cli_dispatcher.lock);
@@ -683,7 +709,8 @@ TEST_P(MessengerTest, AuthTest) {
   g_ceph_context->_conf->set_val("auth_client_required", "none");
   conn->mark_down();
   ASSERT_FALSE(conn->is_connected());
-  conn = client_msgr->get_connection(server_msgr->get_myinst());
+  conn = client_msgr->connect_to(server_msgr->get_mytype(),
+                                                   server_msgr->get_myaddrs());
   {
     MPing *m = new MPing();
     ASSERT_EQ(conn->send_message(m), 0);
@@ -720,7 +747,8 @@ TEST_P(MessengerTest, MessageTest) {
   // 1. A very large "front"(as well as "payload")
   // Because a external message need to invade Messenger::decode_message,
   // here we only use existing message class(MCommand)
-  ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
+  ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
+                                                   server_msgr->get_myaddrs());
   {
     uuid_d uuid;
     uuid.generate_random();
@@ -1057,14 +1085,16 @@ class SyntheticWorkload {
       if (server->get_default_policy().server) {
         p = make_pair(client, server);
       } else {
-        ConnectionRef conn = client->get_connection(server->get_myinst());
+        ConnectionRef conn = client->connect_to(server->get_mytype(),
+                                                    server->get_myaddrs());
         if (available_connections.count(conn) || choose(rng) % 2)
           p = make_pair(client, server);
         else
           p = make_pair(server, client);
       }
     }
-    ConnectionRef conn = p.first->get_connection(p.second->get_myinst());
+    ConnectionRef conn = p.first->connect_to(p.second->get_mytype(),
+                                                 p.second->get_myaddrs());
     available_connections[conn] = p;
   }
 
@@ -1099,7 +1129,8 @@ class SyntheticWorkload {
     // it's a lossless policy, so we need to mark down each side
     if (!p.first->get_default_policy().server && !p.second->get_default_policy().server) {
       ASSERT_EQ(conn->get_messenger(), p.first);
-      ConnectionRef peer = p.second->get_connection(p.first->get_myinst());
+      ConnectionRef peer = p.second->connect_to(p.first->get_mytype(),
+                                                    p.first->get_myaddrs());
       peer->mark_down();
       dispatcher.clear_pending(peer);
       available_connections.erase(peer);
@@ -1462,8 +1493,10 @@ TEST_P(MessengerTest, MarkdownTest) {
   bool equal = false;
   uint64_t equal_count = 0;
   while (i--) {
-    ConnectionRef conn1 = client_msgr->get_connection(server_msgr->get_myinst());
-    ConnectionRef conn2 = client_msgr->get_connection(server_msgr2->get_myinst());
+    ConnectionRef conn1 = client_msgr->connect_to(server_msgr->get_mytype(),
+                                                      server_msgr->get_myaddrs());
+    ConnectionRef conn2 = client_msgr->connect_to(server_msgr2->get_mytype(),
+                                                      server_msgr2->get_myaddrs());
     MPing *m = new MPing();
     ASSERT_EQ(conn1->send_message(m), 0);
     m = new MPing();