From 0e7acc96c2a2b02616f1412ff030b33e6c02571e Mon Sep 17 00:00:00 2001 From: Patrick Donnelly Date: Wed, 20 Mar 2019 12:03:14 -0700 Subject: [PATCH] msg/async: avoid creating unnecessary AsyncConnectionRef Signed-off-by: Patrick Donnelly --- src/msg/async/AsyncMessenger.cc | 20 ++++++++++---------- src/msg/async/AsyncMessenger.h | 19 +++++++++++-------- src/msg/async/ProtocolV1.cc | 2 +- src/msg/async/ProtocolV1.h | 2 +- src/msg/async/ProtocolV2.cc | 4 ++-- src/msg/async/ProtocolV2.h | 4 ++-- 6 files changed, 27 insertions(+), 24 deletions(-) diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index aad3ce53f33..cf443eedd65 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -661,7 +661,7 @@ int AsyncMessenger::send_to(Message *m, int type, const entity_addrvec_t& addrs) } auto av = _filter_addrs(addrs); - AsyncConnectionRef conn = _lookup_conn(av); + const AsyncConnectionRef& conn = _lookup_conn(av); submit_message(m, conn, av, type); return 0; } @@ -689,7 +689,7 @@ ConnectionRef AsyncMessenger::connect_to(int type, const entity_addrvec_t& addrs return conn; } -void AsyncMessenger::submit_message(Message *m, AsyncConnectionRef con, +void AsyncMessenger::submit_message(Message *m, const AsyncConnectionRef& con, const entity_addrvec_t& dest_addrs, int dest_type) { @@ -730,8 +730,8 @@ void AsyncMessenger::submit_message(Message *m, AsyncConnectionRef con, } else { ldout(cct,20) << __func__ << " " << *m << " remote, " << dest_addrs << ", new connection." << dendl; - con = create_connect(dest_addrs, dest_type); - con->send_message(m); + auto&& new_con = create_connect(dest_addrs, dest_type); + new_con->send_message(m); } } @@ -816,10 +816,10 @@ void AsyncMessenger::shutdown_connections(bool queue_reset) void AsyncMessenger::mark_down_addrs(const entity_addrvec_t& addrs) { lock.Lock(); - AsyncConnectionRef p = _lookup_conn(addrs); - if (p) { - ldout(cct, 1) << __func__ << " " << addrs << " -- " << p << dendl; - p->stop(true); + const AsyncConnectionRef& conn = _lookup_conn(addrs); + if (conn) { + ldout(cct, 1) << __func__ << " " << addrs << " -- " << conn << dendl; + conn->stop(true); } else { ldout(cct, 1) << __func__ << " " << addrs << " -- connection dne" << dendl; } @@ -845,12 +845,12 @@ int AsyncMessenger::get_proto_version(int peer_type, bool connect) const return 0; } -int AsyncMessenger::accept_conn(AsyncConnectionRef conn) +int AsyncMessenger::accept_conn(const AsyncConnectionRef& conn) { Mutex::Locker l(lock); auto it = conns.find(*conn->peer_addrs); if (it != conns.end()) { - AsyncConnectionRef existing = it->second; + auto& existing = it->second; // lazy delete, see "deleted_conns" // If conn already in, we will return 0 diff --git a/src/msg/async/AsyncMessenger.h b/src/msg/async/AsyncMessenger.h index ca9e3294a9d..79e82b4264c 100644 --- a/src/msg/async/AsyncMessenger.h +++ b/src/msg/async/AsyncMessenger.h @@ -211,7 +211,7 @@ private: * @param dest_type The peer type of the address we're sending to * just drop silently under failure. */ - void submit_message(Message *m, AsyncConnectionRef con, + void submit_message(Message *m, const AsyncConnectionRef& con, const entity_addrvec_t& dest_addrs, int dest_type); void _finish_bind(const entity_addrvec_t& bind_addrs, @@ -305,17 +305,20 @@ private: Cond stop_cond; bool stopped; - AsyncConnectionRef _lookup_conn(const entity_addrvec_t& k) { + /* You must hold this->lock for the duration of use! */ + const auto& _lookup_conn(const entity_addrvec_t& k) { + static const AsyncConnectionRef nullref; ceph_assert(lock.is_locked()); auto p = conns.find(k); - if (p == conns.end()) - return NULL; + if (p == conns.end()) { + return nullref; + } // lazy delete, see "deleted_conns" Mutex::Locker l(deleted_lock); if (deleted_conns.erase(p->second)) { conns.erase(p); - return NULL; + return nullref; } return p->second; @@ -345,10 +348,10 @@ public: */ AsyncConnectionRef lookup_conn(const entity_addrvec_t& k) { Mutex::Locker l(lock); - return _lookup_conn(k); + return _lookup_conn(k); /* make new ref! */ } - int accept_conn(AsyncConnectionRef conn); + int accept_conn(const AsyncConnectionRef& conn); bool learned_addr(const entity_addr_t &peer_addr_for_me); void add_accept(Worker *w, ConnectedSocket cli_socket, const entity_addr_t &listen_addr, @@ -399,7 +402,7 @@ public: * * See "deleted_conns" */ - void unregister_conn(AsyncConnectionRef conn) { + void unregister_conn(const AsyncConnectionRef& conn) { Mutex::Locker l(deleted_lock); conn->get_perf_counter()->dec(l_msgr_active_connections); deleted_conns.emplace(std::move(conn)); diff --git a/src/msg/async/ProtocolV1.cc b/src/msg/async/ProtocolV1.cc index 0414863d1b6..a14eb337cbb 100644 --- a/src/msg/async/ProtocolV1.cc +++ b/src/msg/async/ProtocolV1.cc @@ -2212,7 +2212,7 @@ CtPtr ProtocolV1::handle_connect_message_reply_write(int r) { return CONTINUE(wait_connect_message); } -CtPtr ProtocolV1::replace(AsyncConnectionRef existing, +CtPtr ProtocolV1::replace(const AsyncConnectionRef& existing, ceph_msg_connect_reply &reply, bufferlist &authorizer_reply) { ldout(cct, 10) << __func__ << " accept replacing " << existing << dendl; diff --git a/src/msg/async/ProtocolV1.h b/src/msg/async/ProtocolV1.h index 6248a6f2e19..837af304208 100644 --- a/src/msg/async/ProtocolV1.h +++ b/src/msg/async/ProtocolV1.h @@ -284,7 +284,7 @@ protected: CtPtr send_connect_message_reply(char tag, ceph_msg_connect_reply &reply, bufferlist &authorizer_reply); CtPtr handle_connect_message_reply_write(int r); - CtPtr replace(AsyncConnectionRef existing, ceph_msg_connect_reply &reply, + CtPtr replace(const AsyncConnectionRef& existing, ceph_msg_connect_reply &reply, bufferlist &authorizer_reply); CtPtr open(ceph_msg_connect_reply &reply, bufferlist &authorizer_reply); CtPtr handle_ready_connect_message_reply_write(int r); diff --git a/src/msg/async/ProtocolV2.cc b/src/msg/async/ProtocolV2.cc index 373615f881c..22e60620e46 100644 --- a/src/msg/async/ProtocolV2.cc +++ b/src/msg/async/ProtocolV2.cc @@ -2525,7 +2525,7 @@ CtPtr ProtocolV2::handle_reconnect(ceph::bufferlist &payload) return reuse_connection(existing, exproto); } -CtPtr ProtocolV2::handle_existing_connection(AsyncConnectionRef existing) { +CtPtr ProtocolV2::handle_existing_connection(const AsyncConnectionRef& existing) { ldout(cct, 20) << __func__ << " existing=" << existing << dendl; std::lock_guard l(existing->lock); @@ -2626,7 +2626,7 @@ CtPtr ProtocolV2::handle_existing_connection(AsyncConnectionRef existing) { } } -CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing, +CtPtr ProtocolV2::reuse_connection(const AsyncConnectionRef& existing, ProtocolV2 *exproto) { ldout(cct, 20) << __func__ << " existing=" << existing << " reconnect=" << reconnecting << dendl; diff --git a/src/msg/async/ProtocolV2.h b/src/msg/async/ProtocolV2.h index 9ea40de8459..070f6910b78 100644 --- a/src/msg/async/ProtocolV2.h +++ b/src/msg/async/ProtocolV2.h @@ -242,8 +242,8 @@ private: Ct *handle_client_ident(ceph::bufferlist &payload); Ct *handle_ident_missing_features_write(int r); Ct *handle_reconnect(ceph::bufferlist &payload); - Ct *handle_existing_connection(AsyncConnectionRef existing); - Ct *reuse_connection(AsyncConnectionRef existing, + Ct *handle_existing_connection(const AsyncConnectionRef& existing); + Ct *reuse_connection(const AsyncConnectionRef& existing, ProtocolV2 *exproto); Ct *send_server_ident(); Ct *send_reconnect_ok(); -- 2.39.5