]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async: track connections by addrvec
authorSage Weil <sage@redhat.com>
Tue, 12 Jun 2018 18:42:25 +0000 (13:42 -0500)
committerSage Weil <sage@redhat.com>
Tue, 3 Jul 2018 18:01:24 +0000 (13:01 -0500)
Signed-off-by: Sage Weil <sage@redhat.com>
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h
src/msg/async/AsyncMessenger.cc
src/msg/async/AsyncMessenger.h

index 3777aa3b22de65f9139b53cf0f3b8669b6045f49..97d7e4400ef07287e3ce8997ed64c81e2dd66f6b 100644 (file)
@@ -1513,7 +1513,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
   ldout(async_msgr->cct, 10) << __func__ << " accept setting up session_security." << dendl;
 
   // existing?
-  AsyncConnectionRef existing = async_msgr->lookup_conn(peer_addrs.legacy_addr());
+  AsyncConnectionRef existing = async_msgr->lookup_conn(peer_addrs);
 
   inject_delay();
 
@@ -1874,7 +1874,8 @@ void AsyncConnection::_connect()
 
 void AsyncConnection::accept(ConnectedSocket socket, entity_addr_t &addr)
 {
-  ldout(async_msgr->cct, 10) << __func__ << " sd=" << socket.fd() << dendl;
+  ldout(async_msgr->cct, 10) << __func__ << " sd=" << socket.fd()
+                            << " on " << addr << dendl;
   assert(socket.fd() >= 0);
 
   std::lock_guard<std::mutex> l(lock);
index cd5ab7328ef963491252cbef485cd41ce3f4217a..58a5c1c5bd4803ea9e0d0d8adc800b3ca43ccc9b 100644 (file)
@@ -194,9 +194,9 @@ class AsyncConnection : public Connection {
   }
 
   // Only call when AsyncConnection first construct
-  void connect(const entity_addr_t& addr, int type) {
+  void connect(const entity_addrvec_t& addrs, int type) {
     set_peer_type(type);
-    set_peer_addr(addr);
+    set_peer_addrs(addrs);
     policy = msgr->get_policy(type);
     _connect();
   }
index 6e693a767cd49c8ad169f730733a1cac58069375..7c7f8e2ae3d6d4c8358f7a67bd9fa96bcacbe1c6 100644 (file)
@@ -542,20 +542,21 @@ void AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket, entity_ad
   lock.Unlock();
 }
 
-AsyncConnectionRef AsyncMessenger::create_connect(const entity_addr_t& addr, int type)
+AsyncConnectionRef AsyncMessenger::create_connect(
+  const entity_addrvec_t& addrs, int type)
 {
   assert(lock.is_locked());
-  assert(addr != my_addrs.legacy_addr());
+  assert(addrs != my_addrs);
 
-  ldout(cct, 10) << __func__ << " " << addr
+  ldout(cct, 10) << __func__ << " " << addrs
       << ", creating connection and registering" << dendl;
 
   // create connection
   Worker *w = stack->get_worker();
   AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w);
-  conn->connect(addr, type);
-  assert(!conns.count(addr));
-  conns[addr] = conn;
+  conn->connect(addrs, type);
+  assert(!conns.count(addrs));
+  conns[addrs] = conn;
   w->get_perf_counter()->inc(l_msgr_active_connections);
 
   return conn;
@@ -569,11 +570,11 @@ ConnectionRef AsyncMessenger::get_connection(const entity_inst_t& dest)
     return local_connection;
   }
 
-  AsyncConnectionRef conn = _lookup_conn(dest.addr);
+  AsyncConnectionRef conn = _lookup_conn(entity_addrvec_t(dest.addr));
   if (conn) {
     ldout(cct, 10) << __func__ << " " << dest << " existing " << conn << dendl;
   } else {
-    conn = create_connect(dest.addr, dest.name.type());
+    conn = create_connect(entity_addrvec_t(dest.addr), dest.name.type());
     ldout(cct, 10) << __func__ << " " << dest << " new " << conn << dendl;
   }
 
@@ -606,7 +607,7 @@ int AsyncMessenger::_send_message(Message *m, const entity_inst_t& dest)
     return -EINVAL;
   }
 
-  AsyncConnectionRef conn = _lookup_conn(dest.addr);
+  AsyncConnectionRef conn = _lookup_conn(entity_addrvec_t(dest.addr));
   submit_message(m, conn, dest.addr, dest.name.type());
   return 0;
 }
@@ -648,7 +649,7 @@ void AsyncMessenger::submit_message(Message *m, AsyncConnectionRef con,
     m->put();
   } else {
     ldout(cct,20) << __func__ << " " << *m << " remote, " << dest_addr << ", new connection." << dendl;
-    con = create_connect(dest_addr, dest_type);
+    con = create_connect(entity_addrvec_t(dest_addr), dest_type);
     con->send_message(m);
   }
 }
