lock.Unlock();
}
-AsyncConnectionRef AsyncMessenger::create_connect(const entity_addr_t& addr, int type)
+AsyncConnectionRef AsyncMessenger::create_connect(
+ const entity_addrvec_t& addrs, int type)
{
assert(lock.is_locked());
- assert(addr != my_addrs.legacy_addr());
+ assert(addrs != my_addrs);
- ldout(cct, 10) << __func__ << " " << addr
+ ldout(cct, 10) << __func__ << " " << addrs
<< ", creating connection and registering" << dendl;
// create connection
Worker *w = stack->get_worker();
AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w);
- conn->connect(addr, type);
- assert(!conns.count(addr));
- conns[addr] = conn;
+ conn->connect(addrs, type);
+ assert(!conns.count(addrs));
+ conns[addrs] = conn;
w->get_perf_counter()->inc(l_msgr_active_connections);
return conn;
return local_connection;
}
- AsyncConnectionRef conn = _lookup_conn(dest.addr);
+ AsyncConnectionRef conn = _lookup_conn(entity_addrvec_t(dest.addr));
if (conn) {
ldout(cct, 10) << __func__ << " " << dest << " existing " << conn << dendl;
} else {
- conn = create_connect(dest.addr, dest.name.type());
+ conn = create_connect(entity_addrvec_t(dest.addr), dest.name.type());
ldout(cct, 10) << __func__ << " " << dest << " new " << conn << dendl;
}
return -EINVAL;
}
- AsyncConnectionRef conn = _lookup_conn(dest.addr);
+ AsyncConnectionRef conn = _lookup_conn(entity_addrvec_t(dest.addr));
submit_message(m, conn, dest.addr, dest.name.type());
return 0;
}
m->put();
} else {
ldout(cct,20) << __func__ << " " << *m << " remote, " << dest_addr << ", new connection." << dendl;
- con = create_connect(dest_addr, dest_type);
+ con = create_connect(entity_addrvec_t(dest_addr), dest_type);
con->send_message(m);
}
}
accepting_conns.clear();
while (!conns.empty()) {
- ceph::unordered_map<entity_addr_t, AsyncConnectionRef>::iterator it = conns.begin();
+ auto it = conns.begin();
AsyncConnectionRef p = it->second;
ldout(cct, 5) << __func__ << " mark down " << it->first << " " << p << dendl;
conns.erase(it);
lock.Unlock();
}
-void AsyncMessenger::mark_down(const entity_addr_t& addr)
+void AsyncMessenger::mark_down_addrs(const entity_addrvec_t& addrs)
{
lock.Lock();
- AsyncConnectionRef p = _lookup_conn(addr);
+ AsyncConnectionRef p = _lookup_conn(addrs);
if (p) {
- ldout(cct, 1) << __func__ << " " << addr << " -- " << p << dendl;
+ ldout(cct, 1) << __func__ << " " << addrs << " -- " << p << dendl;
p->stop(true);
} else {
- ldout(cct, 1) << __func__ << " " << addr << " -- connection dne" << dendl;
+ ldout(cct, 1) << __func__ << " " << addrs << " -- connection dne" << dendl;
}
lock.Unlock();
}
auto it = deleted_conns.begin();
AsyncConnectionRef p = *it;
ldout(cct, 5) << __func__ << " delete " << p << dendl;
- auto conns_it = conns.find(p->peer_addrs.legacy_addr());
+ auto conns_it = conns.find(p->peer_addrs);
if (conns_it != conns.end() && conns_it->second == p)
conns.erase(conns_it);
accepting_conns.erase(p);
*/
ConnectionRef get_connection(const entity_inst_t& dest) override;
ConnectionRef get_loopback_connection() override;
- void mark_down(const entity_addr_t& addr) override;
+ void mark_down(const entity_addr_t& addr) override {
+ mark_down_addrs(entity_addrvec_t(addr));
+ }
+ void mark_down_addrs(const entity_addrvec_t& addrs) override;
void mark_down_all() override {
shutdown_connections(true);
}
* Initiate the connection. (This function returning does not guarantee
* connection success.)
*
- * @param addr The address of the entity to connect to.
+ * @param addrs The address(es) of the entity to connect to.
* @param type The peer type of the entity at the address.
*
* @return a pointer to the newly-created connection. Caller does not own a
* reference; take one if you need it.
*/
- AsyncConnectionRef create_connect(const entity_addr_t& addr, int type);
+ AsyncConnectionRef create_connect(const entity_addrvec_t& addrs, int type);
/**
* Queue up a Message for delivery to the entity specified
* NOTE: a Asyncconnection* with state CLOSED may still be in the map but is considered
* invalid and can be replaced by anyone holding the msgr lock
*/
- ceph::unordered_map<entity_addr_t, AsyncConnectionRef> conns;
+ ceph::unordered_map<entity_addrvec_t, AsyncConnectionRef> conns;
/**
* list of connection are in teh process of accepting
Cond stop_cond;
bool stopped;
- AsyncConnectionRef _lookup_conn(const entity_addr_t& k) {
+ AsyncConnectionRef _lookup_conn(const entity_addrvec_t& k) {
assert(lock.is_locked());
- ceph::unordered_map<entity_addr_t, AsyncConnectionRef>::iterator p = conns.find(k);
+ auto p = conns.find(k);
if (p == conns.end())
return NULL;
/**
* This wraps _lookup_conn.
*/
- AsyncConnectionRef lookup_conn(const entity_addr_t& k) {
+ AsyncConnectionRef lookup_conn(const entity_addrvec_t& k) {
Mutex::Locker l(lock);
return _lookup_conn(k);
}
int accept_conn(AsyncConnectionRef conn) {
Mutex::Locker l(lock);
- auto it = conns.find(conn->peer_addrs.legacy_addr());
+ auto it = conns.find(conn->peer_addrs);
if (it != conns.end()) {
AsyncConnectionRef existing = it->second;
return -1;
}
}
- conns[conn->peer_addrs.legacy_addr()] = conn;
+ conns[conn->peer_addrs] = conn;
conn->get_perf_counter()->inc(l_msgr_active_connections);
accepting_conns.erase(conn);
return 0;