]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/common/smp_helpers: generalize crosscore_ordering_t
authorYingxin Cheng <yingxin.cheng@intel.com>
Tue, 24 Oct 2023 07:26:21 +0000 (15:26 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Mon, 20 Nov 2023 02:44:53 +0000 (10:44 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/common/smp_helpers.h
src/crimson/net/ProtocolV2.cc
src/crimson/net/ProtocolV2.h
src/crimson/net/io_handler.cc
src/crimson/net/io_handler.h
src/crimson/osd/osd_connection_priv.h
src/crimson/osd/pg_shard_manager.h

index c2b7bd9641a77d33390f50b991c24d536370d38d..fad81552d1ff50454b526448dbd7c8d3fab1219f 100644 (file)
@@ -3,10 +3,16 @@
 
 #pragma once
 
+#include <concepts>
 #include <limits>
+#include <optional>
+#include <type_traits>
+#include <vector>
 
+#include <seastar/core/shared_future.hh>
 #include <seastar/core/smp.hh>
 
+#include "common/likely.h"
 #include "crimson/common/errorator.h"
 #include "crimson/common/utility.h"
 
@@ -89,4 +95,114 @@ auto sharded_map_seq(T &t, F &&f) {
     });
 }
 
-}
+enum class crosscore_type_t {
+  ONE,   // from 1 to 1 core
+  ONE_N, // from 1 to n cores
+};
+
+/**
+ * smp_crosscore_ordering_t
+ *
+ * To preserve the event order from source to target core(s).
+ */
+template <crosscore_type_t CTypeValue>
+class smp_crosscore_ordering_t {
+  static constexpr bool IS_ONE = (CTypeValue == crosscore_type_t::ONE);
+  static constexpr bool IS_ONE_N = (CTypeValue == crosscore_type_t::ONE_N);
+  static_assert(IS_ONE || IS_ONE_N);
+
+public:
+  using seq_t = uint64_t;
+
+  smp_crosscore_ordering_t() requires IS_ONE
+    : out_seqs(0) { }
+
+  smp_crosscore_ordering_t() requires IS_ONE_N
+    : out_seqs(seastar::smp::count, 0),
+      in_controls(seastar::smp::count) {}
+
+  ~smp_crosscore_ordering_t() = default;
+
+  /*
+   * Called by the original core to get the ordering sequence
+   */
+
+  seq_t prepare_submit() requires IS_ONE {
+    return do_prepare_submit(out_seqs);
+  }
+
+  seq_t prepare_submit(core_id_t target_core) requires IS_ONE_N {
+    return do_prepare_submit(out_seqs[target_core]);
+  }
+
+  /*
+   * Called by the target core to preserve the ordering
+   */
+
+  seq_t get_in_seq() const requires IS_ONE {
+    return in_controls.seq;
+  }
+
+  seq_t get_in_seq() const requires IS_ONE_N {
+    return in_controls[seastar::this_shard_id()].seq;
+  }
+
+  bool proceed_or_wait(seq_t seq) requires IS_ONE {
+    return in_controls.proceed_or_wait(seq);
+  }
+
+  bool proceed_or_wait(seq_t seq) requires IS_ONE_N {
+    return in_controls[seastar::this_shard_id()].proceed_or_wait(seq);
+  }
+
+  seastar::future<> wait(seq_t seq) requires IS_ONE {
+    return in_controls.wait(seq);
+  }
+
+  seastar::future<> wait(seq_t seq) requires IS_ONE_N {
+    return in_controls[seastar::this_shard_id()].wait(seq);
+  }
+
+private:
+  struct in_control_t {
+    seq_t seq = 0;
+    std::optional<seastar::shared_promise<>> pr_wait;
+
+    bool proceed_or_wait(seq_t in_seq) {
+      if (in_seq == seq + 1) {
+        ++seq;
+        if (unlikely(pr_wait.has_value())) {
+          pr_wait->set_value();
+          pr_wait = std::nullopt;
+        }
+        return true;
+      } else {
+        return false;
+      }
+    }
+
+    seastar::future<> wait(seq_t in_seq) {
+      assert(in_seq != seq + 1);
+      if (!pr_wait.has_value()) {
+        pr_wait = seastar::shared_promise<>();
+      }
+      return pr_wait->get_shared_future();
+    }
+  };
+
+  seq_t do_prepare_submit(seq_t &out_seq) {
+    return ++out_seq;
+  }
+
+  std::conditional_t<
+    IS_ONE,
+    seq_t, std::vector<seq_t>
+  > out_seqs;
+
+  std::conditional_t<
+    IS_ONE,
+    in_control_t, std::vector<in_control_t>
+  > in_controls;
+};
+
+} // namespace crimson
index 55b669384ed3bfd588e97192b00030217c48e4df..d4ef3881c408a89e79b4ebb79fee21b230c89d2c 100644 (file)
@@ -2073,7 +2073,7 @@ void ProtocolV2::trigger_replacing(bool reconnect,
 // READY state
 
 seastar::future<> ProtocolV2::notify_out_fault(
-    crosscore_t::seq_t cc_seq,
+    cc_seq_t cc_seq,
     const char *where,
     std::exception_ptr eptr,
     io_handler_state _io_states)
@@ -2121,7 +2121,7 @@ void ProtocolV2::execute_standby()
 }
 
 seastar::future<> ProtocolV2::notify_out(
-    crosscore_t::seq_t cc_seq)
+    cc_seq_t cc_seq)
 {
   assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
   if (!crosscore.proceed_or_wait(cc_seq)) {
@@ -2210,7 +2210,7 @@ void ProtocolV2::execute_server_wait()
 // CLOSING state
 
 seastar::future<> ProtocolV2::notify_mark_down(
-    crosscore_t::seq_t cc_seq)
+    cc_seq_t cc_seq)
 {
   assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
   if (!crosscore.proceed_or_wait(cc_seq)) {
index dd7a1e7039b519e0c06dbd2a9feae13605de5393..168d079c8e6d2ec2cfa4e34c72bb5a617567bb66 100644 (file)
@@ -29,16 +29,16 @@ public:
  */
 private:
   seastar::future<> notify_out(
-      crosscore_t::seq_t cc_seq) final;
+      cc_seq_t cc_seq) final;
 
   seastar::future<> notify_out_fault(
-      crosscore_t::seq_t cc_seq,
+      cc_seq_t cc_seq,
       const char *where,
       std::exception_ptr,
       io_handler_state) final;
 
   seastar::future<> notify_mark_down(
-      crosscore_t::seq_t cc_seq) final;
+      cc_seq_t cc_seq) final;
 
 /*
 * as ProtocolV2 to be called by SocketConnection
@@ -251,7 +251,7 @@ private:
   // asynchronously populated from io_handler
   io_handler_state io_states;
 
-  crosscore_t crosscore;
+  crosscore_ordering_t crosscore;
 
   bool has_socket = false;
 
index c414c48e12f8e89f63d5fcb8a736bc4d86e587f9..e8a868b4d4c71e8b49cf2c0c15407ffd10a19c8c 100644 (file)
@@ -292,7 +292,7 @@ void IOHandler::assign_frame_assembler(FrameAssemblerV2Ref fa)
 
 void IOHandler::do_set_io_state(
     io_state_t new_state,
-    std::optional<crosscore_t::seq_t> cc_seq,
+    std::optional<cc_seq_t> cc_seq,
     FrameAssemblerV2Ref fa,
     bool set_notify_out)
 {
@@ -363,7 +363,7 @@ void IOHandler::do_set_io_state(
 }
 
 seastar::future<> IOHandler::set_io_state(
-    crosscore_t::seq_t cc_seq,
+    cc_seq_t cc_seq,
     io_state_t new_state,
     FrameAssemblerV2Ref fa,
     bool set_notify_out)
@@ -385,7 +385,7 @@ seastar::future<> IOHandler::set_io_state(
 
 seastar::future<IOHandler::exit_dispatching_ret>
 IOHandler::wait_io_exit_dispatching(
-    crosscore_t::seq_t cc_seq)
+    cc_seq_t cc_seq)
 {
   assert(seastar::this_shard_id() == get_shard_id());
   if (!crosscore.proceed_or_wait(cc_seq)) {
@@ -429,7 +429,7 @@ IOHandler::wait_io_exit_dispatching(
 }
 
 seastar::future<> IOHandler::reset_session(
-    crosscore_t::seq_t cc_seq,
+    cc_seq_t cc_seq,
     bool full)
 {
   assert(seastar::this_shard_id() == get_shard_id());
@@ -454,7 +454,7 @@ seastar::future<> IOHandler::reset_session(
 }
 
 seastar::future<> IOHandler::reset_peer_state(
-    crosscore_t::seq_t cc_seq)
+    cc_seq_t cc_seq)
 {
   assert(seastar::this_shard_id() == get_shard_id());
   if (!crosscore.proceed_or_wait(cc_seq)) {
@@ -476,7 +476,7 @@ seastar::future<> IOHandler::reset_peer_state(
 }
 
 seastar::future<> IOHandler::requeue_out_sent(
-    crosscore_t::seq_t cc_seq)
+    cc_seq_t cc_seq)
 {
   assert(seastar::this_shard_id() == get_shard_id());
   if (!crosscore.proceed_or_wait(cc_seq)) {
@@ -517,7 +517,7 @@ void IOHandler::do_requeue_out_sent()
 }
 
 seastar::future<> IOHandler::requeue_out_sent_up_to(
-    crosscore_t::seq_t cc_seq,
+    cc_seq_t cc_seq,
     seq_num_t msg_seq)
 {
   assert(seastar::this_shard_id() == get_shard_id());
@@ -583,7 +583,7 @@ void IOHandler::discard_out_sent()
 
 seastar::future<>
 IOHandler::dispatch_accept(
-    crosscore_t::seq_t cc_seq,
+    cc_seq_t cc_seq,
     seastar::shard_id new_sid,
     ConnectionFRef conn_fref,
     bool is_replace)
@@ -593,7 +593,7 @@ IOHandler::dispatch_accept(
 
 seastar::future<>
 IOHandler::dispatch_connect(
-    crosscore_t::seq_t cc_seq,
+    cc_seq_t cc_seq,
     seastar::shard_id new_sid,
     ConnectionFRef conn_fref)
 {
@@ -620,7 +620,7 @@ IOHandler::cleanup_prv_shard(seastar::shard_id prv_sid)
 
 seastar::future<>
 IOHandler::to_new_sid(
-    crosscore_t::seq_t cc_seq,
+    cc_seq_t cc_seq,
     seastar::shard_id new_sid,
     ConnectionFRef conn_fref,
     std::optional<bool> is_replace)
@@ -735,7 +735,7 @@ IOHandler::to_new_sid(
 }
 
 seastar::future<> IOHandler::set_accepted_sid(
-    crosscore_t::seq_t cc_seq,
+    cc_seq_t cc_seq,
     seastar::shard_id sid,
     ConnectionFRef conn_fref)
 {
@@ -1183,7 +1183,7 @@ void IOHandler::do_in_dispatch()
 
 seastar::future<>
 IOHandler::close_io(
-    crosscore_t::seq_t cc_seq,
+    cc_seq_t cc_seq,
     bool is_dispatch_reset,
     bool is_replace)
 {
index f53c2ba646847e4706a925838322b4627fe7a97d..f0f0ba0ae62e4e82f57194cc5fe7f99acd76c010 100644 (file)
@@ -5,64 +5,16 @@
 
 #include <vector>
 
-#include <seastar/core/shared_future.hh>
 #include <seastar/util/later.hh>
 
 #include "crimson/common/gated.h"
+#include "crimson/common/smp_helpers.h"
 #include "Fwd.h"
 #include "SocketConnection.h"
 #include "FrameAssemblerV2.h"
 
 namespace crimson::net {
 
-/**
- * crosscore_t
- *
- * To preserve the event order across cores.
- */
-class crosscore_t {
-public:
-  using seq_t = uint64_t;
-
-  crosscore_t() = default;
-  ~crosscore_t() = default;
-
-  seq_t get_in_seq() const {
-    return in_seq;
-  }
-
-  seq_t prepare_submit() {
-    ++out_seq;
-    return out_seq;
-  }
-
-  bool proceed_or_wait(seq_t seq) {
-    if (seq == in_seq + 1) {
-      ++in_seq;
-      if (unlikely(in_pr_wait.has_value())) {
-        in_pr_wait->set_value();
-        in_pr_wait = std::nullopt;
-      }
-      return true;
-    } else {
-      return false;
-    }
-  }
-
-  seastar::future<> wait(seq_t seq) {
-    assert(seq != in_seq + 1);
-    if (!in_pr_wait.has_value()) {
-      in_pr_wait = seastar::shared_promise<>();
-    }
-    return in_pr_wait->get_shared_future();
-  }
-
-private:
-  seq_t out_seq = 0;
-  seq_t in_seq = 0;
-  std::optional<seastar::shared_promise<>> in_pr_wait;
-};
-
 /**
  * io_handler_state
  *
@@ -118,6 +70,9 @@ struct io_handler_state {
  */
 class HandshakeListener {
 public:
+  using crosscore_ordering_t = smp_crosscore_ordering_t<crosscore_type_t::ONE>;
+  using cc_seq_t = crosscore_ordering_t::seq_t;
+
   virtual ~HandshakeListener() = default;
 
   HandshakeListener(const HandshakeListener&) = delete;
@@ -126,16 +81,16 @@ public:
   HandshakeListener &operator=(HandshakeListener &&) = delete;
 
   virtual seastar::future<> notify_out(
-      crosscore_t::seq_t cc_seq) = 0;
+      cc_seq_t cc_seq) = 0;
 
   virtual seastar::future<> notify_out_fault(
-      crosscore_t::seq_t cc_seq,
+      cc_seq_t cc_seq,
       const char *where,
       std::exception_ptr,
       io_handler_state) = 0;
 
   virtual seastar::future<> notify_mark_down(
-      crosscore_t::seq_t cc_seq) = 0;
+      cc_seq_t cc_seq) = 0;
 
 protected:
   HandshakeListener() = default;
