]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/net: notify if the connection is to be replaced during reset
authorYingxin Cheng <yingxin.cheng@intel.com>
Sun, 29 Mar 2020 11:17:52 +0000 (19:17 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Wed, 1 Apr 2020 03:43:00 +0000 (11:43 +0800)
is_replace=true means the reset connection is going to be replaced by
another accepting connection with the same peer_addr, which currently
only happens under lossy policy when both sides wish to connect to each
other.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
14 files changed:
src/crimson/mgr/client.cc
src/crimson/mgr/client.h
src/crimson/mon/MonClient.cc
src/crimson/mon/MonClient.h
src/crimson/net/Dispatcher.h
src/crimson/net/Protocol.cc
src/crimson/net/ProtocolV2.cc
src/crimson/osd/chained_dispatchers.cc
src/crimson/osd/chained_dispatchers.h
src/crimson/osd/heartbeat.cc
src/crimson/osd/heartbeat.h
src/crimson/osd/osd.cc
src/crimson/osd/osd.h
src/test/crimson/test_messenger.cc

index 9134c2602132ba557d88351ef8c2151eb7241614..b6331191577102c43131e59168519cda2ff1626e 100644 (file)
@@ -70,7 +70,7 @@ seastar::future<> Client::ms_handle_connect(crimson::net::ConnectionRef c)
   }
 }
 
-seastar::future<> Client::ms_handle_reset(crimson::net::ConnectionRef c)
+seastar::future<> Client::ms_handle_reset(crimson::net::ConnectionRef c, bool is_replace)
 {
   if (conn == c) {
     report_timer.cancel();
index 099a504f08f6933b2e1f3cdcc8848b9fef15c45c..3873ba6418b2db7bccdb1c7c3316b8f76fa357d6 100644 (file)
@@ -39,7 +39,7 @@ public:
 private:
   seastar::future<> ms_dispatch(crimson::net::Connection* conn,
                                Ref<Message> m) override;
-  seastar::future<> ms_handle_reset(crimson::net::ConnectionRef conn) final;
+  seastar::future<> ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) final;
   seastar::future<> ms_handle_connect(crimson::net::ConnectionRef conn) final;
   seastar::future<> handle_mgr_map(crimson::net::Connection* conn,
                                   Ref<MMgrMap> m);
index 0018037013706a4e8c9c49f57dd0ce58a551956b..ba4eccac353fdf3b0ff88798250cddf3820a7ee1 100644 (file)
@@ -541,7 +541,7 @@ Client::ms_dispatch(crimson::net::Connection* conn, MessageRef m)
   }
 }
 
