]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: adjust dispatcher interface about cross-core notifications
authorYingxin Cheng <yingxin.cheng@intel.com>
Mon, 10 Jul 2023 01:51:39 +0000 (09:51 +0800)
committerMatan Breizman <mbreizma@redhat.com>
Thu, 19 Oct 2023 07:15:13 +0000 (07:15 +0000)
Due to that we aren't able to determine cross-core ordering:
* Move ms_handle_connect/accept() to be called in the new shard, so it
  will notify before ms_dispatch() in the same core;
* Introduce another ms_handle_shard_change() when the current core is
  changed;

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
(cherry picked from commit 6bab7e698db8b7f3bec240952ed33a2bb9918a20)

src/crimson/mgr/client.cc
src/crimson/net/Dispatcher.h
src/crimson/net/ProtocolV2.cc
src/crimson/net/chained_dispatchers.cc
src/crimson/net/chained_dispatchers.h
src/crimson/net/io_handler.cc
src/crimson/net/io_handler.h
src/crimson/osd/heartbeat.cc
src/crimson/tools/perf_crimson_msgr.cc
src/test/crimson/test_messenger.cc
src/test/crimson/test_messenger_thrash.cc

index 81c508c1918e413ca5a6af1ce40ce61862b2df6d..169915c9eb3b2777b26610075bada9bdec2f8b00 100644 (file)
@@ -67,9 +67,9 @@ Client::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
 
 void Client::ms_handle_connect(
     crimson::net::ConnectionRef c,
-    seastar::shard_id new_shard)
+    seastar::shard_id prv_shard)
 {
-  ceph_assert_always(new_shard == seastar::this_shard_id());
+  ceph_assert_always(prv_shard == seastar::this_shard_id());
   gate.dispatch_in_background(__func__, *this, [this, c] {
     if (conn == c) {
       // ask for the mgrconfigure message
index 11908349e7cd1ce19ca8f42706bf1cfa4b44597d..9eea0a858f0645beb1d8284f985a402d7af60239 100644 (file)
@@ -30,17 +30,24 @@ class Dispatcher {
   // used to throttle the connection if it's too busy.
   virtual std::optional<seastar::future<>> ms_dispatch(ConnectionRef, MessageRef) = 0;
 
+  // The connection is moving to the new_shard under accept/connect.
+  // User should not operate conn in this shard thereafter.
+  virtual void ms_handle_shard_change(
+      ConnectionRef conn,
+      seastar::shard_id new_shard,
+      bool is_accept_or_connect) {}
+
   // The connection is accepted or recoverred(lossless), all the followup
-  // events and messages will be dispatched to the new_shard.
+  // events and messages will be dispatched to this shard.
   //
   // is_replace=true means the accepted connection has replaced
   // another connecting connection with the same peer_addr, which currently only
   // happens under lossy policy when both sides wish to connect to each other.
-  virtual void ms_handle_accept(ConnectionRef conn, seastar::shard_id new_shard, bool is_replace) {}
+  virtual void ms_handle_accept(ConnectionRef conn, seastar::shard_id prv_shard, bool is_replace) {}
 
   // The connection is (re)connected, all the followup events and messages will
-  // be dispatched to the new_shard.
-  virtual void ms_handle_connect(ConnectionRef conn, seastar::shard_id new_shard) {}
+  // be dispatched to this shard.
+  virtual void ms_handle_connect(ConnectionRef conn, seastar::shard_id prv_shard) {}
 
   // a reset event is dispatched when the connection is closed unexpectedly.
   //
index 869c11cf5a553d6696f11fdb14f8e65c7265d018..045022b353cd8fcf15ec5252f909cc641266b641 100644 (file)
@@ -1003,6 +1003,8 @@ void ProtocolV2::execute_connecting()
             }
 
             auto cc_seq = crosscore.prepare_submit();
+            // there are 2 hops with dispatch_connect()
+            crosscore.prepare_submit();
             logger().info("{} connected: gs={}, pgs={}, cs={}, "
                           "client_cookie={}, server_cookie={}, {}, new_sid={}, "
                           "send {} IOHandler::dispatch_connect()",
@@ -1797,6 +1799,8 @@ void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) {
 
     // set io_handler to a new shard
     auto cc_seq = crosscore.prepare_submit();
+    // there are 2 hops with dispatch_accept()
+    crosscore.prepare_submit();
     auto new_io_shard = frame_assembler->get_socket_shard_id();
     logger().debug("{} send {} IOHandler::dispatch_accept({})",
                    conn, cc_seq, new_io_shard);
@@ -1968,6 +1972,8 @@ void ProtocolV2::trigger_replacing(bool reconnect,
       // set io_handler to a new shard
       // we should prevent parallel switching core attemps
       auto cc_seq = crosscore.prepare_submit();
+      // there are 2 hops with dispatch_accept()
+      crosscore.prepare_submit();
       logger().debug("{} send {} IOHandler::dispatch_accept({})",
                      conn, cc_seq, new_io_shard);
       ConnectionFRef conn_fref = seastar::make_foreign(
index dfff6d916fa6dacea20daab6ed28694743bbf6a1..1e4af3baa7dfb52f1e3df0091c3ae4fed79bedba 100644 (file)
@@ -13,7 +13,7 @@ namespace {
 namespace crimson::net {
 
 seastar::future<>
-ChainedDispatchers::ms_dispatch(crimson::net::ConnectionRef conn,
+ChainedDispatchers::ms_dispatch(ConnectionRef conn,
                                 MessageRef m) {
   try {
     for (auto& dispatcher : dispatchers) {
@@ -39,13 +39,29 @@ ChainedDispatchers::ms_dispatch(crimson::net::ConnectionRef conn,
 }
 
 void
-ChainedDispatchers::ms_handle_accept(
-    crimson::net::ConnectionRef conn,
+ChainedDispatchers::ms_handle_shard_change(
+    ConnectionRef conn,
     seastar::shard_id new_shard,
+    bool ac) {
+  try {
+    for (auto& dispatcher : dispatchers) {
+      dispatcher->ms_handle_shard_change(conn, new_shard, ac);
+    }
+  } catch (...) {
+    logger().error("{} got unexpected exception in ms_handle_shard_change() {}",
+                   *conn, std::current_exception());
+    ceph_abort();
+  }
+}
+
+void
+ChainedDispatchers::ms_handle_accept(
+    ConnectionRef conn,
+    seastar::shard_id prv_shard,
     bool is_replace) {
   try {
     for (auto& dispatcher : dispatchers) {
-      dispatcher->ms_handle_accept(conn, new_shard, is_replace);
+      dispatcher->ms_handle_accept(conn, prv_shard, is_replace);
     }
   } catch (...) {
     logger().error("{} got unexpected exception in ms_handle_accept() {}",
@@ -56,11 +72,11 @@ ChainedDispatchers::ms_handle_accept(
 
 void
 ChainedDispatchers::ms_handle_connect(
-    crimson::net::ConnectionRef conn,
-    seastar::shard_id new_shard) {
+    ConnectionRef conn,
+    seastar::shard_id prv_shard) {
   try {
     for(auto& dispatcher : dispatchers) {
-      dispatcher->ms_handle_connect(conn, new_shard);
+      dispatcher->ms_handle_connect(conn, prv_shard);
     }
   } catch (...) {
     logger().error("{} got unexpected exception in ms_handle_connect() {}",
@@ -70,7 +86,7 @@ ChainedDispatchers::ms_handle_connect(
 }
 
 void
-ChainedDispatchers::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) {
+ChainedDispatchers::ms_handle_reset(ConnectionRef conn, bool is_replace) {
   try {
     for (auto& dispatcher : dispatchers) {
       dispatcher->ms_handle_reset(conn, is_replace);
@@ -83,7 +99,7 @@ ChainedDispatchers::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_re
 }
 
 void
-ChainedDispatchers::ms_handle_remote_reset(crimson::net::ConnectionRef conn) {
+ChainedDispatchers::ms_handle_remote_reset(ConnectionRef conn) {
   try {
     for (auto& dispatcher : dispatchers) {
       dispatcher->ms_handle_remote_reset(conn);
index 5835205119d8d24a0fa20efccb9a335fa883e3fa..ec085864ffac03d594265fab8bc46f321e01414b 100644 (file)
@@ -25,11 +25,12 @@ public:
   bool empty() const {
     return dispatchers.empty();
   }
-  seastar::future<> ms_dispatch(crimson::net::ConnectionRef, MessageRef);
-  void ms_handle_accept(crimson::net::ConnectionRef conn, seastar::shard_id, bool is_replace);
-  void ms_handle_connect(crimson::net::ConnectionRef conn, seastar::shard_id);
-  void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace);
-  void ms_handle_remote_reset(crimson::net::ConnectionRef conn);
+  seastar::future<> ms_dispatch(ConnectionRef, MessageRef);
+  void ms_handle_shard_change(ConnectionRef, seastar::shard_id, bool);
+  void ms_handle_accept(ConnectionRef conn, seastar::shard_id, bool is_replace);
+  void ms_handle_connect(ConnectionRef conn, seastar::shard_id);
+  void ms_handle_reset(ConnectionRef conn, bool is_replace);
+  void ms_handle_remote_reset(ConnectionRef conn);
 
  private:
   dispatchers_t dispatchers;
index abb7f5e467346b7c509e26677ae16bfc241df58c..576f9ff434c9c3fd3802815bad14decd7ebd5183 100644 (file)
@@ -561,36 +561,7 @@ IOHandler::dispatch_accept(
     ConnectionFRef conn_fref,
     bool is_replace)
 {
-  ceph_assert_always(seastar::this_shard_id() == get_shard_id());
-  if (!crosscore.proceed_or_wait(cc_seq)) {
-    logger().debug("{} got {} dispatch_accept(), wait at {}",
-                   conn, cc_seq, crosscore.get_in_seq());
-    return crosscore.wait(cc_seq
-    ).then([this, cc_seq, new_sid, is_replace,
-            conn_fref=std::move(conn_fref)]() mutable {
-      return dispatch_accept(cc_seq, new_sid, std::move(conn_fref), is_replace);
-    });
-  }
-
-  logger().debug("{} got {} dispatch_accept(new_sid={}, replace={}) at {}",
-                 conn, cc_seq, new_sid, is_replace, io_stat_printer{*this});
-  if (get_io_state() == io_state_t::drop) {
-    assert(!protocol_is_connected);
-    // it is possible that both io_handler and protocolv2 are
-    // trying to close each other from different cores simultaneously.
-    return to_new_sid(new_sid, std::move(conn_fref));
-  }
-  // protocol_is_connected can be from true to true here if the replacing is
-  // happening to a connected connection.
-  protocol_is_connected = true;
-  ceph_assert_always(conn_ref);
-  auto _conn_ref = conn_ref;
-  auto fut = to_new_sid(new_sid, std::move(conn_fref));
-
-  dispatchers.ms_handle_accept(_conn_ref, new_sid, is_replace);
-  // user can make changes
-
-  return fut;
+  return to_new_sid(cc_seq, new_sid, std::move(conn_fref), is_replace);
 }
 
 seastar::future<>
@@ -599,35 +570,7 @@ IOHandler::dispatch_connect(
     seastar::shard_id new_sid,
     ConnectionFRef conn_fref)
 {
-  ceph_assert_always(seastar::this_shard_id() == get_shard_id());
-  if (!crosscore.proceed_or_wait(cc_seq)) {
-    logger().debug("{} got {} dispatch_connect(), wait at {}",
-                   conn, cc_seq, crosscore.get_in_seq());
-    return crosscore.wait(cc_seq
-    ).then([this, cc_seq, new_sid,
-            conn_fref=std::move(conn_fref)]() mutable {
-      return dispatch_connect(cc_seq, new_sid, std::move(conn_fref));
-    });
-  }
-
-  logger().debug("{} got {} dispatch_connect({}) at {}",
-                 conn, cc_seq, new_sid, io_stat_printer{*this});
-  if (get_io_state() == io_state_t::drop) {
-    assert(!protocol_is_connected);
-    // it is possible that both io_handler and protocolv2 are
-    // trying to close each other from different cores simultaneously.
-    return to_new_sid(new_sid, std::move(conn_fref));
-  }
-  ceph_assert_always(protocol_is_connected == false);
-  protocol_is_connected = true;
-  ceph_assert_always(conn_ref);
-  auto _conn_ref = conn_ref;
-  auto fut = to_new_sid(new_sid, std::move(conn_fref));
-
-  dispatchers.ms_handle_connect(_conn_ref, new_sid);
-  // user can make changes
-
-  return fut;
+  return to_new_sid(cc_seq, new_sid, std::move(conn_fref), std::nullopt);
 }
 
 seastar::future<>
@@ -650,18 +593,56 @@ IOHandler::cleanup_prv_shard(seastar::shard_id prv_sid)
 
 seastar::future<>
 IOHandler::to_new_sid(
+    crosscore_t::seq_t cc_seq,
     seastar::shard_id new_sid,
-    ConnectionFRef conn_fref)
+    ConnectionFRef conn_fref,
+    std::optional<bool> is_replace)
 {
-  /*
-   * Note:
-   * - It must be called before user is aware of the new core (through dispatching);
-   * - Messenger must wait the returned future for futher operations to prevent racing;
-   * - In general, the below submitted continuation should be the first one from the prv sid
-   *   to the new sid;
-   */
+  ceph_assert_always(seastar::this_shard_id() == get_shard_id());
+  if (!crosscore.proceed_or_wait(cc_seq)) {
+    logger().debug("{} got {} to_new_sid(), wait at {}",
+                   conn, cc_seq, crosscore.get_in_seq());
+    return crosscore.wait(cc_seq
+    ).then([this, cc_seq, new_sid, is_replace,
+            conn_fref=std::move(conn_fref)]() mutable {
+      return to_new_sid(cc_seq, new_sid, std::move(conn_fref), is_replace);
+    });
+  }
+
+  bool is_accept_or_connect = is_replace.has_value();
+  logger().debug("{} got {} to_new_sid_1(new_sid={}, {}) at {}",
+                 conn, cc_seq, new_sid,
+                 fmt::format("{}",
+                   is_accept_or_connect ?
+                   (*is_replace ? "accept(replace)" : "accept(!replace)") :
+                   "connect"),
+                 io_stat_printer{*this});
+  auto next_cc_seq = ++cc_seq;
+
+  if (get_io_state() != io_state_t::drop) {
+    ceph_assert_always(conn_ref);
+    if (new_sid != seastar::this_shard_id()) {
+      dispatchers.ms_handle_shard_change(conn_ref, new_sid, is_accept_or_connect);
+      // user can make changes
+    }
+  } else {
+    // it is possible that both io_handler and protocolv2 are
+    // trying to close each other from different cores simultaneously.
+    assert(!protocol_is_connected);
+  }
+
+  if (get_io_state() != io_state_t::drop) {
+    if (is_accept_or_connect) {
+      // protocol_is_connected can be from true to true here if the replacing is
+      // happening to a connected connection.
+    } else {
+      ceph_assert_always(protocol_is_connected == false);
+    }
+    protocol_is_connected = true;
+  } else {
+    assert(!protocol_is_connected);
+  }
 
-  assert(seastar::this_shard_id() == get_shard_id());
   bool is_dropped = false;
   if (get_io_state() == io_state_t::drop) {
     is_dropped = true;
@@ -679,30 +660,50 @@ IOHandler::to_new_sid(
   assert(new_sid == get_shard_id());
 
   return seastar::smp::submit_to(new_sid,
-      [this, is_dropped, prv_sid, conn_fref=std::move(conn_fref)]() mutable {
-    logger().debug("{} see new_sid in io_handler(new_sid) from {}, is_dropped={}",
-                   conn, prv_sid, is_dropped);
+      [this, next_cc_seq, is_dropped, prv_sid, is_replace, conn_fref=std::move(conn_fref)]() mutable {
+    logger().debug("{} got {} to_new_sid_2(prv_sid={}, is_dropped={}, {}) at {}",
+                   conn, next_cc_seq, prv_sid, is_dropped,
+                   fmt::format("{}",
+                     is_replace.has_value() ?
+                     (*is_replace ? "accept(replace)" : "accept(!replace)") :
+                     "connect"),
+                   io_stat_printer{*this});
 
     ceph_assert_always(seastar::this_shard_id() == get_shard_id());
     ceph_assert_always(get_io_state() != io_state_t::open);
     ceph_assert_always(!maybe_dropped_sid.has_value());
-
-    ceph_assert_always(!conn_ref);
-    conn_ref = make_local_shared_foreign(std::move(conn_fref));
+    ceph_assert_always(crosscore.proceed_or_wait(next_cc_seq));
 
     if (is_dropped) {
-      // the follow up cleanups will be done in the prv_sid
+      ceph_assert_always(get_io_state() == io_state_t::drop);
       ceph_assert_always(shard_states->assert_closed_and_exit());
       maybe_dropped_sid = prv_sid;
+      // cleanup_prv_shard() will be done in a follow-up close_io()
     } else {
-      // may be at io_state_t::drop
-      // cleanup the prvious shard
+      // possible at io_state_t::drop
+
+      // previous shard is not cleaned,
+      // but close_io() is responsible to clean up the current shard,
+      // so cleanup the previous shard here.
       shard_states->dispatch_in_background(
           "cleanup_prv_sid", conn, [this, prv_sid] {
         return cleanup_prv_shard(prv_sid);
       });
       maybe_notify_out_dispatch();
     }
+
+    ceph_assert_always(!conn_ref);
+    // assign even if already dropping
+    conn_ref = make_local_shared_foreign(std::move(conn_fref));
+
+    if (get_io_state() != io_state_t::drop) {
+      if (is_replace.has_value()) {
+        dispatchers.ms_handle_accept(conn_ref, prv_sid, *is_replace);
+      } else {
+        dispatchers.ms_handle_connect(conn_ref, prv_sid);
+      }
+      // user can make changes
+    }
   });
 }
 
index edb69b3407afa46e9ab2506d3d8920dd5759a59c..843e565672abe2632bfda25c5174ca68ec625093 100644 (file)
@@ -447,7 +447,10 @@ public:
   seastar::future<> do_send_keepalive();
 
   seastar::future<> to_new_sid(
-      seastar::shard_id new_sid, ConnectionFRef);
+      crosscore_t::seq_t cc_seq,
+      seastar::shard_id new_sid,
+      ConnectionFRef,
+      std::optional<bool> is_replace);
 
   void dispatch_reset(bool is_replace);
 
index cdee52731e644a3e211554404aff1a7f497e086a..266e56533c3bb1579afdf26fc2db9ce87aa0d42e 100644 (file)
@@ -238,9 +238,9 @@ void Heartbeat::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replac
 
 void Heartbeat::ms_handle_connect(
     crimson::net::ConnectionRef conn,
-    seastar::shard_id new_shard)
+    seastar::shard_id prv_shard)
 {
-  ceph_assert_always(seastar::this_shard_id() == new_shard);
+  ceph_assert_always(seastar::this_shard_id() == prv_shard);
   auto peer = conn->get_peer_id();
   if (conn->get_peer_type() != entity_name_t::TYPE_OSD ||
       peer == entity_name_t::NEW) {
@@ -254,10 +254,10 @@ void Heartbeat::ms_handle_connect(
 
 void Heartbeat::ms_handle_accept(
     crimson::net::ConnectionRef conn,
-    seastar::shard_id new_shard,
+    seastar::shard_id prv_shard,
     bool is_replace)
 {
-  ceph_assert_always(seastar::this_shard_id() == new_shard);
+  ceph_assert_always(seastar::this_shard_id() == prv_shard);
   auto peer = conn->get_peer_id();
   if (conn->get_peer_type() != entity_name_t::TYPE_OSD ||
       peer == entity_name_t::NEW) {
index 1e4bde97dac5f1152763b91f93876332a103d222..e1a2db2302fb381a508a289bd0be6667d96f24e3 100644 (file)
@@ -605,8 +605,8 @@ static seastar::future<> run(
 
       void ms_handle_connect(
           crimson::net::ConnectionRef conn,
-          seastar::shard_id new_shard) override {
-        ceph_assert_always(new_shard == seastar::this_shard_id());
+          seastar::shard_id prv_shard) override {
+        ceph_assert_always(prv_shard == seastar::this_shard_id());
         assert(is_active());
         unsigned index = static_cast<ConnectionPriv&>(conn->get_user_private()).index;
         auto &conn_state = conn_states[index];
index ab98aa586bc4f19eb616a89016bd976ae7a9da31..e597caa2b38a8d1e5f0f7df4fe961162b701c391 100644 (file)
@@ -122,10 +122,10 @@ static seastar::future<> test_echo(unsigned rounds,
 
       void ms_handle_accept(
           crimson::net::ConnectionRef conn,
-          seastar::shard_id new_shard,
+          seastar::shard_id prv_shard,
           bool is_replace) override {
         logger().info("server accepted {}", *conn);
-        ceph_assert(new_shard == seastar::this_shard_id());
+        ceph_assert(prv_shard == seastar::this_shard_id());
         ceph_assert(!is_replace);
       }
 
@@ -196,8 +196,8 @@ static seastar::future<> test_echo(unsigned rounds,
 
       void ms_handle_connect(
           crimson::net::ConnectionRef conn,
-          seastar::shard_id new_shard) override {
-        assert(new_shard == seastar::this_shard_id());
+          seastar::shard_id prv_shard) override {
+        assert(prv_shard == seastar::this_shard_id());
         auto session = seastar::make_shared<PingSession>();
         auto [i, added] = sessions.emplace(conn, session);
         std::ignore = i;
@@ -937,9 +937,9 @@ class FailoverSuite : public Dispatcher {
 
   void ms_handle_accept(
       ConnectionRef conn,
-      seastar::shard_id new_shard,
+      seastar::shard_id prv_shard,
       bool is_replace) override {
-    assert(new_shard == seastar::this_shard_id());
+    assert(prv_shard == seastar::this_shard_id());
     auto result = interceptor.find_result(conn);
     if (result == nullptr) {
       logger().error("Untracked accepted connection: {}", *conn);
@@ -964,8 +964,8 @@ class FailoverSuite : public Dispatcher {
 
   void ms_handle_connect(
       ConnectionRef conn,
-      seastar::shard_id new_shard) override {
-    assert(new_shard == seastar::this_shard_id());
+      seastar::shard_id prv_shard) override {
+    assert(prv_shard == seastar::this_shard_id());
     auto result = interceptor.find_result(conn);
     if (result == nullptr) {
       logger().error("Untracked connected connection: {}", *conn);
@@ -1533,9 +1533,9 @@ class FailoverSuitePeer : public Dispatcher {
 
   void ms_handle_accept(
       ConnectionRef conn,
-      seastar::shard_id new_shard,
+      seastar::shard_id prv_shard,
       bool is_replace) override {
-    assert(new_shard == seastar::this_shard_id());
+    assert(prv_shard == seastar::this_shard_id());
     logger().info("[TestPeer] got accept from Test");
     ceph_assert(!tracked_conn ||
                 tracked_conn->is_closed() ||
@@ -1693,9 +1693,9 @@ class FailoverTestPeer : public Dispatcher {
 
   void ms_handle_accept(
       ConnectionRef conn,
-      seastar::shard_id new_shard,
+      seastar::shard_id prv_shard,
       bool is_replace) override {
-    assert(new_shard == seastar::this_shard_id());
+    assert(prv_shard == seastar::this_shard_id());
     cmd_conn = conn;
   }
 
index 7c26d6ffdec672dfc6b31ecf0cb2203cc683db99..22cce30a8094d6e6f64b20d4071a3364214e70e2 100644 (file)
@@ -136,17 +136,17 @@ class SyntheticDispatcher final
 
   void ms_handle_accept(
       crimson::net::ConnectionRef conn,
-      seastar::shard_id new_shard,
+      seastar::shard_id prv_shard,
       bool is_replace) final {
     logger().info("{} - Connection:{}", __func__, *conn);
-    assert(new_shard == seastar::this_shard_id());
+    assert(prv_shard == seastar::this_shard_id());
   }
 
   void ms_handle_connect(
       crimson::net::ConnectionRef conn,
-      seastar::shard_id new_shard) final {
+      seastar::shard_id prv_shard) final {
     logger().info("{} - Connection:{}", __func__, *conn);
-    assert(new_shard == seastar::this_shard_id());
+    assert(prv_shard == seastar::this_shard_id());
   }
 
   void ms_handle_reset(crimson::net::ConnectionRef con, bool is_replace) final;