From 6658ba17ef1bcc89b8ff5e74bfc7c7c0b8ef8fac Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Thu, 1 Jun 2023 17:05:42 +0800 Subject: [PATCH] crimson/net: misc cleanups to protocol v2 implementations Signed-off-by: Yingxin Cheng --- src/crimson/net/ProtocolV2.cc | 122 +++++++++++++++++++--------------- src/crimson/net/ProtocolV2.h | 15 ++--- 2 files changed, 74 insertions(+), 63 deletions(-) diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index 99a48536de5..b3a25955483 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -187,9 +187,10 @@ void ProtocolV2::start_accept(SocketFRef&& new_socket, execute_accepting(); } -void ProtocolV2::trigger_state(state_t new_state, io_state_t new_io_state, bool reentrant) +void ProtocolV2::trigger_state(state_t new_state, io_state_t new_io_state) { - if (!reentrant && new_state == state) { + ceph_assert_always(!gate.is_closed()); + if (new_state == state) { logger().error("{} is not allowed to re-trigger state {}", conn, get_state_name(state)); ceph_abort(); @@ -201,11 +202,11 @@ void ProtocolV2::trigger_state(state_t new_state, io_state_t new_io_state, bool } logger().debug("{} TRIGGER {}, was {}", conn, get_state_name(new_state), get_state_name(state)); - auto pre_state = state; - if (pre_state == state_t::READY) { - assert(!gate.is_closed()); - ceph_assert_always(!exit_io.has_value()); - exit_io = seastar::shared_promise<>(); + + if (state == state_t::READY) { + // from READY + ceph_assert_always(!pr_exit_io.has_value()); + pr_exit_io = seastar::shared_promise<>(); } bool need_notify_out; @@ -215,28 +216,28 @@ void ProtocolV2::trigger_state(state_t new_state, io_state_t new_io_state, bool need_notify_out = false; } + auto pre_state = state; state = new_state; + + FrameAssemblerV2Ref fa; if (new_state == state_t::READY) { - // I'm not responsible to shutdown the socket at READY - is_socket_valid = false; - io_handler.set_io_state(new_io_state, std::move(frame_assembler), need_notify_out); + assert(new_io_state == io_state_t::open); + fa = std::move(frame_assembler); } else { - io_handler.set_io_state(new_io_state, nullptr, need_notify_out); + assert(new_io_state != io_state_t::open); } - - /* - * not atomic below - */ + io_handler.set_io_state(new_io_state, std::move(fa), need_notify_out); if (pre_state == state_t::READY) { + assert(new_io_state != io_state_t::open); gate.dispatch_in_background("exit_io", conn, [this] { return io_handler.wait_io_exit_dispatching( ).then([this](auto ret) { frame_assembler = std::move(ret.frame_assembler); ceph_assert_always(!frame_assembler->is_socket_valid()); io_states = ret.io_states; - exit_io->set_value(); - exit_io = std::nullopt; + pr_exit_io->set_value(); + pr_exit_io = std::nullopt; }); }); } @@ -780,7 +781,7 @@ ProtocolV2::client_reconnect() void ProtocolV2::execute_connecting() { ceph_assert_always(!is_socket_valid); - trigger_state(state_t::CONNECTING, io_state_t::delay, false); + trigger_state(state_t::CONNECTING, io_state_t::delay); gated_execute("execute_connecting", conn, [this] { global_seq = messenger.get_global_seq(); assert(client_cookie != 0); @@ -1497,7 +1498,7 @@ ProtocolV2::server_reconnect() void ProtocolV2::execute_accepting() { assert(is_socket_valid); - trigger_state(state_t::ACCEPTING, io_state_t::none, false); + trigger_state(state_t::ACCEPTING, io_state_t::none); gate.dispatch_in_background("execute_accepting", conn, [this] { return seastar::futurize_invoke([this] { #ifdef UNIT_TESTS_BUILT @@ -1631,10 +1632,13 @@ void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) { }; ceph_assert_always(is_socket_valid); - trigger_state(state_t::ESTABLISHING, io_state_t::delay, false); + trigger_state(state_t::ESTABLISHING, io_state_t::delay); if (existing_conn) { - static_cast(existing_conn->protocol.get())->do_close( - true /* is_dispatch_reset */, std::move(accept_me)); + ProtocolV2 *existing_proto = dynamic_cast( + existing_conn->protocol.get()); + existing_proto->do_close( + true, // is_dispatch_reset + std::move(accept_me)); if (unlikely(state != state_t::ESTABLISHING)) { logger().warn("{} triggered {} during execute_establishing(), " "the accept event will not be delivered!", @@ -1653,6 +1657,7 @@ void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) { } gated_execute("execute_establishing", conn, [this] { + ceph_assert_always(state == state_t::ESTABLISHING); return seastar::futurize_invoke([this] { return send_server_ident(); }).then([this] { @@ -1677,6 +1682,8 @@ void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) { seastar::future<> ProtocolV2::send_server_ident() { + ceph_assert_always(state == state_t::ESTABLISHING || + state == state_t::REPLACING); // send_server_ident() logic // refered to async-conn v2: not assign gs to global_seq @@ -1730,9 +1737,11 @@ void ProtocolV2::trigger_replacing(bool reconnect, uint64_t new_connect_seq, uint64_t new_msg_seq) { + ceph_assert_always(state >= state_t::ESTABLISHING); + ceph_assert_always(state <= state_t::WAIT); ceph_assert_always(has_socket || state == state_t::CONNECTING); ceph_assert_always(!mover.socket->is_shutdown()); - trigger_state(state_t::REPLACING, io_state_t::delay, false); + trigger_state(state_t::REPLACING, io_state_t::delay); if (is_socket_valid) { frame_assembler->shutdown_socket(&gate); is_socket_valid = false; @@ -1773,6 +1782,7 @@ void ProtocolV2::trigger_replacing(bool reconnect, } if (unlikely(state != state_t::REPLACING)) { + ceph_assert_always(state == state_t::CLOSING); return mover.socket->close( ).then([sock = std::move(mover.socket)] { abort_protocol(); @@ -1813,11 +1823,13 @@ void ProtocolV2::trigger_replacing(bool reconnect, } }).then([this, reconnect] { if (unlikely(state != state_t::REPLACING)) { - logger().debug("{} triggered {} at the end of trigger_replacing()", + logger().debug("{} triggered {} at the end of trigger_replacing(), abort", conn, get_state_name(state)); + ceph_assert_always(state == state_t::CLOSING); abort_protocol(); } - logger().info("{} replaced ({}): gs={}, pgs={}, cs={}, " + logger().info("{} replaced ({}), going to ready: " + "gs={}, pgs={}, cs={}, " "client_cookie={}, server_cookie={}, {}", conn, reconnect ? "reconnected" : "connected", global_seq, peer_global_seq, connect_seq, @@ -1845,7 +1857,9 @@ void ProtocolV2::execute_ready() assert(conn.policy.lossy || (client_cookie != 0 && server_cookie != 0)); protocol_timer.cancel(); ceph_assert_always(is_socket_valid); - trigger_state(state_t::READY, io_state_t::open, false); + // I'm not responsible to shutdown the socket at READY + is_socket_valid = false; + trigger_state(state_t::READY, io_state_t::open); } // STANDBY state @@ -1853,7 +1867,7 @@ void ProtocolV2::execute_ready() void ProtocolV2::execute_standby() { ceph_assert_always(!is_socket_valid); - trigger_state(state_t::STANDBY, io_state_t::delay, false); + trigger_state(state_t::STANDBY, io_state_t::delay); } void ProtocolV2::notify_out() @@ -1871,7 +1885,7 @@ void ProtocolV2::notify_out() void ProtocolV2::execute_wait(bool max_backoff) { ceph_assert_always(!is_socket_valid); - trigger_state(state_t::WAIT, io_state_t::delay, false); + trigger_state(state_t::WAIT, io_state_t::delay); gated_execute("execute_wait", conn, [this, max_backoff] { double backoff = protocol_timer.last_dur(); if (max_backoff) { @@ -1909,7 +1923,7 @@ void ProtocolV2::execute_wait(bool max_backoff) void ProtocolV2::execute_server_wait() { ceph_assert_always(is_socket_valid); - trigger_state(state_t::SERVER_WAIT, io_state_t::none, false); + trigger_state(state_t::SERVER_WAIT, io_state_t::none); gated_execute("execute_server_wait", conn, [this] { return frame_assembler->read_exactly(1 ).then([this](auto bptr) { @@ -1944,22 +1958,21 @@ seastar::future<> ProtocolV2::close_clean_yielded() // the container when seastar::parallel_for_each() is still iterating in it. // that'd lead to a segfault. return seastar::yield( - ).then([this, conn_ref = conn.shared_from_this()] { + ).then([this] { do_close(false); - // it can happen if close_clean() is called inside Dispatcher::ms_handle_reset() - // which will otherwise result in deadlock - assert(closed_clean_fut.valid()); - return closed_clean_fut.get_future(); - }); + return pr_closed_clean.get_shared_future(); + + // connection may be unreferenced from the messenger, + // so need to hold the additional reference. + }).finally([conn_ref = conn.shared_from_this()] {});; } void ProtocolV2::do_close( bool is_dispatch_reset, std::optional> f_accept_new) { - if (closed) { + if (state == state_t::CLOSING) { // already closing - assert(state == state_t::CLOSING); return; } @@ -1972,9 +1985,9 @@ void ProtocolV2::do_close( * atomic operations */ - closed = true; + ceph_assert_always(!gate.is_closed()); - // trigger close + // messenger registrations, must before user events messenger.closing_conn( seastar::static_pointer_cast( conn.shared_from_this())); @@ -1990,27 +2003,27 @@ void ProtocolV2::do_close( // cannot happen ceph_assert(false); } - protocol_timer.cancel(); - trigger_state(state_t::CLOSING, io_state_t::drop, false); - if (f_accept_new) { + // the replacing connection must be registerred after the replaced + // connection is unreigsterred. (*f_accept_new)(); } + + protocol_timer.cancel(); if (is_socket_valid) { frame_assembler->shutdown_socket(&gate); is_socket_valid = false; } - assert(!gate.is_closed()); - auto handshake_closed = gate.close(); - auto io_closed = io_handler.close_io( - is_dispatch_reset, is_replace); - - // asynchronous operations - assert(!closed_clean_fut.valid()); - closed_clean_fut = seastar::when_all( - std::move(handshake_closed), std::move(io_closed) - ).discard_result().then([this] { - ceph_assert_always(!exit_io.has_value()); + + trigger_state(state_t::CLOSING, io_state_t::drop); + gate.dispatch_in_background( + "close_io", conn, [this, is_dispatch_reset, is_replace] { + return io_handler.close_io(is_dispatch_reset, is_replace); + }); + + std::ignore = gate.close( + ).then([this] { + ceph_assert_always(!pr_exit_io.has_value()); if (has_socket) { ceph_assert_always(frame_assembler); return frame_assembler->close_shutdown_socket(); @@ -2022,6 +2035,7 @@ void ProtocolV2::do_close( messenger.closed_conn( seastar::static_pointer_cast( conn.shared_from_this())); + pr_closed_clean.set_value(); #ifdef UNIT_TESTS_BUILT closed_clean = true; if (conn.interceptor) { @@ -2030,7 +2044,7 @@ void ProtocolV2::do_close( } #endif }).handle_exception([conn_ref = conn.shared_from_this(), this] (auto eptr) { - logger().error("{} closing: closed_clean_fut got unexpected exception {}", + logger().error("{} closing got unexpected exception {}", conn, eptr); ceph_abort(); }); diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h index 2aa9496ef83..b1767f7da01 100644 --- a/src/crimson/net/ProtocolV2.h +++ b/src/crimson/net/ProtocolV2.h @@ -52,7 +52,7 @@ public: } bool is_closed() const { - return closed; + return state == state_t::CLOSING; } #endif @@ -60,8 +60,8 @@ private: using io_state_t = IOHandler::io_state_t; seastar::future<> wait_exit_io() { - if (exit_io.has_value()) { - return exit_io->get_shared_future(); + if (pr_exit_io.has_value()) { + return pr_exit_io->get_shared_future(); } else { return seastar::now(); } @@ -94,7 +94,7 @@ private: return statenames[static_cast(state)]; } - void trigger_state(state_t state, io_state_t io_state, bool reentrant); + void trigger_state(state_t new_state, io_state_t new_io_state); template void gated_execute(const char *what, T &who, Func &&func) { @@ -227,16 +227,13 @@ private: FrameAssemblerV2Ref frame_assembler; - std::optional> exit_io; + std::optional> pr_exit_io; AuthConnectionMetaRef auth_meta; crimson::common::Gated gate; - bool closed = false; - - // become valid only after closed == true - seastar::shared_future<> closed_clean_fut; + seastar::shared_promise<> pr_closed_clean; #ifdef UNIT_TESTS_BUILT bool closed_clean = false; -- 2.39.5