From ecd85f4d935220ebd426b79b3aef49e5069e903f Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Tue, 12 Jun 2018 13:42:25 -0500 Subject: [PATCH] msg/async: track connections by addrvec Signed-off-by: Sage Weil --- src/msg/async/AsyncConnection.cc | 5 +++-- src/msg/async/AsyncConnection.h | 4 ++-- src/msg/async/AsyncMessenger.cc | 33 ++++++++++++++++---------------- src/msg/async/AsyncMessenger.h | 21 +++++++++++--------- 4 files changed, 34 insertions(+), 29 deletions(-) diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 3777aa3b22de6..97d7e4400ef07 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -1513,7 +1513,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis ldout(async_msgr->cct, 10) << __func__ << " accept setting up session_security." << dendl; // existing? - AsyncConnectionRef existing = async_msgr->lookup_conn(peer_addrs.legacy_addr()); + AsyncConnectionRef existing = async_msgr->lookup_conn(peer_addrs); inject_delay(); @@ -1874,7 +1874,8 @@ void AsyncConnection::_connect() void AsyncConnection::accept(ConnectedSocket socket, entity_addr_t &addr) { - ldout(async_msgr->cct, 10) << __func__ << " sd=" << socket.fd() << dendl; + ldout(async_msgr->cct, 10) << __func__ << " sd=" << socket.fd() + << " on " << addr << dendl; assert(socket.fd() >= 0); std::lock_guard l(lock); diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index cd5ab7328ef96..58a5c1c5bd480 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -194,9 +194,9 @@ class AsyncConnection : public Connection { } // Only call when AsyncConnection first construct - void connect(const entity_addr_t& addr, int type) { + void connect(const entity_addrvec_t& addrs, int type) { set_peer_type(type); - set_peer_addr(addr); + set_peer_addrs(addrs); policy = msgr->get_policy(type); _connect(); } diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index 6e693a767cd49..7c7f8e2ae3d6d 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -542,20 +542,21 @@ void AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket, entity_ad lock.Unlock(); } -AsyncConnectionRef AsyncMessenger::create_connect(const entity_addr_t& addr, int type) +AsyncConnectionRef AsyncMessenger::create_connect( + const entity_addrvec_t& addrs, int type) { assert(lock.is_locked()); - assert(addr != my_addrs.legacy_addr()); + assert(addrs != my_addrs); - ldout(cct, 10) << __func__ << " " << addr + ldout(cct, 10) << __func__ << " " << addrs << ", creating connection and registering" << dendl; // create connection Worker *w = stack->get_worker(); AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w); - conn->connect(addr, type); - assert(!conns.count(addr)); - conns[addr] = conn; + conn->connect(addrs, type); + assert(!conns.count(addrs)); + conns[addrs] = conn; w->get_perf_counter()->inc(l_msgr_active_connections); return conn; @@ -569,11 +570,11 @@ ConnectionRef AsyncMessenger::get_connection(const entity_inst_t& dest) return local_connection; } - AsyncConnectionRef conn = _lookup_conn(dest.addr); + AsyncConnectionRef conn = _lookup_conn(entity_addrvec_t(dest.addr)); if (conn) { ldout(cct, 10) << __func__ << " " << dest << " existing " << conn << dendl; } else { - conn = create_connect(dest.addr, dest.name.type()); + conn = create_connect(entity_addrvec_t(dest.addr), dest.name.type()); ldout(cct, 10) << __func__ << " " << dest << " new " << conn << dendl; } @@ -606,7 +607,7 @@ int AsyncMessenger::_send_message(Message *m, const entity_inst_t& dest) return -EINVAL; } - AsyncConnectionRef conn = _lookup_conn(dest.addr); + AsyncConnectionRef conn = _lookup_conn(entity_addrvec_t(dest.addr)); submit_message(m, conn, dest.addr, dest.name.type()); return 0; } @@ -648,7 +649,7 @@ void AsyncMessenger::submit_message(Message *m, AsyncConnectionRef con, m->put(); } else { ldout(cct,20) << __func__ << " " << *m << " remote, " << dest_addr << ", new connection." << dendl; - con = create_connect(dest_addr, dest_type); + con = create_connect(entity_addrvec_t(dest_addr), dest_type); con->send_message(m); } } @@ -708,7 +709,7 @@ void AsyncMessenger::shutdown_connections(bool queue_reset) accepting_conns.clear(); while (!conns.empty()) { - ceph::unordered_map::iterator it = conns.begin(); + auto it = conns.begin(); AsyncConnectionRef p = it->second; ldout(cct, 5) << __func__ << " mark down " << it->first << " " << p << dendl; conns.erase(it); @@ -728,15 +729,15 @@ void AsyncMessenger::shutdown_connections(bool queue_reset) lock.Unlock(); } -void AsyncMessenger::mark_down(const entity_addr_t& addr) +void AsyncMessenger::mark_down_addrs(const entity_addrvec_t& addrs) { lock.Lock(); - AsyncConnectionRef p = _lookup_conn(addr); + AsyncConnectionRef p = _lookup_conn(addrs); if (p) { - ldout(cct, 1) << __func__ << " " << addr << " -- " << p << dendl; + ldout(cct, 1) << __func__ << " " << addrs << " -- " << p << dendl; p->stop(true); } else { - ldout(cct, 1) << __func__ << " " << addr << " -- connection dne" << dendl; + ldout(cct, 1) << __func__ << " " << addrs << " -- connection dne" << dendl; } lock.Unlock(); } @@ -810,7 +811,7 @@ int AsyncMessenger::reap_dead() auto it = deleted_conns.begin(); AsyncConnectionRef p = *it; ldout(cct, 5) << __func__ << " delete " << p << dendl; - auto conns_it = conns.find(p->peer_addrs.legacy_addr()); + auto conns_it = conns.find(p->peer_addrs); if (conns_it != conns.end() && conns_it->second == p) conns.erase(conns_it); accepting_conns.erase(p); diff --git a/src/msg/async/AsyncMessenger.h b/src/msg/async/AsyncMessenger.h index ee886a7d5109c..df748fbde713a 100644 --- a/src/msg/async/AsyncMessenger.h +++ b/src/msg/async/AsyncMessenger.h @@ -151,7 +151,10 @@ public: */ ConnectionRef get_connection(const entity_inst_t& dest) override; ConnectionRef get_loopback_connection() override; - void mark_down(const entity_addr_t& addr) override; + void mark_down(const entity_addr_t& addr) override { + mark_down_addrs(entity_addrvec_t(addr)); + } + void mark_down_addrs(const entity_addrvec_t& addrs) override; void mark_down_all() override { shutdown_connections(true); } @@ -189,13 +192,13 @@ private: * Initiate the connection. (This function returning does not guarantee * connection success.) * - * @param addr The address of the entity to connect to. + * @param addrs The address(es) of the entity to connect to. * @param type The peer type of the entity at the address. * * @return a pointer to the newly-created connection. Caller does not own a * reference; take one if you need it. */ - AsyncConnectionRef create_connect(const entity_addr_t& addr, int type); + AsyncConnectionRef create_connect(const entity_addrvec_t& addrs, int type); /** * Queue up a Message for delivery to the entity specified @@ -271,7 +274,7 @@ private: * NOTE: a Asyncconnection* with state CLOSED may still be in the map but is considered * invalid and can be replaced by anyone holding the msgr lock */ - ceph::unordered_map conns; + ceph::unordered_map conns; /** * list of connection are in teh process of accepting @@ -302,9 +305,9 @@ private: Cond stop_cond; bool stopped; - AsyncConnectionRef _lookup_conn(const entity_addr_t& k) { + AsyncConnectionRef _lookup_conn(const entity_addrvec_t& k) { assert(lock.is_locked()); - ceph::unordered_map::iterator p = conns.find(k); + auto p = conns.find(k); if (p == conns.end()) return NULL; @@ -341,14 +344,14 @@ public: /** * This wraps _lookup_conn. */ - AsyncConnectionRef lookup_conn(const entity_addr_t& k) { + AsyncConnectionRef lookup_conn(const entity_addrvec_t& k) { Mutex::Locker l(lock); return _lookup_conn(k); } int accept_conn(AsyncConnectionRef conn) { Mutex::Locker l(lock); - auto it = conns.find(conn->peer_addrs.legacy_addr()); + auto it = conns.find(conn->peer_addrs); if (it != conns.end()) { AsyncConnectionRef existing = it->second; @@ -362,7 +365,7 @@ public: return -1; } } - conns[conn->peer_addrs.legacy_addr()] = conn; + conns[conn->peer_addrs] = conn; conn->get_perf_counter()->inc(l_msgr_active_connections); accepting_conns.erase(conn); return 0; -- 2.39.5