@@ -150,6 +105,9 @@ protected:
  */
 class IOHandler final : public ConnectionHandler {
 public:
+  using crosscore_ordering_t = smp_crosscore_ordering_t<crosscore_type_t::ONE>;
+  using cc_seq_t = crosscore_ordering_t::seq_t;
+
   IOHandler(ChainedDispatchers &,
             SocketConnection &);
 
@@ -221,7 +179,7 @@ public:
   void print_io_stat(std::ostream &out) const;
 
   seastar::future<> set_accepted_sid(
-      crosscore_t::seq_t cc_seq,
+      cc_seq_t cc_seq,
       seastar::shard_id sid,
       ConnectionFRef conn_fref);
 
@@ -230,7 +188,7 @@ public:
    */
 
   seastar::future<> close_io(
-      crosscore_t::seq_t cc_seq,
+      cc_seq_t cc_seq,
       bool is_dispatch_reset,
       bool is_replace);
 
@@ -251,7 +209,7 @@ public:
   friend class fmt::formatter<io_state_t>;
 
   seastar::future<> set_io_state(
-      crosscore_t::seq_t cc_seq,
+      cc_seq_t cc_seq,
       io_state_t new_state,
       FrameAssemblerV2Ref fa,
       bool set_notify_out);
@@ -262,30 +220,30 @@ public:
   };
   seastar::future<exit_dispatching_ret>
   wait_io_exit_dispatching(
-      crosscore_t::seq_t cc_seq);
+      cc_seq_t cc_seq);
 
   seastar::future<> reset_session(
-      crosscore_t::seq_t cc_seq,
+      cc_seq_t cc_seq,
       bool full);
 
   seastar::future<> reset_peer_state(
-      crosscore_t::seq_t cc_seq);
+      cc_seq_t cc_seq);
 
   seastar::future<> requeue_out_sent_up_to(
-      crosscore_t::seq_t cc_seq,
+      cc_seq_t cc_seq,
       seq_num_t msg_seq);
 
   seastar::future<> requeue_out_sent(
-      crosscore_t::seq_t cc_seq);
+      cc_seq_t cc_seq);
 
   seastar::future<> dispatch_accept(
-      crosscore_t::seq_t cc_seq,
+      cc_seq_t cc_seq,
       seastar::shard_id new_sid,
       ConnectionFRef,
       bool is_replace);
 
   seastar::future<> dispatch_connect(
-      crosscore_t::seq_t cc_seq,
+      cc_seq_t cc_seq,
       seastar::shard_id new_sid,
       ConnectionFRef);
 
