From d20fb8304085299d8ec434145d77d1d43af5aae3 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Wed, 31 May 2023 16:57:41 +0800 Subject: [PATCH] crimson/net: notify and update io_states from io_handler to protocol asynchronously Signed-off-by: Yingxin Cheng --- src/crimson/net/ProtocolV2.cc | 62 +++++++++++++--------- src/crimson/net/ProtocolV2.h | 9 +++- src/crimson/net/io_handler.cc | 49 ++++++++++++++---- src/crimson/net/io_handler.h | 96 ++++++++++++++++++++++++++++++----- 4 files changed, 168 insertions(+), 48 deletions(-) diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index 1ba97617f58aa..99a48536de5ab 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -23,8 +23,6 @@ using namespace ceph::msgr::v2; using crimson::common::local_conf; -using io_state_t = crimson::net::IOHandler::io_state_t; -using io_stat_printer = crimson::net::IOHandler::io_stat_printer; namespace { @@ -148,7 +146,9 @@ ProtocolV2::ProtocolV2(SocketConnection& conn, frame_assembler{FrameAssemblerV2::create(conn)}, auth_meta{seastar::make_lw_shared()}, protocol_timer{conn} -{} +{ + io_states = io_handler.get_states(); +} ProtocolV2::~ProtocolV2() {} @@ -207,13 +207,21 @@ void ProtocolV2::trigger_state(state_t new_state, io_state_t new_io_state, bool ceph_assert_always(!exit_io.has_value()); exit_io = seastar::shared_promise<>(); } + + bool need_notify_out; + if (new_state == state_t::STANDBY && !conn.policy.server) { + need_notify_out = true; + } else { + need_notify_out = false; + } + state = new_state; if (new_state == state_t::READY) { // I'm not responsible to shutdown the socket at READY is_socket_valid = false; - io_handler.set_io_state(new_io_state, std::move(frame_assembler)); + io_handler.set_io_state(new_io_state, std::move(frame_assembler), need_notify_out); } else { - io_handler.set_io_state(new_io_state, nullptr); + io_handler.set_io_state(new_io_state, nullptr, need_notify_out); } /* @@ -223,9 +231,10 @@ void ProtocolV2::trigger_state(state_t new_state, io_state_t new_io_state, bool if (pre_state == state_t::READY) { gate.dispatch_in_background("exit_io", conn, [this] { return io_handler.wait_io_exit_dispatching( - ).then([this](FrameAssemblerV2Ref fa) { - frame_assembler = std::move(fa); + ).then([this](auto ret) { + frame_assembler = std::move(ret.frame_assembler); ceph_assert_always(!frame_assembler->is_socket_valid()); + io_states = ret.io_states; exit_io->set_value(); exit_io = std::nullopt; }); @@ -295,20 +304,20 @@ void ProtocolV2::fault( } if (conn.policy.server || - (conn.policy.standby && !io_handler.is_out_queued_or_sent())) { + (conn.policy.standby && !io_states.is_out_queued_or_sent())) { if (conn.policy.server) { logger().info("{} protocol {} {} fault as server, going to STANDBY {} -- {}", conn, get_state_name(state), where, - io_stat_printer{io_handler}, + io_states, e_what); } else { logger().info("{} protocol {} {} fault with nothing to send, going to STANDBY {} -- {}", conn, get_state_name(state), where, - io_stat_printer{io_handler}, + io_states, e_what); } execute_standby(); @@ -318,7 +327,7 @@ void ProtocolV2::fault( conn, get_state_name(state), where, - io_stat_printer{io_handler}, + io_states, e_what); execute_wait(false); } else { @@ -328,7 +337,7 @@ void ProtocolV2::fault( conn, get_state_name(state), where, - io_stat_printer{io_handler}, + io_states, e_what); execute_connecting(); } @@ -342,6 +351,7 @@ void ProtocolV2::reset_session(bool full) client_cookie = generate_client_cookie(); peer_global_seq = 0; } + io_states.reset_session(full); io_handler.reset_session(full); } @@ -623,6 +633,7 @@ ProtocolV2::client_connect() return frame_assembler->read_frame_payload( ).then([this](auto payload) { // handle_server_ident() logic + io_states.requeue_out_sent(); io_handler.requeue_out_sent(); auto server_ident = ServerIdentFrame::Decode(payload->back()); logger().debug("{} GOT ServerIdentFrame:" @@ -699,12 +710,12 @@ ProtocolV2::client_reconnect() server_cookie, global_seq, connect_seq, - io_handler.get_in_seq()); + io_states.in_seq); logger().debug("{} WRITE ReconnectFrame: addrs={}, client_cookie={}," " server_cookie={}, gs={}, cs={}, in_seq={}", conn, messenger.get_myaddrs(), client_cookie, server_cookie, - global_seq, connect_seq, io_handler.get_in_seq()); + global_seq, connect_seq, io_states.in_seq); return frame_assembler->write_flush_frame(reconnect).then([this] { return frame_assembler->read_main_preamble(); }).then([this](auto ret) { @@ -754,6 +765,7 @@ ProtocolV2::client_reconnect() auto reconnect_ok = ReconnectOkFrame::Decode(payload->back()); logger().debug("{} GOT ReconnectOkFrame: msg_seq={}", conn, reconnect_ok.msg_seq()); + io_states.requeue_out_sent_up_to(); io_handler.requeue_out_sent_up_to(reconnect_ok.msg_seq()); return seastar::make_ready_future(next_step_t::ready); }); @@ -886,8 +898,7 @@ void ProtocolV2::execute_connecting() logger().info("{} connected: gs={}, pgs={}, cs={}, " "client_cookie={}, server_cookie={}, {}", conn, global_seq, peer_global_seq, connect_seq, - client_cookie, server_cookie, - io_stat_printer{io_handler}); + client_cookie, server_cookie, io_states); io_handler.dispatch_connect(); if (unlikely(state != state_t::CONNECTING)) { logger().debug("{} triggered {} after ms_handle_connect(), abort", @@ -1653,8 +1664,7 @@ void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) { logger().info("{} established: gs={}, pgs={}, cs={}, " "client_cookie={}, server_cookie={}, {}", conn, global_seq, peer_global_seq, connect_seq, - client_cookie, server_cookie, - io_stat_printer{io_handler}); + client_cookie, server_cookie, io_states); execute_ready(); }).handle_exception([this](std::exception_ptr eptr) { fault(state_t::ESTABLISHING, "execute_establishing", eptr); @@ -1674,6 +1684,7 @@ ProtocolV2::send_server_ident() logger().debug("{} UPDATE: gs={} for server ident", conn, global_seq); // this is required for the case when this connection is being replaced + io_states.reset_peer_state(); io_handler.reset_peer_state(); if (!conn.policy.lossy) { @@ -1783,9 +1794,10 @@ void ProtocolV2::trigger_replacing(bool reconnect, if (reconnect) { connect_seq = new_connect_seq; // send_reconnect_ok() logic + io_states.requeue_out_sent_up_to(); io_handler.requeue_out_sent_up_to(new_msg_seq); - auto reconnect_ok = ReconnectOkFrame::Encode(io_handler.get_in_seq()); - logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, io_handler.get_in_seq()); + auto reconnect_ok = ReconnectOkFrame::Encode(io_states.in_seq); + logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, io_states.in_seq); return frame_assembler->write_flush_frame(reconnect_ok); } else { client_cookie = new_client_cookie; @@ -1809,8 +1821,7 @@ void ProtocolV2::trigger_replacing(bool reconnect, "client_cookie={}, server_cookie={}, {}", conn, reconnect ? "reconnected" : "connected", global_seq, peer_global_seq, connect_seq, - client_cookie, server_cookie, - io_stat_printer{io_handler}); + client_cookie, server_cookie, io_states); execute_ready(); }).handle_exception([this](std::exception_ptr eptr) { fault(state_t::REPLACING, "trigger_replacing", eptr); @@ -1820,8 +1831,12 @@ void ProtocolV2::trigger_replacing(bool reconnect, // READY state -void ProtocolV2::notify_out_fault(const char *where, std::exception_ptr eptr) +void ProtocolV2::notify_out_fault( + const char *where, + std::exception_ptr eptr, + io_handler_state _io_states) { + io_states = _io_states; fault(state_t::READY, where, eptr); } @@ -1843,6 +1858,7 @@ void ProtocolV2::execute_standby() void ProtocolV2::notify_out() { + io_states.is_out_queued = true; if (unlikely(state == state_t::STANDBY && !conn.policy.server)) { logger().info("{} notify_out(): at {}, going to CONNECTING", conn, get_state_name(state)); diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h index f81ffdbfbc69a..2aa9496ef83e5 100644 --- a/src/crimson/net/ProtocolV2.h +++ b/src/crimson/net/ProtocolV2.h @@ -30,7 +30,7 @@ public: private: void notify_out() final; - void notify_out_fault(const char *, std::exception_ptr) final; + void notify_out_fault(const char *where, std::exception_ptr, io_handler_state) final; void notify_mark_down() final; @@ -57,6 +57,8 @@ public: #endif private: + using io_state_t = IOHandler::io_state_t; + seastar::future<> wait_exit_io() { if (exit_io.has_value()) { return exit_io->get_shared_future(); @@ -92,7 +94,7 @@ private: return statenames[static_cast(state)]; } - void trigger_state(state_t state, IOHandler::io_state_t io_state, bool reentrant); + void trigger_state(state_t state, io_state_t io_state, bool reentrant); template void gated_execute(const char *what, T &who, Func &&func) { @@ -215,6 +217,9 @@ private: IOHandler &io_handler; + // asynchronously populated from io_handler + io_handler_state io_states; + bool has_socket = false; // the socket exists and it is not shutdown diff --git a/src/crimson/net/io_handler.cc b/src/crimson/net/io_handler.cc index 952158c5d31cd..cbd16013e0708 100644 --- a/src/crimson/net/io_handler.cc +++ b/src/crimson/net/io_handler.cc @@ -78,7 +78,7 @@ ceph::bufferlist IOHandler::sweep_out_pending_msgs_to_sent( } if (require_ack && num_msgs == 0u) { - auto ack_frame = AckFrame::Encode(get_in_seq()); + auto ack_frame = AckFrame::Encode(in_seq); bl.append(frame_assembler->get_buffer(ack_frame)); } @@ -101,7 +101,7 @@ ceph::bufferlist IOHandler::sweep_out_pending_msgs_to_sent( header.type, header.priority, header.version, ceph_le32(0), header.data_off, - ceph_le64(get_in_seq()), + ceph_le64(in_seq), footer.flags, header.compat_version, header.reserved}; @@ -172,8 +172,9 @@ void IOHandler::print_io_stat(std::ostream &out) const } void IOHandler::set_io_state( - const IOHandler::io_state_t &new_state, - FrameAssemblerV2Ref fa) + io_state_t new_state, + FrameAssemblerV2Ref fa, + bool set_notify_out) { ceph_assert_always(!( (new_state == io_state_t::none && io_state != io_state_t::none) || @@ -212,6 +213,16 @@ void IOHandler::set_io_state( assert(fa == nullptr); } + if (new_state == io_state_t::delay) { + need_notify_out = set_notify_out; + if (need_notify_out) { + maybe_notify_out_dispatch(); + } + } else { + assert(set_notify_out == false); + need_notify_out = false; + } + if (io_state != new_state) { io_state = new_state; io_state_changed.set_value(); @@ -227,7 +238,8 @@ void IOHandler::set_io_state( } } -seastar::future IOHandler::wait_io_exit_dispatching() +seastar::future +IOHandler::wait_io_exit_dispatching() { ceph_assert_always(io_state != io_state_t::open); ceph_assert_always(frame_assembler != nullptr); @@ -248,7 +260,9 @@ seastar::future IOHandler::wait_io_exit_dispatching() } }() ).discard_result().then([this] { - return std::move(frame_assembler); + return exit_dispatching_ret{ + std::move(frame_assembler), + get_states()}; }); } @@ -289,7 +303,7 @@ void IOHandler::requeue_out_sent() std::make_move_iterator(out_sent_msgs.begin()), std::make_move_iterator(out_sent_msgs.end())); out_sent_msgs.clear(); - notify_out_dispatch(); + maybe_notify_out_dispatch(); } void IOHandler::requeue_out_sent_up_to(seq_num_t seq) @@ -487,7 +501,9 @@ seastar::future<> IOHandler::do_out_dispatch() eptr = std::current_exception(); } set_io_state(io_state_t::delay); - handshake_listener->notify_out_fault("do_out_dispatch", eptr); + auto states = get_states(); + handshake_listener->notify_out_fault( + "do_out_dispatch", eptr, states); } else { logger().info("{} do_out_dispatch(): fault at {} -- {}", conn, io_state, e.what()); @@ -497,9 +513,18 @@ seastar::future<> IOHandler::do_out_dispatch() }); } +void IOHandler::maybe_notify_out_dispatch() +{ + if (is_out_queued()) { + notify_out_dispatch(); + } +} + void IOHandler::notify_out_dispatch() { - handshake_listener->notify_out(); + if (need_notify_out) { + handshake_listener->notify_out(); + } if (out_dispatching) { // already dispatching return; @@ -587,7 +612,7 @@ IOHandler::read_message(utime_t throttle_stamp, std::size_t msg_size) // client side queueing because messages can't be renumbered, but the (kernel) // client will occasionally pull a message out of the sent queue to send // elsewhere. in that case it doesn't matter if we "got" it or not. - uint64_t cur_seq = get_in_seq(); + uint64_t cur_seq = in_seq; if (message->get_seq() <= cur_seq) { logger().error("{} got old message {} <= {} {}, discarding", conn, message->get_seq(), cur_seq, *message); @@ -726,7 +751,9 @@ void IOHandler::do_in_dispatch() logger().info("{} do_in_dispatch(): fault at {}, going to delay -- {}", conn, io_state, e_what); set_io_state(io_state_t::delay); - handshake_listener->notify_out_fault("do_in_dispatch", eptr); + auto states = get_states(); + handshake_listener->notify_out_fault( + "do_in_dispatch", eptr, states); } else { logger().info("{} do_in_dispatch(): fault at {} -- {}", conn, io_state, e_what); diff --git a/src/crimson/net/io_handler.h b/src/crimson/net/io_handler.h index 2b48c8ab17012..db82de5160ec8 100644 --- a/src/crimson/net/io_handler.h +++ b/src/crimson/net/io_handler.h @@ -12,10 +12,55 @@ namespace crimson::net { +/** + * io_handler_state + * + * It is required to populate the states from IOHandler to ProtocolV2 + * asynchronously. + */ +struct io_handler_state { + seq_num_t in_seq; + bool is_out_queued; + bool has_out_sent; + + bool is_out_queued_or_sent() const { + return is_out_queued || has_out_sent; + } + + /* + * should be consistent with the accroding interfaces in IOHandler + */ + + void reset_session(bool full) { + in_seq = 0; + if (full) { + is_out_queued = false; + has_out_sent = false; + } + } + + void reset_peer_state() { + in_seq = 0; + is_out_queued = is_out_queued_or_sent(); + has_out_sent = false; + } + + void requeue_out_sent_up_to() { + // noop since the information is insufficient + } + + void requeue_out_sent() { + if (has_out_sent) { + has_out_sent = false; + is_out_queued = true; + } + } +}; + /** * HandshakeListener * - * The interface class for IOHandler to notify the ProtocolV2 for handshake. + * The interface class for IOHandler to notify the ProtocolV2. * * The notifications may be cross-core and asynchronous. */ @@ -30,7 +75,10 @@ public: virtual void notify_out() = 0; - virtual void notify_out_fault(const char *where, std::exception_ptr) = 0; + virtual void notify_out_fault( + const char *where, + std::exception_ptr, + io_handler_state) = 0; virtual void notify_mark_down() = 0; @@ -102,6 +150,10 @@ public: handshake_listener = &hl; } + io_handler_state get_states() const { + return {in_seq, is_out_queued(), has_out_sent()}; + } + struct io_stat_printer { const IOHandler &io_handler; }; @@ -137,9 +189,16 @@ public: }; friend class fmt::formatter; - void set_io_state(const io_state_t &new_state, FrameAssemblerV2Ref fa=nullptr); + void set_io_state( + io_state_t new_state, + FrameAssemblerV2Ref fa = nullptr, + bool set_notify_out = false); - seastar::future wait_io_exit_dispatching(); + struct exit_dispatching_ret { + FrameAssemblerV2Ref frame_assembler; + io_handler_state io_states; + }; + seastar::future wait_io_exit_dispatching(); void reset_session(bool full); @@ -149,14 +208,6 @@ public: void requeue_out_sent(); - bool is_out_queued_or_sent() const { - return is_out_queued() || !out_sent_msgs.empty(); - } - - seq_num_t get_in_seq() const { - return in_seq; - } - void dispatch_accept(); void dispatch_connect(); @@ -188,6 +239,8 @@ public: std::optional maybe_keepalive_ack, bool require_ack); + void maybe_notify_out_dispatch(); + void notify_out_dispatch(); void ack_out_sent(seq_num_t seq); @@ -244,6 +297,8 @@ private: uint64_t ack_left = 0; + bool need_notify_out = false; + /* * in states for reading */ @@ -266,6 +321,23 @@ inline std::ostream& operator<<( } // namespace crimson::net +template <> +struct fmt::formatter { + constexpr auto parse(format_parse_context& ctx) { + return ctx.begin(); + } + + template + auto format(crimson::net::io_handler_state state, FormatContext& ctx) { + return fmt::format_to( + ctx.out(), + "io(in_seq={}, is_out_queued={}, has_out_sent={})", + state.in_seq, + state.is_out_queued, + state.has_out_sent); + } +}; + template <> struct fmt::formatter : fmt::formatter { -- 2.39.5