From: Yingxin Cheng Date: Fri, 6 Sep 2019 03:31:06 +0000 (+0800) Subject: crimson/net: add ESTABLISHING state X-Git-Tag: v15.1.0~1515^2~14 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=46606152352fa5ad65bab5a14443ff7b06560dce;p=ceph-ci.git crimson/net: add ESTABLISHING state With the new ESTABLISHING state, connection lookup and acceptance can be atomic, solving the issues related to racing connect. Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index 4c62f818276..537c0af0b99 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -1214,7 +1214,14 @@ ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn) " this connection", conn, *existing_conn); existing_proto->dispatch_reset(); existing_proto->close(); - return send_server_ident(); + + if (unlikely(state != state_t::ACCEPTING)) { + logger().debug("{} triggered {} in execute_accepting()", + conn, get_state_name(state)); + abort_protocol(); + } + execute_establishing(); + return seastar::make_ready_future(next_step_t::ready); } if (existing_proto->server_cookie != 0) { @@ -1348,10 +1355,13 @@ ProtocolV2::server_connect() } } - // TODO: atomically register & unaccept the connecton with lookup_conn() - - // if everything is OK reply with server identification - return send_server_ident(); + if (unlikely(state != state_t::ACCEPTING)) { + logger().debug("{} triggered {} in execute_accepting()", + conn, get_state_name(state)); + abort_protocol(); + } + execute_establishing(); + return seastar::make_ready_future(next_step_t::ready); }); } @@ -1540,8 +1550,7 @@ ProtocolV2::server_reconnect() void ProtocolV2::execute_accepting() { - // TODO: change to write_state_t::none - trigger_state(state_t::ACCEPTING, write_state_t::delay, false); + trigger_state(state_t::ACCEPTING, write_state_t::none, false); seastar::with_gate(pending_dispatch, [this] { return seastar::futurize_apply([this] { INTERCEPT_N_RW(custom_bp_t::SOCKET_ACCEPTED); @@ -1584,41 +1593,21 @@ void ProtocolV2::execute_accepting() } } }).then([this] (next_step_t next) { - if (unlikely(state != state_t::ACCEPTING)) { - logger().debug("{} triggered {} at the end of execute_accepting()", - conn, get_state_name(state)); - abort_protocol(); - } switch (next) { - case next_step_t::ready: { - seastar::with_gate(pending_dispatch, [this] { - return dispatcher.ms_handle_accept( - seastar::static_pointer_cast(conn.shared_from_this())); - }).handle_exception([this] (std::exception_ptr eptr) { - logger().error("{} ms_handle_accept caught exception: {}", conn, eptr); - ceph_abort("unexpected exception from ms_handle_accept()"); - }); - messenger.register_conn( - seastar::static_pointer_cast( - conn.shared_from_this())); - messenger.unaccept_conn( - seastar::static_pointer_cast( - conn.shared_from_this())); - logger().info("{} accepted: gs={}, pgs={}, cs={}," - " client_cookie={}, server_cookie={}, in_seq={}, out_seq={}", - conn, global_seq, peer_global_seq, connect_seq, - client_cookie, server_cookie, conn.in_seq, conn.out_seq); - execute_ready(); + case next_step_t::ready: + assert(state != state_t::ACCEPTING); break; - } - case next_step_t::wait: { + case next_step_t::wait: + if (unlikely(state != state_t::ACCEPTING)) { + logger().debug("{} triggered {} at the end of execute_accepting()", + conn, get_state_name(state)); + abort_protocol(); + } logger().info("{} execute_accepting(): going to SERVER_WAIT", conn); execute_server_wait(); break; - } - default: { + default: ceph_abort("impossible next step"); - } } }).handle_exception([this] (std::exception_ptr eptr) { logger().info("{} execute_accepting(): fault at {}, going to CLOSING -- {}", @@ -1663,9 +1652,53 @@ seastar::future<> ProtocolV2::finish_auth() }); } -// ACCEPTING or REPLACING state +// ESTABLISHING -seastar::future +void ProtocolV2::execute_establishing() { + trigger_state(state_t::ESTABLISHING, write_state_t::delay, false); + seastar::with_gate(pending_dispatch, [this] { + return dispatcher.ms_handle_accept( + seastar::static_pointer_cast(conn.shared_from_this())); + }).handle_exception([this] (std::exception_ptr eptr) { + logger().error("{} ms_handle_accept caught exception: {}", conn, eptr); + ceph_abort("unexpected exception from ms_handle_accept()"); + }); + messenger.register_conn( + seastar::static_pointer_cast( + conn.shared_from_this())); + messenger.unaccept_conn( + seastar::static_pointer_cast( + conn.shared_from_this())); + logger().info("{} accepted: gs={}, pgs={}, cs={}," + " client_cookie={}, server_cookie={}, in_seq={}, out_seq={}", + conn, global_seq, peer_global_seq, connect_seq, + client_cookie, server_cookie, conn.in_seq, conn.out_seq); + execution_done = seastar::with_gate(pending_dispatch, [this] { + return seastar::futurize_apply([this] { + return send_server_ident(); + }).then([this] { + if (unlikely(state != state_t::ESTABLISHING)) { + logger().debug("{} triggered {} at the end of execute_establishing()", + conn, get_state_name(state)); + abort_protocol(); + } + execute_ready(); + }).handle_exception([this] (std::exception_ptr eptr) { + if (state != state_t::ESTABLISHING) { + logger().info("{} execute_establishing() protocol aborted at {} -- {}", + conn, get_state_name(state), eptr); + assert(state == state_t::CLOSING || + state == state_t::REPLACING); + return; + } + fault(false, "execute_establishing()", eptr); + }); + }); +} + +// ESTABLISHING or REPLACING state + +seastar::future<> ProtocolV2::send_server_ident() { // send_server_ident() logic @@ -1707,8 +1740,6 @@ ProtocolV2::send_server_ident() conn.set_features(connection_features); return write_frame(server_ident); - }).then([] { - return next_step_t::ready; }); } @@ -1730,6 +1761,15 @@ void ProtocolV2::trigger_replacing(bool reconnect, if (socket) { socket->shutdown(); } + if (!reconnect && new_client_cookie != client_cookie) { + seastar::with_gate(pending_dispatch, [this] { + return dispatcher.ms_handle_accept( + seastar::static_pointer_cast(conn.shared_from_this())); + }).handle_exception([this] (std::exception_ptr eptr) { + logger().error("{} ms_handle_accept caught exception: {}", conn, eptr); + ceph_abort("unexpected exception from ms_handle_accept()"); + }); + } seastar::with_gate(pending_dispatch, [this, reconnect, @@ -1782,9 +1822,7 @@ void ProtocolV2::trigger_replacing(bool reconnect, client_cookie = new_client_cookie; conn.set_peer_name(new_peer_name); connection_features = new_conn_features; - return send_server_ident().then([] (next_step_t next) { - assert(next == next_step_t::ready); - }); + return send_server_ident(); } }).then([this] { if (unlikely(state != state_t::REPLACING)) { @@ -2126,7 +2164,7 @@ void ProtocolV2::trigger_close() messenger.unaccept_conn( seastar::static_pointer_cast( conn.shared_from_this())); - } else if (state >= state_t::CONNECTING && state < state_t::CLOSING) { + } else if (state >= state_t::ESTABLISHING && state < state_t::CLOSING) { messenger.unregister_conn( seastar::static_pointer_cast( conn.shared_from_this())); diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h index 677008acde3..7bba5d79a68 100644 --- a/src/crimson/net/ProtocolV2.h +++ b/src/crimson/net/ProtocolV2.h @@ -43,6 +43,7 @@ class ProtocolV2 final : public Protocol { NONE = 0, ACCEPTING, SERVER_WAIT, + ESTABLISHING, CONNECTING, READY, STANDBY, @@ -56,6 +57,7 @@ class ProtocolV2 final : public Protocol { const char *const statenames[] = {"NONE", "ACCEPTING", "SERVER_WAIT", + "ESTABLISHING", "CONNECTING", "READY", "STANDBY", @@ -171,8 +173,11 @@ class ProtocolV2 final : public Protocol { // CONNECTING/ACCEPTING seastar::future<> finish_auth(); - // ACCEPTING/REPLACING (server) - seastar::future send_server_ident(); + // ESTABLISHING + void execute_establishing(); + + // ESTABLISHING/REPLACING (server) + seastar::future<> send_server_ident(); // REPLACING (server) void trigger_replacing(bool reconnect,