From: Yingxin Cheng Date: Sun, 29 Mar 2020 11:17:52 +0000 (+0800) Subject: crimson/net: notify if the connection is to be replaced during reset X-Git-Tag: wip-pdonnell-testing-20200918.022351~1640^2~1 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=f8259a5e90fee7650df52db94ffa336bf94dc1d9;p=ceph-ci.git crimson/net: notify if the connection is to be replaced during reset is_replace=true means the reset connection is going to be replaced by another accepting connection with the same peer_addr, which currently only happens under lossy policy when both sides wish to connect to each other. Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/mgr/client.cc b/src/crimson/mgr/client.cc index 9134c260213..b6331191577 100644 --- a/src/crimson/mgr/client.cc +++ b/src/crimson/mgr/client.cc @@ -70,7 +70,7 @@ seastar::future<> Client::ms_handle_connect(crimson::net::ConnectionRef c) } } -seastar::future<> Client::ms_handle_reset(crimson::net::ConnectionRef c) +seastar::future<> Client::ms_handle_reset(crimson::net::ConnectionRef c, bool is_replace) { if (conn == c) { report_timer.cancel(); diff --git a/src/crimson/mgr/client.h b/src/crimson/mgr/client.h index 099a504f08f..3873ba6418b 100644 --- a/src/crimson/mgr/client.h +++ b/src/crimson/mgr/client.h @@ -39,7 +39,7 @@ public: private: seastar::future<> ms_dispatch(crimson::net::Connection* conn, Ref m) override; - seastar::future<> ms_handle_reset(crimson::net::ConnectionRef conn) final; + seastar::future<> ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) final; seastar::future<> ms_handle_connect(crimson::net::ConnectionRef conn) final; seastar::future<> handle_mgr_map(crimson::net::Connection* conn, Ref m); diff --git a/src/crimson/mon/MonClient.cc b/src/crimson/mon/MonClient.cc index 00180370137..ba4eccac353 100644 --- a/src/crimson/mon/MonClient.cc +++ b/src/crimson/mon/MonClient.cc @@ -541,7 +541,7 @@ Client::ms_dispatch(crimson::net::Connection* conn, MessageRef m) } } -seastar::future<> Client::ms_handle_reset(crimson::net::ConnectionRef conn) +seastar::future<> Client::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) { auto found = std::find_if(pending_conns.begin(), pending_conns.end(), [peer_addr = conn->get_peer_addr()](auto& mc) { diff --git a/src/crimson/mon/MonClient.h b/src/crimson/mon/MonClient.h index 1e764c9b98d..d834f159b1c 100644 --- a/src/crimson/mon/MonClient.h +++ b/src/crimson/mon/MonClient.h @@ -141,7 +141,7 @@ private: seastar::future<> ms_dispatch(crimson::net::Connection* conn, MessageRef m) override; - seastar::future<> ms_handle_reset(crimson::net::ConnectionRef conn) override; + seastar::future<> ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override; seastar::future<> handle_monmap(crimson::net::Connection* conn, Ref m); diff --git a/src/crimson/net/Dispatcher.h b/src/crimson/net/Dispatcher.h index ac608fc4315..fa7d9f91392 100644 --- a/src/crimson/net/Dispatcher.h +++ b/src/crimson/net/Dispatcher.h @@ -39,7 +39,11 @@ class Dispatcher { return seastar::make_ready_future<>(); } - virtual seastar::future<> ms_handle_reset(ConnectionRef conn) { + // a reset event is dispatched when the connection is closed unexpectedly. + // is_replace=true means the reset connection is going to be replaced by + // another accepting connection with the same peer_addr, which currently only + // happens under lossy policy when both sides wish to connect to each other. + virtual seastar::future<> ms_handle_reset(ConnectionRef conn, bool is_replace) { return seastar::make_ready_future<>(); } diff --git a/src/crimson/net/Protocol.cc b/src/crimson/net/Protocol.cc index 3d01f26e50d..c95cf1d08b5 100644 --- a/src/crimson/net/Protocol.cc +++ b/src/crimson/net/Protocol.cc @@ -48,9 +48,10 @@ void Protocol::close(bool dispatch_reset, return; } + bool is_replace = f_accept_new ? true : false; logger().info("{} closing: reset {}, replace {}", conn, dispatch_reset ? "yes" : "no", - f_accept_new ? "yes" : "no"); + is_replace ? "yes" : "no"); // unregister_conn() drops a reference, so hold another until completion auto cleanup = [conn_ref = conn.shared_from_this(), this] { @@ -74,10 +75,11 @@ void Protocol::close(bool dispatch_reset, } set_write_state(write_state_t::drop); auto gate_closed = pending_dispatch.close(); - auto reset_dispatched = seastar::futurize_apply([this, dispatch_reset] { + auto reset_dispatched = seastar::futurize_apply([this, dispatch_reset, is_replace] { if (dispatch_reset) { return dispatcher.ms_handle_reset( - seastar::static_pointer_cast(conn.shared_from_this())); + seastar::static_pointer_cast(conn.shared_from_this()), + is_replace); } return seastar::now(); }).handle_exception([this] (std::exception_ptr eptr) { diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index 7f5ff573b82..e823ece20e5 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -1311,7 +1311,6 @@ ProtocolV2::server_connect() SocketConnectionRef existing_conn = messenger.lookup_conn(conn.peer_addr); - bool dispatch_reset = true; if (existing_conn) { if (existing_conn->protocol->proto_type != proto_t::v2) { logger().warn("{} existing connection {} proto version is {}, close existing", @@ -1319,14 +1318,15 @@ ProtocolV2::server_connect() static_cast(existing_conn->protocol->proto_type)); // should unregister the existing from msgr atomically // NOTE: this is following async messenger logic, but we may miss the reset event. - dispatch_reset = false; + execute_establishing(existing_conn, false); + return seastar::make_ready_future(next_step_t::ready); } else { return handle_existing_connection(existing_conn); } + } else { + execute_establishing(nullptr, true); + return seastar::make_ready_future(next_step_t::ready); } - - execute_establishing(existing_conn, dispatch_reset); - return seastar::make_ready_future(next_step_t::ready); }); } diff --git a/src/crimson/osd/chained_dispatchers.cc b/src/crimson/osd/chained_dispatchers.cc index 1bc40deb2a5..3d6ba846b75 100644 --- a/src/crimson/osd/chained_dispatchers.cc +++ b/src/crimson/osd/chained_dispatchers.cc @@ -26,9 +26,9 @@ ChainedDispatchers::ms_handle_connect(crimson::net::ConnectionRef conn) { } seastar::future<> -ChainedDispatchers::ms_handle_reset(crimson::net::ConnectionRef conn) { - return seastar::do_for_each(dispatchers, [conn](Dispatcher* dispatcher) { - return dispatcher->ms_handle_reset(conn); +ChainedDispatchers::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) { + return seastar::do_for_each(dispatchers, [conn, is_replace](Dispatcher* dispatcher) { + return dispatcher->ms_handle_reset(conn, is_replace); }); } diff --git a/src/crimson/osd/chained_dispatchers.h b/src/crimson/osd/chained_dispatchers.h index 1748e03adac..2ea1e517b78 100644 --- a/src/crimson/osd/chained_dispatchers.h +++ b/src/crimson/osd/chained_dispatchers.h @@ -25,6 +25,6 @@ public: seastar::future<> ms_dispatch(crimson::net::Connection* conn, MessageRef m) override; seastar::future<> ms_handle_accept(crimson::net::ConnectionRef conn) override; seastar::future<> ms_handle_connect(crimson::net::ConnectionRef conn) override; - seastar::future<> ms_handle_reset(crimson::net::ConnectionRef conn) override; + seastar::future<> ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override; seastar::future<> ms_handle_remote_reset(crimson::net::ConnectionRef conn) override; }; diff --git a/src/crimson/osd/heartbeat.cc b/src/crimson/osd/heartbeat.cc index 6f60654854c..67e31267322 100644 --- a/src/crimson/osd/heartbeat.cc +++ b/src/crimson/osd/heartbeat.cc @@ -195,7 +195,7 @@ seastar::future<> Heartbeat::ms_dispatch(crimson::net::Connection* conn, } } -seastar::future<> Heartbeat::ms_handle_reset(crimson::net::ConnectionRef conn) +seastar::future<> Heartbeat::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) { auto found = std::find_if(peers.begin(), peers.end(), [conn](const peers_map_t::value_type& peer) { diff --git a/src/crimson/osd/heartbeat.h b/src/crimson/osd/heartbeat.h index a0e6146cd47..55571cff2fb 100644 --- a/src/crimson/osd/heartbeat.h +++ b/src/crimson/osd/heartbeat.h @@ -46,7 +46,7 @@ public: // Dispatcher methods seastar::future<> ms_dispatch(crimson::net::Connection* conn, MessageRef m) override; - seastar::future<> ms_handle_reset(crimson::net::ConnectionRef conn) override; + seastar::future<> ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override; private: seastar::future<> handle_osd_ping(crimson::net::Connection* conn, diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc index 01f93853577..d48d98c2c19 100644 --- a/src/crimson/osd/osd.cc +++ b/src/crimson/osd/osd.cc @@ -614,7 +614,7 @@ seastar::future<> OSD::ms_handle_connect(crimson::net::ConnectionRef conn) } } -seastar::future<> OSD::ms_handle_reset(crimson::net::ConnectionRef conn) +seastar::future<> OSD::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) { // TODO: cleanup the session attached to this connection logger().warn("ms_handle_reset"); diff --git a/src/crimson/osd/osd.h b/src/crimson/osd/osd.h index be090fb90e1..b6b6c270d31 100644 --- a/src/crimson/osd/osd.h +++ b/src/crimson/osd/osd.h @@ -100,7 +100,7 @@ class OSD final : public crimson::net::Dispatcher, // Dispatcher methods seastar::future<> ms_dispatch(crimson::net::Connection* conn, MessageRef m) final; seastar::future<> ms_handle_connect(crimson::net::ConnectionRef conn) final; - seastar::future<> ms_handle_reset(crimson::net::ConnectionRef conn) final; + seastar::future<> ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) final; seastar::future<> ms_handle_remote_reset(crimson::net::ConnectionRef conn) final; // mgr::WithStats methods diff --git a/src/test/crimson/test_messenger.cc b/src/test/crimson/test_messenger.cc index 4a5b3f745d8..06a12642625 100644 --- a/src/test/crimson/test_messenger.cc +++ b/src/test/crimson/test_messenger.cc @@ -846,7 +846,7 @@ class FailoverSuite : public Dispatcher { return seastar::now(); } - seastar::future<> ms_handle_reset(ConnectionRef conn) override { + seastar::future<> ms_handle_reset(ConnectionRef conn, bool is_replace) override { auto result = interceptor.find_result(conn); if (result == nullptr) { logger().error("Untracked reset connection: {}", *conn); @@ -1384,7 +1384,7 @@ class FailoverSuitePeer : public Dispatcher { return flush_pending_send(); } - seastar::future<> ms_handle_reset(ConnectionRef conn) override { + seastar::future<> ms_handle_reset(ConnectionRef conn, bool is_replace) override { logger().info("[TestPeer] got reset from Test"); ceph_assert(tracked_conn == conn); tracked_conn = nullptr;