From: Yingxin Cheng Date: Fri, 25 Nov 2022 01:51:50 +0000 (+0800) Subject: crimson/net: add aborts when the state is inconsistent X-Git-Tag: v18.1.0~375^2~26 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=c8d5a14c2248081319808550643ea6ef03a954f6;p=ceph-ci.git crimson/net: add aborts when the state is inconsistent To prevent unexpected event dispatching and state transitions. Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index b4097c3f844..7028e158c25 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -259,7 +259,12 @@ void ProtocolV2::trigger_state(state_t _state, out_state_t _out_state, bool reen if (!reentrant && _state == state) { logger().error("{} is not allowed to re-trigger state {}", conn, get_state_name(state)); - ceph_assert(false); + ceph_abort(); + } + if (state == state_t::CLOSING) { + logger().error("{} CLOSING is not allowed to trigger state {}", + conn, get_state_name(_state)); + ceph_abort(); } logger().debug("{} TRIGGER {}, was {}", conn, get_state_name(_state), get_state_name(state)); @@ -687,6 +692,11 @@ ProtocolV2::client_reconnect() case Tag::SESSION_RESET: return frame_assembler.read_frame_payload( ).then([this](auto payload) { + if (unlikely(state != state_t::CONNECTING)) { + logger().debug("{} triggered {} before reset_session()", + conn, get_state_name(state)); + abort_protocol(); + } // handle_session_reset() logic auto reset = ResetFrame::Decode(payload->back()); logger().warn("{} GOT ResetFrame: full={}", conn, reset.full()); @@ -967,6 +977,12 @@ ProtocolV2::reuse_connection( ProtocolV2* existing_proto, bool do_reset, bool reconnect, uint64_t conn_seq, uint64_t msg_seq) { + if (unlikely(state != state_t::ACCEPTING)) { + logger().debug("{} triggered {} before trigger_replacing()", + conn, get_state_name(state)); + abort_protocol(); + } + existing_proto->trigger_replacing(reconnect, do_reset, frame_assembler.to_replace(), @@ -1036,6 +1052,11 @@ ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn) logger().warn("{} server_connect:" " existing connection {} is a lossy channel. Close existing in favor of" " this connection", conn, *existing_conn); + if (unlikely(state != state_t::ACCEPTING)) { + logger().debug("{} triggered {} before execute_establishing()", + conn, get_state_name(state)); + abort_protocol(); + } execute_establishing(existing_conn); return seastar::make_ready_future(next_step_t::ready); } @@ -1166,6 +1187,11 @@ ProtocolV2::server_connect() if (existing_conn) { return handle_existing_connection(existing_conn); } else { + if (unlikely(state != state_t::ACCEPTING)) { + logger().debug("{} triggered {} before execute_establishing()", + conn, get_state_name(state)); + abort_protocol(); + } execute_establishing(nullptr); return seastar::make_ready_future(next_step_t::ready); } @@ -1457,12 +1483,6 @@ seastar::future<> ProtocolV2::finish_auth() // ESTABLISHING void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) { - if (unlikely(state != state_t::ACCEPTING)) { - logger().debug("{} triggered {} before execute_establishing()", - conn, get_state_name(state)); - abort_protocol(); - } - auto accept_me = [this] { messenger.register_conn( seastar::static_pointer_cast( @@ -1488,6 +1508,11 @@ void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) { dispatchers.ms_handle_accept( seastar::static_pointer_cast(conn.shared_from_this())); + if (unlikely(state != state_t::ESTABLISHING)) { + logger().debug("{} triggered {} after ms_handle_accept() during execute_establishing()", + conn, get_state_name(state)); + abort_protocol(); + } gated_execute("execute_establishing", [this] { return seastar::futurize_invoke([this] { @@ -1726,6 +1751,12 @@ seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp, std::size_t m { return frame_assembler.read_frame_payload( ).then([this, throttle_stamp, msg_size](auto payload) { + if (unlikely(state != state_t::READY)) { + logger().debug("{} triggered {} during read_message()", + conn, get_state_name(state)); + abort_protocol(); + } + utime_t recv_stamp{seastar::lowres_system_clock::now()}; // we need to get the size before std::moving segments data @@ -1803,6 +1834,7 @@ seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp, std::size_t m // TODO: change MessageRef with seastar::shared_ptr auto msg_ref = MessageRef{message, false}; + assert(state == state_t::READY); // throttle the reading process by the returned future return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref)); }); @@ -1814,7 +1846,12 @@ void ProtocolV2::execute_ready(bool dispatch_connect) trigger_state(state_t::READY, out_state_t::open, false); if (dispatch_connect) { dispatchers.ms_handle_connect( - seastar::static_pointer_cast(conn.shared_from_this())); + seastar::static_pointer_cast(conn.shared_from_this())); + if (unlikely(state != state_t::READY)) { + logger().debug("{} triggered {} after ms_handle_connect() during execute_ready()", + conn, get_state_name(state)); + abort_protocol(); + } } #ifdef UNIT_TESTS_BUILT if (conn.interceptor) { @@ -2000,6 +2037,7 @@ void ProtocolV2::do_close( { if (closed) { // already closing + assert(state == state_t::CLOSING); return; }