From: Yingxin Cheng Date: Tue, 24 Oct 2023 07:26:21 +0000 (+0800) Subject: crimson/common/smp_helpers: generalize crosscore_ordering_t X-Git-Tag: v19.0.0~15^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=77e66ad09831e617a21bfa15d42c07041e362cb4;p=ceph.git crimson/common/smp_helpers: generalize crosscore_ordering_t Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/common/smp_helpers.h b/src/crimson/common/smp_helpers.h index c2b7bd9641a..fad81552d1f 100644 --- a/src/crimson/common/smp_helpers.h +++ b/src/crimson/common/smp_helpers.h @@ -3,10 +3,16 @@ #pragma once +#include #include +#include +#include +#include +#include #include +#include "common/likely.h" #include "crimson/common/errorator.h" #include "crimson/common/utility.h" @@ -89,4 +95,114 @@ auto sharded_map_seq(T &t, F &&f) { }); } -} +enum class crosscore_type_t { + ONE, // from 1 to 1 core + ONE_N, // from 1 to n cores +}; + +/** + * smp_crosscore_ordering_t + * + * To preserve the event order from source to target core(s). + */ +template +class smp_crosscore_ordering_t { + static constexpr bool IS_ONE = (CTypeValue == crosscore_type_t::ONE); + static constexpr bool IS_ONE_N = (CTypeValue == crosscore_type_t::ONE_N); + static_assert(IS_ONE || IS_ONE_N); + +public: + using seq_t = uint64_t; + + smp_crosscore_ordering_t() requires IS_ONE + : out_seqs(0) { } + + smp_crosscore_ordering_t() requires IS_ONE_N + : out_seqs(seastar::smp::count, 0), + in_controls(seastar::smp::count) {} + + ~smp_crosscore_ordering_t() = default; + + /* + * Called by the original core to get the ordering sequence + */ + + seq_t prepare_submit() requires IS_ONE { + return do_prepare_submit(out_seqs); + } + + seq_t prepare_submit(core_id_t target_core) requires IS_ONE_N { + return do_prepare_submit(out_seqs[target_core]); + } + + /* + * Called by the target core to preserve the ordering + */ + + seq_t get_in_seq() const requires IS_ONE { + return in_controls.seq; + } + + seq_t get_in_seq() const requires IS_ONE_N { + return in_controls[seastar::this_shard_id()].seq; + } + + bool proceed_or_wait(seq_t seq) requires IS_ONE { + return in_controls.proceed_or_wait(seq); + } + + bool proceed_or_wait(seq_t seq) requires IS_ONE_N { + return in_controls[seastar::this_shard_id()].proceed_or_wait(seq); + } + + seastar::future<> wait(seq_t seq) requires IS_ONE { + return in_controls.wait(seq); + } + + seastar::future<> wait(seq_t seq) requires IS_ONE_N { + return in_controls[seastar::this_shard_id()].wait(seq); + } + +private: + struct in_control_t { + seq_t seq = 0; + std::optional> pr_wait; + + bool proceed_or_wait(seq_t in_seq) { + if (in_seq == seq + 1) { + ++seq; + if (unlikely(pr_wait.has_value())) { + pr_wait->set_value(); + pr_wait = std::nullopt; + } + return true; + } else { + return false; + } + } + + seastar::future<> wait(seq_t in_seq) { + assert(in_seq != seq + 1); + if (!pr_wait.has_value()) { + pr_wait = seastar::shared_promise<>(); + } + return pr_wait->get_shared_future(); + } + }; + + seq_t do_prepare_submit(seq_t &out_seq) { + return ++out_seq; + } + + std::conditional_t< + IS_ONE, + seq_t, std::vector + > out_seqs; + + std::conditional_t< + IS_ONE, + in_control_t, std::vector + > in_controls; +}; + +} // namespace crimson diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index 55b669384ed..d4ef3881c40 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -2073,7 +2073,7 @@ void ProtocolV2::trigger_replacing(bool reconnect, // READY state seastar::future<> ProtocolV2::notify_out_fault( - crosscore_t::seq_t cc_seq, + cc_seq_t cc_seq, const char *where, std::exception_ptr eptr, io_handler_state _io_states) @@ -2121,7 +2121,7 @@ void ProtocolV2::execute_standby() } seastar::future<> ProtocolV2::notify_out( - crosscore_t::seq_t cc_seq) + cc_seq_t cc_seq) { assert(seastar::this_shard_id() == conn.get_messenger_shard_id()); if (!crosscore.proceed_or_wait(cc_seq)) { @@ -2210,7 +2210,7 @@ void ProtocolV2::execute_server_wait() // CLOSING state seastar::future<> ProtocolV2::notify_mark_down( - crosscore_t::seq_t cc_seq) + cc_seq_t cc_seq) { assert(seastar::this_shard_id() == conn.get_messenger_shard_id()); if (!crosscore.proceed_or_wait(cc_seq)) { diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h index dd7a1e7039b..168d079c8e6 100644 --- a/src/crimson/net/ProtocolV2.h +++ b/src/crimson/net/ProtocolV2.h @@ -29,16 +29,16 @@ public: */ private: seastar::future<> notify_out( - crosscore_t::seq_t cc_seq) final; + cc_seq_t cc_seq) final; seastar::future<> notify_out_fault( - crosscore_t::seq_t cc_seq, + cc_seq_t cc_seq, const char *where, std::exception_ptr, io_handler_state) final; seastar::future<> notify_mark_down( - crosscore_t::seq_t cc_seq) final; + cc_seq_t cc_seq) final; /* * as ProtocolV2 to be called by SocketConnection @@ -251,7 +251,7 @@ private: // asynchronously populated from io_handler io_handler_state io_states; - crosscore_t crosscore; + crosscore_ordering_t crosscore; bool has_socket = false; diff --git a/src/crimson/net/io_handler.cc b/src/crimson/net/io_handler.cc index c414c48e12f..e8a868b4d4c 100644 --- a/src/crimson/net/io_handler.cc +++ b/src/crimson/net/io_handler.cc @@ -292,7 +292,7 @@ void IOHandler::assign_frame_assembler(FrameAssemblerV2Ref fa) void IOHandler::do_set_io_state( io_state_t new_state, - std::optional cc_seq, + std::optional cc_seq, FrameAssemblerV2Ref fa, bool set_notify_out) { @@ -363,7 +363,7 @@ void IOHandler::do_set_io_state( } seastar::future<> IOHandler::set_io_state( - crosscore_t::seq_t cc_seq, + cc_seq_t cc_seq, io_state_t new_state, FrameAssemblerV2Ref fa, bool set_notify_out) @@ -385,7 +385,7 @@ seastar::future<> IOHandler::set_io_state( seastar::future IOHandler::wait_io_exit_dispatching( - crosscore_t::seq_t cc_seq) + cc_seq_t cc_seq) { assert(seastar::this_shard_id() == get_shard_id()); if (!crosscore.proceed_or_wait(cc_seq)) { @@ -429,7 +429,7 @@ IOHandler::wait_io_exit_dispatching( } seastar::future<> IOHandler::reset_session( - crosscore_t::seq_t cc_seq, + cc_seq_t cc_seq, bool full) { assert(seastar::this_shard_id() == get_shard_id()); @@ -454,7 +454,7 @@ seastar::future<> IOHandler::reset_session( } seastar::future<> IOHandler::reset_peer_state( - crosscore_t::seq_t cc_seq) + cc_seq_t cc_seq) { assert(seastar::this_shard_id() == get_shard_id()); if (!crosscore.proceed_or_wait(cc_seq)) { @@ -476,7 +476,7 @@ seastar::future<> IOHandler::reset_peer_state( } seastar::future<> IOHandler::requeue_out_sent( - crosscore_t::seq_t cc_seq) + cc_seq_t cc_seq) { assert(seastar::this_shard_id() == get_shard_id()); if (!crosscore.proceed_or_wait(cc_seq)) { @@ -517,7 +517,7 @@ void IOHandler::do_requeue_out_sent() } seastar::future<> IOHandler::requeue_out_sent_up_to( - crosscore_t::seq_t cc_seq, + cc_seq_t cc_seq, seq_num_t msg_seq) { assert(seastar::this_shard_id() == get_shard_id()); @@ -583,7 +583,7 @@ void IOHandler::discard_out_sent() seastar::future<> IOHandler::dispatch_accept( - crosscore_t::seq_t cc_seq, + cc_seq_t cc_seq, seastar::shard_id new_sid, ConnectionFRef conn_fref, bool is_replace) @@ -593,7 +593,7 @@ IOHandler::dispatch_accept( seastar::future<> IOHandler::dispatch_connect( - crosscore_t::seq_t cc_seq, + cc_seq_t cc_seq, seastar::shard_id new_sid, ConnectionFRef conn_fref) { @@ -620,7 +620,7 @@ IOHandler::cleanup_prv_shard(seastar::shard_id prv_sid) seastar::future<> IOHandler::to_new_sid( - crosscore_t::seq_t cc_seq, + cc_seq_t cc_seq, seastar::shard_id new_sid, ConnectionFRef conn_fref, std::optional is_replace) @@ -735,7 +735,7 @@ IOHandler::to_new_sid( } seastar::future<> IOHandler::set_accepted_sid( - crosscore_t::seq_t cc_seq, + cc_seq_t cc_seq, seastar::shard_id sid, ConnectionFRef conn_fref) { @@ -1183,7 +1183,7 @@ void IOHandler::do_in_dispatch() seastar::future<> IOHandler::close_io( - crosscore_t::seq_t cc_seq, + cc_seq_t cc_seq, bool is_dispatch_reset, bool is_replace) { diff --git a/src/crimson/net/io_handler.h b/src/crimson/net/io_handler.h index f53c2ba6468..f0f0ba0ae62 100644 --- a/src/crimson/net/io_handler.h +++ b/src/crimson/net/io_handler.h @@ -5,64 +5,16 @@ #include -#include #include #include "crimson/common/gated.h" +#include "crimson/common/smp_helpers.h" #include "Fwd.h" #include "SocketConnection.h" #include "FrameAssemblerV2.h" namespace crimson::net { -/** - * crosscore_t - * - * To preserve the event order across cores. - */ -class crosscore_t { -public: - using seq_t = uint64_t; - - crosscore_t() = default; - ~crosscore_t() = default; - - seq_t get_in_seq() const { - return in_seq; - } - - seq_t prepare_submit() { - ++out_seq; - return out_seq; - } - - bool proceed_or_wait(seq_t seq) { - if (seq == in_seq + 1) { - ++in_seq; - if (unlikely(in_pr_wait.has_value())) { - in_pr_wait->set_value(); - in_pr_wait = std::nullopt; - } - return true; - } else { - return false; - } - } - - seastar::future<> wait(seq_t seq) { - assert(seq != in_seq + 1); - if (!in_pr_wait.has_value()) { - in_pr_wait = seastar::shared_promise<>(); - } - return in_pr_wait->get_shared_future(); - } - -private: - seq_t out_seq = 0; - seq_t in_seq = 0; - std::optional> in_pr_wait; -}; - /** * io_handler_state * @@ -118,6 +70,9 @@ struct io_handler_state { */ class HandshakeListener { public: + using crosscore_ordering_t = smp_crosscore_ordering_t; + using cc_seq_t = crosscore_ordering_t::seq_t; + virtual ~HandshakeListener() = default; HandshakeListener(const HandshakeListener&) = delete; @@ -126,16 +81,16 @@ public: HandshakeListener &operator=(HandshakeListener &&) = delete; virtual seastar::future<> notify_out( - crosscore_t::seq_t cc_seq) = 0; + cc_seq_t cc_seq) = 0; virtual seastar::future<> notify_out_fault( - crosscore_t::seq_t cc_seq, + cc_seq_t cc_seq, const char *where, std::exception_ptr, io_handler_state) = 0; virtual seastar::future<> notify_mark_down( - crosscore_t::seq_t cc_seq) = 0; + cc_seq_t cc_seq) = 0; protected: HandshakeListener() = default; @@ -150,6 +105,9 @@ protected: */ class IOHandler final : public ConnectionHandler { public: + using crosscore_ordering_t = smp_crosscore_ordering_t; + using cc_seq_t = crosscore_ordering_t::seq_t; + IOHandler(ChainedDispatchers &, SocketConnection &); @@ -221,7 +179,7 @@ public: void print_io_stat(std::ostream &out) const; seastar::future<> set_accepted_sid( - crosscore_t::seq_t cc_seq, + cc_seq_t cc_seq, seastar::shard_id sid, ConnectionFRef conn_fref); @@ -230,7 +188,7 @@ public: */ seastar::future<> close_io( - crosscore_t::seq_t cc_seq, + cc_seq_t cc_seq, bool is_dispatch_reset, bool is_replace); @@ -251,7 +209,7 @@ public: friend class fmt::formatter; seastar::future<> set_io_state( - crosscore_t::seq_t cc_seq, + cc_seq_t cc_seq, io_state_t new_state, FrameAssemblerV2Ref fa, bool set_notify_out); @@ -262,30 +220,30 @@ public: }; seastar::future wait_io_exit_dispatching( - crosscore_t::seq_t cc_seq); + cc_seq_t cc_seq); seastar::future<> reset_session( - crosscore_t::seq_t cc_seq, + cc_seq_t cc_seq, bool full); seastar::future<> reset_peer_state( - crosscore_t::seq_t cc_seq); + cc_seq_t cc_seq); seastar::future<> requeue_out_sent_up_to( - crosscore_t::seq_t cc_seq, + cc_seq_t cc_seq, seq_num_t msg_seq); seastar::future<> requeue_out_sent( - crosscore_t::seq_t cc_seq); + cc_seq_t cc_seq); seastar::future<> dispatch_accept( - crosscore_t::seq_t cc_seq, + cc_seq_t cc_seq, seastar::shard_id new_sid, ConnectionFRef, bool is_replace); seastar::future<> dispatch_connect( - crosscore_t::seq_t cc_seq, + cc_seq_t cc_seq, seastar::shard_id new_sid, ConnectionFRef); @@ -426,7 +384,7 @@ public: void do_set_io_state( io_state_t new_state, - std::optional cc_seq = std::nullopt, + std::optional cc_seq = std::nullopt, FrameAssemblerV2Ref fa = nullptr, bool set_notify_out = false); @@ -449,7 +407,7 @@ public: seastar::future<> do_send_keepalive(); seastar::future<> to_new_sid( - crosscore_t::seq_t cc_seq, + cc_seq_t cc_seq, seastar::shard_id new_sid, ConnectionFRef, std::optional is_replace); @@ -509,7 +467,7 @@ public: private: shard_states_ref_t shard_states; - crosscore_t crosscore; + crosscore_ordering_t crosscore; // drop was happening in the previous sid std::optional maybe_dropped_sid; diff --git a/src/crimson/osd/osd_connection_priv.h b/src/crimson/osd/osd_connection_priv.h index 2d2a459017b..3c7d085c06e 100644 --- a/src/crimson/osd/osd_connection_priv.h +++ b/src/crimson/osd/osd_connection_priv.h @@ -3,8 +3,7 @@ #pragma once -#include - +#include "crimson/common/smp_helpers.h" #include "crimson/net/Connection.h" #include "crimson/osd/osd_operation.h" #include "crimson/osd/osd_operations/client_request.h" @@ -13,75 +12,9 @@ namespace crimson::osd { -/** - * crosscore_ordering_t - * - * To preserve the event order from 1 source to n target cores. - */ -class crosscore_ordering_t { -public: - using seq_t = uint64_t; - - crosscore_ordering_t() - : out_seqs(seastar::smp::count, 0), - in_controls(seastar::smp::count) {} - - ~crosscore_ordering_t() = default; - - // Called by the original core to get the ordering sequence - seq_t prepare_submit(core_id_t target_core) { - auto &out_seq = out_seqs[target_core]; - ++out_seq; - return out_seq; - } - - /* - * Called by the target core to preserve the ordering - */ - - seq_t get_in_seq() const { - auto core = seastar::this_shard_id(); - return in_controls[core].seq; - } - - bool proceed_or_wait(seq_t seq) { - auto core = seastar::this_shard_id(); - auto &in_control = in_controls[core]; - if (seq == in_control.seq + 1) { - ++in_control.seq; - if (unlikely(in_control.pr_wait.has_value())) { - in_control.pr_wait->set_value(); - in_control.pr_wait = std::nullopt; - } - return true; - } else { - return false; - } - } - - seastar::future<> wait(seq_t seq) { - auto core = seastar::this_shard_id(); - auto &in_control = in_controls[core]; - assert(seq != in_control.seq + 1); - if (!in_control.pr_wait.has_value()) { - in_control.pr_wait = seastar::shared_promise<>(); - } - return in_control.pr_wait->get_shared_future(); - } - -private: - struct in_control_t { - seq_t seq = 0; - std::optional> pr_wait; - }; - - // source-side - std::vector out_seqs; - // target-side - std::vector in_controls; -}; - struct OSDConnectionPriv : public crimson::net::Connection::user_private_t { + using crosscore_ordering_t = smp_crosscore_ordering_t; + ConnectionPipeline client_request_conn_pipeline; ConnectionPipeline peering_request_conn_pipeline; ConnectionPipeline replicated_request_conn_pipeline; diff --git a/src/crimson/osd/pg_shard_manager.h b/src/crimson/osd/pg_shard_manager.h index cf13cb52bbf..74154499c8a 100644 --- a/src/crimson/osd/pg_shard_manager.h +++ b/src/crimson/osd/pg_shard_manager.h @@ -151,7 +151,7 @@ public: template auto process_ordered_op_remotely( - crosscore_ordering_t::seq_t cc_seq, + OSDConnectionPriv::crosscore_ordering_t::seq_t cc_seq, ShardServices &target_shard_services, typename T::IRef &&op, F &&f) {