@@ -708,7 +709,7 @@ void AsyncMessenger::shutdown_connections(bool queue_reset)
   accepting_conns.clear();
 
   while (!conns.empty()) {
-    ceph::unordered_map<entity_addr_t, AsyncConnectionRef>::iterator it = conns.begin();
+    auto it = conns.begin();
     AsyncConnectionRef p = it->second;
     ldout(cct, 5) << __func__ << " mark down " << it->first << " " << p << dendl;
     conns.erase(it);
@@ -728,15 +729,15 @@ void AsyncMessenger::shutdown_connections(bool queue_reset)
   lock.Unlock();
 }
 
-void AsyncMessenger::mark_down(const entity_addr_t& addr)
+void AsyncMessenger::mark_down_addrs(const entity_addrvec_t& addrs)
 {
   lock.Lock();
-  AsyncConnectionRef p = _lookup_conn(addr);
+  AsyncConnectionRef p = _lookup_conn(addrs);
   if (p) {
-    ldout(cct, 1) << __func__ << " " << addr << " -- " << p << dendl;
+    ldout(cct, 1) << __func__ << " " << addrs << " -- " << p << dendl;
     p->stop(true);
   } else {
-    ldout(cct, 1) << __func__ << " " << addr << " -- connection dne" << dendl;
+    ldout(cct, 1) << __func__ << " " << addrs << " -- connection dne" << dendl;
   }
   lock.Unlock();
 }
@@ -810,7 +811,7 @@ int AsyncMessenger::reap_dead()
     auto it = deleted_conns.begin();
     AsyncConnectionRef p = *it;
     ldout(cct, 5) << __func__ << " delete " << p << dendl;
-    auto conns_it = conns.find(p->peer_addrs.legacy_addr());
+    auto conns_it = conns.find(p->peer_addrs);
     if (conns_it != conns.end() && conns_it->second == p)
       conns.erase(conns_it);
     accepting_conns.erase(p);
index ee886a7d5109c87fac7c95885942b7a11890638f..df748fbde713aee7a6771e944c2d731a8c24be29 100644 (file)
@@ -151,7 +151,10 @@ public:
    */
   ConnectionRef get_connection(const entity_inst_t& dest) override;
   ConnectionRef get_loopback_connection() override;
-  void mark_down(const entity_addr_t& addr) override;
+  void mark_down(const entity_addr_t& addr) override {
+    mark_down_addrs(entity_addrvec_t(addr));
+  }
+  void mark_down_addrs(const entity_addrvec_t& addrs) override;
   void mark_down_all() override {
     shutdown_connections(true);
   }
@@ -189,13 +192,13 @@ private:
    * Initiate the connection. (This function returning does not guarantee
    * connection success.)
    *
-   * @param addr The address of the entity to connect to.
+   * @param addrs The address(es) of the entity to connect to.
    * @param type The peer type of the entity at the address.
    *
    * @return a pointer to the newly-created connection. Caller does not own a
    * reference; take one if you need it.
    */
-  AsyncConnectionRef create_connect(const entity_addr_t& addr, int type);
+  AsyncConnectionRef create_connect(const entity_addrvec_t& addrs, int type);
 
   /**
    * Queue up a Message for delivery to the entity specified
@@ -271,7 +274,7 @@ private:
    * NOTE: a Asyncconnection* with state CLOSED may still be in the map but is considered
    * invalid and can be replaced by anyone holding the msgr lock
    */
-  ceph::unordered_map<entity_addr_t, AsyncConnectionRef> conns;
+  ceph::unordered_map<entity_addrvec_t, AsyncConnectionRef> conns;
 
   /**
    * list of connection are in teh process of accepting
@@ -302,9 +305,9 @@ private:
   Cond  stop_cond;
   bool stopped;
 
-  AsyncConnectionRef _lookup_conn(const entity_addr_t& k) {
+  AsyncConnectionRef _lookup_conn(const entity_addrvec_t& k) {
     assert(lock.is_locked());
-    ceph::unordered_map<entity_addr_t, AsyncConnectionRef>::iterator p = conns.find(k);
+    auto p = conns.find(k);
     if (p == conns.end())
       return NULL;
 
@@ -341,14 +344,14 @@ public:
   /**
    * This wraps _lookup_conn.
    */
-  AsyncConnectionRef lookup_conn(const entity_addr_t& k) {
+  AsyncConnectionRef lookup_conn(const entity_addrvec_t& k) {
     Mutex::Locker l(lock);
     return _lookup_conn(k);
   }
 
   int accept_conn(AsyncConnectionRef conn) {
     Mutex::Locker l(lock);
-    auto it = conns.find(conn->peer_addrs.legacy_addr());
+    auto it = conns.find(conn->peer_addrs);
     if (it != conns.end()) {
       AsyncConnectionRef existing = it->second;
 
@@ -362,7 +365,7 @@ public:
         return -1;
       }
     }
-    conns[conn->peer_addrs.legacy_addr()] = conn;
+    conns[conn->peer_addrs] = conn;
     conn->get_perf_counter()->inc(l_msgr_active_connections);
     accepting_conns.erase(conn);
     return 0;