From: Yingxin Cheng Date: Mon, 5 Jun 2023 02:56:27 +0000 (+0800) Subject: crimson/net: convert all interactions between protocol and io-handler to be cross... X-Git-Tag: v19.0.0~951^2~6 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=0b60209df64804d2a2f4121216cbaf058b38515b;p=ceph.git crimson/net: convert all interactions between protocol and io-handler to be cross-core Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index f7f2bccfe41e4..e1ffef2908894 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -237,7 +237,20 @@ void ProtocolV2::trigger_state_phase2( } else { assert(new_io_state != io_state_t::open); } - io_handler.set_io_state(new_io_state, std::move(fa), need_notify_out); + logger().debug("{} IOHandler::set_io_state(): new_state={}, new_io_state={}, " + "fa={}, set_notify_out={}", + conn, get_state_name(new_state), new_io_state, + fa ? fmt::format("(sid={})", fa->get_shard_id()) : "N/A", + need_notify_out); + gate.dispatch_in_background( + "set_io_state", conn, + [this, new_io_state, fa=std::move(fa)]() mutable { + return seastar::smp::submit_to( + io_handler.get_shard_id(), + [this, new_io_state, fa=std::move(fa), set_notify_out=need_notify_out]() mutable { + io_handler.set_io_state(new_io_state, std::move(fa), set_notify_out); + }); + }); if (need_exit_io) { // from READY @@ -375,8 +388,17 @@ void ProtocolV2::reset_session(bool full) client_cookie = generate_client_cookie(); peer_global_seq = 0; } + + logger().debug("{} IOHandler::reset_session({})", conn, full); io_states.reset_session(full); - io_handler.reset_session(full); + gate.dispatch_in_background( + "reset_session", conn, [this, full] { + return seastar::smp::submit_to( + io_handler.get_shard_id(), [this, full] { + io_handler.reset_session(full); + }); + }); + // user can make changes } seastar::future> @@ -656,9 +678,23 @@ ProtocolV2::client_connect() case Tag::SERVER_IDENT: return frame_assembler->read_frame_payload( ).then([this](auto payload) { + if (unlikely(state != state_t::CONNECTING)) { + logger().debug("{} triggered {} at receiving SERVER_IDENT", + conn, get_state_name(state)); + abort_protocol(); + } + // handle_server_ident() logic + logger().debug("{} IOHandler::requeue_out_sent()", conn); io_states.requeue_out_sent(); - io_handler.requeue_out_sent(); + gate.dispatch_in_background( + "requeue_out_sent", conn, [this] { + return seastar::smp::submit_to( + io_handler.get_shard_id(), [this] { + io_handler.requeue_out_sent(); + }); + }); + auto server_ident = ServerIdentFrame::Decode(payload->back()); logger().debug("{} GOT ServerIdentFrame:" " addrs={}, gid={}, gs={}," @@ -785,12 +821,28 @@ ProtocolV2::client_reconnect() case Tag::SESSION_RECONNECT_OK: return frame_assembler->read_frame_payload( ).then([this](auto payload) { + if (unlikely(state != state_t::CONNECTING)) { + logger().debug("{} triggered {} at receiving RECONNECT_OK", + conn, get_state_name(state)); + abort_protocol(); + } + // handle_reconnect_ok() logic auto reconnect_ok = ReconnectOkFrame::Decode(payload->back()); - logger().debug("{} GOT ReconnectOkFrame: msg_seq={}", + logger().debug("{} GOT ReconnectOkFrame: msg_seq={}, " + "IOHandler::requeue_out_sent_up_to()", conn, reconnect_ok.msg_seq()); + io_states.requeue_out_sent_up_to(); - io_handler.requeue_out_sent_up_to(reconnect_ok.msg_seq()); + auto msg_seq = reconnect_ok.msg_seq(); + gate.dispatch_in_background( + "requeue_out_reconnecting", conn, [this, msg_seq] { + return seastar::smp::submit_to( + io_handler.get_shard_id(), [this, msg_seq] { + io_handler.requeue_out_sent_up_to(msg_seq); + }); + }); + return seastar::make_ready_future(next_step_t::ready); }); default: { @@ -1763,11 +1815,19 @@ ProtocolV2::send_server_ident() // refered to async-conn v2: not assign gs to global_seq global_seq = messenger.get_global_seq(); - logger().debug("{} UPDATE: gs={} for server ident", conn, global_seq); + logger().debug("{} UPDATE: gs={} for server ident, " + "IOHandler::reset_peer_state()", + conn, global_seq); // this is required for the case when this connection is being replaced io_states.reset_peer_state(); - io_handler.reset_peer_state(); + gate.dispatch_in_background( + "reset_peer_state", conn, [this] { + return seastar::smp::submit_to( + io_handler.get_shard_id(), [this] { + io_handler.reset_peer_state(); + }); + }); if (!conn.policy.lossy) { server_cookie = ceph::util::generate_random_number(1, -1ll); @@ -1925,8 +1985,17 @@ void ProtocolV2::trigger_replacing(bool reconnect, if (reconnect) { connect_seq = new_connect_seq; // send_reconnect_ok() logic + + logger().debug("{} IOHandler::requeue_out_sent_up_to({})", conn, new_msg_seq); io_states.requeue_out_sent_up_to(); - io_handler.requeue_out_sent_up_to(new_msg_seq); + gate.dispatch_in_background( + "requeue_out_replacing", conn, [this, new_msg_seq] { + return seastar::smp::submit_to( + io_handler.get_shard_id(), [this, new_msg_seq] { + io_handler.requeue_out_sent_up_to(new_msg_seq); + }); + }); + auto reconnect_ok = ReconnectOkFrame::Encode(io_states.in_seq); logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, io_states.in_seq); return frame_assembler->write_flush_frame(reconnect_ok); diff --git a/src/crimson/net/io_handler.cc b/src/crimson/net/io_handler.cc index 576d91b7a15e4..7072c4d0b2ef8 100644 --- a/src/crimson/net/io_handler.cc +++ b/src/crimson/net/io_handler.cc @@ -215,10 +215,16 @@ void IOHandler::mark_down() return; } - logger().info("{} mark_down() with {}", + logger().info("{} mark_down() at {}, send notify_mark_down()", conn, io_stat_printer{*this}); set_io_state(io_state_t::drop); - handshake_listener->notify_mark_down(); + shard_states->dispatch_in_background( + "notify_mark_down", conn, [this] { + return seastar::smp::submit_to( + conn.get_messenger_shard_id(), [this] { + handshake_listener->notify_mark_down(); + }); + }); } void IOHandler::print_io_stat(std::ostream &out) const @@ -671,8 +677,9 @@ IOHandler::do_out_dispatch(shard_states_t &ctx) } if (io_state == io_state_t::open) { - logger().info("{} do_out_dispatch(): fault at {}, going to delay -- {}", - conn, io_state, e.what()); + logger().info("{} do_out_dispatch(): fault at {}, {}, going to delay -- {}, " + "send notify_out_fault()", + conn, io_state, io_stat_printer{*this}, e.what()); std::exception_ptr eptr; try { throw e; @@ -680,9 +687,15 @@ IOHandler::do_out_dispatch(shard_states_t &ctx) eptr = std::current_exception(); } set_io_state(io_state_t::delay); - auto states = get_states(); - handshake_listener->notify_out_fault( - "do_out_dispatch", eptr, states); + shard_states->dispatch_in_background( + "notify_out_fault(out)", conn, [this, eptr] { + auto states = get_states(); + return seastar::smp::submit_to( + conn.get_messenger_shard_id(), [this, eptr, states] { + handshake_listener->notify_out_fault( + "do_out_dispatch", eptr, states); + }); + }); } else { if (io_state != io_state_t::switched) { logger().info("{} do_out_dispatch(): fault at {}, {} -- {}", @@ -708,7 +721,14 @@ void IOHandler::notify_out_dispatch() { assert(is_out_queued()); if (need_notify_out) { - handshake_listener->notify_out(); + logger().debug("{} send notify_out()", conn); + shard_states->dispatch_in_background( + "notify_out", conn, [this] { + return seastar::smp::submit_to( + conn.get_messenger_shard_id(), [this] { + handshake_listener->notify_out(); + }); + }); } if (shard_states->try_enter_out_dispatching()) { shard_states->dispatch_in_background( @@ -927,12 +947,19 @@ void IOHandler::do_in_dispatch() auto io_state = ctx.get_io_state(); if (io_state == io_state_t::open) { - logger().info("{} do_in_dispatch(): fault at {}, going to delay -- {}", - conn, io_state, e_what); + logger().info("{} do_in_dispatch(): fault at {}, {}, going to delay -- {}, " + "send notify_out_fault()", + conn, io_state, io_stat_printer{*this}, e_what); set_io_state(io_state_t::delay); - auto states = get_states(); - handshake_listener->notify_out_fault( - "do_in_dispatch", eptr, states); + shard_states->dispatch_in_background( + "notify_out_fault(in)", conn, [this, eptr] { + auto states = get_states(); + return seastar::smp::submit_to( + conn.get_messenger_shard_id(), [this, eptr, states] { + handshake_listener->notify_out_fault( + "do_in_dispatch", eptr, states); + }); + }); } else { if (io_state != io_state_t::switched) { logger().info("{} do_in_dispatch(): fault at {}, {} -- {}", diff --git a/src/crimson/net/io_handler.h b/src/crimson/net/io_handler.h index acb171bde282e..ecdd02f157335 100644 --- a/src/crimson/net/io_handler.h +++ b/src/crimson/net/io_handler.h @@ -62,7 +62,8 @@ struct io_handler_state { * * The interface class for IOHandler to notify the ProtocolV2. * - * The notifications may be cross-core and asynchronous. + * The notifications may be cross-core and must be sent to + * SocketConnection::get_messenger_shard_id() */ class HandshakeListener { public: @@ -145,6 +146,10 @@ public: * The calls may be cross-core and asynchronous */ public: + /* + * should not be called cross-core + */ + void set_handshake_listener(HandshakeListener &hl) { ceph_assert_always(handshake_listener == nullptr); handshake_listener = &hl; @@ -159,6 +164,10 @@ public: }; void print_io_stat(std::ostream &out) const; + /* + * may be called cross-core + */ + seastar::future<> close_io(bool is_dispatch_reset, bool is_replace); /**