]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: preserve the ordering upon the calls to Connection::send()/keepalive()
authorYingxin Cheng <yingxin.cheng@intel.com>
Mon, 30 Oct 2023 02:00:57 +0000 (10:00 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Mon, 20 Nov 2023 02:44:53 +0000 (10:44 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/common/smp_helpers.h
src/crimson/net/Connection.h
src/crimson/net/ProtocolV2.h
src/crimson/net/SocketConnection.cc
src/crimson/net/SocketConnection.h
src/crimson/net/io_handler.cc
src/crimson/net/io_handler.h

index fad81552d1ff50454b526448dbd7c8d3fab1219f..429c938229bcc96c17d6db82719ea7ceb09ca447 100644 (file)
@@ -98,6 +98,7 @@ auto sharded_map_seq(T &t, F &&f) {
 enum class crosscore_type_t {
   ONE,   // from 1 to 1 core
   ONE_N, // from 1 to n cores
+  N_ONE, // from n to 1 core
 };
 
 /**
@@ -109,7 +110,8 @@ template <crosscore_type_t CTypeValue>
 class smp_crosscore_ordering_t {
   static constexpr bool IS_ONE = (CTypeValue == crosscore_type_t::ONE);
   static constexpr bool IS_ONE_N = (CTypeValue == crosscore_type_t::ONE_N);
-  static_assert(IS_ONE || IS_ONE_N);
+  static constexpr bool IS_N_ONE = (CTypeValue == crosscore_type_t::N_ONE);
+  static_assert(IS_ONE || IS_ONE_N || IS_N_ONE);
 
 public:
   using seq_t = uint64_t;
@@ -117,7 +119,7 @@ public:
   smp_crosscore_ordering_t() requires IS_ONE
     : out_seqs(0) { }
 
-  smp_crosscore_ordering_t() requires IS_ONE_N
+  smp_crosscore_ordering_t() requires (!IS_ONE)
     : out_seqs(seastar::smp::count, 0),
       in_controls(seastar::smp::count) {}
 
@@ -135,6 +137,10 @@ public:
     return do_prepare_submit(out_seqs[target_core]);
   }
 
+  seq_t prepare_submit() requires IS_N_ONE {
+    return do_prepare_submit(out_seqs[seastar::this_shard_id()]);
+  }
+
   /*
    * Called by the target core to preserve the ordering
    */
@@ -147,6 +153,10 @@ public:
     return in_controls[seastar::this_shard_id()].seq;
   }
 
+  seq_t get_in_seq(core_id_t source_core) const requires IS_N_ONE {
+    return in_controls[source_core].seq;
+  }
+
   bool proceed_or_wait(seq_t seq) requires IS_ONE {
     return in_controls.proceed_or_wait(seq);
   }
@@ -155,6 +165,10 @@ public:
     return in_controls[seastar::this_shard_id()].proceed_or_wait(seq);
   }
 
+  bool proceed_or_wait(seq_t seq, core_id_t source_core) requires IS_N_ONE {
+    return in_controls[source_core].proceed_or_wait(seq);
+  }
+
   seastar::future<> wait(seq_t seq) requires IS_ONE {
     return in_controls.wait(seq);
   }
@@ -163,6 +177,16 @@ public:
     return in_controls[seastar::this_shard_id()].wait(seq);
   }
 