-seastar::future<> Client::ms_handle_reset(crimson::net::ConnectionRef conn)
+seastar::future<> Client::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace)
 {
   auto found = std::find_if(pending_conns.begin(), pending_conns.end(),
                             [peer_addr = conn->get_peer_addr()](auto& mc) {
index 1e764c9b98d7334762ea7213a70d386865fccc1d..d834f159b1c64901288eeb6f1f2e59911e3d0d9d 100644 (file)
@@ -141,7 +141,7 @@ private:
 
   seastar::future<> ms_dispatch(crimson::net::Connection* conn,
                                MessageRef m) override;
-  seastar::future<> ms_handle_reset(crimson::net::ConnectionRef conn) override;
+  seastar::future<> ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override;
 
   seastar::future<> handle_monmap(crimson::net::Connection* conn,
                                  Ref<MMonMap> m);
index ac608fc431536bb6338db33612806baa4252ffb1..fa7d9f913921093470b7ea18eb67045efede8953 100644 (file)
@@ -39,7 +39,11 @@ class Dispatcher {
     return seastar::make_ready_future<>();
   }
 
-  virtual seastar::future<> ms_handle_reset(ConnectionRef conn) {
+  // a reset event is dispatched when the connection is closed unexpectedly.
+  // is_replace=true means the reset connection is going to be replaced by
+  // another accepting connection with the same peer_addr, which currently only
+  // happens under lossy policy when both sides wish to connect to each other.
+  virtual seastar::future<> ms_handle_reset(ConnectionRef conn, bool is_replace) {
     return seastar::make_ready_future<>();
   }
 
index 3d01f26e50d425c620ff6ed49f5099ff765070d0..c95cf1d08b54734c9a302ea6684bdd8633866069 100644 (file)
@@ -48,9 +48,10 @@ void Protocol::close(bool dispatch_reset,
     return;
   }
 
+  bool is_replace = f_accept_new ? true : false;
   logger().info("{} closing: reset {}, replace {}", conn,
                 dispatch_reset ? "yes" : "no",
-                f_accept_new ? "yes" : "no");
+                is_replace ? "yes" : "no");
 
   // unregister_conn() drops a reference, so hold another until completion
   auto cleanup = [conn_ref = conn.shared_from_this(), this] {
@@ -74,10 +75,11 @@ void Protocol::close(bool dispatch_reset,
   }
   set_write_state(write_state_t::drop);
   auto gate_closed = pending_dispatch.close();
-  auto reset_dispatched = seastar::futurize_apply([this, dispatch_reset] {
+  auto reset_dispatched = seastar::futurize_apply([this, dispatch_reset, is_replace] {
     if (dispatch_reset) {
       return dispatcher.ms_handle_reset(
-          seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+          seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()),
+          is_replace);
     }
     return seastar::now();
   }).handle_exception([this] (std::exception_ptr eptr) {
index 7f5ff573b8242833ea3caad8075d78411323269b..e823ece20e512d33cbb25d4fbaeee530d693edea 100644 (file)
@@ -1311,7 +1311,6 @@ ProtocolV2::server_connect()
 
     SocketConnectionRef existing_conn = messenger.lookup_conn(conn.peer_addr);
 
-    bool dispatch_reset = true;
     if (existing_conn) {
       if (existing_conn->protocol->proto_type != proto_t::v2) {
         logger().warn("{} existing connection {} proto version is {}, close existing",
@@ -1319,14 +1318,15 @@ ProtocolV2::server_connect()
                       static_cast<int>(existing_conn->protocol->proto_type));
         // should unregister the existing from msgr atomically
         // NOTE: this is following async messenger logic, but we may miss the reset event.
-        dispatch_reset = false;
+        execute_establishing(existing_conn, false);
+        return seastar::make_ready_future<next_step_t>(next_step_t::ready);
       } else {
         return handle_existing_connection(existing_conn);
       }
+    } else {
+      execute_establishing(nullptr, true);
+      return seastar::make_ready_future<next_step_t>(next_step_t::ready);
     }
-
-    execute_establishing(existing_conn, dispatch_reset);
-    return seastar::make_ready_future<next_step_t>(next_step_t::ready);
   });
 }
 
index 1bc40deb2a5612cbc81691ef5f5de35cd96b02a4..3d6ba846b754d68e8808290a9e313e82f89d7523 100644 (file)
@@ -26,9 +26,9 @@ ChainedDispatchers::ms_handle_connect(crimson::net::ConnectionRef conn) {
 }
 
 seastar::future<>
-ChainedDispatchers::ms_handle_reset(crimson::net::ConnectionRef conn) {
-  return seastar::do_for_each(dispatchers, [conn](Dispatcher* dispatcher) {
-    return dispatcher->ms_handle_reset(conn);
+ChainedDispatchers::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) {
+  return seastar::do_for_each(dispatchers, [conn, is_replace](Dispatcher* dispatcher) {
+    return dispatcher->ms_handle_reset(conn, is_replace);
   });
 }
 
index 1748e03adac259c7fb25b92df231de8fa11f5c66..2ea1e517b7825485104670b7460cc25aaf23509d 100644 (file)
@@ -25,6 +25,6 @@ public:
   seastar::future<> ms_dispatch(crimson::net::Connection* conn, MessageRef m) override;
   seastar::future<> ms_handle_accept(crimson::net::ConnectionRef conn) override;
   seastar::future<> ms_handle_connect(crimson::net::ConnectionRef conn) override;
-  seastar::future<> ms_handle_reset(crimson::net::ConnectionRef conn) override;
+  seastar::future<> ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override;
   seastar::future<> ms_handle_remote_reset(crimson::net::ConnectionRef conn) override;
 };
index 6f60654854caeb25e8555ff1f0902baabfb9659c..67e31267322e807609bfdaa3c57a6b5f60b16023 100644 (file)
@@ -195,7 +195,7 @@ seastar::future<> Heartbeat::ms_dispatch(crimson::net::Connection* conn,
   }
 }
 
-seastar::future<> Heartbeat::ms_handle_reset(crimson::net::ConnectionRef conn)
+seastar::future<> Heartbeat::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace)
 {
   auto found = std::find_if(peers.begin(), peers.end(),
                             [conn](const peers_map_t::value_type& peer) {
index a0e6146cd47bbc5cbe5c0dbfb9f79946cb6a1d85..55571cff2fbb8afa957e29fd84041137d4ba25b2 100644 (file)
@@ -46,7 +46,7 @@ public:
   // Dispatcher methods
   seastar::future<> ms_dispatch(crimson::net::Connection* conn,
                                MessageRef m) override;
-  seastar::future<> ms_handle_reset(crimson::net::ConnectionRef conn) override;
+  seastar::future<> ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override;
 
 private:
   seastar::future<> handle_osd_ping(crimson::net::Connection* conn,
index 01f938535774100ca66fd3c5de07515796c2d2c2..d48d98c2c198c800ff6690ca8e29869d65304c96 100644 (file)
@@ -614,7 +614,7 @@ seastar::future<> OSD::ms_handle_connect(crimson::net::ConnectionRef conn)
   }
 }
 
-seastar::future<> OSD::ms_handle_reset(crimson::net::ConnectionRef conn)
+seastar::future<> OSD::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace)
 {
   // TODO: cleanup the session attached to this connection
   logger().warn("ms_handle_reset");
index be090fb90e1cd708eab94578a4eb5302509f49f7..b6b6c270d31267b564c56434f8cf7fa9cbd9e64a 100644 (file)
@@ -100,7 +100,7 @@ class OSD final : public crimson::net::Dispatcher,
   // Dispatcher methods
   seastar::future<> ms_dispatch(crimson::net::Connection* conn, MessageRef m) final;
   seastar::future<> ms_handle_connect(crimson::net::ConnectionRef conn) final;
-  seastar::future<> ms_handle_reset(crimson::net::ConnectionRef conn) final;
+  seastar::future<> ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) final;
   seastar::future<> ms_handle_remote_reset(crimson::net::ConnectionRef conn) final;
 
   // mgr::WithStats methods
index 4a5b3f745d88ba47b5da854d7cc77fd485b455f1..06a126426256a8357a0f7d91cf4560cc8a4abdbe 100644 (file)
@@ -846,7 +846,7 @@ class FailoverSuite : public Dispatcher {
     return seastar::now();
   }
 
-  seastar::future<> ms_handle_reset(ConnectionRef conn) override {
+  seastar::future<> ms_handle_reset(ConnectionRef conn, bool is_replace) override {
     auto result = interceptor.find_result(conn);
     if (result == nullptr) {
       logger().error("Untracked reset connection: {}", *conn);
@@ -1384,7 +1384,7 @@ class FailoverSuitePeer : public Dispatcher {
     return flush_pending_send();
   }
 
-  seastar::future<> ms_handle_reset(ConnectionRef conn) override {
+  seastar::future<> ms_handle_reset(ConnectionRef conn, bool is_replace) override {
     logger().info("[TestPeer] got reset from Test");
     ceph_assert(tracked_conn == conn);
     tracked_conn = nullptr;