]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: implement logic to move a io-handler to a new sid
authorYingxin Cheng <yingxin.cheng@intel.com>
Mon, 5 Jun 2023 02:22:29 +0000 (10:22 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Sun, 25 Jun 2023 03:57:19 +0000 (11:57 +0800)
Note that it is inevitable that the user can mark down the connection
while the protocol is trying to move the connection to another core.

In that case, the implementation should tolerate the racing, which
finally needs to cleanup resources correctly and dispatch reasonable
events.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/FrameAssemblerV2.cc
src/crimson/net/FrameAssemblerV2.h
src/crimson/net/ProtocolV2.cc
src/crimson/net/ProtocolV2.h
src/crimson/net/io_handler.cc
src/crimson/net/io_handler.h

index f76f67740546d48e210da5947f92a6376f2fa31d..bb48138a81fb73254a78b592c383628dc2c83388 100644 (file)
@@ -151,6 +151,14 @@ bool FrameAssemblerV2::is_socket_valid() const
   return has_socket() && !is_socket_shutdown;
 }
 
+seastar::shard_id
+FrameAssemblerV2::get_socket_shard_id() const
+{
+  assert(seastar::this_shard_id() == sid);
+  assert(is_socket_valid());
+  return socket->get_shard_id();
+}
+
 SocketFRef FrameAssemblerV2::move_socket()
 {
   assert(has_socket());
index a4494384ab10793da6b8742c5d0b8a0fc1338adb..e4af653812d7a6bed89afde4f9af075bbd743607 100644 (file)
@@ -26,6 +26,16 @@ public:
 
   FrameAssemblerV2(FrameAssemblerV2 &&) = delete;
 
+  void set_shard_id(seastar::shard_id _sid) {
+    assert(seastar::this_shard_id() == sid);
+    clear();
+    sid = _sid;
+  }
+
+  seastar::shard_id get_shard_id() const {
+    return sid;
+  }
+
   void set_is_rev1(bool is_rev1);
 
   void create_session_stream_handlers(
@@ -67,6 +77,8 @@ public:
   // the socket exists and not shutdown
   bool is_socket_valid() const;
 
+  seastar::shard_id get_socket_shard_id() const;
+
   void set_socket(SocketFRef &&);
 
   void learn_socket_ephemeral_port_as_connector(uint16_t port);
index b3a25955483e9f6a9f04c4327fd53caf7630350a..6c64db82fd558b018d291d47bf87bbc4c960bb37 100644 (file)
@@ -222,6 +222,9 @@ void ProtocolV2::trigger_state(state_t new_state, io_state_t new_io_state)
   FrameAssemblerV2Ref fa;
   if (new_state == state_t::READY) {
     assert(new_io_state == io_state_t::open);
+    assert(io_handler.get_shard_id() ==
+           frame_assembler->get_socket_shard_id());
+    frame_assembler->set_shard_id(io_handler.get_shard_id());
     fa = std::move(frame_assembler);
   } else {
     assert(new_io_state != io_state_t::open);
@@ -229,11 +232,19 @@ void ProtocolV2::trigger_state(state_t new_state, io_state_t new_io_state)
   io_handler.set_io_state(new_io_state, std::move(fa), need_notify_out);
 
   if (pre_state == state_t::READY) {
+    logger().debug("{} IOHandler::wait_io_exit_dispatching() start ...", conn);
     assert(new_io_state != io_state_t::open);
     gate.dispatch_in_background("exit_io", conn, [this] {
-      return io_handler.wait_io_exit_dispatching(
-      ).then([this](auto ret) {
+      return seastar::smp::submit_to(
+          io_handler.get_shard_id(), [this] {
+        return io_handler.wait_io_exit_dispatching();
+      }).then([this](auto ret) {
+        logger().debug("{} IOHandler::wait_io_exit_dispatching() finish, {}",
+                       conn, ret.io_states);
         frame_assembler = std::move(ret.frame_assembler);
+        assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
+        ceph_assert_always(
+            seastar::this_shard_id() == frame_assembler->get_shard_id());
         ceph_assert_always(!frame_assembler->is_socket_valid());
         io_states = ret.io_states;
         pr_exit_io->set_value();
@@ -896,18 +907,43 @@ void ProtocolV2::execute_connecting()
           }
           switch (next) {
            case next_step_t::ready: {
-            logger().info("{} connected: gs={}, pgs={}, cs={}, "
-                          "client_cookie={}, server_cookie={}, {}",
-                          conn, global_seq, peer_global_seq, connect_seq,
-                          client_cookie, server_cookie, io_states);
-            io_handler.dispatch_connect();
             if (unlikely(state != state_t::CONNECTING)) {
-              logger().debug("{} triggered {} after ms_handle_connect(), abort",
+              logger().debug("{} triggered {} before dispatch_connect(), abort",
                              conn, get_state_name(state));
               abort_protocol();
             }
-            execute_ready();
-            break;
+
+            logger().info("{} connected: gs={}, pgs={}, cs={}, "
+                          "client_cookie={}, server_cookie={}, {}, new_sid={}, "
+                          "IOHandler::dispatch_connect()",
+                          conn, global_seq, peer_global_seq, connect_seq,
+                          client_cookie, server_cookie, io_states,
+                          frame_assembler->get_socket_shard_id());
+
+            // set io_handler to a new shard
+            auto new_io_shard = frame_assembler->get_socket_shard_id();
+            ConnectionFRef conn_fref = seastar::make_foreign(
+                conn.shared_from_this());
+            ceph_assert_always(!pr_switch_io_shard.has_value());
+            pr_switch_io_shard = seastar::shared_promise<>();
+            return seastar::smp::submit_to(
+                io_handler.get_shard_id(),
+                [this, new_io_shard, conn_fref=std::move(conn_fref)]() mutable {
+              return io_handler.dispatch_connect(
+                  new_io_shard, std::move(conn_fref));
+            }).then([this, new_io_shard] {
+              ceph_assert_always(io_handler.get_shard_id() == new_io_shard);
+              pr_switch_io_shard->set_value();
+              pr_switch_io_shard = std::nullopt;
+              // user can make changes
+
+              if (unlikely(state != state_t::CONNECTING)) {
+                logger().debug("{} triggered {} after dispatch_connect(), abort",
+                               conn, get_state_name(state));
+                abort_protocol();
+              }
+              execute_ready();
+            });
            }
            case next_step_t::wait: {
             logger().info("{} execute_connecting(): going to WAIT(max-backoff)", conn);
@@ -915,7 +951,7 @@ void ProtocolV2::execute_connecting()
             frame_assembler->shutdown_socket<true>(&gate);
             is_socket_valid = false;
             execute_wait(true);
-            break;
+            return seastar::now();
            }
            default: {
             ceph_abort("impossible next step");
@@ -1634,6 +1670,13 @@ void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) {
   ceph_assert_always(is_socket_valid);
   trigger_state(state_t::ESTABLISHING, io_state_t::delay);
   if (existing_conn) {
+    logger().info("{} start establishing: gs={}, pgs={}, cs={}, "
+                  "client_cookie={}, server_cookie={}, {}, new_sid={}, "
+                  "close existing {}",
+                  conn, global_seq, peer_global_seq, connect_seq,
+                  client_cookie, server_cookie,
+                  io_states, frame_assembler->get_socket_shard_id(),
+                  *existing_conn);
     ProtocolV2 *existing_proto = dynamic_cast<ProtocolV2*>(
         existing_conn->protocol.get());
     existing_proto->do_close(
@@ -1646,19 +1689,42 @@ void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) {
       abort_protocol();
     }
   } else {
+    logger().info("{} start establishing: gs={}, pgs={}, cs={}, "
+                  "client_cookie={}, server_cookie={}, {}, new_sid={}, "
+                  "no existing",
+                  conn, global_seq, peer_global_seq, connect_seq,
+                  client_cookie, server_cookie, io_states,
+                  frame_assembler->get_socket_shard_id());
     accept_me();
   }
 
-  io_handler.dispatch_accept();
-  if (unlikely(state != state_t::ESTABLISHING)) {
-    logger().debug("{} triggered {} after ms_handle_accept() during execute_establishing()",
-                   conn, get_state_name(state));
-    abort_protocol();
-  }
-
   gated_execute("execute_establishing", conn, [this] {
     ceph_assert_always(state == state_t::ESTABLISHING);
-    return seastar::futurize_invoke([this] {
+
+    // set io_handler to a new shard
+    auto new_io_shard = frame_assembler->get_socket_shard_id();
+    logger().debug("{} IOHandler::dispatch_accept({})", conn, new_io_shard);
+    ConnectionFRef conn_fref = seastar::make_foreign(
+        conn.shared_from_this());
+    ceph_assert_always(!pr_switch_io_shard.has_value());
+    pr_switch_io_shard = seastar::shared_promise<>();
+    return seastar::smp::submit_to(
+        io_handler.get_shard_id(),
+        [this, new_io_shard, conn_fref=std::move(conn_fref)]() mutable {
+      return io_handler.dispatch_accept(
+          new_io_shard, std::move(conn_fref));
+    }).then([this, new_io_shard] {
+      ceph_assert_always(io_handler.get_shard_id() == new_io_shard);
+      pr_switch_io_shard->set_value();
+      pr_switch_io_shard = std::nullopt;
+      // user can make changes
+
+      if (unlikely(state != state_t::ESTABLISHING)) {
+        logger().debug("{} triggered {} after dispatch_accept() during execute_establishing()",
+                       conn, get_state_name(state));
+        abort_protocol();
+      }
+
       return send_server_ident();
     }).then([this] {
       if (unlikely(state != state_t::ESTABLISHING)) {
@@ -1666,10 +1732,7 @@ void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) {
                        conn, get_state_name(state));
         abort_protocol();
       }
-      logger().info("{} established: gs={}, pgs={}, cs={}, "
-                    "client_cookie={}, server_cookie={}, {}",
-                    conn, global_seq, peer_global_seq, connect_seq,
-                    client_cookie, server_cookie, io_states);
+      logger().info("{} established, going to ready", conn);
       execute_ready();
     }).handle_exception([this](std::exception_ptr eptr) {
       fault(state_t::ESTABLISHING, "execute_establishing", eptr);
@@ -1741,6 +1804,12 @@ void ProtocolV2::trigger_replacing(bool reconnect,
   ceph_assert_always(state <= state_t::WAIT);
   ceph_assert_always(has_socket || state == state_t::CONNECTING);
   ceph_assert_always(!mover.socket->is_shutdown());
+
+  logger().info("{} start replacing ({}): pgs was {}, cs was {}, "
+                "client_cookie was {}, {}, new_sid={}",
+                conn, reconnect ? "reconnected" : "connected",
+                peer_global_seq, connect_seq, client_cookie,
+                io_states, mover.socket->get_shard_id());
   trigger_state(state_t::REPLACING, io_state_t::delay);
   if (is_socket_valid) {
     frame_assembler->shutdown_socket<true>(&gate);
@@ -1759,15 +1828,46 @@ void ProtocolV2::trigger_replacing(bool reconnect,
        new_peer_global_seq,
        new_connect_seq, new_msg_seq] () mutable {
     ceph_assert_always(state == state_t::REPLACING);
-    io_handler.dispatch_accept();
-    // state may become CLOSING, close mover.socket and abort later
+    auto new_io_shard = mover.socket->get_shard_id();
+    // state may become CLOSING below, but we cannot abort the chain until
+    // mover.socket is correctly handled (closed or replaced).
+
     return wait_exit_io(
     ).then([this] {
+      if (unlikely(state != state_t::REPLACING)) {
+        ceph_assert_always(state == state_t::CLOSING);
+        return seastar::now();
+      }
+
       ceph_assert_always(frame_assembler);
       protocol_timer.cancel();
       auto done = std::move(execution_done);
       execution_done = seastar::now();
       return done;
+    }).then([this, new_io_shard] {
+      if (unlikely(state != state_t::REPLACING)) {
+        ceph_assert_always(state == state_t::CLOSING);
+        return seastar::now();
+      }
+
+      // set io_handler to a new shard
+      // we should prevent parallel switching core attemps
+      logger().debug("{} IOHandler::dispatch_accept({})", conn, new_io_shard);
+      ConnectionFRef conn_fref = seastar::make_foreign(
+          conn.shared_from_this());
+      ceph_assert_always(!pr_switch_io_shard.has_value());
+      pr_switch_io_shard = seastar::shared_promise<>();
+      return seastar::smp::submit_to(
+          io_handler.get_shard_id(),
+          [this, new_io_shard, conn_fref=std::move(conn_fref)]() mutable {
+        return io_handler.dispatch_accept(
+            new_io_shard, std::move(conn_fref));
+      }).then([this, new_io_shard] {
+        ceph_assert_always(io_handler.get_shard_id() == new_io_shard);
+        pr_switch_io_shard->set_value();
+        pr_switch_io_shard = std::nullopt;
+        // user can make changes
+      });
     }).then([this,
              reconnect,
              do_reset,
index b1767f7da0108271226a9318cab2c5d5e1ae45ee..d74425344b4c6ddf2ae3ceec13bbcc271939149e 100644 (file)
@@ -227,6 +227,8 @@ private:
 
   FrameAssemblerV2Ref frame_assembler;
 
+  std::optional<seastar::shared_promise<>> pr_switch_io_shard;
+
   std::optional<seastar::shared_promise<>> pr_exit_io;
 
   AuthConnectionMetaRef auth_meta;
index 54ef67356ab0fdc3be03857fbfee500b1ca1178a..576d91b7a15e4416be1857a68c5603d5dd813280 100644 (file)
@@ -57,7 +57,9 @@ IOHandler::IOHandler(ChainedDispatchers &dispatchers,
 IOHandler::~IOHandler()
 {
   // close_io() must be finished
-  ceph_assert_always(shard_states->assert_closed_and_exit());
+  ceph_assert_always(maybe_prv_shard_states == nullptr);
+  // should be true in the according shard
+  // ceph_assert_always(shard_states->assert_closed_and_exit());
   assert(!conn_ref);
 }
 
@@ -234,6 +236,19 @@ void IOHandler::print_io_stat(std::ostream &out) const
       << ")";
 }
 
+void IOHandler::assign_frame_assembler(FrameAssemblerV2Ref fa)
+{
+  assert(fa != nullptr);
+  ceph_assert_always(frame_assembler == nullptr);
+  frame_assembler = std::move(fa);
+  ceph_assert_always(
+      frame_assembler->get_shard_id() == get_shard_id());
+  // should have been set through dispatch_accept/connect()
+  ceph_assert_always(
+      frame_assembler->get_socket_shard_id() == get_shard_id());
+  ceph_assert_always(frame_assembler->is_socket_valid());
+}
+
 void IOHandler::set_io_state(
     io_state_t new_state,
     FrameAssemblerV2Ref fa,
@@ -241,20 +256,32 @@ void IOHandler::set_io_state(
 {
   ceph_assert_always(seastar::this_shard_id() == get_shard_id());
   auto prv_state = get_io_state();
+  logger().debug("{} got set_io_state(): prv_state={}, new_state={}, "
+                 "fa={}, set_notify_out={}, at {}",
+                 conn, prv_state, new_state,
+                 fa ? "present" : "N/A", set_notify_out,
+                 io_stat_printer{*this});
   ceph_assert_always(!(
     (new_state == io_state_t::none && prv_state != io_state_t::none) ||
-    (new_state == io_state_t::open && prv_state == io_state_t::open) ||
-    (new_state != io_state_t::drop && prv_state == io_state_t::drop)
+    (new_state == io_state_t::open && prv_state == io_state_t::open)
   ));
 
+  if (prv_state == io_state_t::drop) {
+    // only possible due to a racing mark_down() from user
+    if (new_state == io_state_t::open) {
+      assign_frame_assembler(std::move(fa));
+      frame_assembler->shutdown_socket<false>(nullptr);
+    } else {
+      assert(fa == nullptr);
+    }
+    return;
+  }
+
   bool dispatch_in = false;
   if (new_state == io_state_t::open) {
     // to open
     ceph_assert_always(protocol_is_connected == true);
-    assert(fa != nullptr);
-    ceph_assert_always(frame_assembler == nullptr);
-    frame_assembler = std::move(fa);
-    ceph_assert_always(frame_assembler->is_socket_valid());
+    assign_frame_assembler(std::move(fa));
     dispatch_in = true;
 #ifdef UNIT_TESTS_BUILT
     if (conn.interceptor) {
@@ -301,13 +328,31 @@ void IOHandler::set_io_state(
 seastar::future<IOHandler::exit_dispatching_ret>
 IOHandler::wait_io_exit_dispatching()
 {
+  assert(seastar::this_shard_id() == get_shard_id());
+  logger().debug("{} got wait_io_exit_dispatching()", conn);
   ceph_assert_always(get_io_state() != io_state_t::open);
   ceph_assert_always(frame_assembler != nullptr);
   ceph_assert_always(!frame_assembler->is_socket_valid());
-  return shard_states->wait_io_exit_dispatching(
-  ).then([this] {
+  return seastar::futurize_invoke([this] {
+    // cannot be running in parallel with to_new_sid()
+    if (maybe_dropped_sid.has_value()) {
+      ceph_assert_always(get_io_state() == io_state_t::drop);
+      assert(shard_states->assert_closed_and_exit());
+      auto prv_sid = *maybe_dropped_sid;
+      return seastar::smp::submit_to(prv_sid, [this] {
+        logger().debug("{} got wait_io_exit_dispatching from prv_sid", conn);
+        assert(maybe_prv_shard_states != nullptr);
+        return maybe_prv_shard_states->wait_io_exit_dispatching();
+      });
+    } else {
+      return shard_states->wait_io_exit_dispatching();
+    }
+  }).then([this] {
+    logger().debug("{} finish wait_io_exit_dispatching at {}",
+                   conn, io_stat_printer{*this});
     ceph_assert_always(frame_assembler != nullptr);
     ceph_assert_always(!frame_assembler->is_socket_valid());
+    frame_assembler->set_shard_id(conn.get_messenger_shard_id());
     return exit_dispatching_ret{
       std::move(frame_assembler),
       get_states()};
@@ -399,27 +444,127 @@ void IOHandler::discard_out_sent()
   out_sent_msgs.clear();
 }
 
-void IOHandler::dispatch_accept()
+seastar::future<>
+IOHandler::dispatch_accept(
+    seastar::shard_id new_sid,
+    ConnectionFRef conn_fref)
 {
+  ceph_assert_always(seastar::this_shard_id() == get_shard_id());
+  logger().debug("{} got dispatch_accept({}) at {}",
+                 conn, new_sid, io_stat_printer{*this});
   if (get_io_state() == io_state_t::drop) {
-    return;
+    assert(!protocol_is_connected);
+    // it is possible that both io_handler and protocolv2 are
+    // trying to close each other from different cores simultaneously.
+    return to_new_sid(new_sid, std::move(conn_fref));
   }
   // protocol_is_connected can be from true to true here if the replacing is
   // happening to a connected connection.
   protocol_is_connected = true;
   ceph_assert_always(conn_ref);
-  dispatchers.ms_handle_accept(conn_ref, get_shard_id());
+  auto _conn_ref = conn_ref;
+  auto fut = to_new_sid(new_sid, std::move(conn_fref));
+  dispatchers.ms_handle_accept(_conn_ref, new_sid);
+  return fut;
 }
 
-void IOHandler::dispatch_connect()
+seastar::future<>
+IOHandler::dispatch_connect(
+    seastar::shard_id new_sid,
+    ConnectionFRef conn_fref)
 {
+  ceph_assert_always(seastar::this_shard_id() == get_shard_id());
+  logger().debug("{} got dispatch_connect({}) at {}",
+                 conn, new_sid, io_stat_printer{*this});
   if (get_io_state() == io_state_t::drop) {
-    return;
+    assert(!protocol_is_connected);
+    // it is possible that both io_handler and protocolv2 are
+    // trying to close each other from different cores simultaneously.
+    return to_new_sid(new_sid, std::move(conn_fref));
   }
   ceph_assert_always(protocol_is_connected == false);
   protocol_is_connected = true;
   ceph_assert_always(conn_ref);
-  dispatchers.ms_handle_connect(conn_ref, get_shard_id());
+  auto _conn_ref = conn_ref;
+  auto fut = to_new_sid(new_sid, std::move(conn_fref));
+  dispatchers.ms_handle_connect(_conn_ref, new_sid);
+  return fut;
+}
+
+seastar::future<>
+IOHandler::cleanup_prv_shard(seastar::shard_id prv_sid)
+{
+  assert(seastar::this_shard_id() == get_shard_id());
+  return seastar::smp::submit_to(prv_sid, [this] {
+    logger().debug("{} got cleanup_prv_shard()", conn);
+    assert(maybe_prv_shard_states != nullptr);
+    auto ref_prv_states = std::move(maybe_prv_shard_states);
+    auto &prv_states = *ref_prv_states;
+    return prv_states.close(
+    ).then([ref_prv_states=std::move(ref_prv_states)] {
+      ceph_assert_always(ref_prv_states->assert_closed_and_exit());
+    });
+  }).then([this] {
+    ceph_assert_always(maybe_prv_shard_states == nullptr);
+  });
+}
+
+seastar::future<>
+IOHandler::to_new_sid(
+    seastar::shard_id new_sid,
+    ConnectionFRef conn_fref)
+{
+  /*
+   * Note:
+   * - It must be called before user is aware of the new core (through dispatching);
+   * - Messenger must wait the returned future for futher operations to prevent racing;
+   * - In general, the below submitted continuation should be the first one from the prv sid
+   *   to the new sid;
+   */
+
+  assert(seastar::this_shard_id() == get_shard_id());
+  bool is_dropped = false;
+  if (get_io_state() == io_state_t::drop) {
+    is_dropped = true;
+  }
+  ceph_assert_always(get_io_state() != io_state_t::open);
+
+  // apply the switching atomically
+  ceph_assert_always(conn_ref);
+  conn_ref.reset();
+  auto prv_sid = get_shard_id();
+  ceph_assert_always(maybe_prv_shard_states == nullptr);
+  maybe_prv_shard_states = std::move(shard_states);
+  shard_states = shard_states_t::create_from_previous(
+      *maybe_prv_shard_states, new_sid);
+  assert(new_sid == get_shard_id());
+
+  return seastar::smp::submit_to(new_sid,
+      [this, is_dropped, prv_sid, conn_fref=std::move(conn_fref)]() mutable {
+    logger().debug("{} see new_sid in io_handler(new_sid) from {}, is_dropped={}",
+                   conn, prv_sid, is_dropped);
+
+    ceph_assert_always(seastar::this_shard_id() == get_shard_id());
+    ceph_assert_always(get_io_state() != io_state_t::open);
+    ceph_assert_always(!maybe_dropped_sid.has_value());
+
+    ceph_assert_always(!conn_ref);
+    conn_ref = make_local_shared_foreign(std::move(conn_fref));
+
+    if (is_dropped) {
+      // the follow up cleanups will be done in the prv_sid
+      ceph_assert_always(shard_states->assert_closed_and_exit());
+      maybe_dropped_sid = prv_sid;
+    } else {
+      // may be at io_state_t::drop
+      // cleanup the prvious shard
+      shard_states->dispatch_in_background(
+          "cleanup_prv_sid", conn, [this, prv_sid] {
+        return cleanup_prv_shard(prv_sid);
+      });
+      maybe_notify_out_dispatch();
+    }
+  });
 }
 
 void IOHandler::dispatch_reset(bool is_replace)
@@ -806,8 +951,12 @@ void IOHandler::do_in_dispatch()
 seastar::future<>
 IOHandler::close_io(bool is_dispatch_reset, bool is_replace)
 {
+  ceph_assert_always(seastar::this_shard_id() == get_shard_id());
   ceph_assert_always(get_io_state() == io_state_t::drop);
 
+  logger().debug("{} got close_io(reset={}, replace={})",
+                 conn, is_dispatch_reset, is_replace);
+
   if (is_dispatch_reset) {
     dispatch_reset(is_replace);
   }
@@ -815,10 +964,17 @@ IOHandler::close_io(bool is_dispatch_reset, bool is_replace)
   ceph_assert_always(conn_ref);
   conn_ref.reset();
 
-  return shard_states->close(
-  ).then([this] {
+  // cannot be running in parallel with to_new_sid()
+  if (maybe_dropped_sid.has_value()) {
     assert(shard_states->assert_closed_and_exit());
-  });
+    auto prv_sid = *maybe_dropped_sid;
+    return cleanup_prv_shard(prv_sid);
+  } else {
+    return shard_states->close(
+    ).then([this] {
+      assert(shard_states->assert_closed_and_exit());
+    });
+  }
 }
 
 /*
index 07f4c1cb4262a020a5e8d186ee5213b0fe36978d..acb171bde282efe342601299d41e1cd2d4df55ee 100644 (file)
@@ -108,7 +108,7 @@ public:
 /*
  * as ConnectionHandler
  */
-private:
+public:
   seastar::shard_id get_shard_id() const final {
     return shard_states->get_shard_id();
   }
@@ -196,9 +196,11 @@ public:
 
   void requeue_out_sent();
 
-  void dispatch_accept();
+  seastar::future<> dispatch_accept(
+      seastar::shard_id new_sid, ConnectionFRef);
 
-  void dispatch_connect();
+  seastar::future<> dispatch_connect(
+      seastar::shard_id new_sid, ConnectionFRef);
 
  private:
   class shard_states_t;
@@ -339,6 +341,8 @@ public:
     return shard_states->get_io_state();
   }
 
+  void assign_frame_assembler(FrameAssemblerV2Ref);
+
   seastar::future<> send_redirected(MessageFRef msg);
 
   seastar::future<> do_send(MessageFRef msg);
@@ -347,6 +351,9 @@ public:
 
   seastar::future<> do_send_keepalive();
 
+  seastar::future<> to_new_sid(
+      seastar::shard_id new_sid, ConnectionFRef);
+
   void dispatch_reset(bool is_replace);
 
   void dispatch_remote_reset();
@@ -388,9 +395,17 @@ public:
 
   void do_in_dispatch();
 
+  seastar::future<> cleanup_prv_shard(seastar::shard_id prv_sid);
+
 private:
   shard_states_ref_t shard_states;
 
+  // drop was happening in the previous sid
+  std::optional<seastar::shard_id> maybe_dropped_sid;
+
+  // the remaining states in the previous sid for cleanup, see to_new_sid()
+  shard_states_ref_t maybe_prv_shard_states;
+
   ChainedDispatchers &dispatchers;
 
   SocketConnection &conn;