From f60fd12151867920545f6034136e1b8cb6310062 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Wed, 7 Dec 2022 10:06:31 +0800 Subject: [PATCH] crimson/net: cleanups to Protocol and ProtocolV2 interfaces Signed-off-by: Yingxin Cheng --- src/crimson/net/Protocol.cc | 10 +++ src/crimson/net/Protocol.h | 21 ++++--- src/crimson/net/ProtocolV2.cc | 13 ++-- src/crimson/net/ProtocolV2.h | 115 +++++++++++++++++----------------- 4 files changed, 85 insertions(+), 74 deletions(-) diff --git a/src/crimson/net/Protocol.cc b/src/crimson/net/Protocol.cc index 01d2245cd2ae8..b470abf5f69fb 100644 --- a/src/crimson/net/Protocol.cc +++ b/src/crimson/net/Protocol.cc @@ -245,6 +245,16 @@ seastar::future Protocol::wait_io_exit_dispatching() }); } +void Protocol::reset_session(bool full) +{ + // reset in + in_seq = 0; + if (full) { + reset_out(); + dispatch_remote_reset(); + } +} + void Protocol::requeue_out_sent() { assert(io_state != io_state_t::open); diff --git a/src/crimson/net/Protocol.h b/src/crimson/net/Protocol.h index 062ffe697e2af..5bfdc71282bfb 100644 --- a/src/crimson/net/Protocol.h +++ b/src/crimson/net/Protocol.h @@ -78,8 +78,14 @@ class Protocol { // TODO: encapsulate a SessionedSender class protected: - seastar::future<> close_io() { + seastar::future<> close_io( + bool is_dispatch_reset, + bool is_replace) { ceph_assert_always(io_state == io_state_t::drop); + + if (is_dispatch_reset) { + dispatch_reset(is_replace); + } assert(!gate.is_closed()); return gate.close(); } @@ -102,16 +108,12 @@ class Protocol { seastar::future wait_io_exit_dispatching(); + void reset_session(bool full); + void requeue_out_sent_up_to(seq_num_t seq); void requeue_out_sent(); - void reset_out(); - - void reset_in() { - in_seq = 0; - } - bool is_out_queued_or_sent() const { return is_out_queued() || !out_sent_msgs.empty(); } @@ -124,11 +126,11 @@ class Protocol { void dispatch_connect(); + private: void dispatch_reset(bool is_replace); void dispatch_remote_reset(); - private: bool is_out_queued() const { return (!out_pending_msgs.empty() || ack_left > 0 || @@ -136,6 +138,8 @@ class Protocol { next_keepalive_ack.has_value()); } + void reset_out(); + seastar::future try_exit_out_dispatch(); seastar::future<> do_out_dispatch(); @@ -153,6 +157,7 @@ class Protocol { void do_in_dispatch(); +private: ChainedDispatchers &dispatchers; SocketConnection &conn; diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index 900d4a5ca05d1..b4a5767d7fa26 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -351,13 +351,11 @@ void ProtocolV2::reset_session(bool full) { server_cookie = 0; connect_seq = 0; - reset_in(); if (full) { client_cookie = generate_client_cookie(); peer_global_seq = 0; - reset_out(); - dispatch_remote_reset(); } + do_reset_session(full); } seastar::future> @@ -1636,7 +1634,7 @@ ProtocolV2::send_server_ident() // this is required for the case when this connection is being replaced requeue_out_sent_up_to(0); - reset_in(); + do_reset_session(false); if (!conn.policy.lossy) { server_cookie = ceph::util::generate_random_number(1, -1ll); @@ -1935,11 +1933,8 @@ void ProtocolV2::do_close( } assert(!gate.is_closed()); auto handshake_closed = gate.close(); - auto io_closed = close_io(); - - if (is_dispatch_reset) { - dispatch_reset(is_replace); - } + auto io_closed = close_io( + is_dispatch_reset, is_replace); // asynchronous operations assert(!closed_clean_fut.valid()); diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h index cbec7923b3222..820d8e5f0acfd 100644 --- a/src/crimson/net/ProtocolV2.h +++ b/src/crimson/net/ProtocolV2.h @@ -44,6 +44,7 @@ class ProtocolV2 final : public Protocol { void notify_mark_down() override; + private: seastar::future<> wait_exit_io() { if (exit_io.has_value()) { return exit_io->get_shared_future(); @@ -52,33 +53,6 @@ class ProtocolV2 final : public Protocol { } } - private: - SocketConnection &conn; - - SocketMessenger &messenger; - - bool has_socket = false; - - // the socket exists and it is not shutdown - bool is_socket_valid = false; - - FrameAssemblerV2Ref frame_assembler; - - std::optional> exit_io; - - AuthConnectionMetaRef auth_meta; - - crimson::common::Gated gate; - - bool closed = false; - - // become valid only after closed == true - seastar::shared_future<> closed_clean_fut; - -#ifdef UNIT_TESTS_BUILT - bool closed_clean = false; - -#endif enum class state_t { NONE = 0, ACCEPTING, @@ -91,7 +65,6 @@ class ProtocolV2 final : public Protocol { REPLACING, CLOSING }; - state_t state = state_t::NONE; static const char *get_state_name(state_t state) { const char *const statenames[] = {"NONE", @@ -109,16 +82,6 @@ class ProtocolV2 final : public Protocol { void trigger_state(state_t state, io_state_t io_state, bool reentrant); - uint64_t peer_supported_features = 0; - - uint64_t client_cookie = 0; - uint64_t server_cookie = 0; - uint64_t global_seq = 0; - uint64_t peer_global_seq = 0; - uint64_t connect_seq = 0; - - seastar::future<> execution_done = seastar::now(); - template void gated_execute(const char *what, T &who, Func &&func) { gate.dispatch_in_background(what, who, [this, &who, &func] { @@ -141,25 +104,6 @@ class ProtocolV2 final : public Protocol { }); } - class Timer { - double last_dur_ = 0.0; - const SocketConnection& conn; - std::optional as; - public: - Timer(SocketConnection& conn) : conn(conn) {} - double last_dur() const { return last_dur_; } - seastar::future<> backoff(double seconds); - void cancel() { - last_dur_ = 0.0; - if (as) { - as->request_abort(); - as = std::nullopt; - } - } - }; - Timer protocol_timer; - - private: void fault(state_t expected_state, const char *where, std::exception_ptr eptr); @@ -251,6 +195,63 @@ class ProtocolV2 final : public Protocol { // reentrant void do_close(bool is_dispatch_reset, std::optional> f_accept_new=std::nullopt); + + private: + SocketConnection &conn; + + SocketMessenger &messenger; + + bool has_socket = false; + + // the socket exists and it is not shutdown + bool is_socket_valid = false; + + FrameAssemblerV2Ref frame_assembler; + + std::optional> exit_io; + + AuthConnectionMetaRef auth_meta; + + crimson::common::Gated gate; + + bool closed = false; + + // become valid only after closed == true + seastar::shared_future<> closed_clean_fut; + +#ifdef UNIT_TESTS_BUILT + bool closed_clean = false; + +#endif + state_t state = state_t::NONE; + + uint64_t peer_supported_features = 0; + + uint64_t client_cookie = 0; + uint64_t server_cookie = 0; + uint64_t global_seq = 0; + uint64_t peer_global_seq = 0; + uint64_t connect_seq = 0; + + seastar::future<> execution_done = seastar::now(); + + class Timer { + double last_dur_ = 0.0; + const SocketConnection& conn; + std::optional as; + public: + Timer(SocketConnection& conn) : conn(conn) {} + double last_dur() const { return last_dur_; } + seastar::future<> backoff(double seconds); + void cancel() { + last_dur_ = 0.0; + if (as) { + as->request_abort(); + as = std::nullopt; + } + } + }; + Timer protocol_timer; }; } // namespace crimson::net -- 2.39.5