]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/mon/MonClient: handle mon client connection race
authorSamuel Just <sjust@redhat.com>
Thu, 30 May 2019 00:31:57 +0000 (17:31 -0700)
committerSamuel Just <sjust@redhat.com>
Fri, 31 May 2019 21:05:44 +0000 (14:05 -0700)
The first to establish a connection and close the others needs
to ensure that the others don't mess with broken state.  To that
end, leave a cleared pending_conns map while the conns are being
closed.  Also, take care to clean up outstanding promises and
fallout from conns disappearing.

Signed-off-by: Samuel Just <sjust@redhat.com>
src/crimson/mon/MonClient.cc
src/crimson/mon/MonClient.h

index 683bcea294e8b9be1b47099b5ef25325852b2c1d..6bc976f51909e0fca4bd08e69fe12d76e1c91896 100644 (file)
@@ -53,13 +53,19 @@ public:
   Connection(const AuthRegistry& auth_registry,
              ceph::net::ConnectionRef conn,
              KeyRing* keyring);
+  enum class AuthResult {
+    success = 0,
+    failure,
+    canceled
+  };
   seastar::future<> handle_auth_reply(Ref<MAuthReply> m);
   // v1
-  seastar::future<> authenticate_v1(epoch_t epoch,
-                                    const EntityName& name,
-                                    uint32_t want_keys);
+  seastar::future<AuthResult> authenticate_v1(
+    epoch_t epoch,
+    const EntityName& name,
+    uint32_t want_keys);
   // v2
-  seastar::future<> authenticate_v2();
+  seastar::future<AuthResult> authenticate_v2();
   auth::AuthClient::auth_request_t
   get_auth_request(const EntityName& name,
                    uint32_t want_keys);
@@ -95,7 +101,8 @@ private:
     rotating,
     general,
   };
-  seastar::future<bool> do_auth(request_t);
+  seastar::future<std::optional<AuthResult>> do_auth_single(request_t);
+  seastar::future<AuthResult> do_auth(request_t);
 
 private:
   bool closed = false;
@@ -105,7 +112,7 @@ private:
   using clock_t = seastar::lowres_system_clock;
   clock_t::time_point auth_start;
   ceph::auth::method_t auth_method = 0;
-  seastar::promise<> auth_done;
+  seastar::promise<AuthResult> auth_done;
   // v1 and v2
   const AuthRegistry& auth_registry;
   ceph::net::ConnectionRef conn;
