}
auto av = _filter_addrs(addrs);
- AsyncConnectionRef conn = _lookup_conn(av);
+ const AsyncConnectionRef& conn = _lookup_conn(av);
submit_message(m, conn, av, type);
return 0;
}
return conn;
}
-void AsyncMessenger::submit_message(Message *m, AsyncConnectionRef con,
+void AsyncMessenger::submit_message(Message *m, const AsyncConnectionRef& con,
const entity_addrvec_t& dest_addrs,
int dest_type)
{
} else {
ldout(cct,20) << __func__ << " " << *m << " remote, " << dest_addrs
<< ", new connection." << dendl;
- con = create_connect(dest_addrs, dest_type);
- con->send_message(m);
+ auto&& new_con = create_connect(dest_addrs, dest_type);
+ new_con->send_message(m);
}
}
void AsyncMessenger::mark_down_addrs(const entity_addrvec_t& addrs)
{
lock.Lock();
- AsyncConnectionRef p = _lookup_conn(addrs);
- if (p) {
- ldout(cct, 1) << __func__ << " " << addrs << " -- " << p << dendl;
- p->stop(true);
+ const AsyncConnectionRef& conn = _lookup_conn(addrs);
+ if (conn) {
+ ldout(cct, 1) << __func__ << " " << addrs << " -- " << conn << dendl;
+ conn->stop(true);
} else {
ldout(cct, 1) << __func__ << " " << addrs << " -- connection dne" << dendl;
}
return 0;
}
-int AsyncMessenger::accept_conn(AsyncConnectionRef conn)
+int AsyncMessenger::accept_conn(const AsyncConnectionRef& conn)
{
Mutex::Locker l(lock);
auto it = conns.find(*conn->peer_addrs);
if (it != conns.end()) {
- AsyncConnectionRef existing = it->second;
+ auto& existing = it->second;
// lazy delete, see "deleted_conns"
// If conn already in, we will return 0
* @param dest_type The peer type of the address we're sending to
* just drop silently under failure.
*/
- void submit_message(Message *m, AsyncConnectionRef con,
+ void submit_message(Message *m, const AsyncConnectionRef& con,
const entity_addrvec_t& dest_addrs, int dest_type);
void _finish_bind(const entity_addrvec_t& bind_addrs,
Cond stop_cond;
bool stopped;
- AsyncConnectionRef _lookup_conn(const entity_addrvec_t& k) {
+ /* You must hold this->lock for the duration of use! */
+ const auto& _lookup_conn(const entity_addrvec_t& k) {
+ static const AsyncConnectionRef nullref;
ceph_assert(lock.is_locked());
auto p = conns.find(k);
- if (p == conns.end())
- return NULL;
+ if (p == conns.end()) {
+ return nullref;
+ }
// lazy delete, see "deleted_conns"
Mutex::Locker l(deleted_lock);
if (deleted_conns.erase(p->second)) {
conns.erase(p);
- return NULL;
+ return nullref;
}
return p->second;
*/
AsyncConnectionRef lookup_conn(const entity_addrvec_t& k) {
Mutex::Locker l(lock);
- return _lookup_conn(k);
+ return _lookup_conn(k); /* make new ref! */
}
- int accept_conn(AsyncConnectionRef conn);
+ int accept_conn(const AsyncConnectionRef& conn);
bool learned_addr(const entity_addr_t &peer_addr_for_me);
void add_accept(Worker *w, ConnectedSocket cli_socket,
const entity_addr_t &listen_addr,
*
* See "deleted_conns"
*/
- void unregister_conn(AsyncConnectionRef conn) {
+ void unregister_conn(const AsyncConnectionRef& conn) {
Mutex::Locker l(deleted_lock);
conn->get_perf_counter()->dec(l_msgr_active_connections);
deleted_conns.emplace(std::move(conn));
return CONTINUE(wait_connect_message);
}
-CtPtr ProtocolV1::replace(AsyncConnectionRef existing,
+CtPtr ProtocolV1::replace(const AsyncConnectionRef& existing,
ceph_msg_connect_reply &reply,
bufferlist &authorizer_reply) {
ldout(cct, 10) << __func__ << " accept replacing " << existing << dendl;
CtPtr send_connect_message_reply(char tag, ceph_msg_connect_reply &reply,
bufferlist &authorizer_reply);
CtPtr handle_connect_message_reply_write(int r);
- CtPtr replace(AsyncConnectionRef existing, ceph_msg_connect_reply &reply,
+ CtPtr replace(const AsyncConnectionRef& existing, ceph_msg_connect_reply &reply,
bufferlist &authorizer_reply);
CtPtr open(ceph_msg_connect_reply &reply, bufferlist &authorizer_reply);
CtPtr handle_ready_connect_message_reply_write(int r);
return reuse_connection(existing, exproto);
}
-CtPtr ProtocolV2::handle_existing_connection(AsyncConnectionRef existing) {
+CtPtr ProtocolV2::handle_existing_connection(const AsyncConnectionRef& existing) {
ldout(cct, 20) << __func__ << " existing=" << existing << dendl;
std::lock_guard<std::mutex> l(existing->lock);
}
}
-CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing,
+CtPtr ProtocolV2::reuse_connection(const AsyncConnectionRef& existing,
ProtocolV2 *exproto) {
ldout(cct, 20) << __func__ << " existing=" << existing
<< " reconnect=" << reconnecting << dendl;
Ct<ProtocolV2> *handle_client_ident(ceph::bufferlist &payload);
Ct<ProtocolV2> *handle_ident_missing_features_write(int r);
Ct<ProtocolV2> *handle_reconnect(ceph::bufferlist &payload);
- Ct<ProtocolV2> *handle_existing_connection(AsyncConnectionRef existing);
- Ct<ProtocolV2> *reuse_connection(AsyncConnectionRef existing,
+ Ct<ProtocolV2> *handle_existing_connection(const AsyncConnectionRef& existing);
+ Ct<ProtocolV2> *reuse_connection(const AsyncConnectionRef& existing,
ProtocolV2 *exproto);
Ct<ProtocolV2> *send_server_ident();
Ct<ProtocolV2> *send_reconnect_ok();