From 2c9a617081de190bb9f33c624041275688aa919e Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Tue, 31 Mar 2020 16:07:13 +0800 Subject: [PATCH] crimson/net: fix is_connected() to identify if handshake has completed Signed-off-by: Yingxin Cheng --- src/crimson/net/Protocol.cc | 5 ----- src/crimson/net/Protocol.h | 2 +- src/crimson/net/ProtocolV1.cc | 32 ++++++++++++++++++++------------ src/crimson/net/ProtocolV1.h | 9 ++++++++- src/crimson/net/ProtocolV2.cc | 24 ++++++++++++++++-------- src/crimson/net/ProtocolV2.h | 4 +++- 6 files changed, 48 insertions(+), 28 deletions(-) diff --git a/src/crimson/net/Protocol.cc b/src/crimson/net/Protocol.cc index c95cf1d08b5..0cbbe4a49db 100644 --- a/src/crimson/net/Protocol.cc +++ b/src/crimson/net/Protocol.cc @@ -35,11 +35,6 @@ Protocol::~Protocol() assert(!exit_open); } -bool Protocol::is_connected() const -{ - return write_state == write_state_t::open; -} - void Protocol::close(bool dispatch_reset, std::optional> f_accept_new) { diff --git a/src/crimson/net/Protocol.h b/src/crimson/net/Protocol.h index a6c922c33f1..290cb3fb843 100644 --- a/src/crimson/net/Protocol.h +++ b/src/crimson/net/Protocol.h @@ -23,7 +23,7 @@ class Protocol { Protocol(Protocol&&) = delete; virtual ~Protocol(); - bool is_connected() const; + virtual bool is_connected() const = 0; #ifdef UNIT_TESTS_BUILT bool is_closed_clean = false; diff --git a/src/crimson/net/ProtocolV1.cc b/src/crimson/net/ProtocolV1.cc index a5a813a81a6..d0c677df9a5 100644 --- a/src/crimson/net/ProtocolV1.cc +++ b/src/crimson/net/ProtocolV1.cc @@ -131,6 +131,11 @@ ProtocolV1::ProtocolV1(Dispatcher& dispatcher, ProtocolV1::~ProtocolV1() {} +bool ProtocolV1::is_connected() const +{ + return state == state_t::open; +} + // connecting state void ProtocolV1::reset_session() @@ -368,12 +373,7 @@ void ProtocolV1::start_connect(const entity_addr_t& _peer_addr, return repeat_connect(); }); }).then([this] { - // notify the dispatcher and allow them to reject the connection - return dispatcher.ms_handle_connect( - seastar::static_pointer_cast( - conn.shared_from_this())); - }).then([this] { - execute_open(); + execute_open(open_t::connected); }).handle_exception([this] (std::exception_ptr eptr) { // TODO: handle fault in the connecting state logger().warn("{} connecting fault: {}", conn, eptr); @@ -656,16 +656,12 @@ void ProtocolV1::start_accept(SocketRef&& sock, return seastar::repeat([this] { return repeat_handle_connect(); }); - }).then([this] { - // notify the dispatcher and allow them to reject the connection - return dispatcher.ms_handle_accept( - seastar::static_pointer_cast(conn.shared_from_this())); }).then([this] { messenger.register_conn( seastar::static_pointer_cast(conn.shared_from_this())); messenger.unaccept_conn( seastar::static_pointer_cast(conn.shared_from_this())); - execute_open(); + execute_open(open_t::accepted); }).handle_exception([this] (std::exception_ptr eptr) { // TODO: handle fault in the accepting state logger().warn("{} accepting fault: {}", conn, eptr); @@ -890,12 +886,24 @@ seastar::future<> ProtocolV1::handle_tags() }); } -void ProtocolV1::execute_open() +void ProtocolV1::execute_open(open_t type) { logger().trace("{} trigger open, was {}", conn, static_cast(state)); state = state_t::open; set_write_state(write_state_t::open); + if (type == open_t::connected) { + gated_dispatch("ms_handle_connect", [this] { + return dispatcher.ms_handle_connect( + seastar::static_pointer_cast(conn.shared_from_this())); + }); + } else { // type == open_t::accepted + gated_dispatch("ms_handle_accept", [this] { + return dispatcher.ms_handle_accept( + seastar::static_pointer_cast(conn.shared_from_this())); + }); + } + gated_dispatch("execute_open", [this] { // start background processing of tags return handle_tags() diff --git a/src/crimson/net/ProtocolV1.h b/src/crimson/net/ProtocolV1.h index 31a6ddc2eae..8278230f779 100644 --- a/src/crimson/net/ProtocolV1.h +++ b/src/crimson/net/ProtocolV1.h @@ -18,6 +18,8 @@ class ProtocolV1 final : public Protocol { ~ProtocolV1() override; private: + bool is_connected() const override; + void start_connect(const entity_addr_t& peer_addr, const entity_name_t& peer_name) override; @@ -110,7 +112,12 @@ class ProtocolV1 final : public Protocol { seastar::future<> maybe_throttle(); seastar::future<> read_message(); seastar::future<> handle_tags(); - void execute_open(); + + enum class open_t { + connected, + accepted + }; + void execute_open(open_t type); // replacing // the number of connections initiated in this session, increment when a diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index e823ece20e5..3f590e66b0d 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -153,6 +153,12 @@ ProtocolV2::ProtocolV2(Dispatcher& dispatcher, ProtocolV2::~ProtocolV2() {} +bool ProtocolV2::is_connected() const { + return state == state_t::READY || + state == state_t::ESTABLISHING || + state == state_t::REPLACING; +} + void ProtocolV2::start_connect(const entity_addr_t& _peer_addr, const entity_name_t& _peer_name) { @@ -937,17 +943,13 @@ void ProtocolV2::execute_connecting() } switch (next) { case next_step_t::ready: { - gated_dispatch("ms_handle_connect", [this] { - return dispatcher.ms_handle_connect( - seastar::static_pointer_cast(conn.shared_from_this())); - }); logger().info("{} connected:" " gs={}, pgs={}, cs={}, client_cookie={}," " server_cookie={}, in_seq={}, out_seq={}, out_q={}", conn, global_seq, peer_global_seq, connect_seq, client_cookie, server_cookie, conn.in_seq, conn.out_seq, conn.out_q.size()); - execute_ready(); + execute_ready(true); break; } case next_step_t::wait: { @@ -1669,7 +1671,7 @@ void ProtocolV2::execute_establishing( conn, global_seq, peer_global_seq, connect_seq, client_cookie, server_cookie, conn.in_seq, conn.out_seq, conn.out_q.size()); - execute_ready(); + execute_ready(false); }).handle_exception([this] (std::exception_ptr eptr) { if (state != state_t::ESTABLISHING) { logger().info("{} execute_establishing() protocol aborted at {} -- {}", @@ -1822,7 +1824,7 @@ void ProtocolV2::trigger_replacing(bool reconnect, conn, reconnect ? "reconnected" : "connected", global_seq, peer_global_seq, connect_seq, client_cookie, server_cookie, conn.in_seq, conn.out_seq, conn.out_q.size()); - execute_ready(); + execute_ready(false); }).handle_exception([this] (std::exception_ptr eptr) { if (state != state_t::REPLACING) { logger().info("{} trigger_replacing(): protocol aborted at {} -- {}", @@ -1985,10 +1987,16 @@ seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp) }); } -void ProtocolV2::execute_ready() +void ProtocolV2::execute_ready(bool dispatch_connect) { assert(conn.policy.lossy || (client_cookie != 0 && server_cookie != 0)); trigger_state(state_t::READY, write_state_t::open, false); + if (dispatch_connect) { + gated_dispatch("ms_handle_connect", [this] { + return dispatcher.ms_handle_connect( + seastar::static_pointer_cast(conn.shared_from_this())); + }); + } #ifdef UNIT_TESTS_BUILT if (conn.interceptor) { conn.interceptor->register_conn_ready(conn); diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h index 5d3a6742181..53d27a3603f 100644 --- a/src/crimson/net/ProtocolV2.h +++ b/src/crimson/net/ProtocolV2.h @@ -19,6 +19,8 @@ class ProtocolV2 final : public Protocol { ~ProtocolV2() override; private: + bool is_connected() const override; + void start_connect(const entity_addr_t& peer_addr, const entity_name_t& peer_name) override; @@ -204,7 +206,7 @@ class ProtocolV2 final : public Protocol { // READY seastar::future<> read_message(utime_t throttle_stamp); - void execute_ready(); + void execute_ready(bool dispatch_connect); // STANDBY void execute_standby(); -- 2.39.5