From: Yingxin Cheng Date: Mon, 30 Oct 2023 02:00:57 +0000 (+0800) Subject: crimson/net: preserve the ordering upon the calls to Connection::send()/keepalive() X-Git-Tag: v19.0.0~15^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=6ebf9cd3671960ceb88d3b95d487c91fd41fc8e6;p=ceph.git crimson/net: preserve the ordering upon the calls to Connection::send()/keepalive() Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/common/smp_helpers.h b/src/crimson/common/smp_helpers.h index fad81552d1ff..429c938229bc 100644 --- a/src/crimson/common/smp_helpers.h +++ b/src/crimson/common/smp_helpers.h @@ -98,6 +98,7 @@ auto sharded_map_seq(T &t, F &&f) { enum class crosscore_type_t { ONE, // from 1 to 1 core ONE_N, // from 1 to n cores + N_ONE, // from n to 1 core }; /** @@ -109,7 +110,8 @@ template class smp_crosscore_ordering_t { static constexpr bool IS_ONE = (CTypeValue == crosscore_type_t::ONE); static constexpr bool IS_ONE_N = (CTypeValue == crosscore_type_t::ONE_N); - static_assert(IS_ONE || IS_ONE_N); + static constexpr bool IS_N_ONE = (CTypeValue == crosscore_type_t::N_ONE); + static_assert(IS_ONE || IS_ONE_N || IS_N_ONE); public: using seq_t = uint64_t; @@ -117,7 +119,7 @@ public: smp_crosscore_ordering_t() requires IS_ONE : out_seqs(0) { } - smp_crosscore_ordering_t() requires IS_ONE_N + smp_crosscore_ordering_t() requires (!IS_ONE) : out_seqs(seastar::smp::count, 0), in_controls(seastar::smp::count) {} @@ -135,6 +137,10 @@ public: return do_prepare_submit(out_seqs[target_core]); } + seq_t prepare_submit() requires IS_N_ONE { + return do_prepare_submit(out_seqs[seastar::this_shard_id()]); + } + /* * Called by the target core to preserve the ordering */ @@ -147,6 +153,10 @@ public: return in_controls[seastar::this_shard_id()].seq; } + seq_t get_in_seq(core_id_t source_core) const requires IS_N_ONE { + return in_controls[source_core].seq; + } + bool proceed_or_wait(seq_t seq) requires IS_ONE { return in_controls.proceed_or_wait(seq); } @@ -155,6 +165,10 @@ public: return in_controls[seastar::this_shard_id()].proceed_or_wait(seq); } + bool proceed_or_wait(seq_t seq, core_id_t source_core) requires IS_N_ONE { + return in_controls[source_core].proceed_or_wait(seq); + } + seastar::future<> wait(seq_t seq) requires IS_ONE { return in_controls.wait(seq); } @@ -163,6 +177,16 @@ public: return in_controls[seastar::this_shard_id()].wait(seq); } + seastar::future<> wait(seq_t seq, core_id_t source_core) requires IS_N_ONE { + return in_controls[source_core].wait(seq); + } + + void reset_wait() requires IS_N_ONE { + for (auto &in_control : in_controls) { + in_control.reset_wait(); + } + } + private: struct in_control_t { seq_t seq = 0; @@ -171,10 +195,7 @@ private: bool proceed_or_wait(seq_t in_seq) { if (in_seq == seq + 1) { ++seq; - if (unlikely(pr_wait.has_value())) { - pr_wait->set_value(); - pr_wait = std::nullopt; - } + reset_wait(); return true; } else { return false; @@ -188,6 +209,13 @@ private: } return pr_wait->get_shared_future(); } + + void reset_wait() { + if (unlikely(pr_wait.has_value())) { + pr_wait->set_value(); + pr_wait = std::nullopt; + } + } }; seq_t do_prepare_submit(seq_t &out_seq) { diff --git a/src/crimson/net/Connection.h b/src/crimson/net/Connection.h index 7141e20f476d..41596987b09f 100644 --- a/src/crimson/net/Connection.h +++ b/src/crimson/net/Connection.h @@ -81,8 +81,8 @@ class Connection : public seastar::enable_shared_from_this { * * Send a message over a connection that has completed its handshake. * - * May be invoked from any core, but that requires to chain the returned - * future to preserve ordering. + * May be invoked from any core, and the send order will be preserved upon + * the call. */ virtual seastar::future<> send(MessageURef msg) = 0; @@ -92,8 +92,8 @@ class Connection : public seastar::enable_shared_from_this { * Send a keepalive message over a connection that has completed its * handshake. * - * May be invoked from any core, but that requires to chain the returned - * future to preserve ordering. + * May be invoked from any core, and the send order will be preserved upon + * the call. */ virtual seastar::future<> send_keepalive() = 0; diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h index 168d079c8e6d..4262bbbc70cc 100644 --- a/src/crimson/net/ProtocolV2.h +++ b/src/crimson/net/ProtocolV2.h @@ -251,7 +251,7 @@ private: // asynchronously populated from io_handler io_handler_state io_states; - crosscore_ordering_t crosscore; + proto_crosscore_ordering_t crosscore; bool has_socket = false; diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index 57e5c12c1aed..767192682773 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -79,16 +79,13 @@ bool SocketConnection::peer_wins() const return (messenger.get_myaddr() > peer_addr || policy.server); } -seastar::future<> SocketConnection::send(MessageURef _msg) +seastar::future<> SocketConnection::send(MessageURef msg) { - // may be invoked from any core - MessageFRef msg = seastar::make_foreign(std::move(_msg)); return io_handler->send(std::move(msg)); } seastar::future<> SocketConnection::send_keepalive() { - // may be invoked from any core return io_handler->send_keepalive(); } diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index 823d6c574dad..7d20f68867e8 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -54,7 +54,7 @@ public: virtual bool is_connected() const = 0; - virtual seastar::future<> send(MessageFRef) = 0; + virtual seastar::future<> send(MessageURef) = 0; virtual seastar::future<> send_keepalive() = 0; diff --git a/src/crimson/net/io_handler.cc b/src/crimson/net/io_handler.cc index e8a868b4d4c7..b9b0339f9448 100644 --- a/src/crimson/net/io_handler.cc +++ b/src/crimson/net/io_handler.cc @@ -160,84 +160,132 @@ IOHandler::sweep_out_pending_msgs_to_sent( #endif } -seastar::future<> IOHandler::send(MessageFRef msg) +seastar::future<> IOHandler::send(MessageURef _msg) { + // may be invoked from any core + MessageFRef msg = seastar::make_foreign(std::move(_msg)); + auto cc_seq = io_crosscore.prepare_submit(); + auto source_core = seastar::this_shard_id(); // sid may be changed on-the-fly during the submission - if (seastar::this_shard_id() == get_shard_id()) { - return do_send(std::move(msg)); + if (source_core == get_shard_id()) { + return do_send(cc_seq, source_core, std::move(msg)); } else { - logger().trace("{} send() is directed to {} -- {}", - conn, get_shard_id(), *msg); + logger().trace("{} send() {} is directed to core {} -- {}", + conn, cc_seq, get_shard_id(), *msg); return seastar::smp::submit_to( - get_shard_id(), [this, msg=std::move(msg)]() mutable { - return send_redirected(std::move(msg)); + get_shard_id(), + [this, cc_seq, source_core, msg=std::move(msg)]() mutable { + return send_recheck_shard(cc_seq, source_core, std::move(msg)); }); } } -seastar::future<> IOHandler::send_redirected(MessageFRef msg) +seastar::future<> IOHandler::send_recheck_shard( + cc_seq_t cc_seq, + core_id_t source_core, + MessageFRef msg) { // sid may be changed on-the-fly during the submission if (seastar::this_shard_id() == get_shard_id()) { - return do_send(std::move(msg)); + return do_send(cc_seq, source_core, std::move(msg)); } else { - logger().debug("{} send() is redirected to {} -- {}", - conn, get_shard_id(), *msg); + logger().debug("{} send_recheck_shard() {} " + "is redirected from core {} to {} -- {}", + conn, cc_seq, source_core, get_shard_id(), *msg); return seastar::smp::submit_to( - get_shard_id(), [this, msg=std::move(msg)]() mutable { - return send_redirected(std::move(msg)); + get_shard_id(), + [this, cc_seq, source_core, msg=std::move(msg)]() mutable { + return send_recheck_shard(cc_seq, source_core, std::move(msg)); }); } } -seastar::future<> IOHandler::do_send(MessageFRef msg) +seastar::future<> IOHandler::do_send( + cc_seq_t cc_seq, + core_id_t source_core, + MessageFRef msg) { assert(seastar::this_shard_id() == get_shard_id()); - logger().trace("{} do_send() got message -- {}", conn, *msg); - if (get_io_state() != io_state_t::drop) { - out_pending_msgs.push_back(std::move(msg)); - notify_out_dispatch(); + if (io_crosscore.proceed_or_wait(cc_seq, source_core)) { + logger().trace("{} do_send() got {} from core {}: send message -- {}", + conn, cc_seq, source_core, *msg); + if (get_io_state() != io_state_t::drop) { + out_pending_msgs.push_back(std::move(msg)); + notify_out_dispatch(); + } + return seastar::now(); + } else { + logger().debug("{} do_send() got {} from core {}, wait at {} -- {}", + conn, cc_seq, source_core, + io_crosscore.get_in_seq(source_core), + *msg); + return io_crosscore.wait(cc_seq, source_core + ).then([this, cc_seq, source_core, msg=std::move(msg)]() mutable { + return send_recheck_shard(cc_seq, source_core, std::move(msg)); + }); } - return seastar::now(); } seastar::future<> IOHandler::send_keepalive() { + // may be invoked from any core + auto cc_seq = io_crosscore.prepare_submit(); + auto source_core = seastar::this_shard_id(); // sid may be changed on-the-fly during the submission - if (seastar::this_shard_id() == get_shard_id()) { - return do_send_keepalive(); + if (source_core == get_shard_id()) { + return do_send_keepalive(cc_seq, source_core); } else { - logger().trace("{} send_keepalive() is directed to {}", conn, get_shard_id()); + logger().trace("{} send_keepalive() {} is directed to core {}", + conn, cc_seq, get_shard_id()); return seastar::smp::submit_to( - get_shard_id(), [this] { - return send_keepalive_redirected(); + get_shard_id(), + [this, cc_seq, source_core] { + return send_keepalive_recheck_shard(cc_seq, source_core); }); } } -seastar::future<> IOHandler::send_keepalive_redirected() +seastar::future<> IOHandler::send_keepalive_recheck_shard( + cc_seq_t cc_seq, + core_id_t source_core) { // sid may be changed on-the-fly during the submission if (seastar::this_shard_id() == get_shard_id()) { - return do_send_keepalive(); + return do_send_keepalive(cc_seq, source_core); } else { - logger().debug("{} send_keepalive() is redirected to {}", conn, get_shard_id()); + logger().debug("{} send_keepalive_recheck_shard() {} " + "is redirected from core {} to {}", + conn, cc_seq, source_core, get_shard_id()); return seastar::smp::submit_to( - get_shard_id(), [this] { - return send_keepalive_redirected(); + get_shard_id(), + [this, cc_seq, source_core] { + return send_keepalive_recheck_shard(cc_seq, source_core); }); } } -seastar::future<> IOHandler::do_send_keepalive() +seastar::future<> IOHandler::do_send_keepalive( + cc_seq_t cc_seq, + core_id_t source_core) { assert(seastar::this_shard_id() == get_shard_id()); - logger().trace("{} do_send_keeplive(): need_keepalive={}", conn, need_keepalive); - if (!need_keepalive) { - need_keepalive = true; - notify_out_dispatch(); + if (io_crosscore.proceed_or_wait(cc_seq, source_core)) { + logger().trace("{} do_send_keeplive() got {} from core {}: need_keepalive={}", + conn, cc_seq, source_core, need_keepalive); + if (!need_keepalive) { + need_keepalive = true; + notify_out_dispatch(); + } + return seastar::now(); + } else { + logger().debug("{} do_send_keepalive() got {} from core {}, wait at {}", + conn, cc_seq, source_core, + io_crosscore.get_in_seq(source_core)); + return io_crosscore.wait(cc_seq, source_core + ).then([this, cc_seq, source_core] { + return send_keepalive_recheck_shard(cc_seq, source_core); + }); } - return seastar::now(); } void IOHandler::mark_down() @@ -249,7 +297,7 @@ void IOHandler::mark_down() return; } - auto cc_seq = crosscore.prepare_submit(); + auto cc_seq = proto_crosscore.prepare_submit(); logger().info("{} mark_down() at {}, send {} notify_mark_down()", conn, io_stat_printer{*this}, cc_seq); do_set_io_state(io_state_t::drop); @@ -369,10 +417,10 @@ seastar::future<> IOHandler::set_io_state( bool set_notify_out) { assert(seastar::this_shard_id() == get_shard_id()); - if (!crosscore.proceed_or_wait(cc_seq)) { + if (!proto_crosscore.proceed_or_wait(cc_seq)) { logger().debug("{} got {} set_io_state(), wait at {}", - conn, cc_seq, crosscore.get_in_seq()); - return crosscore.wait(cc_seq + conn, cc_seq, proto_crosscore.get_in_seq()); + return proto_crosscore.wait(cc_seq ).then([this, cc_seq, new_state, fa=std::move(fa), set_notify_out]() mutable { return set_io_state(cc_seq, new_state, std::move(fa), set_notify_out); @@ -388,10 +436,10 @@ IOHandler::wait_io_exit_dispatching( cc_seq_t cc_seq) { assert(seastar::this_shard_id() == get_shard_id()); - if (!crosscore.proceed_or_wait(cc_seq)) { + if (!proto_crosscore.proceed_or_wait(cc_seq)) { logger().debug("{} got {} wait_io_exit_dispatching(), wait at {}", - conn, cc_seq, crosscore.get_in_seq()); - return crosscore.wait(cc_seq + conn, cc_seq, proto_crosscore.get_in_seq()); + return proto_crosscore.wait(cc_seq ).then([this, cc_seq] { return wait_io_exit_dispatching(cc_seq); }); @@ -433,10 +481,10 @@ seastar::future<> IOHandler::reset_session( bool full) { assert(seastar::this_shard_id() == get_shard_id()); - if (!crosscore.proceed_or_wait(cc_seq)) { + if (!proto_crosscore.proceed_or_wait(cc_seq)) { logger().debug("{} got {} reset_session(), wait at {}", - conn, cc_seq, crosscore.get_in_seq()); - return crosscore.wait(cc_seq + conn, cc_seq, proto_crosscore.get_in_seq()); + return proto_crosscore.wait(cc_seq ).then([this, cc_seq, full] { return reset_session(cc_seq, full); }); @@ -457,10 +505,10 @@ seastar::future<> IOHandler::reset_peer_state( cc_seq_t cc_seq) { assert(seastar::this_shard_id() == get_shard_id()); - if (!crosscore.proceed_or_wait(cc_seq)) { + if (!proto_crosscore.proceed_or_wait(cc_seq)) { logger().debug("{} got {} reset_peer_state(), wait at {}", - conn, cc_seq, crosscore.get_in_seq()); - return crosscore.wait(cc_seq + conn, cc_seq, proto_crosscore.get_in_seq()); + return proto_crosscore.wait(cc_seq ).then([this, cc_seq] { return reset_peer_state(cc_seq); }); @@ -479,10 +527,10 @@ seastar::future<> IOHandler::requeue_out_sent( cc_seq_t cc_seq) { assert(seastar::this_shard_id() == get_shard_id()); - if (!crosscore.proceed_or_wait(cc_seq)) { + if (!proto_crosscore.proceed_or_wait(cc_seq)) { logger().debug("{} got {} requeue_out_sent(), wait at {}", - conn, cc_seq, crosscore.get_in_seq()); - return crosscore.wait(cc_seq + conn, cc_seq, proto_crosscore.get_in_seq()); + return proto_crosscore.wait(cc_seq ).then([this, cc_seq] { return requeue_out_sent(cc_seq); }); @@ -521,10 +569,10 @@ seastar::future<> IOHandler::requeue_out_sent_up_to( seq_num_t msg_seq) { assert(seastar::this_shard_id() == get_shard_id()); - if (!crosscore.proceed_or_wait(cc_seq)) { + if (!proto_crosscore.proceed_or_wait(cc_seq)) { logger().debug("{} got {} requeue_out_sent_up_to(), wait at {}", - conn, cc_seq, crosscore.get_in_seq()); - return crosscore.wait(cc_seq + conn, cc_seq, proto_crosscore.get_in_seq()); + return proto_crosscore.wait(cc_seq ).then([this, cc_seq, msg_seq] { return requeue_out_sent_up_to(cc_seq, msg_seq); }); @@ -626,10 +674,10 @@ IOHandler::to_new_sid( std::optional is_replace) { ceph_assert_always(seastar::this_shard_id() == get_shard_id()); - if (!crosscore.proceed_or_wait(cc_seq)) { + if (!proto_crosscore.proceed_or_wait(cc_seq)) { logger().debug("{} got {} to_new_sid(), wait at {}", - conn, cc_seq, crosscore.get_in_seq()); - return crosscore.wait(cc_seq + conn, cc_seq, proto_crosscore.get_in_seq()); + return proto_crosscore.wait(cc_seq ).then([this, cc_seq, new_sid, is_replace, conn_fref=std::move(conn_fref)]() mutable { return to_new_sid(cc_seq, new_sid, std::move(conn_fref), is_replace); @@ -685,6 +733,8 @@ IOHandler::to_new_sid( shard_states = shard_states_t::create_from_previous( *maybe_prv_shard_states, new_sid); assert(new_sid == get_shard_id()); + // broadcast shard change to all the io waiters, atomically. + io_crosscore.reset_wait(); return seastar::smp::submit_to(new_sid, [this, next_cc_seq, is_dropped, prv_sid, is_replace, conn_fref=std::move(conn_fref)]() mutable { @@ -699,7 +749,7 @@ IOHandler::to_new_sid( ceph_assert_always(seastar::this_shard_id() == get_shard_id()); ceph_assert_always(get_io_state() != io_state_t::open); ceph_assert_always(!maybe_dropped_sid.has_value()); - ceph_assert_always(crosscore.proceed_or_wait(next_cc_seq)); + ceph_assert_always(proto_crosscore.proceed_or_wait(next_cc_seq)); if (is_dropped) { ceph_assert_always(get_io_state() == io_state_t::drop); @@ -749,7 +799,7 @@ seastar::future<> IOHandler::set_accepted_sid( return seastar::smp::submit_to(sid, [this, cc_seq, conn_fref=std::move(conn_fref)]() mutable { // must be the first to proceed - ceph_assert_always(crosscore.proceed_or_wait(cc_seq)); + ceph_assert_always(proto_crosscore.proceed_or_wait(cc_seq)); logger().debug("{} set accepted sid", conn); ceph_assert_always(seastar::this_shard_id() == get_shard_id()); @@ -875,7 +925,7 @@ IOHandler::do_out_dispatch(shard_states_t &ctx) } if (io_state == io_state_t::open) { - auto cc_seq = crosscore.prepare_submit(); + auto cc_seq = proto_crosscore.prepare_submit(); logger().info("{} do_out_dispatch(): fault at {}, {}, going to delay -- {}, " "send {} notify_out_fault()", conn, io_state, io_stat_printer{*this}, e.what(), cc_seq); @@ -922,7 +972,7 @@ void IOHandler::notify_out_dispatch() ceph_assert_always(seastar::this_shard_id() == get_shard_id()); assert(is_out_queued()); if (need_notify_out) { - auto cc_seq = crosscore.prepare_submit(); + auto cc_seq = proto_crosscore.prepare_submit(); logger().debug("{} send {} notify_out()", conn, cc_seq); shard_states->dispatch_in_background( @@ -1152,7 +1202,7 @@ void IOHandler::do_in_dispatch() auto io_state = ctx.get_io_state(); if (io_state == io_state_t::open) { - auto cc_seq = crosscore.prepare_submit(); + auto cc_seq = proto_crosscore.prepare_submit(); logger().info("{} do_in_dispatch(): fault at {}, {}, going to delay -- {}, " "send {} notify_out_fault()", conn, io_state, io_stat_printer{*this}, e_what, cc_seq); @@ -1188,10 +1238,10 @@ IOHandler::close_io( bool is_replace) { ceph_assert_always(seastar::this_shard_id() == get_shard_id()); - if (!crosscore.proceed_or_wait(cc_seq)) { + if (!proto_crosscore.proceed_or_wait(cc_seq)) { logger().debug("{} got {} close_io(), wait at {}", - conn, cc_seq, crosscore.get_in_seq()); - return crosscore.wait(cc_seq + conn, cc_seq, proto_crosscore.get_in_seq()); + return proto_crosscore.wait(cc_seq ).then([this, cc_seq, is_dispatch_reset, is_replace] { return close_io(cc_seq, is_dispatch_reset, is_replace); }); diff --git a/src/crimson/net/io_handler.h b/src/crimson/net/io_handler.h index f0f0ba0ae62e..8b88e2f5a254 100644 --- a/src/crimson/net/io_handler.h +++ b/src/crimson/net/io_handler.h @@ -70,8 +70,8 @@ struct io_handler_state { */ class HandshakeListener { public: - using crosscore_ordering_t = smp_crosscore_ordering_t; - using cc_seq_t = crosscore_ordering_t::seq_t; + using proto_crosscore_ordering_t = smp_crosscore_ordering_t; + using cc_seq_t = proto_crosscore_ordering_t::seq_t; virtual ~HandshakeListener() = default; @@ -105,8 +105,9 @@ protected: */ class IOHandler final : public ConnectionHandler { public: - using crosscore_ordering_t = smp_crosscore_ordering_t; - using cc_seq_t = crosscore_ordering_t::seq_t; + using io_crosscore_ordering_t = smp_crosscore_ordering_t; + using proto_crosscore_ordering_t = smp_crosscore_ordering_t; + using cc_seq_t = proto_crosscore_ordering_t::seq_t; IOHandler(ChainedDispatchers &, SocketConnection &); @@ -131,7 +132,7 @@ public: return protocol_is_connected; } - seastar::future<> send(MessageFRef msg) final; + seastar::future<> send(MessageURef msg) final; seastar::future<> send_keepalive() final; @@ -398,13 +399,13 @@ public: void assign_frame_assembler(FrameAssemblerV2Ref); - seastar::future<> send_redirected(MessageFRef msg); + seastar::future<> send_recheck_shard(cc_seq_t, core_id_t, MessageFRef); - seastar::future<> do_send(MessageFRef msg); + seastar::future<> do_send(cc_seq_t, core_id_t, MessageFRef); - seastar::future<> send_keepalive_redirected(); + seastar::future<> send_keepalive_recheck_shard(cc_seq_t, core_id_t); - seastar::future<> do_send_keepalive(); + seastar::future<> do_send_keepalive(cc_seq_t, core_id_t); seastar::future<> to_new_sid( cc_seq_t cc_seq, @@ -467,7 +468,9 @@ public: private: shard_states_ref_t shard_states; - crosscore_ordering_t crosscore; + proto_crosscore_ordering_t proto_crosscore; + + io_crosscore_ordering_t io_crosscore; // drop was happening in the previous sid std::optional maybe_dropped_sid;