From: Yingxin Cheng Date: Thu, 3 Nov 2022 05:45:47 +0000 (+0800) Subject: crimson/net: move close logic from Protocol to ProtocolV2 X-Git-Tag: v18.1.0~375^2~30 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=275214692eaa947f77da703ac782ab46660c10fc;p=ceph-ci.git crimson/net: move close logic from Protocol to ProtocolV2 Protocol class will be removed. Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/net/Protocol.cc b/src/crimson/net/Protocol.cc index 27fd98f54ae..e38590a1aec 100644 --- a/src/crimson/net/Protocol.cc +++ b/src/crimson/net/Protocol.cc @@ -31,61 +31,6 @@ Protocol::~Protocol() assert(!out_exit_dispatching); } -void Protocol::close(bool dispatch_reset, - std::optional> f_accept_new) -{ - if (closed) { - // already closing - return; - } - - bool is_replace = f_accept_new ? true : false; - logger().info("{} closing: reset {}, replace {}", conn, - dispatch_reset ? "yes" : "no", - is_replace ? "yes" : "no"); - - // atomic operations - closed = true; - trigger_close(); - if (f_accept_new) { - (*f_accept_new)(); - } - if (conn.socket) { - conn.socket->shutdown(); - } - set_out_state(out_state_t::drop); - assert(!gate.is_closed()); - auto gate_closed = gate.close(); - - if (dispatch_reset) { - dispatchers.ms_handle_reset( - seastar::static_pointer_cast(conn.shared_from_this()), - is_replace); - } - - // asynchronous operations - assert(!close_ready.valid()); - close_ready = std::move(gate_closed).then([this] { - if (conn.socket) { - return conn.socket->close(); - } else { - return seastar::now(); - } - }).then([this] { - logger().debug("{} closed!", conn); - on_closed(); -#ifdef UNIT_TESTS_BUILT - is_closed_clean = true; - if (conn.interceptor) { - conn.interceptor->register_conn_closed(conn); - } -#endif - }).handle_exception([conn_ref = conn.shared_from_this(), this] (auto eptr) { - logger().error("{} closing: close_ready got unexpected exception {}", conn, eptr); - ceph_abort(); - }); -} - ceph::bufferlist Protocol::sweep_out_pending_msgs_to_sent( size_t num_msgs, bool require_keepalive, diff --git a/src/crimson/net/Protocol.h b/src/crimson/net/Protocol.h index c71b37f07c4..5260f05bae7 100644 --- a/src/crimson/net/Protocol.h +++ b/src/crimson/net/Protocol.h @@ -15,35 +15,23 @@ namespace crimson::net { class Protocol { +// public to SocketConnection public: Protocol(Protocol&&) = delete; virtual ~Protocol(); virtual bool is_connected() const = 0; + virtual void close() = 0; + + virtual seastar::future<> close_clean_yielded() = 0; + #ifdef UNIT_TESTS_BUILT - bool is_closed_clean = false; - bool is_closed() const { return closed; } -#endif + virtual bool is_closed_clean() const = 0; - // Reentrant closing - void close(bool dispatch_reset, std::optional> f_accept_new=std::nullopt); - seastar::future<> close_clean(bool dispatch_reset) { - // yield() so that close(dispatch_reset) can be called *after* - // close_clean() is applied to all connections in a container using - // seastar::parallel_for_each(). otherwise, we could erase a connection in - // the container when seastar::parallel_for_each() is still iterating in - // it. that'd lead to a segfault. - return seastar::yield( - ).then([this, dispatch_reset, conn_ref = conn.shared_from_this()] { - close(dispatch_reset); - // it can happen if close_clean() is called inside Dispatcher::ms_handle_reset() - // which will otherwise result in deadlock - assert(close_ready.valid()); - return close_ready.get_future(); - }); - } + virtual bool is_closed() const = 0; +#endif virtual void start_connect(const entity_addr_t& peer_addr, const entity_name_t& peer_name) = 0; @@ -56,8 +44,6 @@ class Protocol { Protocol(ChainedDispatchers& dispatchers, SocketConnection& conn); - virtual void trigger_close() = 0; - virtual ceph::bufferlist do_sweep_messages( const std::deque& msgs, size_t num_msgs, @@ -67,13 +53,6 @@ class Protocol { virtual void notify_out() = 0; - virtual void on_closed() = 0; - - private: - bool closed = false; - // become valid only after closed == true - seastar::shared_future<> close_ready; - // the write state-machine public: using clock_t = seastar::lowres_system_clock; diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index 0c911afdc27..794698b96e1 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -68,9 +68,9 @@ seastar::logger& logger() { throw std::system_error(make_error_code(crimson::net::error::protocol_aborted)); } -[[noreturn]] void abort_in_close(crimson::net::ProtocolV2& proto, bool dispatch_reset) { - proto.close(dispatch_reset); - abort_protocol(); +#define ABORT_IN_CLOSE(dispatch_reset) { \ + do_close(dispatch_reset); \ + abort_protocol(); \ } inline void expect_tag(const Tag& expected, @@ -377,7 +377,7 @@ void ProtocolV2::fault(bool backoff, const char* func_name, std::exception_ptr e if (conn.policy.lossy) { logger().info("{} {}: fault at {} on lossy channel, going to CLOSING -- {}", conn, func_name, get_state_name(state), eptr); - close(true); + do_close(true); } else if (conn.policy.server || (conn.policy.standby && !is_out_queued_or_sent())) { logger().info("{} {}: fault at {} with nothing to send, going to STANDBY -- {}", @@ -486,13 +486,13 @@ ProtocolV2::banner_exchange(bool is_connect) logger().error("{} peer does not support all required features" " required={} peer_supported={}", conn, required_features, _peer_supported_features); - abort_in_close(*this, is_connect); + ABORT_IN_CLOSE(is_connect); } if ((supported_features & _peer_required_features) != _peer_required_features) { logger().error("{} we do not support all peer required features" " peer_required={} supported={}", conn, _peer_required_features, supported_features); - abort_in_close(*this, is_connect); + ABORT_IN_CLOSE(is_connect); } peer_supported_features = _peer_supported_features; bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1); @@ -617,7 +617,7 @@ seastar::future<> ProtocolV2::client_auth(std::vector &allowed_methods }); } catch (const crimson::auth::error& e) { logger().error("{} get_initial_auth_request returned {}", conn, e.what()); - abort_in_close(*this, true); + ABORT_IN_CLOSE(true); return seastar::now(); } } @@ -715,7 +715,7 @@ ProtocolV2::client_connect() logger().error("{} connection peer id ({}) does not match " "what it should be ({}) during connecting, close", conn, server_ident.gid(), conn.get_peer_id()); - abort_in_close(*this, true); + ABORT_IN_CLOSE(true); } conn.set_peer_id(server_ident.gid()); conn.set_features(server_ident.supported_features() & @@ -865,7 +865,7 @@ void ProtocolV2::execute_connecting() logger().warn("{} connection peer type does not match what peer advertises {} != {}", conn, ceph_entity_type_name(conn.get_peer_type()), ceph_entity_type_name(_peer_type)); - abort_in_close(*this, true); + ABORT_IN_CLOSE(true); } if (unlikely(state != state_t::CONNECTING)) { logger().debug("{} triggered {} during banner_exchange(), abort", @@ -1090,7 +1090,7 @@ ProtocolV2::reuse_connection( // close this connection because all the necessary information is delivered // to the exisiting connection, and jump to error handling code to abort the // current state. - abort_in_close(*this, false); + ABORT_IN_CLOSE(false); return seastar::make_ready_future(next_step_t::none); } @@ -1520,7 +1520,7 @@ void ProtocolV2::execute_accepting() }).handle_exception([this] (std::exception_ptr eptr) { logger().info("{} execute_accepting(): fault at {}, going to CLOSING -- {}", conn, get_state_name(state), eptr); - close(false); + do_close(false); }); }); } @@ -1580,7 +1580,7 @@ void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) { trigger_state(state_t::ESTABLISHING, out_state_t::delay, false); if (existing_conn) { - existing_conn->protocol->close( + static_cast(existing_conn->protocol.get())->do_close( true /* dispatch_reset */, std::move(accept_me)); if (unlikely(state != state_t::ESTABLISHING)) { logger().warn("{} triggered {} during execute_establishing(), " @@ -2083,19 +2083,59 @@ void ProtocolV2::execute_server_wait() }).handle_exception([this] (std::exception_ptr eptr) { logger().info("{} execute_server_wait(): fault at {}, going to CLOSING -- {}", conn, get_state_name(state), eptr); - close(false); + do_close(false); }); }); } // CLOSING state -void ProtocolV2::trigger_close() +void ProtocolV2::close() { + do_close(false); +} + +seastar::future<> ProtocolV2::close_clean_yielded() +{ + // yield() so that do_close() can be called *after* close_clean_yielded() is + // applied to all connections in a container using + // seastar::parallel_for_each(). otherwise, we could erase a connection in + // the container when seastar::parallel_for_each() is still iterating in it. + // that'd lead to a segfault. + return seastar::yield( + ).then([this, conn_ref = conn.shared_from_this()] { + do_close(false); + // it can happen if close_clean() is called inside Dispatcher::ms_handle_reset() + // which will otherwise result in deadlock + assert(closed_clean_fut.valid()); + return closed_clean_fut.get_future(); + }); +} + +void ProtocolV2::do_close( + bool dispatch_reset, + std::optional> f_accept_new) +{ + if (closed) { + // already closing + return; + } + + bool is_replace = f_accept_new ? true : false; + logger().info("{} closing: reset {}, replace {}", conn, + dispatch_reset ? "yes" : "no", + is_replace ? "yes" : "no"); + + /* + * atomic operations + */ + + closed = true; + + // trigger close messenger.closing_conn( seastar::static_pointer_cast( conn.shared_from_this())); - if (state == state_t::ACCEPTING || state == state_t::SERVER_WAIT) { messenger.unaccept_conn( seastar::static_pointer_cast( @@ -2108,16 +2148,48 @@ void ProtocolV2::trigger_close() // cannot happen ceph_assert(false); } - protocol_timer.cancel(); trigger_state(state_t::CLOSING, out_state_t::drop, false); -} -void ProtocolV2::on_closed() -{ - messenger.closed_conn( - seastar::static_pointer_cast( - conn.shared_from_this())); + if (f_accept_new) { + (*f_accept_new)(); + } + if (conn.socket) { + conn.socket->shutdown(); + } + assert(!gate.is_closed()); + auto gate_closed = gate.close(); + + if (dispatch_reset) { + dispatchers.ms_handle_reset( + seastar::static_pointer_cast(conn.shared_from_this()), + is_replace); + } + + // asynchronous operations + assert(!closed_clean_fut.valid()); + closed_clean_fut = std::move(gate_closed).then([this] { + if (conn.socket) { + return conn.socket->close(); + } else { + return seastar::now(); + } + }).then([this] { + logger().debug("{} closed!", conn); + messenger.closed_conn( + seastar::static_pointer_cast( + conn.shared_from_this())); +#ifdef UNIT_TESTS_BUILT + closed_clean = true; + if (conn.interceptor) { + conn.interceptor->register_conn_closed(conn); + } +#endif + }).handle_exception([conn_ref = conn.shared_from_this(), this] (auto eptr) { + logger().error("{} closing: closed_clean_fut got unexpected exception {}", + conn, eptr); + ceph_abort(); + }); } void ProtocolV2::print_conn(std::ostream& out) const diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h index cfd5781ff04..40c32136b70 100644 --- a/src/crimson/net/ProtocolV2.h +++ b/src/crimson/net/ProtocolV2.h @@ -20,20 +20,34 @@ class ProtocolV2 final : public Protocol { SocketConnection& conn, SocketMessenger& messenger); ~ProtocolV2() override; - void print_conn(std::ostream&) const final; +// public to SocketConnection, but private to the others private: - void on_closed() override; bool is_connected() const override; + void close() override; + + seastar::future<> close_clean_yielded() override; + +#ifdef UNIT_TESTS_BUILT + bool is_closed_clean() const override { + return closed_clean; + } + + bool is_closed() const override { + return closed; + } + +#endif void start_connect(const entity_addr_t& peer_addr, const entity_name_t& peer_name) override; void start_accept(SocketRef&& socket, const entity_addr_t& peer_addr) override; - void trigger_close() override; + void print_conn(std::ostream&) const final; + private: ceph::bufferlist do_sweep_messages( const std::deque& msgs, size_t num_msgs, @@ -48,6 +62,15 @@ class ProtocolV2 final : public Protocol { AuthConnectionMetaRef auth_meta; + bool closed = false; + + // become valid only after closed == true + seastar::shared_future<> closed_clean_fut; + +#ifdef UNIT_TESTS_BUILT + bool closed_clean = false; + +#endif enum class state_t { NONE = 0, ACCEPTING, @@ -229,6 +252,11 @@ class ProtocolV2 final : public Protocol { // SERVER_WAIT void execute_server_wait(); + + // CLOSING + // reentrant + void do_close(bool dispatch_reset, + std::optional> f_accept_new=std::nullopt); }; } // namespace crimson::net diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index bd7259c6e7c..9f989e21c2f 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -57,7 +57,7 @@ bool SocketConnection::is_closed() const bool SocketConnection::is_closed_clean() const { assert(seastar::this_shard_id() == shard_id()); - return protocol->is_closed_clean; + return protocol->is_closed_clean(); } #endif @@ -104,7 +104,7 @@ void SocketConnection::set_last_keepalive_ack(clock_t::time_point when) void SocketConnection::mark_down() { assert(seastar::this_shard_id() == shard_id()); - protocol->close(false); + protocol->close(); } void @@ -122,9 +122,9 @@ SocketConnection::start_accept(SocketRef&& sock, } seastar::future<> -SocketConnection::close_clean(bool dispatch_reset) +SocketConnection::close_clean_yielded() { - return protocol->close_clean(dispatch_reset); + return protocol->close_clean_yielded(); } seastar::shard_id SocketConnection::shard_id() const { diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index ea18407e459..76703c76337 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -120,7 +120,7 @@ class SocketConnection : public Connection { void start_accept(SocketRef&& socket, const entity_addr_t& peer_addr); - seastar::future<> close_clean(bool dispatch_reset); + seastar::future<> close_clean_yielded(); seastar::socket_address get_local_address() const; diff --git a/src/crimson/net/SocketMessenger.cc b/src/crimson/net/SocketMessenger.cc index 6bfb0f341de..b5cb03932b8 100644 --- a/src/crimson/net/SocketMessenger.cc +++ b/src/crimson/net/SocketMessenger.cc @@ -262,16 +262,16 @@ seastar::future<> SocketMessenger::shutdown() // close all connections }).then([this] { return seastar::parallel_for_each(accepting_conns, [] (auto conn) { - return conn->close_clean(false); + return conn->close_clean_yielded(); }); }).then([this] { ceph_assert(accepting_conns.empty()); return seastar::parallel_for_each(connections, [] (auto conn) { - return conn.second->close_clean(false); + return conn.second->close_clean_yielded(); }); }).then([this] { return seastar::parallel_for_each(closing_conns, [] (auto conn) { - return conn->close_clean(false); + return conn->close_clean_yielded(); }); }).then([this] { ceph_assert(connections.empty());