@@ -426,7 +384,7 @@ public:
 
   void do_set_io_state(
       io_state_t new_state,
-      std::optional<crosscore_t::seq_t> cc_seq = std::nullopt,
+      std::optional<cc_seq_t> cc_seq = std::nullopt,
       FrameAssemblerV2Ref fa = nullptr,
       bool set_notify_out = false);
 
@@ -449,7 +407,7 @@ public:
   seastar::future<> do_send_keepalive();
 
   seastar::future<> to_new_sid(
-      crosscore_t::seq_t cc_seq,
+      cc_seq_t cc_seq,
       seastar::shard_id new_sid,
       ConnectionFRef,
       std::optional<bool> is_replace);
@@ -509,7 +467,7 @@ public:
 private:
   shard_states_ref_t shard_states;
 
-  crosscore_t crosscore;
+  crosscore_ordering_t crosscore;
 
   // drop was happening in the previous sid
   std::optional<seastar::shard_id> maybe_dropped_sid;
index 2d2a459017bb7901c2588d15a650416542e5c899..3c7d085c06e97fcdc72c8d8774ddc265eb1f8d16 100644 (file)
@@ -3,8 +3,7 @@
 
 #pragma once
 
-#include <seastar/core/smp.hh>
-
+#include "crimson/common/smp_helpers.h"
 #include "crimson/net/Connection.h"
 #include "crimson/osd/osd_operation.h"
 #include "crimson/osd/osd_operations/client_request.h"
 
 namespace crimson::osd {
 
-/**
- * crosscore_ordering_t
- *
- * To preserve the event order from 1 source to n target cores.
- */
-class crosscore_ordering_t {
-public:
-  using seq_t = uint64_t;
-
-  crosscore_ordering_t()
-    : out_seqs(seastar::smp::count, 0),
-      in_controls(seastar::smp::count) {}
-
-  ~crosscore_ordering_t() = default;
-
-  // Called by the original core to get the ordering sequence
-  seq_t prepare_submit(core_id_t target_core) {
-    auto &out_seq = out_seqs[target_core];
-    ++out_seq;
-    return out_seq;
-  }
-
-  /*
-   * Called by the target core to preserve the ordering
-   */
-
-  seq_t get_in_seq() const {
-    auto core = seastar::this_shard_id();
-    return in_controls[core].seq;
-  }
-
-  bool proceed_or_wait(seq_t seq) {
-    auto core = seastar::this_shard_id();
-    auto &in_control = in_controls[core];
-    if (seq == in_control.seq + 1) {
-      ++in_control.seq;
-      if (unlikely(in_control.pr_wait.has_value())) {
-        in_control.pr_wait->set_value();
-        in_control.pr_wait = std::nullopt;
-      }
-      return true;
-    } else {
-      return false;
-    }
-  }
-
-  seastar::future<> wait(seq_t seq) {
-    auto core = seastar::this_shard_id();
-    auto &in_control = in_controls[core];
-    assert(seq != in_control.seq + 1);
-    if (!in_control.pr_wait.has_value()) {
-      in_control.pr_wait = seastar::shared_promise<>();
-    }
-    return in_control.pr_wait->get_shared_future();
-  }
-
-private:
-  struct in_control_t {
-    seq_t seq = 0;
-    std::optional<seastar::shared_promise<>> pr_wait;
-  };
-
-  // source-side
-  std::vector<seq_t> out_seqs;
-  // target-side
-  std::vector<in_control_t> in_controls;
-};
-
 struct OSDConnectionPriv : public crimson::net::Connection::user_private_t {
+  using crosscore_ordering_t = smp_crosscore_ordering_t<crosscore_type_t::ONE_N>;
+
   ConnectionPipeline client_request_conn_pipeline;
   ConnectionPipeline peering_request_conn_pipeline;
   ConnectionPipeline replicated_request_conn_pipeline;
index cf13cb52bbf70c400ebbbad59e6641f6c4135216..74154499c8a54c7b70ad26f083ef7e36d90e3b3c 100644 (file)
@@ -151,7 +151,7 @@ public:
 
   template <typename T, typename F>
   auto process_ordered_op_remotely(
-      crosscore_ordering_t::seq_t cc_seq,
+      OSDConnectionPriv::crosscore_ordering_t::seq_t cc_seq,
       ShardServices &target_shard_services,
       typename T::IRef &&op,
       F &&f) {