+  seastar::future<> wait(seq_t seq, core_id_t source_core) requires IS_N_ONE {
+    return in_controls[source_core].wait(seq);
+  }
+
+  void reset_wait() requires IS_N_ONE {
+    for (auto &in_control : in_controls) {
+      in_control.reset_wait();
+    }
+  }
+
 private:
   struct in_control_t {
     seq_t seq = 0;
@@ -171,10 +195,7 @@ private:
     bool proceed_or_wait(seq_t in_seq) {
       if (in_seq == seq + 1) {
         ++seq;
-        if (unlikely(pr_wait.has_value())) {
-          pr_wait->set_value();
-          pr_wait = std::nullopt;
-        }
+        reset_wait();
         return true;
       } else {
         return false;
@@ -188,6 +209,13 @@ private:
       }
       return pr_wait->get_shared_future();
     }
+
+    void reset_wait() {
+      if (unlikely(pr_wait.has_value())) {
+        pr_wait->set_value();
+        pr_wait = std::nullopt;
+      }
+    }
   };
 
   seq_t do_prepare_submit(seq_t &out_seq) {
index 7141e20f476df7a1461e854d0744142d4a24cb28..41596987b09f545516fd473d6a023f8f17df446b 100644 (file)
@@ -81,8 +81,8 @@ class Connection : public seastar::enable_shared_from_this<Connection> {
    *
    * Send a message over a connection that has completed its handshake.
    *
-   * May be invoked from any core, but that requires to chain the returned
-   * future to preserve ordering.
+   * May be invoked from any core, and the send order will be preserved upon
+   * the call.
    */
   virtual seastar::future<> send(MessageURef msg) = 0;
 
@@ -92,8 +92,8 @@ class Connection : public seastar::enable_shared_from_this<Connection> {
    * Send a keepalive message over a connection that has completed its
    * handshake.
    *
-   * May be invoked from any core, but that requires to chain the returned
-   * future to preserve ordering.
+   * May be invoked from any core, and the send order will be preserved upon
+   * the call.
    */
   virtual seastar::future<> send_keepalive() = 0;
 
index 168d079c8e6d2ec2cfa4e34c72bb5a617567bb66..4262bbbc70cc7b5837d1cddbc596da9bbc9c6f91 100644 (file)
@@ -251,7 +251,7 @@ private:
   // asynchronously populated from io_handler
   io_handler_state io_states;
 
-  crosscore_ordering_t crosscore;
+  proto_crosscore_ordering_t crosscore;
 
   bool has_socket = false;
 
index 57e5c12c1aed433e89df4c3d4b7c15348b226e08..767192682773ebf38e69091dafd42b083103f383 100644 (file)
@@ -79,16 +79,13 @@ bool SocketConnection::peer_wins() const
   return (messenger.get_myaddr() > peer_addr || policy.server);
 }
 
-seastar::future<> SocketConnection::send(MessageURef _msg)
+seastar::future<> SocketConnection::send(MessageURef msg)
 {
-  // may be invoked from any core
-  MessageFRef msg = seastar::make_foreign(std::move(_msg));
   return io_handler->send(std::move(msg));
 }
 
 seastar::future<> SocketConnection::send_keepalive()
 {
-  // may be invoked from any core
   return io_handler->send_keepalive();
 }
 
index 823d6c574dad7b8f7962ffe68cbe500c0b9c443c..7d20f68867e897f2059d3a2c43644b127dd82906 100644 (file)
@@ -54,7 +54,7 @@ public:
 
   virtual bool is_connected() const = 0;
 
-  virtual seastar::future<> send(MessageFRef) = 0;
+  virtual seastar::future<> send(MessageURef) = 0;
 
   virtual seastar::future<> send_keepalive() = 0;
 
index e8a868b4d4c71e8b49cf2c0c15407ffd10a19c8c..b9b0339f9448add80baa11c5d455873e2ea3c97f 100644 (file)
@@ -160,84 +160,132 @@ IOHandler::sweep_out_pending_msgs_to_sent(
 #endif
 }
 
-seastar::future<> IOHandler::send(MessageFRef msg)
+seastar::future<> IOHandler::send(MessageURef _msg)
 {
+  // may be invoked from any core
+  MessageFRef msg = seastar::make_foreign(std::move(_msg));
+  auto cc_seq = io_crosscore.prepare_submit();
+  auto source_core = seastar::this_shard_id();
   // sid may be changed on-the-fly during the submission
-  if (seastar::this_shard_id() == get_shard_id()) {
-    return do_send(std::move(msg));
+  if (source_core == get_shard_id()) {
+    return do_send(cc_seq, source_core, std::move(msg));
   } else {
-    logger().trace("{} send() is directed to {} -- {}",
-                   conn, get_shard_id(), *msg);
+    logger().trace("{} send() {} is directed to core {} -- {}",
+                   conn, cc_seq, get_shard_id(), *msg);
     return seastar::smp::submit_to(
-        get_shard_id(), [this, msg=std::move(msg)]() mutable {
-      return send_redirected(std::move(msg));
+        get_shard_id(),
+        [this, cc_seq, source_core, msg=std::move(msg)]() mutable {
+      return send_recheck_shard(cc_seq, source_core, std::move(msg));
     });
   }
 }
 
-seastar::future<> IOHandler::send_redirected(MessageFRef msg)
+seastar::future<> IOHandler::send_recheck_shard(
+  cc_seq_t cc_seq,
+  core_id_t source_core,
+  MessageFRef msg)
 {
   // sid may be changed on-the-fly during the submission
   if (seastar::this_shard_id() == get_shard_id()) {
-    return do_send(std::move(msg));
+    return do_send(cc_seq, source_core, std::move(msg));
   } else {
-    logger().debug("{} send() is redirected to {} -- {}",
-                   conn, get_shard_id(), *msg);
+    logger().debug("{} send_recheck_shard() {} "
+                   "is redirected from core {} to {} -- {}",
+                   conn, cc_seq, source_core, get_shard_id(), *msg);
     return seastar::smp::submit_to(
-        get_shard_id(), [this, msg=std::move(msg)]() mutable {
-      return send_redirected(std::move(msg));
+        get_shard_id(),
+        [this, cc_seq, source_core, msg=std::move(msg)]() mutable {
+      return send_recheck_shard(cc_seq, source_core, std::move(msg));
     });
   }
 }
 
-seastar::future<> IOHandler::do_send(MessageFRef msg)
+seastar::future<> IOHandler::do_send(
+  cc_seq_t cc_seq,
+  core_id_t source_core,
+  MessageFRef msg)
 {
   assert(seastar::this_shard_id() == get_shard_id());
-  logger().trace("{} do_send() got message -- {}", conn, *msg);
-  if (get_io_state() != io_state_t::drop) {
-    out_pending_msgs.push_back(std::move(msg));
-    notify_out_dispatch();
+  if (io_crosscore.proceed_or_wait(cc_seq, source_core)) {
+    logger().trace("{} do_send() got {} from core {}: send message -- {}",
+                   conn, cc_seq, source_core, *msg);
+    if (get_io_state() != io_state_t::drop) {
+      out_pending_msgs.push_back(std::move(msg));
+      notify_out_dispatch();
+    }
+    return seastar::now();
+  } else {
+    logger().debug("{} do_send() got {} from core {}, wait at {} -- {}",
+                   conn, cc_seq, source_core,
+                   io_crosscore.get_in_seq(source_core),
+                   *msg);
+    return io_crosscore.wait(cc_seq, source_core
+    ).then([this, cc_seq, source_core, msg=std::move(msg)]() mutable {
+      return send_recheck_shard(cc_seq, source_core, std::move(msg));
+    });
   }
-  return seastar::now();
 }
 
 seastar::future<> IOHandler::send_keepalive()
 {
+  // may be invoked from any core
+  auto cc_seq = io_crosscore.prepare_submit();
+  auto source_core = seastar::this_shard_id();
   // sid may be changed on-the-fly during the submission
-  if (seastar::this_shard_id() == get_shard_id()) {
-    return do_send_keepalive();
+  if (source_core == get_shard_id()) {
+    return do_send_keepalive(cc_seq, source_core);
   } else {
-    logger().trace("{} send_keepalive() is directed to {}", conn, get_shard_id());
+    logger().trace("{} send_keepalive() {} is directed to core {}",
+                   conn, cc_seq, get_shard_id());
     return seastar::smp::submit_to(
-        get_shard_id(), [this] {
-      return send_keepalive_redirected();
+        get_shard_id(),
+        [this, cc_seq, source_core] {
+      return send_keepalive_recheck_shard(cc_seq, source_core);
     });
   }
 }
 
-seastar::future<> IOHandler::send_keepalive_redirected()
+seastar::future<> IOHandler::send_keepalive_recheck_shard(
+  cc_seq_t cc_seq,
+  core_id_t source_core)
 {
   // sid may be changed on-the-fly during the submission
   if (seastar::this_shard_id() == get_shard_id()) {
-    return do_send_keepalive();
+    return do_send_keepalive(cc_seq, source_core);
   } else {
-    logger().debug("{} send_keepalive() is redirected to {}", conn, get_shard_id());
+    logger().debug("{} send_keepalive_recheck_shard() {} "
+                   "is redirected from core {} to {}",
+                   conn, cc_seq, source_core, get_shard_id());
     return seastar::smp::submit_to(
-        get_shard_id(), [this] {
-      return send_keepalive_redirected();
+        get_shard_id(),
+        [this, cc_seq, source_core] {
+      return send_keepalive_recheck_shard(cc_seq, source_core);
     });
   }
 }
 
-seastar::future<> IOHandler::do_send_keepalive()
+seastar::future<> IOHandler::do_send_keepalive(
+  cc_seq_t cc_seq,
+  core_id_t source_core)
 {
   assert(seastar::this_shard_id() == get_shard_id());
-  logger().trace("{} do_send_keeplive(): need_keepalive={}", conn, need_keepalive);
-  if (!need_keepalive) {
-    need_keepalive = true;
-    notify_out_dispatch();
+  if (io_crosscore.proceed_or_wait(cc_seq, source_core)) {
+    logger().trace("{} do_send_keeplive() got {} from core {}: need_keepalive={}",
+                   conn, cc_seq, source_core, need_keepalive);
+    if (!need_keepalive) {
+      need_keepalive = true;
+      notify_out_dispatch();
+    }
+    return seastar::now();
+  } else {
+    logger().debug("{} do_send_keepalive() got {} from core {}, wait at {}",
+                   conn, cc_seq, source_core,
+                   io_crosscore.get_in_seq(source_core));
+    return io_crosscore.wait(cc_seq, source_core
+    ).then([this, cc_seq, source_core] {
+      return send_keepalive_recheck_shard(cc_seq, source_core);
+    });
   }
-  return seastar::now();
 }
 
 void IOHandler::mark_down()
@@ -249,7 +297,7 @@ void IOHandler::mark_down()
     return;
   }
 
-  auto cc_seq = crosscore.prepare_submit();
+  auto cc_seq = proto_crosscore.prepare_submit();
   logger().info("{} mark_down() at {}, send {} notify_mark_down()",
                 conn, io_stat_printer{*this}, cc_seq);
   do_set_io_state(io_state_t::drop);
@@ -369,10 +417,10 @@ seastar::future<> IOHandler::set_io_state(
     bool set_notify_out)
 {
   assert(seastar::this_shard_id() == get_shard_id());
-  if (!crosscore.proceed_or_wait(cc_seq)) {
+  if (!proto_crosscore.proceed_or_wait(cc_seq)) {
     logger().debug("{} got {} set_io_state(), wait at {}",
-                   conn, cc_seq, crosscore.get_in_seq());
-    return crosscore.wait(cc_seq
+                   conn, cc_seq, proto_crosscore.get_in_seq());
+    return proto_crosscore.wait(cc_seq
     ).then([this, cc_seq, new_state,
             fa=std::move(fa), set_notify_out]() mutable {
       return set_io_state(cc_seq, new_state, std::move(fa), set_notify_out);
@@ -388,10 +436,10 @@ IOHandler::wait_io_exit_dispatching(
     cc_seq_t cc_seq)
 {
   assert(seastar::this_shard_id() == get_shard_id());
-  if (!crosscore.proceed_or_wait(cc_seq)) {
+  if (!proto_crosscore.proceed_or_wait(cc_seq)) {
     logger().debug("{} got {} wait_io_exit_dispatching(), wait at {}",
-                   conn, cc_seq, crosscore.get_in_seq());
-    return crosscore.wait(cc_seq
+                   conn, cc_seq, proto_crosscore.get_in_seq());
+    return proto_crosscore.wait(cc_seq
     ).then([this, cc_seq] {
       return wait_io_exit_dispatching(cc_seq);
     });
@@ -433,10 +481,10 @@ seastar::future<> IOHandler::reset_session(
     bool full)
 {
   assert(seastar::this_shard_id() == get_shard_id());
-  if (!crosscore.proceed_or_wait(cc_seq)) {
+  if (!proto_crosscore.proceed_or_wait(cc_seq)) {
     logger().debug("{} got {} reset_session(), wait at {}",
-                   conn, cc_seq, crosscore.get_in_seq());
-    return crosscore.wait(cc_seq
+                   conn, cc_seq, proto_crosscore.get_in_seq());
+    return proto_crosscore.wait(cc_seq
     ).then([this, cc_seq, full] {
       return reset_session(cc_seq, full);
     });
@@ -457,10 +505,10 @@ seastar::future<> IOHandler::reset_peer_state(
     cc_seq_t cc_seq)
 {
   assert(seastar::this_shard_id() == get_shard_id());
-  if (!crosscore.proceed_or_wait(cc_seq)) {
+  if (!proto_crosscore.proceed_or_wait(cc_seq)) {
     logger().debug("{} got {} reset_peer_state(), wait at {}",
-                   conn, cc_seq, crosscore.get_in_seq());
-    return crosscore.wait(cc_seq
+                   conn, cc_seq, proto_crosscore.get_in_seq());
+    return proto_crosscore.wait(cc_seq
     ).then([this, cc_seq] {
       return reset_peer_state(cc_seq);
     });
@@ -479,10 +527,10 @@ seastar::future<> IOHandler::requeue_out_sent(
     cc_seq_t cc_seq)
 {
   assert(seastar::this_shard_id() == get_shard_id());
-  if (!crosscore.proceed_or_wait(cc_seq)) {
+  if (!proto_crosscore.proceed_or_wait(cc_seq)) {
     logger().debug("{} got {} requeue_out_sent(), wait at {}",
-                   conn, cc_seq, crosscore.get_in_seq());
-    return crosscore.wait(cc_seq
+                   conn, cc_seq, proto_crosscore.get_in_seq());
+    return proto_crosscore.wait(cc_seq
     ).then([this, cc_seq] {
       return requeue_out_sent(cc_seq);
     });
@@ -521,10 +569,10 @@ seastar::future<> IOHandler::requeue_out_sent_up_to(
     seq_num_t msg_seq)
 {
   assert(seastar::this_shard_id() == get_shard_id());
-  if (!crosscore.proceed_or_wait(cc_seq)) {
+  if (!proto_crosscore.proceed_or_wait(cc_seq)) {
     logger().debug("{} got {} requeue_out_sent_up_to(), wait at {}",
-                   conn, cc_seq, crosscore.get_in_seq());
-    return crosscore.wait(cc_seq
+                   conn, cc_seq, proto_crosscore.get_in_seq());
+    return proto_crosscore.wait(cc_seq
     ).then([this, cc_seq, msg_seq] {
       return requeue_out_sent_up_to(cc_seq, msg_seq);
     });
@@ -626,10 +674,10 @@ IOHandler::to_new_sid(
     std::optional<bool> is_replace)
 {
   ceph_assert_always(seastar::this_shard_id() == get_shard_id());
-  if (!crosscore.proceed_or_wait(cc_seq)) {
+  if (!proto_crosscore.proceed_or_wait(cc_seq)) {
     logger().debug("{} got {} to_new_sid(), wait at {}",
-                   conn, cc_seq, crosscore.get_in_seq());
-    return crosscore.wait(cc_seq
+                   conn, cc_seq, proto_crosscore.get_in_seq());
+    return proto_crosscore.wait(cc_seq
     ).then([this, cc_seq, new_sid, is_replace,
             conn_fref=std::move(conn_fref)]() mutable {
       return to_new_sid(cc_seq, new_sid, std::move(conn_fref), is_replace);
@@ -685,6 +733,8 @@ IOHandler::to_new_sid(
   shard_states = shard_states_t::create_from_previous(
       *maybe_prv_shard_states, new_sid);
   assert(new_sid == get_shard_id());
+  // broadcast shard change to all the io waiters, atomically.
+  io_crosscore.reset_wait();
 
   return seastar::smp::submit_to(new_sid,
       [this, next_cc_seq, is_dropped, prv_sid, is_replace, conn_fref=std::move(conn_fref)]() mutable {
@@ -699,7 +749,7 @@ IOHandler::to_new_sid(
     ceph_assert_always(seastar::this_shard_id() == get_shard_id());
     ceph_assert_always(get_io_state() != io_state_t::open);
     ceph_assert_always(!maybe_dropped_sid.has_value());
-    ceph_assert_always(crosscore.proceed_or_wait(next_cc_seq));
+    ceph_assert_always(proto_crosscore.proceed_or_wait(next_cc_seq));
 
     if (is_dropped) {
       ceph_assert_always(get_io_state() == io_state_t::drop);
@@ -749,7 +799,7 @@ seastar::future<> IOHandler::set_accepted_sid(
   return seastar::smp::submit_to(sid,
       [this, cc_seq, conn_fref=std::move(conn_fref)]() mutable {
     // must be the first to proceed
-    ceph_assert_always(crosscore.proceed_or_wait(cc_seq));
+    ceph_assert_always(proto_crosscore.proceed_or_wait(cc_seq));
 
     logger().debug("{} set accepted sid", conn);
     ceph_assert_always(seastar::this_shard_id() == get_shard_id());
@@ -875,7 +925,7 @@ IOHandler::do_out_dispatch(shard_states_t &ctx)
     }
 
     if (io_state == io_state_t::open) {
-      auto cc_seq = crosscore.prepare_submit();
+      auto cc_seq = proto_crosscore.prepare_submit();
       logger().info("{} do_out_dispatch(): fault at {}, {}, going to delay -- {}, "
                     "send {} notify_out_fault()",
                     conn, io_state, io_stat_printer{*this}, e.what(), cc_seq);
@@ -922,7 +972,7 @@ void IOHandler::notify_out_dispatch()
   ceph_assert_always(seastar::this_shard_id() == get_shard_id());
   assert(is_out_queued());
   if (need_notify_out) {
-    auto cc_seq = crosscore.prepare_submit();
+    auto cc_seq = proto_crosscore.prepare_submit();
     logger().debug("{} send {} notify_out()",
                    conn, cc_seq);
     shard_states->dispatch_in_background(
@@ -1152,7 +1202,7 @@ void IOHandler::do_in_dispatch()
 
       auto io_state = ctx.get_io_state();
       if (io_state == io_state_t::open) {
-        auto cc_seq = crosscore.prepare_submit();
+        auto cc_seq = proto_crosscore.prepare_submit();
         logger().info("{} do_in_dispatch(): fault at {}, {}, going to delay -- {}, "
                       "send {} notify_out_fault()",
                       conn, io_state, io_stat_printer{*this}, e_what, cc_seq);
@@ -1188,10 +1238,10 @@ IOHandler::close_io(
     bool is_replace)
 {
   ceph_assert_always(seastar::this_shard_id() == get_shard_id());
-  if (!crosscore.proceed_or_wait(cc_seq)) {
+  if (!proto_crosscore.proceed_or_wait(cc_seq)) {
     logger().debug("{} got {} close_io(), wait at {}",
-                   conn, cc_seq, crosscore.get_in_seq());
-    return crosscore.wait(cc_seq
+                   conn, cc_seq, proto_crosscore.get_in_seq());
+    return proto_crosscore.wait(cc_seq
     ).then([this, cc_seq, is_dispatch_reset, is_replace] {
       return close_io(cc_seq, is_dispatch_reset, is_replace);
     });
index f0f0ba0ae62e4e82f57194cc5fe7f99acd76c010..8b88e2f5a254a469d4013854a4cf39ad52a932b0 100644 (file)
@@ -70,8 +70,8 @@ struct io_handler_state {
  */
 class HandshakeListener {
 public:
-  using crosscore_ordering_t = smp_crosscore_ordering_t<crosscore_type_t::ONE>;
-  using cc_seq_t = crosscore_ordering_t::seq_t;
+  using proto_crosscore_ordering_t = smp_crosscore_ordering_t<crosscore_type_t::ONE>;
+  using cc_seq_t = proto_crosscore_ordering_t::seq_t;
 
   virtual ~HandshakeListener() = default;
 
@@ -105,8 +105,9 @@ protected:
  */
 class IOHandler final : public ConnectionHandler {
 public:
-  using crosscore_ordering_t = smp_crosscore_ordering_t<crosscore_type_t::ONE>;
-  using cc_seq_t = crosscore_ordering_t::seq_t;
+  using io_crosscore_ordering_t = smp_crosscore_ordering_t<crosscore_type_t::N_ONE>;
+  using proto_crosscore_ordering_t = smp_crosscore_ordering_t<crosscore_type_t::ONE>;
+  using cc_seq_t = proto_crosscore_ordering_t::seq_t;
 
   IOHandler(ChainedDispatchers &,
             SocketConnection &);
@@ -131,7 +132,7 @@ public:
     return protocol_is_connected;
   }
 
-  seastar::future<> send(MessageFRef msg) final;
+  seastar::future<> send(MessageURef msg) final;
 
   seastar::future<> send_keepalive() final;
 
@@ -398,13 +399,13 @@ public:
 
   void assign_frame_assembler(FrameAssemblerV2Ref);
 
-  seastar::future<> send_redirected(MessageFRef msg);
+  seastar::future<> send_recheck_shard(cc_seq_t, core_id_t, MessageFRef);
 
-  seastar::future<> do_send(MessageFRef msg);
+  seastar::future<> do_send(cc_seq_t, core_id_t, MessageFRef);
 
-  seastar::future<> send_keepalive_redirected();
+  seastar::future<> send_keepalive_recheck_shard(cc_seq_t, core_id_t);
 
-  seastar::future<> do_send_keepalive();
+  seastar::future<> do_send_keepalive(cc_seq_t, core_id_t);
 
   seastar::future<> to_new_sid(
       cc_seq_t cc_seq,
@@ -467,7 +468,9 @@ public:
 private:
   shard_states_ref_t shard_states;
 
-  crosscore_ordering_t crosscore;
+  proto_crosscore_ordering_t proto_crosscore;
+
+  io_crosscore_ordering_t io_crosscore;
 
   // drop was happening in the previous sid
   std::optional<seastar::shard_id> maybe_dropped_sid;