From: Yingxin Cheng Date: Thu, 12 Mar 2020 04:45:38 +0000 (+0800) Subject: crimson/net: close() with ms_handle_reset() X-Git-Tag: v16.0.0~8^2~9 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=22fe7ccc0b8790bbbf12c6c4d3c9ec5f1b28ebff;p=ceph.git crimson/net: close() with ms_handle_reset() * ms_handle_reset() should not be able to contaminate the internal atomic messenger status, so make it an asynchronous event along with close(); * add is_closed_clean() for messenger unit test, because the reset event now happens after connection closed. Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/net/Connection.h b/src/crimson/net/Connection.h index d43d61b699e6..3beb8f42bb56 100644 --- a/src/crimson/net/Connection.h +++ b/src/crimson/net/Connection.h @@ -95,6 +95,8 @@ class Connection : public seastar::enable_shared_from_this { #ifdef UNIT_TESTS_BUILT virtual bool is_closed() const = 0; + virtual bool is_closed_clean() const = 0; + virtual bool peer_wins() const = 0; #endif diff --git a/src/crimson/net/Protocol.cc b/src/crimson/net/Protocol.cc index bf2633c1c221..9c51c900342f 100644 --- a/src/crimson/net/Protocol.cc +++ b/src/crimson/net/Protocol.cc @@ -7,6 +7,7 @@ #include "crimson/common/log.h" #include "crimson/net/Errors.h" +#include "crimson/net/Dispatcher.h" #include "crimson/net/Socket.h" #include "crimson/net/SocketConnection.h" #include "msg/Message.h" @@ -39,37 +40,60 @@ bool Protocol::is_connected() const return write_state == write_state_t::open; } -seastar::future<> Protocol::close() +void Protocol::close(bool dispatch_reset) { if (closed) { // already closing assert(close_ready.valid()); - return close_ready.get_future(); + return; } // unregister_conn() drops a reference, so hold another until completion auto cleanup = [conn_ref = conn.shared_from_this(), this] { logger().debug("{} closed!", conn); +#ifdef UNIT_TESTS_BUILT + is_closed_clean = true; + if (conn.interceptor) { + conn.interceptor->register_conn_closed(conn); + } +#endif }; - trigger_close(); - // close_ready become valid only after state is state_t::closing assert(!close_ready.valid()); + // atomic operations + trigger_close(); if (socket) { socket->shutdown(); - close_ready = pending_dispatch.close().finally([this] { - return socket->close(); - }).finally(std::move(cleanup)); - } else { - close_ready = pending_dispatch.close().finally(std::move(cleanup)); } - closed = true; set_write_state(write_state_t::drop); + auto gate_closed = pending_dispatch.close(); - return close_ready.get_future(); + // asynchronous operations + close_ready = seastar::when_all_succeed( + std::move(gate_closed).finally([this] { + if (socket) { + return socket->close(); + } + return seastar::now(); + }), + [this, dispatch_reset] { + if (dispatch_reset) { + // force ms_handle_reset() to be an asynchronous task to prevent + // internal state contamination. + return seastar::sleep(0s).then([this] { + return dispatcher.ms_handle_reset( + seastar::static_pointer_cast(conn.shared_from_this())); + }).handle_exception([this] (std::exception_ptr eptr) { + logger().error("{} ms_handle_reset caught exception: {}", conn, eptr); + ceph_abort("unexpected exception from ms_handle_reset()"); + }); + } + return seastar::now(); + } + ).finally(std::move(cleanup)); } seastar::future<> Protocol::send(MessageRef msg) diff --git a/src/crimson/net/Protocol.h b/src/crimson/net/Protocol.h index 4df2549c37c9..df9a12aa45e7 100644 --- a/src/crimson/net/Protocol.h +++ b/src/crimson/net/Protocol.h @@ -24,10 +24,17 @@ class Protocol { bool is_connected() const; +#ifdef UNIT_TESTS_BUILT + bool is_closed_clean = false; bool is_closed() const { return closed; } +#endif // Reentrant closing - seastar::future<> close(); + void close(bool dispatch_reset); + seastar::future<> close_clean(bool dispatch_reset) { + close(dispatch_reset); + return close_ready.get_future(); + } virtual void start_connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type) = 0; diff --git a/src/crimson/net/ProtocolV1.cc b/src/crimson/net/ProtocolV1.cc index 3eac0d2c9243..becb7d681256 100644 --- a/src/crimson/net/ProtocolV1.cc +++ b/src/crimson/net/ProtocolV1.cc @@ -377,7 +377,7 @@ void ProtocolV1::start_connect(const entity_addr_t& _peer_addr, }).handle_exception([this] (std::exception_ptr eptr) { // TODO: handle fault in the connecting state logger().warn("{} connecting fault: {}", conn, eptr); - (void) close(); + close(false); }); }); } @@ -663,7 +663,7 @@ void ProtocolV1::start_accept(SocketRef&& sock, }).handle_exception([this] (std::exception_ptr eptr) { // TODO: handle fault in the accepting state logger().warn("{} accepting fault: {}", conn, eptr); - (void) close(); + close(false); }); }); } @@ -901,16 +901,13 @@ void ProtocolV1::execute_open() logger().warn("{} open fault: {}", conn, e); if (e.code() == error::protocol_aborted || e.code() == std::errc::connection_reset) { - return dispatcher.ms_handle_reset( - seastar::static_pointer_cast(conn.shared_from_this())) - .then([this] { - (void) close(); - }); + close(true); + return seastar::now(); } else if (e.code() == error::read_eof) { return dispatcher.ms_handle_remote_reset( seastar::static_pointer_cast(conn.shared_from_this())) .then([this] { - (void) close(); + close(false); }); } else { throw e; @@ -918,7 +915,7 @@ void ProtocolV1::execute_open() }).handle_exception([this] (std::exception_ptr eptr) { // TODO: handle fault in the open state logger().warn("{} open fault: {}", conn, eptr); - (void) close(); + close(false); }); }); } diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index befbaa5862e4..8a42e89d01b2 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -62,8 +62,8 @@ void abort_protocol() { throw std::system_error(make_error_code(crimson::net::error::protocol_aborted)); } -void abort_in_close(crimson::net::ProtocolV2& proto) { - (void) proto.close(); +void abort_in_close(crimson::net::ProtocolV2& proto, bool dispatch_reset) { + proto.close(dispatch_reset); abort_protocol(); } @@ -413,8 +413,7 @@ void ProtocolV2::fault(bool backoff, const char* func_name, std::exception_ptr e if (conn.policy.lossy) { logger().info("{} {}: fault at {} on lossy channel, going to CLOSING -- {}", conn, func_name, get_state_name(state), eptr); - dispatch_reset(); - (void) close(); + close(true); } else if (conn.policy.server || (conn.policy.standby && (!is_queued() && conn.sent.empty()))) { @@ -432,17 +431,6 @@ void ProtocolV2::fault(bool backoff, const char* func_name, std::exception_ptr e } } -void ProtocolV2::dispatch_reset() -{ - (void) seastar::with_gate(pending_dispatch, [this] { - return dispatcher.ms_handle_reset( - seastar::static_pointer_cast(conn.shared_from_this())); - }).handle_exception([this] (std::exception_ptr eptr) { - logger().error("{} ms_handle_reset caught exception: {}", conn, eptr); - ceph_abort("unexpected exception from ms_handle_reset()"); - }); -} - void ProtocolV2::reset_session(bool full) { server_cookie = 0; @@ -538,13 +526,13 @@ seastar::future ProtocolV2::banner_exchange() logger().error("{} peer does not support all required features" " required={} peer_supported={}", conn, required_features, peer_supported_features); - abort_in_close(*this); + abort_in_close(*this, false); } if ((supported_features & peer_required_features) != peer_required_features) { logger().error("{} we do not support all peer required features" " peer_required={} supported={}", conn, peer_required_features, supported_features); - abort_in_close(*this); + abort_in_close(*this, false); } this->peer_required_features = peer_required_features; if (this->peer_required_features == 0) { @@ -668,8 +656,7 @@ seastar::future<> ProtocolV2::client_auth(std::vector &allowed_methods }); } catch (const crimson::auth::error& e) { logger().error("{} get_initial_auth_request returned {}", conn, e); - dispatch_reset(); - abort_in_close(*this); + abort_in_close(*this, true); return seastar::now(); } } @@ -915,8 +902,7 @@ void ProtocolV2::execute_connecting() logger().warn("{} connection peer type does not match what peer advertises {} != {}", conn, ceph_entity_type_name(conn.get_peer_type()), ceph_entity_type_name(_peer_type)); - dispatch_reset(); - abort_in_close(*this); + abort_in_close(*this, true); } conn.set_ephemeral_port(_my_addr_from_peer.get_port(), SocketConnection::side_t::connector); @@ -1128,7 +1114,7 @@ ProtocolV2::reuse_connection( // close this connection because all the necessary information is delivered // to the exisiting connection, and jump to error handling code to abort the // current state. - abort_in_close(*this); + abort_in_close(*this, false); return seastar::make_ready_future(next_step_t::none); } @@ -1172,8 +1158,7 @@ ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn) logger().warn("{} server_connect:" " existing connection {} is a lossy channel. Close existing in favor of" " this connection", conn, *existing_conn); - existing_proto->dispatch_reset(); - (void) existing_proto->close(); + existing_proto->close(true); if (unlikely(state != state_t::ACCEPTING)) { logger().debug("{} triggered {} in execute_accepting()", @@ -1573,7 +1558,7 @@ void ProtocolV2::execute_accepting() }).handle_exception([this] (std::exception_ptr eptr) { logger().info("{} execute_accepting(): fault at {}, going to CLOSING -- {}", conn, get_state_name(state), eptr); - (void) close(); + close(false); }); }); } @@ -2116,7 +2101,7 @@ void ProtocolV2::execute_server_wait() }).handle_exception([this] (std::exception_ptr eptr) { logger().info("{} execute_server_wait(): fault at {}, going to CLOSING -- {}", conn, get_state_name(state), eptr); - (void) close(); + close(false); }); }); } @@ -2141,11 +2126,6 @@ void ProtocolV2::trigger_close() protocol_timer.cancel(); trigger_state(state_t::CLOSING, write_state_t::drop, false); -#ifdef UNIT_TESTS_BUILT - if (conn.interceptor) { - conn.interceptor->register_conn_closed(conn); - } -#endif } } // namespace crimson::net diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h index 0e8f2ff90e82..54db2722d752 100644 --- a/src/crimson/net/ProtocolV2.h +++ b/src/crimson/net/ProtocolV2.h @@ -124,7 +124,6 @@ class ProtocolV2 final : public Protocol { private: void fault(bool backoff, const char* func_name, std::exception_ptr eptr); - void dispatch_reset(); void reset_session(bool full); seastar::future banner_exchange(); diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index 4a73034e9226..5bf2c30c4a86 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -63,6 +63,12 @@ bool SocketConnection::is_closed() const return protocol->is_closed(); } +bool SocketConnection::is_closed_clean() const +{ + assert(seastar::engine().cpu_id() == shard_id()); + return protocol->is_closed_clean; +} + #endif bool SocketConnection::peer_wins() const { @@ -84,7 +90,7 @@ seastar::future<> SocketConnection::keepalive() seastar::future<> SocketConnection::close() { assert(seastar::engine().cpu_id() == shard_id()); - return protocol->close(); + return protocol->close_clean(false); } bool SocketConnection::update_rx_seq(seq_num_t seq) diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index 503d4e55fb04..de814c9418b4 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -77,6 +77,8 @@ class SocketConnection : public Connection { bool is_connected() const override; #ifdef UNIT_TESTS_BUILT + bool is_closed_clean() const override; + bool is_closed() const override; bool peer_wins() const override; diff --git a/src/test/crimson/test_messenger.cc b/src/test/crimson/test_messenger.cc index 68d6da744dd0..cc8e96856075 100644 --- a/src/test/crimson/test_messenger.cc +++ b/src/test/crimson/test_messenger.cc @@ -932,7 +932,7 @@ class FailoverSuite : public Dispatcher { unsigned pending_establish = 0; unsigned replaced_conns = 0; for (auto& result : interceptor.results) { - if (result.conn->is_closed()) { + if (result.conn->is_closed_clean()) { if (result.state == conn_state_t::replaced) { ++replaced_conns; }