From f1c9bd17666cfeadb38ffe7d417039bfbae17c0d Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 7 Jan 2019 20:38:48 -0600 Subject: [PATCH] msg/Connection: protect peer_addrs with safe_item_history<> The peer_addrs can be updated during the initial connection handshake, but we don't want users (e.g., dout()) to race with an update and wander off into bad memory. We use the same strategy for Messenger's my_addrs. Fixes: http://tracker.ceph.com/issues/37807 Signed-off-by: Sage Weil --- src/mds/MDSDaemon.cc | 2 +- src/msg/Connection.h | 7 ++++--- src/msg/async/AsyncConnection.cc | 2 +- src/msg/async/AsyncMessenger.cc | 10 +++++----- src/msg/async/ProtocolV1.cc | 12 ++++++------ src/msg/async/ProtocolV2.cc | 16 ++++++++-------- src/msg/simple/PipeConnection.h | 2 +- 7 files changed, 26 insertions(+), 25 deletions(-) diff --git a/src/mds/MDSDaemon.cc b/src/mds/MDSDaemon.cc index d4295e6c267..f4852315304 100644 --- a/src/mds/MDSDaemon.cc +++ b/src/mds/MDSDaemon.cc @@ -631,7 +631,7 @@ void MDSDaemon::handle_command(const MCommand::const_ref &m) if (!session->auth_caps.allow_all()) { dout(1) << __func__ << ": received command from client without `tell` capability: " - << m->get_connection()->peer_addrs << dendl; + << *m->get_connection()->peer_addrs << dendl; ss << "permission denied"; r = -EPERM; diff --git a/src/msg/Connection.h b/src/msg/Connection.h index 735c2b039f7..90d3459c1da 100644 --- a/src/msg/Connection.h +++ b/src/msg/Connection.h @@ -28,6 +28,7 @@ #include "include/ceph_assert.h" // Because intusive_ptr clobbers our assert... #include "include/buffer.h" #include "include/types.h" +#include "common/item_history.h" #include "msg/MessageRef.h" @@ -42,7 +43,7 @@ struct Connection : public RefCountedObject { Messenger *msgr; RefCountedPtr priv; int peer_type; - entity_addrvec_t peer_addrs; + safe_item_history peer_addrs; utime_t last_keepalive, last_keepalive_ack; private: uint64_t features; @@ -177,10 +178,10 @@ public: virtual entity_addr_t get_peer_socket_addr() const = 0; entity_addr_t get_peer_addr() const { - return peer_addrs.front(); + return peer_addrs->front(); } const entity_addrvec_t& get_peer_addrs() const { - return peer_addrs; + return *peer_addrs; } void set_peer_addr(const entity_addr_t& a) { peer_addrs = entity_addrvec_t(a); diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 87c9cb7c9e5..cdfed844eca 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -37,7 +37,7 @@ #define dout_prefix _conn_prefix(_dout) ostream& AsyncConnection::_conn_prefix(std::ostream *_dout) { return *_dout << "-- " << async_msgr->get_myaddrs() << " >> " - << peer_addrs << " conn(" << this + << *peer_addrs << " conn(" << this << (msgr2 ? " msgr2" : " legacy") << " :" << port << " s=" << get_state_name(state) diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index d310e0cd7cc..852402a9cfd 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -594,7 +594,7 @@ AsyncConnectionRef AsyncMessenger::create_connect( conn->connect(addrs, type, target); ceph_assert(!conns.count(addrs)); ldout(cct, 10) << __func__ << " " << conn << " " << addrs << " " - << conn->peer_addrs << dendl; + << *conn->peer_addrs << dendl; conns[addrs] = conn; w->get_perf_counter()->inc(l_msgr_active_connections); @@ -816,7 +816,7 @@ int AsyncMessenger::get_proto_version(int peer_type, bool connect) const int AsyncMessenger::accept_conn(AsyncConnectionRef conn) { Mutex::Locker l(lock); - auto it = conns.find(conn->peer_addrs); + auto it = conns.find(*conn->peer_addrs); if (it != conns.end()) { AsyncConnectionRef existing = it->second; @@ -830,8 +830,8 @@ int AsyncMessenger::accept_conn(AsyncConnectionRef conn) return -1; } } - ldout(cct, 10) << __func__ << " " << conn << " " << conn->peer_addrs << dendl; - conns[conn->peer_addrs] = conn; + ldout(cct, 10) << __func__ << " " << conn << " " << *conn->peer_addrs << dendl; + conns[*conn->peer_addrs] = conn; conn->get_perf_counter()->inc(l_msgr_active_connections); accepting_conns.erase(conn); return 0; @@ -891,7 +891,7 @@ int AsyncMessenger::reap_dead() auto it = deleted_conns.begin(); AsyncConnectionRef p = *it; ldout(cct, 5) << __func__ << " delete " << p << dendl; - auto conns_it = conns.find(p->peer_addrs); + auto conns_it = conns.find(*p->peer_addrs); if (conns_it != conns.end() && conns_it->second == p) conns.erase(conns_it); accepting_conns.erase(p); diff --git a/src/msg/async/ProtocolV1.cc b/src/msg/async/ProtocolV1.cc index ab4b526105f..c29c9e04ed6 100644 --- a/src/msg/async/ProtocolV1.cc +++ b/src/msg/async/ProtocolV1.cc @@ -15,7 +15,7 @@ #define dout_prefix _conn_prefix(_dout) ostream &ProtocolV1::_conn_prefix(std::ostream *_dout) { return *_dout << "--1- " << messenger->get_myaddrs().legacy_addr() << " >> " - << connection->peer_addrs.legacy_addr() << " conn(" + << connection->peer_addrs->legacy_addr() << " conn(" << connection << (connection->msgr2 ? " msgr2" : " legacy") << " :" << connection->port << " s=" << get_state_name(state) << " pgs=" << peer_global_seq << " cs=" << connect_seq @@ -1356,7 +1356,7 @@ CtPtr ProtocolV1::handle_server_banner_and_identify(char *buffer, int r) { ldout(cct, 20) << __func__ << " connect read peer addr " << paddr << " on socket " << connection->cs.fd() << dendl; - entity_addr_t peer_addr = connection->peer_addrs.legacy_addr(); + entity_addr_t peer_addr = connection->peer_addrs->legacy_addr(); if (peer_addr != paddr) { if (paddr.is_blank_ip() && peer_addr.get_port() == paddr.get_port() && peer_addr.get_nonce() == paddr.get_nonce()) { @@ -1953,7 +1953,7 @@ CtPtr ProtocolV1::handle_connect_message_2() { ldout(cct, 10) << __func__ << " accept setting up session_security." << dendl; // existing? - AsyncConnectionRef existing = messenger->lookup_conn(connection->peer_addrs); + AsyncConnectionRef existing = messenger->lookup_conn(*connection->peer_addrs); connection->inject_delay(); @@ -2082,7 +2082,7 @@ CtPtr ProtocolV1::handle_connect_message_2() { } // connection race? - if (connection->peer_addrs.legacy_addr() < messenger->get_myaddr() || + if (connection->peer_addrs->legacy_addr() < messenger->get_myaddr() || existing->policy.server) { // incoming wins ldout(cct, 10) << __func__ << " accept connection race, existing " @@ -2096,7 +2096,7 @@ CtPtr ProtocolV1::handle_connect_message_2() { << __func__ << " accept connection race, existing " << existing << ".cseq " << exproto->connect_seq << " == " << connect_msg.connect_seq << ", sending WAIT" << dendl; - ceph_assert(connection->peer_addrs.legacy_addr() > + ceph_assert(connection->peer_addrs->legacy_addr() > messenger->get_myaddr()); existing->lock.unlock(); return send_connect_message_reply(CEPH_MSGR_TAG_WAIT, reply, @@ -2375,7 +2375,7 @@ CtPtr ProtocolV1::open(ceph_msg_connect_reply &reply, replacing = false; if (r < 0) { ldout(cct, 1) << __func__ << " existing race replacing process for addr = " - << connection->peer_addrs.legacy_addr() + << connection->peer_addrs->legacy_addr() << " just fail later one(this)" << dendl; ldout(cct, 10) << "accept fault after register" << dendl; connection->inject_delay(); diff --git a/src/msg/async/ProtocolV2.cc b/src/msg/async/ProtocolV2.cc index 99b424e6dbc..9872ee6b57a 100644 --- a/src/msg/async/ProtocolV2.cc +++ b/src/msg/async/ProtocolV2.cc @@ -15,7 +15,7 @@ #define dout_prefix _conn_prefix(_dout) ostream &ProtocolV2::_conn_prefix(std::ostream *_dout) { return *_dout << "--2- " << messenger->get_myaddrs() << " >> " - << connection->peer_addrs << " conn(" + << *connection->peer_addrs << " conn(" << connection << (connection->msgr2 ? " msgr2" : " legacy") << " :" << connection->port << " s=" << get_state_name(state) << " pgs=" << peer_global_seq << " cs=" << connect_seq @@ -1376,10 +1376,10 @@ CtPtr ProtocolV2::handle_server_addrvec_and_identify(char *buffer, int r) { // may be trying to connect to a v2 addr, and the remote may // identify themselves by several other addrs as well. This happens // with mon discovery. - if (!connection->peer_addrs.contains(peer_addr)) { + if (!connection->peer_addrs->contains(peer_addr)) { ldout(cct, 10) << __func__ << " server claims to be " << peer_addr << " (of " << paddrs << "), but we are trying to reach " - << connection->peer_addrs << dendl; + << *connection->peer_addrs << dendl; return _fault(); } @@ -2000,9 +2000,9 @@ CtPtr ProtocolV2::handle_connect_message_2() { ldout(cct, 10) << __func__ << " accept setting up session_security." << dendl; // existing? - AsyncConnectionRef existing = messenger->lookup_conn(connection->peer_addrs); + AsyncConnectionRef existing = messenger->lookup_conn(*connection->peer_addrs); ldout(cct, 10) << __func__ << " existing " << existing - << " on " << connection->peer_addrs << dendl; + << " on " << *connection->peer_addrs << dendl; connection->inject_delay(); connection->lock.lock(); @@ -2130,7 +2130,7 @@ CtPtr ProtocolV2::handle_connect_message_2() { } // connection race? - if (connection->peer_addrs.legacy_addr() < messenger->get_myaddr() || + if (connection->peer_addrs->legacy_addr() < messenger->get_myaddr() || existing->policy.server) { // incoming wins ldout(cct, 10) << __func__ << " accept connection race, existing " @@ -2144,7 +2144,7 @@ CtPtr ProtocolV2::handle_connect_message_2() { << __func__ << " accept connection race, existing " << existing << ".cseq " << exproto->connect_seq << " == " << connect_msg.connect_seq << ", sending WAIT" << dendl; - ceph_assert(connection->peer_addrs.legacy_addr() > + ceph_assert(connection->peer_addrs->legacy_addr() > messenger->get_myaddr()); existing->lock.unlock(); return send_connect_message_reply(CEPH_MSGR_TAG_WAIT, reply, @@ -2414,7 +2414,7 @@ CtPtr ProtocolV2::open(ceph_msg_connect_reply &reply, replacing = false; if (r < 0) { ldout(cct, 1) << __func__ << " existing race replacing process for addr = " - << connection->peer_addrs.legacy_addr() + << connection->peer_addrs->legacy_addr() << " just fail later one(this)" << dendl; ldout(cct, 10) << "accept fault after register" << dendl; connection->inject_delay(); diff --git a/src/msg/simple/PipeConnection.h b/src/msg/simple/PipeConnection.h index 4e5c7c37c71..e54604405cf 100644 --- a/src/msg/simple/PipeConnection.h +++ b/src/msg/simple/PipeConnection.h @@ -49,7 +49,7 @@ public: void mark_disposable() override; entity_addr_t get_peer_socket_addr() const override { - return peer_addrs.front(); + return peer_addrs->front(); } }; /* PipeConnection */ -- 2.39.5