From 9cb4832410dde9a14cd2a2d45a6fcabdcf1903ae Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Thu, 12 Mar 2020 15:59:53 +0800 Subject: [PATCH] crimson/net: enforce strict reset/accept order When a new connection tries to replace the old one, the event order should be like: 1. reset(old); 2. accept(new); This means we cannot just reschedule the reset event asynchronously. And we still need to make sure the internal state is integral when reset. Signed-off-by: Yingxin Cheng --- src/crimson/net/Protocol.cc | 31 +++++++++--------- src/crimson/net/Protocol.h | 2 +- src/crimson/net/ProtocolV2.cc | 50 +++++++++++++++++------------- src/crimson/net/ProtocolV2.h | 2 +- src/test/crimson/test_messenger.cc | 2 ++ 5 files changed, 48 insertions(+), 39 deletions(-) diff --git a/src/crimson/net/Protocol.cc b/src/crimson/net/Protocol.cc index 9c51c900342..2183543efb4 100644 --- a/src/crimson/net/Protocol.cc +++ b/src/crimson/net/Protocol.cc @@ -40,7 +40,8 @@ bool Protocol::is_connected() const return write_state == write_state_t::open; } -void Protocol::close(bool dispatch_reset) +void Protocol::close(bool dispatch_reset, + std::optional> f_accept_new) { if (closed) { // already closing @@ -64,12 +65,25 @@ void Protocol::close(bool dispatch_reset) // atomic operations trigger_close(); + if (f_accept_new) { + (*f_accept_new)(); + } if (socket) { socket->shutdown(); } closed = true; set_write_state(write_state_t::drop); auto gate_closed = pending_dispatch.close(); + auto reset_dispatched = seastar::futurize_apply([this, dispatch_reset] { + if (dispatch_reset) { + return dispatcher.ms_handle_reset( + seastar::static_pointer_cast(conn.shared_from_this())); + } + return seastar::now(); + }).handle_exception([this] (std::exception_ptr eptr) { + logger().error("{} ms_handle_reset caught exception: {}", conn, eptr); + ceph_abort("unexpected exception from ms_handle_reset()"); + }); // asynchronous operations close_ready = seastar::when_all_succeed( @@ -79,20 +93,7 @@ void Protocol::close(bool dispatch_reset) } return seastar::now(); }), - [this, dispatch_reset] { - if (dispatch_reset) { - // force ms_handle_reset() to be an asynchronous task to prevent - // internal state contamination. - return seastar::sleep(0s).then([this] { - return dispatcher.ms_handle_reset( - seastar::static_pointer_cast(conn.shared_from_this())); - }).handle_exception([this] (std::exception_ptr eptr) { - logger().error("{} ms_handle_reset caught exception: {}", conn, eptr); - ceph_abort("unexpected exception from ms_handle_reset()"); - }); - } - return seastar::now(); - } + std::move(reset_dispatched) ).finally(std::move(cleanup)); } diff --git a/src/crimson/net/Protocol.h b/src/crimson/net/Protocol.h index df9a12aa45e..bb73746e12b 100644 --- a/src/crimson/net/Protocol.h +++ b/src/crimson/net/Protocol.h @@ -30,7 +30,7 @@ class Protocol { #endif // Reentrant closing - void close(bool dispatch_reset); + void close(bool dispatch_reset, std::optional> f_accept_new=std::nullopt); seastar::future<> close_clean(bool dispatch_reset) { close(dispatch_reset); return close_ready.get_future(); diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index 5169f78b50c..c5732554221 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -1158,14 +1158,7 @@ ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn) logger().warn("{} server_connect:" " existing connection {} is a lossy channel. Close existing in favor of" " this connection", conn, *existing_conn); - existing_proto->close(true); - - if (unlikely(state != state_t::ACCEPTING)) { - logger().debug("{} triggered {} in execute_accepting()", - conn, get_state_name(state)); - abort_protocol(); - } - execute_establishing(); + execute_establishing(existing_conn, true); return seastar::make_ready_future(next_step_t::ready); } @@ -1289,6 +1282,7 @@ ProtocolV2::server_connect() SocketConnectionRef existing_conn = messenger.lookup_conn(conn.peer_addr); + bool dispatch_reset = true; if (existing_conn) { if (existing_conn->protocol->proto_type != proto_t::v2) { logger().warn("{} existing connection {} proto version is {}, close existing", @@ -1296,18 +1290,13 @@ ProtocolV2::server_connect() static_cast(existing_conn->protocol->proto_type)); // should unregister the existing from msgr atomically // NOTE: this is following async messenger logic, but we may miss the reset event. - (void) existing_conn->close(); + dispatch_reset = false; } else { return handle_existing_connection(existing_conn); } } - if (unlikely(state != state_t::ACCEPTING)) { - logger().debug("{} triggered {} in execute_accepting()", - conn, get_state_name(state)); - abort_protocol(); - } - execute_establishing(); + execute_establishing(existing_conn, dispatch_reset); return seastar::make_ready_future(next_step_t::ready); }); } @@ -1602,8 +1591,30 @@ seastar::future<> ProtocolV2::finish_auth() // ESTABLISHING -void ProtocolV2::execute_establishing() { +void ProtocolV2::execute_establishing( + SocketConnectionRef existing_conn, bool dispatch_reset) { + if (unlikely(state != state_t::ACCEPTING)) { + logger().debug("{} triggered {} before execute_establishing()", + conn, get_state_name(state)); + abort_protocol(); + } + + auto accept_me = [this] { + messenger.register_conn( + seastar::static_pointer_cast( + conn.shared_from_this())); + messenger.unaccept_conn( + seastar::static_pointer_cast( + conn.shared_from_this())); + }; + trigger_state(state_t::ESTABLISHING, write_state_t::delay, false); + if (existing_conn) { + existing_conn->protocol->close(dispatch_reset, std::move(accept_me)); + } else { + accept_me(); + } + (void) seastar::with_gate(pending_dispatch, [this] { return dispatcher.ms_handle_accept( seastar::static_pointer_cast(conn.shared_from_this())); @@ -1611,12 +1622,7 @@ void ProtocolV2::execute_establishing() { logger().error("{} ms_handle_accept caught exception: {}", conn, eptr); ceph_abort("unexpected exception from ms_handle_accept()"); }); - messenger.register_conn( - seastar::static_pointer_cast( - conn.shared_from_this())); - messenger.unaccept_conn( - seastar::static_pointer_cast( - conn.shared_from_this())); + execution_done = seastar::with_gate(pending_dispatch, [this] { return seastar::futurize_apply([this] { return send_server_ident(); diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h index 7a64d410942..4aa7b276070 100644 --- a/src/crimson/net/ProtocolV2.h +++ b/src/crimson/net/ProtocolV2.h @@ -173,7 +173,7 @@ class ProtocolV2 final : public Protocol { seastar::future<> finish_auth(); // ESTABLISHING - void execute_establishing(); + void execute_establishing(SocketConnectionRef existing_conn, bool dispatch_reset); // ESTABLISHING/REPLACING (server) seastar::future<> send_server_ident(); diff --git a/src/test/crimson/test_messenger.cc b/src/test/crimson/test_messenger.cc index cc8e9685607..1f30f947e33 100644 --- a/src/test/crimson/test_messenger.cc +++ b/src/test/crimson/test_messenger.cc @@ -1375,6 +1375,7 @@ class FailoverSuitePeer : public Dispatcher { } seastar::future<> ms_handle_accept(ConnectionRef conn) override { + logger().info("[TestPeer] got accept from Test"); ceph_assert(!tracked_conn || tracked_conn->is_closed() || tracked_conn == conn); @@ -1383,6 +1384,7 @@ class FailoverSuitePeer : public Dispatcher { } seastar::future<> ms_handle_reset(ConnectionRef conn) override { + logger().info("[TestPeer] got reset from Test"); ceph_assert(tracked_conn == conn); tracked_conn = nullptr; return seastar::now(); -- 2.39.5