From e95543aa70f2edc8717aa856a89b75e17bf546ea Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Tue, 6 Dec 2022 09:15:28 +0800 Subject: [PATCH] crimson/net: move mark_down() from ProtocolV2 to Protocol Process mark_down in Protocol rather than in ProtocolV2 to prevent further event dispatching after mark_down is called by user. Then notify ProtocolV2 as the IO/socket core and handshake core can be different and the notification can be asynchronous. Signed-off-by: Yingxin Cheng --- src/crimson/net/Protocol.cc | 42 +++++++++++++++++++++++++++++ src/crimson/net/Protocol.h | 21 ++++++--------- src/crimson/net/ProtocolV2.cc | 2 +- src/crimson/net/ProtocolV2.h | 4 +-- src/crimson/net/SocketConnection.cc | 2 +- 5 files changed, 54 insertions(+), 17 deletions(-) diff --git a/src/crimson/net/Protocol.cc b/src/crimson/net/Protocol.cc index 12985bc7e00..b38f72539b5 100644 --- a/src/crimson/net/Protocol.cc +++ b/src/crimson/net/Protocol.cc @@ -138,6 +138,34 @@ seastar::future<> Protocol::send_keepalive() return seastar::now(); } +void Protocol::mark_down() +{ + ceph_assert_always(out_state != out_state_t::none); + need_dispatch_reset = false; + if (out_state == out_state_t::drop) { + return; + } + + logger().info("{} mark_down() with {}", + conn, io_stat_printer{*this}); + set_out_state(out_state_t::drop); + notify_mark_down(); +} + +void Protocol::print_io_stat(std::ostream &out) const +{ + out << "io_stat(" + << "out_state=" << fmt::format("{}", out_state) + << ", in_seq=" << in_seq + << ", out_seq=" << out_seq + << ", out_pending_msgs_size=" << out_pending_msgs.size() + << ", out_sent_msgs_size=" << out_sent_msgs.size() + << ", need_ack=" << (ack_left > 0) + << ", need_keepalive=" << need_keepalive + << ", need_keepalive_ack=" << bool(next_keepalive_ack) + << ")"; +} + void Protocol::set_out_state( const Protocol::out_state_t &new_state, FrameAssemblerV2Ref fa) @@ -274,6 +302,9 @@ void Protocol::reset_out() void Protocol::dispatch_accept() { + if (out_state == out_state_t::drop) { + return; + } // protocol_is_connected can be from true to true here if the replacing is // happening to a connected connection. protocol_is_connected = true; @@ -283,6 +314,9 @@ void Protocol::dispatch_accept() void Protocol::dispatch_connect() { + if (out_state == out_state_t::drop) { + return; + } ceph_assert_always(protocol_is_connected == false); protocol_is_connected = true; dispatchers.ms_handle_connect( @@ -291,6 +325,11 @@ void Protocol::dispatch_connect() void Protocol::dispatch_reset(bool is_replace) { + ceph_assert_always(out_state == out_state_t::drop); + if (!need_dispatch_reset) { + return; + } + need_dispatch_reset = false; dispatchers.ms_handle_reset( seastar::static_pointer_cast(conn.shared_from_this()), is_replace); @@ -298,6 +337,9 @@ void Protocol::dispatch_reset(bool is_replace) void Protocol::dispatch_remote_reset() { + if (out_state == out_state_t::drop) { + return; + } dispatchers.ms_handle_remote_reset( seastar::static_pointer_cast(conn.shared_from_this())); } diff --git a/src/crimson/net/Protocol.h b/src/crimson/net/Protocol.h index 4446e1781fb..e6e6c956ea7 100644 --- a/src/crimson/net/Protocol.h +++ b/src/crimson/net/Protocol.h @@ -21,8 +21,6 @@ class Protocol { Protocol(Protocol&&) = delete; virtual ~Protocol(); - virtual void close() = 0; - virtual seastar::future<> close_clean_yielded() = 0; #ifdef UNIT_TESTS_BUILT @@ -47,6 +45,8 @@ class Protocol { virtual void notify_out_fault(const char *where, std::exception_ptr) = 0; + virtual void notify_mark_down() = 0; + // the write state-machine public: using clock_t = seastar::lowres_system_clock; @@ -71,24 +71,17 @@ class Protocol { last_keepalive_ack = when; } + void mark_down(); + struct io_stat_printer { const Protocol &protocol; }; - void print_io_stat(std::ostream &out) const { - out << "io_stat(" - << "in_seq=" << in_seq - << ", out_seq=" << out_seq - << ", out_pending_msgs_size=" << out_pending_msgs.size() - << ", out_sent_msgs_size=" << out_sent_msgs.size() - << ", need_ack=" << (ack_left > 0) - << ", need_keepalive=" << need_keepalive - << ", need_keepalive_ack=" << bool(next_keepalive_ack) - << ")"; - } + void print_io_stat(std::ostream &out) const; // TODO: encapsulate a SessionedSender class protected: seastar::future<> close_out() { + ceph_assert_always(out_state == out_state_t::drop); assert(!gate.is_closed()); return gate.close(); } @@ -172,6 +165,8 @@ class Protocol { bool protocol_is_connected = false; + bool need_dispatch_reset = true; + /* * out states for writing */ diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index 76e21de0484..13be01e32de 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -1862,7 +1862,7 @@ void ProtocolV2::execute_server_wait() // CLOSING state -void ProtocolV2::close() +void ProtocolV2::notify_mark_down() { do_close(false); } diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h index ec4d2646813..807c63de503 100644 --- a/src/crimson/net/ProtocolV2.h +++ b/src/crimson/net/ProtocolV2.h @@ -19,8 +19,6 @@ class ProtocolV2 final : public Protocol { // public to SocketConnection, but private to the others private: - void close() override; - seastar::future<> close_clean_yielded() override; #ifdef UNIT_TESTS_BUILT @@ -46,6 +44,8 @@ class ProtocolV2 final : public Protocol { void notify_out_fault(const char *, std::exception_ptr) override; + void notify_mark_down() override; + seastar::future<> wait_exit_io() { if (exit_io.has_value()) { return exit_io->get_shared_future(); diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index e4a2d7d789b..5b3d806ed7e 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -104,7 +104,7 @@ void SocketConnection::set_last_keepalive_ack(clock_t::time_point when) void SocketConnection::mark_down() { assert(seastar::this_shard_id() == shard_id()); - protocol->close(); + protocol->mark_down(); } void -- 2.39.5