From d07592eab731c35ae8142bbdd682248bdedd6d4e Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Wed, 29 May 2019 17:31:57 -0700 Subject: [PATCH] crimson/mon/MonClient: handle mon client connection race The first to establish a connection and close the others needs to ensure that the others don't mess with broken state. To that end, leave a cleared pending_conns map while the conns are being closed. Also, take care to clean up outstanding promises and fallout from conns disappearing. Signed-off-by: Samuel Just --- src/crimson/mon/MonClient.cc | 212 ++++++++++++++++++++++++----------- src/crimson/mon/MonClient.h | 7 +- 2 files changed, 150 insertions(+), 69 deletions(-) diff --git a/src/crimson/mon/MonClient.cc b/src/crimson/mon/MonClient.cc index 683bcea294e..6bc976f5190 100644 --- a/src/crimson/mon/MonClient.cc +++ b/src/crimson/mon/MonClient.cc @@ -53,13 +53,19 @@ public: Connection(const AuthRegistry& auth_registry, ceph::net::ConnectionRef conn, KeyRing* keyring); + enum class AuthResult { + success = 0, + failure, + canceled + }; seastar::future<> handle_auth_reply(Ref m); // v1 - seastar::future<> authenticate_v1(epoch_t epoch, - const EntityName& name, - uint32_t want_keys); + seastar::future authenticate_v1( + epoch_t epoch, + const EntityName& name, + uint32_t want_keys); // v2 - seastar::future<> authenticate_v2(); + seastar::future authenticate_v2(); auth::AuthClient::auth_request_t get_auth_request(const EntityName& name, uint32_t want_keys); @@ -95,7 +101,8 @@ private: rotating, general, }; - seastar::future do_auth(request_t); + seastar::future> do_auth_single(request_t); + seastar::future do_auth(request_t); private: bool closed = false; @@ -105,7 +112,7 @@ private: using clock_t = seastar::lowres_system_clock; clock_t::time_point auth_start; ceph::auth::method_t auth_method = 0; - seastar::promise<> auth_done; + seastar::promise auth_done; // v1 and v2 const AuthRegistry& auth_registry; ceph::net::ConnectionRef conn; @@ -129,16 +136,18 @@ Connection::Connection(const AuthRegistry& auth_registry, seastar::future<> Connection::handle_auth_reply(Ref m) { reply.set_value(m); + reply = {}; return seastar::now(); } seastar::future<> Connection::renew_tickets() { if (auth->need_tickets()) { - return do_auth(request_t::general).then([](bool success) { - if (!success) { - throw std::system_error(make_error_code( - ceph::net::error::negotiation_failure)); + return do_auth(request_t::general).then([](AuthResult r) { + if (r != AuthResult::success) { + throw std::system_error( + make_error_code( + ceph::net::error::negotiation_failure)); } }); } @@ -159,8 +168,8 @@ seastar::future<> Connection::renew_rotating_keyring() return seastar::now(); } last_rotating_renew_sent = now; - return do_auth(request_t::rotating).then([](bool success) { - if (!success) { + return do_auth(request_t::rotating).then([](AuthResult r) { + if (r != AuthResult::success) { throw std::system_error(make_error_code( ceph::net::error::negotiation_failure)); } @@ -219,7 +228,8 @@ Connection::setup_session(epoch_t epoch, return conn->send(m); } -seastar::future Connection::do_auth(Connection::request_t what) +seastar::future> +Connection::do_auth_single(Connection::request_t what) { auto m = make_message(); m->protocol = auth->get_protocol(); @@ -243,22 +253,43 @@ seastar::future Connection::do_auth(Connection::request_t what) logger().info("waiting"); return reply.get_future(); }).then([this] (Ref m) { - logger().info("mon {} => {} returns {}: {}", - conn->get_messenger()->get_myaddr(), - conn->get_peer_addr(), *m, m->result); - reply = {}; + if (!m) { + ceph_assert(closed); + logger().info("do_auth: connection closed"); + return seastar::make_ready_future>( + std::make_optional(AuthResult::canceled)); + } + logger().info( + "do_auth: mon {} => {} returns {}: {}", + conn->get_messenger()->get_myaddr(), + conn->get_peer_addr(), *m, m->result); auto p = m->result_bl.cbegin(); auto ret = auth->handle_response(m->result, p, nullptr, nullptr); if (ret != 0 && ret != -EAGAIN) { - throw std::system_error(make_error_code( - ceph::net::error::negotiation_failure)); + logger().error( + "do_auth: got error {} on mon {}", + ret, + conn->get_peer_addr()); } - return seastar::make_ready_future(ret == 0); + return seastar::make_ready_future>( + ret == -EAGAIN + ? std::nullopt + : std::make_optional(ret == 0 + ? AuthResult::success + : AuthResult::failure + )); }); } -seastar::future<> +seastar::future +Connection::do_auth(Connection::request_t what) { + return seastar::repeat_until_value([this, what]() { + return do_auth_single(what); + }); +} + +seastar::future Connection::authenticate_v1(epoch_t epoch, const EntityName& name, uint32_t want_keys) @@ -268,7 +299,10 @@ Connection::authenticate_v1(epoch_t epoch, }).then([this] { return reply.get_future(); }).then([name, want_keys, this](Ref m) { - reply = {}; + if (!m) { + logger().error("authenticate_v1 canceled on {}", name); + return seastar::make_ready_future(AuthResult::canceled); + } global_id = m->global_id; auth = create_auth(m->protocol, m->global_id, name, want_keys); switch (auto p = m->result_bl.cbegin(); @@ -276,24 +310,20 @@ Connection::authenticate_v1(epoch_t epoch, nullptr, nullptr)) { case 0: // none - return seastar::now(); + return seastar::make_ready_future(AuthResult::success); case -EAGAIN: // cephx - return seastar::repeat([this] { - return do_auth(request_t::general).then([](bool success) { - return seastar::make_ready_future( - success ? - seastar::stop_iteration::yes: - seastar::stop_iteration::no); - }); - }); + return do_auth(request_t::general); default: ceph_assert_always(0); } + }).handle_exception([](auto ep) { + logger().error("authenticate_v1 failed with {}", ep); + return seastar::make_ready_future(AuthResult::canceled); }); } -seastar::future<> Connection::authenticate_v2() +seastar::future Connection::authenticate_v2() { auth_start = seastar::lowres_system_clock::now(); return conn->send(make_message()).then([this] { @@ -369,7 +399,7 @@ Connection::handle_auth_done(uint64_t new_global_id, secret_t connection_secret; int r = auth->handle_response(0, p, &session_key, &connection_secret); conn->set_last_keepalive_ack(auth_start); - auth_done.set_value(); + auth_done.set_value(AuthResult::success); return {session_key, connection_secret, r}; } @@ -401,6 +431,10 @@ int Connection::handle_auth_bad_method(uint32_t old_auth_method, seastar::future<> Connection::close() { + reply.set_value(Ref(nullptr)); + reply = {}; + auth_done.set_value(AuthResult::canceled); + auth_done = {}; if (conn && !std::exchange(closed, true)) { return conn->close(); } else { @@ -410,6 +444,7 @@ seastar::future<> Connection::close() bool Connection::is_my_peer(const entity_addr_t& addr) const { + ceph_assert(conn); return conn->get_peer_addr() == addr; } @@ -514,11 +549,11 @@ seastar::future<> Client::ms_handle_reset(ceph::net::ConnectionRef conn) { auto found = std::find_if(pending_conns.begin(), pending_conns.end(), [peer_addr = conn->get_peer_addr()](auto& mc) { - return mc.is_my_peer(peer_addr); + return mc->is_my_peer(peer_addr); }); if (found != pending_conns.end()) { logger().warn("pending conn reset by {}", conn->get_peer_addr()); - return found->close(); + return (*found)->close(); } else if (active_con && active_con->is_my_peer(conn->get_peer_addr())) { logger().warn("active conn reset {}", conn->get_peer_addr()); active_con.reset(); @@ -621,12 +656,12 @@ Client::get_auth_request(ceph::net::ConnectionRef con, if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) { auto found = std::find_if(pending_conns.begin(), pending_conns.end(), [peer_addr = con->get_peer_addr()](auto& mc) { - return mc.is_my_peer(peer_addr); + return mc->is_my_peer(peer_addr); }); if (found == pending_conns.end()) { throw ceph::auth::error{"unknown connection"}; } - return found->get_auth_request(entity_name, want_keys); + return (*found)->get_auth_request(entity_name, want_keys); } else { // generate authorizer if (!active_con) { @@ -656,14 +691,14 @@ Client::get_auth_request(ceph::net::ConnectionRef con, if (conn->get_peer_type() == CEPH_ENTITY_TYPE_MON) { auto found = std::find_if(pending_conns.begin(), pending_conns.end(), [peer_addr = conn->get_peer_addr()](auto& mc) { - return mc.is_my_peer(peer_addr); + return mc->is_my_peer(peer_addr); }); if (found == pending_conns.end()) { throw ceph::auth::error{"unknown connection"}; } bufferlist reply; tie(auth_meta->session_key, auth_meta->connection_secret, reply) = - found->handle_auth_reply_more(bl); + (*found)->handle_auth_reply_more(bl); return reply; } else { // authorizer challenges @@ -685,14 +720,14 @@ int Client::handle_auth_done(ceph::net::ConnectionRef conn, if (conn->get_peer_type() == CEPH_ENTITY_TYPE_MON) { auto found = std::find_if(pending_conns.begin(), pending_conns.end(), [peer_addr = conn->get_peer_addr()](auto& mc) { - return mc.is_my_peer(peer_addr); + return mc->is_my_peer(peer_addr); }); if (found == pending_conns.end()) { return -ENOENT; } int r = 0; tie(auth_meta->session_key, auth_meta->connection_secret, r) = - found->handle_auth_done(global_id, bl); + (*found)->handle_auth_done(global_id, bl); return r; } else { // verify authorizer reply @@ -717,11 +752,12 @@ int Client::handle_auth_bad_method(ceph::net::ConnectionRef conn, if (conn->get_peer_type() == CEPH_ENTITY_TYPE_MON) { auto found = std::find_if(pending_conns.begin(), pending_conns.end(), [peer_addr = conn->get_peer_addr()](auto& mc) { - return mc.is_my_peer(peer_addr); + return mc->is_my_peer(peer_addr); }); if (found != pending_conns.end()) { - return found->handle_auth_bad_method(old_auth_method, result, - allowed_methods, allowed_modes); + return (*found)->handle_auth_bad_method( + old_auth_method, result, + allowed_methods, allowed_modes); } else { return -ENOENT; } @@ -745,8 +781,12 @@ seastar::future<> Client::handle_monmap(ceph::net::Connection* conn, if (monmap.get_addr_name(peer_addr, cur_mon)) { if (active_con) { - return seastar::when_all_succeed(active_con->renew_tickets(), - active_con->renew_rotating_keyring()); + logger().info("handle_monmap: renewing tickets"); + return seastar::when_all_succeed( + active_con->renew_tickets(), + active_con->renew_rotating_keyring()).then([](){ + logger().info("handle_mon_map: renewed tickets"); + }); } else { return seastar::now(); } @@ -759,15 +799,16 @@ seastar::future<> Client::handle_monmap(ceph::net::Connection* conn, seastar::future<> Client::handle_auth_reply(ceph::net::Connection* conn, Ref m) { - logger().info("mon {} => {} returns {}: {}", - conn->get_messenger()->get_myaddr(), - conn->get_peer_addr(), *m, m->result); + logger().info( + "handle_auth_reply mon {} => {} returns {}: {}", + conn->get_messenger()->get_myaddr(), + conn->get_peer_addr(), *m, m->result); auto found = std::find_if(pending_conns.begin(), pending_conns.end(), [peer_addr = conn->get_peer_addr()](auto& mc) { - return mc.is_my_peer(peer_addr); + return mc->is_my_peer(peer_addr); }); if (found != pending_conns.end()) { - return found->handle_auth_reply(m); + return (*found)->handle_auth_reply(m); } else if (active_con) { return active_con->handle_auth_reply(m); } else { @@ -884,6 +925,7 @@ seastar::future<> Client::stop() seastar::future<> Client::reopen_session(int rank) { + logger().info("{} to mon.{}", __func__, rank); vector mons; if (rank >= 0) { mons.push_back(rank); @@ -897,38 +939,74 @@ seastar::future<> Client::reopen_session(int rank) #warning fixme auto peer = monmap.get_addrs(rank).front(); logger().info("connecting to mon.{}", rank); - return msgr.connect(peer, CEPH_ENTITY_TYPE_MON).then([this] (auto xconn) { + return msgr.connect(peer, CEPH_ENTITY_TYPE_MON).then( + [this] (auto xconn) -> seastar::future { // sharded-messenger compatible mode assumes all connections running // in one shard. ceph_assert((*xconn)->shard_id() == seastar::engine().cpu_id()); ceph::net::ConnectionRef conn = xconn->release(); - auto& mc = pending_conns.emplace_back(auth_registry, conn, &keyring); + auto& mc = pending_conns.emplace_back( + std::make_unique(auth_registry, conn, &keyring)); if (conn->get_peer_addr().is_msgr2()) { - return mc.authenticate_v2(); + return mc->authenticate_v2(); } else { - return mc.authenticate_v1(monmap.get_epoch(), entity_name, want_keys) + return mc->authenticate_v1(monmap.get_epoch(), entity_name, want_keys) .handle_exception([conn](auto ep) { - return conn->close().then([ep = std::move(ep)] { - std::rethrow_exception(ep); + return conn->close().then([ep=std::move(ep)](){ + return seastar::make_exception_future(ep); }); }); } - }).then([peer, this] { + }).then([peer, this](auto result) { + if (result == Connection::AuthResult::canceled) { + return seastar::now(); + } + if (!is_hunting()) { return seastar::now(); } logger().info("found mon.{}", monmap.get_name(peer)); - return seastar::parallel_for_each(pending_conns, [peer, this] (auto& conn) { - if (conn.is_my_peer(peer)) { - active_con.reset(new Connection{std::move(conn)}); - return seastar::now(); - } else { - return conn.close(); - } - }); + + auto found = std::find_if( + pending_conns.begin(), pending_conns.end(), + [peer](auto& conn) { + return conn->is_my_peer(peer); + }); + if (found == pending_conns.end()) { + // Happens if another connection has won the race + ceph_assert(active_con && pending_conns.empty()); + logger().info( + "no pending connection for mon.{}, peer {}", + monmap.get_name(peer), + peer); + return seastar::now(); + } + + ceph_assert(!active_con && !pending_conns.empty()); + active_con = std::move(*found); + found->reset(); + auto ret = seastar::do_with( + std::move(pending_conns), + [this](auto &pending_conns) { + return seastar::parallel_for_each( + pending_conns, + [this] (auto &conn) { + if (!conn) { + return seastar::now(); + } else { + return conn->close(); + } + }); + }); + pending_conns.clear(); + return ret; + }).then([]() { + logger().debug("reopen_session mon connection attempts complete"); + }).handle_exception([](auto ep) { + logger().error("mon connections failed with ep {}", ep); + return seastar::make_exception_future(ep); }); }).then([this] { - pending_conns.clear(); ceph_assert_always(active_con); return active_con->renew_rotating_keyring(); }); diff --git a/src/crimson/mon/MonClient.h b/src/crimson/mon/MonClient.h index 96fa3182c7a..75a00433824 100644 --- a/src/crimson/mon/MonClient.h +++ b/src/crimson/mon/MonClient.h @@ -50,9 +50,8 @@ class Client : public ceph::net::Dispatcher, const uint32_t want_keys; MonMap monmap; - seastar::promise reply; std::unique_ptr active_con; - std::vector pending_conns; + std::vector> pending_conns; seastar::timer timer; seastar::gate tick_gate; @@ -91,6 +90,10 @@ public: bool sub_want_increment(const std::string& what, version_t start, unsigned flags); seastar::future<> renew_subs(); + MonMap &get_monmap_ref() { + return monmap; + } + private: // AuthServer methods std::pair, std::vector> -- 2.39.5