@@ -129,16 +136,18 @@ Connection::Connection(const AuthRegistry& auth_registry,
 seastar::future<> Connection::handle_auth_reply(Ref<MAuthReply> m)
 {
   reply.set_value(m);
+  reply = {};
   return seastar::now();
 }
 
 seastar::future<> Connection::renew_tickets()
 {
   if (auth->need_tickets()) {
-    return do_auth(request_t::general).then([](bool success) {
-      if (!success)  {
-        throw std::system_error(make_error_code(
-          ceph::net::error::negotiation_failure));
+    return do_auth(request_t::general).then([](AuthResult r) {
+      if (r != AuthResult::success)  {
+        throw std::system_error(
+         make_error_code(
+           ceph::net::error::negotiation_failure));
       }
     });
   }
@@ -159,8 +168,8 @@ seastar::future<> Connection::renew_rotating_keyring()
     return seastar::now();
   }
   last_rotating_renew_sent = now;
-  return do_auth(request_t::rotating).then([](bool success) {
-    if (!success)  {
+  return do_auth(request_t::rotating).then([](AuthResult r) {
+    if (r != AuthResult::success)  {
       throw std::system_error(make_error_code(
         ceph::net::error::negotiation_failure));
     }
@@ -219,7 +228,8 @@ Connection::setup_session(epoch_t epoch,
   return conn->send(m);
 }
 
-seastar::future<bool> Connection::do_auth(Connection::request_t what)
+seastar::future<std::optional<Connection::AuthResult>>
+Connection::do_auth_single(Connection::request_t what)
 {
   auto m = make_message<MAuth>();
   m->protocol = auth->get_protocol();
@@ -243,22 +253,43 @@ seastar::future<bool> Connection::do_auth(Connection::request_t what)
     logger().info("waiting");
     return reply.get_future();
   }).then([this] (Ref<MAuthReply> m) {
-    logger().info("mon {} => {} returns {}: {}",
-                   conn->get_messenger()->get_myaddr(),
-                   conn->get_peer_addr(), *m, m->result);
-    reply = {};
+    if (!m) {
+      ceph_assert(closed);
+      logger().info("do_auth: connection closed");
+      return seastar::make_ready_future<std::optional<Connection::AuthResult>>(
+       std::make_optional(AuthResult::canceled));
+    }
+    logger().info(
+      "do_auth: mon {} => {} returns {}: {}",
+      conn->get_messenger()->get_myaddr(),
+      conn->get_peer_addr(), *m, m->result);
     auto p = m->result_bl.cbegin();
     auto ret = auth->handle_response(m->result, p,
                                     nullptr, nullptr);
     if (ret != 0 && ret != -EAGAIN) {
-      throw std::system_error(make_error_code(
-        ceph::net::error::negotiation_failure));
+      logger().error(
+       "do_auth: got error {} on mon {}",
+       ret,
+       conn->get_peer_addr());
     }
-    return seastar::make_ready_future<bool>(ret == 0);
+    return seastar::make_ready_future<std::optional<Connection::AuthResult>>(
+      ret == -EAGAIN
+      ? std::nullopt
+      : std::make_optional(ret == 0
+        ? AuthResult::success
+        : AuthResult::failure
+      ));
   });
 }
 
-seastar::future<>
+seastar::future<Connection::AuthResult>
+Connection::do_auth(Connection::request_t what) {
+  return seastar::repeat_until_value([this, what]() {
+    return do_auth_single(what);
+  });
+}
+
+seastar::future<Connection::AuthResult>
 Connection::authenticate_v1(epoch_t epoch,
                             const EntityName& name,
                             uint32_t want_keys)
@@ -268,7 +299,10 @@ Connection::authenticate_v1(epoch_t epoch,
   }).then([this] {
     return reply.get_future();
   }).then([name, want_keys, this](Ref<MAuthReply> m) {
-    reply = {};
+    if (!m) {
+      logger().error("authenticate_v1 canceled on {}", name);
+      return seastar::make_ready_future<AuthResult>(AuthResult::canceled);
+    }
     global_id = m->global_id;
     auth = create_auth(m->protocol, m->global_id, name, want_keys);
     switch (auto p = m->result_bl.cbegin();
@@ -276,24 +310,20 @@ Connection::authenticate_v1(epoch_t epoch,
                                  nullptr, nullptr)) {
     case 0:
       // none
-      return seastar::now();
+      return seastar::make_ready_future<AuthResult>(AuthResult::success);
     case -EAGAIN:
       // cephx
-      return seastar::repeat([this] {
-        return do_auth(request_t::general).then([](bool success) {
-          return seastar::make_ready_future<seastar::stop_iteration>(
-            success ?
-            seastar::stop_iteration::yes:
-            seastar::stop_iteration::no);
-          });
-        });
+      return do_auth(request_t::general);
     default:
       ceph_assert_always(0);
     }
+  }).handle_exception([](auto ep) {
+    logger().error("authenticate_v1 failed with {}", ep);
+    return seastar::make_ready_future<AuthResult>(AuthResult::canceled);
   });
 }
 
-seastar::future<> Connection::authenticate_v2()
+seastar::future<Connection::AuthResult> Connection::authenticate_v2()
 {
   auth_start = seastar::lowres_system_clock::now();
   return conn->send(make_message<MMonGetMap>()).then([this] {
@@ -369,7 +399,7 @@ Connection::handle_auth_done(uint64_t new_global_id,
   secret_t connection_secret;
   int r = auth->handle_response(0, p, &session_key, &connection_secret);
   conn->set_last_keepalive_ack(auth_start);
-  auth_done.set_value();
+  auth_done.set_value(AuthResult::success);
   return {session_key, connection_secret, r};
 }
 
@@ -401,6 +431,10 @@ int Connection::handle_auth_bad_method(uint32_t old_auth_method,
 
 seastar::future<> Connection::close()
 {
+  reply.set_value(Ref<MAuthReply>(nullptr));
+  reply = {};
+  auth_done.set_value(AuthResult::canceled);
+  auth_done = {};
   if (conn && !std::exchange(closed, true)) {
     return conn->close();
   } else {
@@ -410,6 +444,7 @@ seastar::future<> Connection::close()
 
 bool Connection::is_my_peer(const entity_addr_t& addr) const
 {
+  ceph_assert(conn);
   return conn->get_peer_addr() == addr;
 }
 
@@ -514,11 +549,11 @@ seastar::future<> Client::ms_handle_reset(ceph::net::ConnectionRef conn)
 {
   auto found = std::find_if(pending_conns.begin(), pending_conns.end(),
                             [peer_addr = conn->get_peer_addr()](auto& mc) {
-                              return mc.is_my_peer(peer_addr);
+                              return mc->is_my_peer(peer_addr);
                             });
   if (found != pending_conns.end()) {
     logger().warn("pending conn reset by {}", conn->get_peer_addr());
-    return found->close();
+    return (*found)->close();
   } else if (active_con && active_con->is_my_peer(conn->get_peer_addr())) {
     logger().warn("active conn reset {}", conn->get_peer_addr());
     active_con.reset();
@@ -621,12 +656,12 @@ Client::get_auth_request(ceph::net::ConnectionRef con,
   if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
     auto found = std::find_if(pending_conns.begin(), pending_conns.end(),
                               [peer_addr = con->get_peer_addr()](auto& mc) {
-                                return mc.is_my_peer(peer_addr);
+                                return mc->is_my_peer(peer_addr);
                               });
     if (found == pending_conns.end()) {
       throw ceph::auth::error{"unknown connection"};
     }
-    return found->get_auth_request(entity_name, want_keys);
+    return (*found)->get_auth_request(entity_name, want_keys);
   } else {
     // generate authorizer
     if (!active_con) {
@@ -656,14 +691,14 @@ Client::get_auth_request(ceph::net::ConnectionRef con,
   if (conn->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
     auto found = std::find_if(pending_conns.begin(), pending_conns.end(),
                               [peer_addr = conn->get_peer_addr()](auto& mc) {
-                                return mc.is_my_peer(peer_addr);
+                                return mc->is_my_peer(peer_addr);
                               });
     if (found == pending_conns.end()) {
       throw ceph::auth::error{"unknown connection"};
     }
     bufferlist reply;
     tie(auth_meta->session_key, auth_meta->connection_secret, reply) =
-      found->handle_auth_reply_more(bl);
+      (*found)->handle_auth_reply_more(bl);
     return reply;
   } else {
     // authorizer challenges
@@ -685,14 +720,14 @@ int Client::handle_auth_done(ceph::net::ConnectionRef conn,
   if (conn->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
     auto found = std::find_if(pending_conns.begin(), pending_conns.end(),
                               [peer_addr = conn->get_peer_addr()](auto& mc) {
-                                return mc.is_my_peer(peer_addr);
+                                return mc->is_my_peer(peer_addr);
                               });
     if (found == pending_conns.end()) {
       return -ENOENT;
     }
     int r = 0;
     tie(auth_meta->session_key, auth_meta->connection_secret, r) =
-      found->handle_auth_done(global_id, bl);
+      (*found)->handle_auth_done(global_id, bl);
     return r;
   } else {
     // verify authorizer reply
@@ -717,11 +752,12 @@ int Client::handle_auth_bad_method(ceph::net::ConnectionRef conn,
   if (conn->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
     auto found = std::find_if(pending_conns.begin(), pending_conns.end(),
                               [peer_addr = conn->get_peer_addr()](auto& mc) {
-                                return mc.is_my_peer(peer_addr);
+                                return mc->is_my_peer(peer_addr);
                               });
     if (found != pending_conns.end()) {
-      return found->handle_auth_bad_method(old_auth_method, result,
-                                           allowed_methods, allowed_modes);
+      return (*found)->handle_auth_bad_method(
+       old_auth_method, result,
+       allowed_methods, allowed_modes);
     } else {
       return -ENOENT;
     }
@@ -745,8 +781,12 @@ seastar::future<> Client::handle_monmap(ceph::net::Connection* conn,
 
   if (monmap.get_addr_name(peer_addr, cur_mon)) {
     if (active_con) {
-      return seastar::when_all_succeed(active_con->renew_tickets(),
-                                       active_con->renew_rotating_keyring());
+      logger().info("handle_monmap: renewing tickets");
+      return seastar::when_all_succeed(
+       active_con->renew_tickets(),
+       active_con->renew_rotating_keyring()).then([](){
+         logger().info("handle_mon_map: renewed tickets");
+       });
     } else {
       return seastar::now();
     }
@@ -759,15 +799,16 @@ seastar::future<> Client::handle_monmap(ceph::net::Connection* conn,
 seastar::future<> Client::handle_auth_reply(ceph::net::Connection* conn,
                                                Ref<MAuthReply> m)
 {
-  logger().info("mon {} => {} returns {}: {}",
-                conn->get_messenger()->get_myaddr(),
-                conn->get_peer_addr(), *m, m->result);
+  logger().info(
+    "handle_auth_reply mon {} => {} returns {}: {}",
+    conn->get_messenger()->get_myaddr(),
+    conn->get_peer_addr(), *m, m->result);
   auto found = std::find_if(pending_conns.begin(), pending_conns.end(),
                             [peer_addr = conn->get_peer_addr()](auto& mc) {
-                              return mc.is_my_peer(peer_addr);
+                              return mc->is_my_peer(peer_addr);
                             });
   if (found != pending_conns.end()) {
-    return found->handle_auth_reply(m);
+    return (*found)->handle_auth_reply(m);
   } else if (active_con) {
     return active_con->handle_auth_reply(m);
   } else {
@@ -884,6 +925,7 @@ seastar::future<> Client::stop()
 
 seastar::future<> Client::reopen_session(int rank)
 {
+  logger().info("{} to mon.{}", __func__, rank);
   vector<unsigned> mons;
   if (rank >= 0) {
     mons.push_back(rank);
@@ -897,38 +939,74 @@ seastar::future<> Client::reopen_session(int rank)
 #warning fixme
     auto peer = monmap.get_addrs(rank).front();
     logger().info("connecting to mon.{}", rank);
-    return msgr.connect(peer, CEPH_ENTITY_TYPE_MON).then([this] (auto xconn) {
+    return msgr.connect(peer, CEPH_ENTITY_TYPE_MON).then(
+      [this] (auto xconn) -> seastar::future<Connection::AuthResult> {
       // sharded-messenger compatible mode assumes all connections running
       // in one shard.
       ceph_assert((*xconn)->shard_id() == seastar::engine().cpu_id());
       ceph::net::ConnectionRef conn = xconn->release();
-      auto& mc = pending_conns.emplace_back(auth_registry, conn, &keyring);
+      auto& mc = pending_conns.emplace_back(
+       std::make_unique<Connection>(auth_registry, conn, &keyring));
       if (conn->get_peer_addr().is_msgr2()) {
-        return mc.authenticate_v2();
+        return mc->authenticate_v2();
       } else {
-        return mc.authenticate_v1(monmap.get_epoch(), entity_name, want_keys)
+        return mc->authenticate_v1(monmap.get_epoch(), entity_name, want_keys)
           .handle_exception([conn](auto ep) {
-            return conn->close().then([ep = std::move(ep)] {
-              std::rethrow_exception(ep);
+            return conn->close().then([ep=std::move(ep)](){
+             return seastar::make_exception_future<Connection::AuthResult>(ep);
             });
           });
       }
-    }).then([peer, this] {
+    }).then([peer, this](auto result) {
+      if (result == Connection::AuthResult::canceled) {
+       return seastar::now();
+      }
+
       if (!is_hunting()) {
         return seastar::now();
       }
       logger().info("found mon.{}", monmap.get_name(peer));
-      return seastar::parallel_for_each(pending_conns, [peer, this] (auto& conn) {
-        if (conn.is_my_peer(peer)) {
-          active_con.reset(new Connection{std::move(conn)});
-          return seastar::now();
-        } else {
-          return conn.close();
-        }
-      });
+
+      auto found = std::find_if(
+       pending_conns.begin(), pending_conns.end(),
+       [peer](auto& conn) {
+         return conn->is_my_peer(peer);
+       });
+      if (found == pending_conns.end()) {
+       // Happens if another connection has won the race
+       ceph_assert(active_con && pending_conns.empty());
+       logger().info(
+         "no pending connection for mon.{}, peer {}",
+         monmap.get_name(peer),
+         peer);
+       return seastar::now();
+      }
+
+      ceph_assert(!active_con && !pending_conns.empty());
+      active_con = std::move(*found);
+      found->reset();
+      auto ret = seastar::do_with(
+       std::move(pending_conns),
+       [this](auto &pending_conns) {
+         return seastar::parallel_for_each(
+           pending_conns,
+           [this] (auto &conn) {
+             if (!conn) {
+               return seastar::now();
+             } else {
+               return conn->close();
+             }
+           });
+       });
+      pending_conns.clear();
+      return ret;
+    }).then([]() {
+      logger().debug("reopen_session mon connection attempts complete");
+    }).handle_exception([](auto ep) {
+      logger().error("mon connections failed with ep {}", ep);
+      return seastar::make_exception_future(ep);
     });
   }).then([this] {
-    pending_conns.clear();
     ceph_assert_always(active_con);
     return active_con->renew_rotating_keyring();
   });
index 96fa3182c7ac52db06bb83f1e3596d8d0a4719b8..75a00433824297ccd5999a4e1b7ead62aa03e636 100644 (file)
@@ -50,9 +50,8 @@ class Client : public ceph::net::Dispatcher,
   const uint32_t want_keys;
 
   MonMap monmap;
-  seastar::promise<MessageRef> reply;
   std::unique_ptr<Connection> active_con;
-  std::vector<Connection> pending_conns;
+  std::vector<std::unique_ptr<Connection>> pending_conns;
   seastar::timer<seastar::lowres_clock> timer;
   seastar::gate tick_gate;
 
@@ -91,6 +90,10 @@ public:
   bool sub_want_increment(const std::string& what, version_t start, unsigned flags);
   seastar::future<> renew_subs();
 
+  MonMap &get_monmap_ref() {
+    return monmap;
+  }
+
 private:
   // AuthServer methods
   std::pair<std::vector<uint32_t>, std::vector<uint32_t>>