]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/Connection: protect peer_addrs with safe_item_history<>
authorSage Weil <sage@redhat.com>
Tue, 8 Jan 2019 02:38:48 +0000 (20:38 -0600)
committerSage Weil <sage@redhat.com>
Tue, 8 Jan 2019 14:46:25 +0000 (08:46 -0600)
The peer_addrs can be updated during the initial connection handshake,
but we don't want users (e.g., dout()) to race with an update and
wander off into bad memory.

We use the same strategy for Messenger's my_addrs.

Fixes: http://tracker.ceph.com/issues/37807
Signed-off-by: Sage Weil <sage@redhat.com>
src/mds/MDSDaemon.cc
src/msg/Connection.h
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncMessenger.cc
src/msg/async/ProtocolV1.cc
src/msg/async/ProtocolV2.cc
src/msg/simple/PipeConnection.h

index d4295e6c2671aed0be0218c5b312184d8d9ea6bb..f48523153047ae0c7ad9a6b353f6969d02dbdf33 100644 (file)
@@ -631,7 +631,7 @@ void MDSDaemon::handle_command(const MCommand::const_ref &m)
   if (!session->auth_caps.allow_all()) {
     dout(1) << __func__
       << ": received command from client without `tell` capability: "
-      << m->get_connection()->peer_addrs << dendl;
+      << *m->get_connection()->peer_addrs << dendl;
 
     ss << "permission denied";
     r = -EPERM;
index 735c2b039f74ea3dd1d916fe2cf1cb398b64ce07..90d3459c1dac7e79ba05907ed5d2d00e0c45dd66 100644 (file)
@@ -28,6 +28,7 @@
 #include "include/ceph_assert.h" // Because intusive_ptr clobbers our assert...
 #include "include/buffer.h"
 #include "include/types.h"
+#include "common/item_history.h"
 #include "msg/MessageRef.h"
 
 
@@ -42,7 +43,7 @@ struct Connection : public RefCountedObject {
   Messenger *msgr;
   RefCountedPtr priv;
   int peer_type;
-  entity_addrvec_t peer_addrs;
+  safe_item_history<entity_addrvec_t> peer_addrs;
   utime_t last_keepalive, last_keepalive_ack;
 private:
   uint64_t features;
@@ -177,10 +178,10 @@ public:
   virtual entity_addr_t get_peer_socket_addr() const = 0;
 
   entity_addr_t get_peer_addr() const {
-    return peer_addrs.front();
+    return peer_addrs->front();
   }
   const entity_addrvec_t& get_peer_addrs() const {
-    return peer_addrs;
+    return *peer_addrs;
   }
   void set_peer_addr(const entity_addr_t& a) {
     peer_addrs = entity_addrvec_t(a);
index 87c9cb7c9e5076b8468857fb3b78b826f5fd1451..cdfed844eca584677325f28716fdf90f5d7412ac 100644 (file)
@@ -37,7 +37,7 @@
 #define dout_prefix _conn_prefix(_dout)
 ostream& AsyncConnection::_conn_prefix(std::ostream *_dout) {
   return *_dout << "-- " << async_msgr->get_myaddrs() << " >> "
-               << peer_addrs << " conn(" << this
+               << *peer_addrs << " conn(" << this
                << (msgr2 ? " msgr2" : " legacy")
                 << " :" << port
                 << " s=" << get_state_name(state)
index d310e0cd7cca2a9b3ff412698bdd4f758493d868..852402a9cfd29ee2404e788974ecc78013c8df79 100644 (file)
@@ -594,7 +594,7 @@ AsyncConnectionRef AsyncMessenger::create_connect(
   conn->connect(addrs, type, target);
   ceph_assert(!conns.count(addrs));
   ldout(cct, 10) << __func__ << " " << conn << " " << addrs << " "
-                << conn->peer_addrs << dendl;
+                << *conn->peer_addrs << dendl;
   conns[addrs] = conn;
   w->get_perf_counter()->inc(l_msgr_active_connections);
 
@@ -816,7 +816,7 @@ int AsyncMessenger::get_proto_version(int peer_type, bool connect) const
 int AsyncMessenger::accept_conn(AsyncConnectionRef conn)
 {
   Mutex::Locker l(lock);
-  auto it = conns.find(conn->peer_addrs);
+  auto it = conns.find(*conn->peer_addrs);
   if (it != conns.end()) {
     AsyncConnectionRef existing = it->second;
 
@@ -830,8 +830,8 @@ int AsyncMessenger::accept_conn(AsyncConnectionRef conn)
       return -1;
     }
   }
-  ldout(cct, 10) << __func__ << " " << conn << " " << conn->peer_addrs << dendl;
-  conns[conn->peer_addrs] = conn;
+  ldout(cct, 10) << __func__ << " " << conn << " " << *conn->peer_addrs << dendl;
+  conns[*conn->peer_addrs] = conn;
   conn->get_perf_counter()->inc(l_msgr_active_connections);
   accepting_conns.erase(conn);
   return 0;
@@ -891,7 +891,7 @@ int AsyncMessenger::reap_dead()
     auto it = deleted_conns.begin();
     AsyncConnectionRef p = *it;
     ldout(cct, 5) << __func__ << " delete " << p << dendl;
-    auto conns_it = conns.find(p->peer_addrs);
+    auto conns_it = conns.find(*p->peer_addrs);
     if (conns_it != conns.end() && conns_it->second == p)
       conns.erase(conns_it);
     accepting_conns.erase(p);
index ab4b526105f1ca5d82a285adc34e2e28e747d8c4..c29c9e04ed6d3563eb7d1812cbc1ecb290b4ba68 100644 (file)
@@ -15,7 +15,7 @@
 #define dout_prefix _conn_prefix(_dout)
 ostream &ProtocolV1::_conn_prefix(std::ostream *_dout) {
   return *_dout << "--1- " << messenger->get_myaddrs().legacy_addr() << " >> "
-                << connection->peer_addrs.legacy_addr() << " conn("
+                << connection->peer_addrs->legacy_addr() << " conn("
                 << connection << (connection->msgr2 ? " msgr2" : " legacy")
                 << " :" << connection->port << " s=" << get_state_name(state)
                 << " pgs=" << peer_global_seq << " cs=" << connect_seq
@@ -1356,7 +1356,7 @@ CtPtr ProtocolV1::handle_server_banner_and_identify(char *buffer, int r) {
   ldout(cct, 20) << __func__ << " connect read peer addr " << paddr
                  << " on socket " << connection->cs.fd() << dendl;
 
-  entity_addr_t peer_addr = connection->peer_addrs.legacy_addr();
+  entity_addr_t peer_addr = connection->peer_addrs->legacy_addr();
   if (peer_addr != paddr) {
     if (paddr.is_blank_ip() && peer_addr.get_port() == paddr.get_port() &&
         peer_addr.get_nonce() == paddr.get_nonce()) {
@@ -1953,7 +1953,7 @@ CtPtr ProtocolV1::handle_connect_message_2() {
   ldout(cct, 10) << __func__ << " accept setting up session_security." << dendl;
 
   // existing?
-  AsyncConnectionRef existing = messenger->lookup_conn(connection->peer_addrs);
+  AsyncConnectionRef existing = messenger->lookup_conn(*connection->peer_addrs);
 
   connection->inject_delay();
 
@@ -2082,7 +2082,7 @@ CtPtr ProtocolV1::handle_connect_message_2() {
       }
 
       // connection race?
-      if (connection->peer_addrs.legacy_addr() < messenger->get_myaddr() ||
+      if (connection->peer_addrs->legacy_addr() < messenger->get_myaddr() ||
           existing->policy.server) {
         // incoming wins
         ldout(cct, 10) << __func__ << " accept connection race, existing "
@@ -2096,7 +2096,7 @@ CtPtr ProtocolV1::handle_connect_message_2() {
             << __func__ << " accept connection race, existing " << existing
             << ".cseq " << exproto->connect_seq
             << " == " << connect_msg.connect_seq << ", sending WAIT" << dendl;
-        ceph_assert(connection->peer_addrs.legacy_addr() >
+        ceph_assert(connection->peer_addrs->legacy_addr() >
                     messenger->get_myaddr());
         existing->lock.unlock();
         return send_connect_message_reply(CEPH_MSGR_TAG_WAIT, reply,
@@ -2375,7 +2375,7 @@ CtPtr ProtocolV1::open(ceph_msg_connect_reply &reply,
   replacing = false;
   if (r < 0) {
     ldout(cct, 1) << __func__ << " existing race replacing process for addr = "
-                  << connection->peer_addrs.legacy_addr()
+                  << connection->peer_addrs->legacy_addr()
                   << " just fail later one(this)" << dendl;
     ldout(cct, 10) << "accept fault after register" << dendl;
     connection->inject_delay();
index 99b424e6dbc3c1aea9a0d4704b64d8aa695966e3..9872ee6b57a5ac3e9bdca2183a48de104085670c 100644 (file)
@@ -15,7 +15,7 @@
 #define dout_prefix _conn_prefix(_dout)
 ostream &ProtocolV2::_conn_prefix(std::ostream *_dout) {
   return *_dout << "--2- " << messenger->get_myaddrs() << " >> "
-                << connection->peer_addrs << " conn("
+                << *connection->peer_addrs << " conn("
                 << connection << (connection->msgr2 ? " msgr2" : " legacy")
                 << " :" << connection->port << " s=" << get_state_name(state)
                 << " pgs=" << peer_global_seq << " cs=" << connect_seq
@@ -1376,10 +1376,10 @@ CtPtr ProtocolV2::handle_server_addrvec_and_identify(char *buffer, int r) {
   // may be trying to connect to a v2 addr, and the remote may
   // identify themselves by several other addrs as well.  This happens
   // with mon discovery.
-  if (!connection->peer_addrs.contains(peer_addr)) {
+  if (!connection->peer_addrs->contains(peer_addr)) {
     ldout(cct, 10) << __func__ << " server claims to be " << peer_addr
                   << " (of " << paddrs << "), but we are trying to reach "
-                   << connection->peer_addrs << dendl;
+                   << *connection->peer_addrs << dendl;
     return _fault();
   }
 
@@ -2000,9 +2000,9 @@ CtPtr ProtocolV2::handle_connect_message_2() {
   ldout(cct, 10) << __func__ << " accept setting up session_security." << dendl;
 
   // existing?
-  AsyncConnectionRef existing = messenger->lookup_conn(connection->peer_addrs);
+  AsyncConnectionRef existing = messenger->lookup_conn(*connection->peer_addrs);
   ldout(cct, 10) << __func__ << " existing " << existing
-                << " on " << connection->peer_addrs << dendl;
+                << " on " << *connection->peer_addrs << dendl;
   connection->inject_delay();
 
   connection->lock.lock();
@@ -2130,7 +2130,7 @@ CtPtr ProtocolV2::handle_connect_message_2() {
       }
 
       // connection race?
-      if (connection->peer_addrs.legacy_addr() < messenger->get_myaddr() ||
+      if (connection->peer_addrs->legacy_addr() < messenger->get_myaddr() ||
           existing->policy.server) {
         // incoming wins
         ldout(cct, 10) << __func__ << " accept connection race, existing "
@@ -2144,7 +2144,7 @@ CtPtr ProtocolV2::handle_connect_message_2() {
             << __func__ << " accept connection race, existing " << existing
             << ".cseq " << exproto->connect_seq
             << " == " << connect_msg.connect_seq << ", sending WAIT" << dendl;
-        ceph_assert(connection->peer_addrs.legacy_addr() >
+        ceph_assert(connection->peer_addrs->legacy_addr() >
                     messenger->get_myaddr());
         existing->lock.unlock();
         return send_connect_message_reply(CEPH_MSGR_TAG_WAIT, reply,
@@ -2414,7 +2414,7 @@ CtPtr ProtocolV2::open(ceph_msg_connect_reply &reply,
   replacing = false;
   if (r < 0) {
     ldout(cct, 1) << __func__ << " existing race replacing process for addr = "
-                  << connection->peer_addrs.legacy_addr()
+                  << connection->peer_addrs->legacy_addr()
                   << " just fail later one(this)" << dendl;
     ldout(cct, 10) << "accept fault after register" << dendl;
     connection->inject_delay();
index 4e5c7c37c7157bad5444383fc1accad65ec7962d..e54604405cf89d6526394f9b9b5fcc5d284d9f90 100644 (file)
@@ -49,7 +49,7 @@ public:
   void mark_disposable() override;
 
   entity_addr_t get_peer_socket_addr() const override {
-    return peer_addrs.front();
+    return peer_addrs->front();
   }
 
 }; /* PipeConnection */