From 7859037142cdb914352ffcd8667fcb4e90bfb471 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Wed, 31 May 2023 14:34:17 +0800 Subject: [PATCH] crimson/net: keep the order of cross-core events in msgr v2 Signed-off-by: Yingxin Cheng (cherry picked from commit bdd89b68eaf7a64b11aeb8c40ad5060698e03541) --- src/crimson/net/ProtocolV2.cc | 167 +++++++++++++++++-------- src/crimson/net/ProtocolV2.h | 14 ++- src/crimson/net/io_handler.cc | 225 +++++++++++++++++++++++++++------- src/crimson/net/io_handler.h | 110 ++++++++++++++--- 4 files changed, 399 insertions(+), 117 deletions(-) diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index 15d3d565dd7..5a9e0c21c5a 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -239,34 +239,40 @@ void ProtocolV2::trigger_state_phase2( } else { assert(new_io_state != io_state_t::open); } - logger().debug("{} IOHandler::set_io_state(): new_state={}, new_io_state={}, " + + auto cc_seq = crosscore.prepare_submit(); + logger().debug("{} send {} IOHandler::set_io_state(): new_state={}, new_io_state={}, " "fa={}, set_notify_out={}", - conn, get_state_name(new_state), new_io_state, + conn, cc_seq, get_state_name(new_state), new_io_state, fa ? fmt::format("(sid={})", fa->get_shard_id()) : "N/A", need_notify_out); gate.dispatch_in_background( "set_io_state", conn, - [this, new_io_state, fa=std::move(fa)]() mutable { + [this, cc_seq, new_io_state, fa=std::move(fa)]() mutable { return seastar::smp::submit_to( io_handler.get_shard_id(), - [this, new_io_state, fa=std::move(fa), set_notify_out=need_notify_out]() mutable { - io_handler.set_io_state(new_io_state, std::move(fa), set_notify_out); + [this, cc_seq, new_io_state, + fa=std::move(fa), set_notify_out=need_notify_out]() mutable { + return io_handler.set_io_state( + cc_seq, new_io_state, std::move(fa), set_notify_out); }); }); if (need_exit_io) { // from READY - logger().debug("{} IOHandler::wait_io_exit_dispatching() start ...", conn); + auto cc_seq = crosscore.prepare_submit(); + logger().debug("{} send {} IOHandler::wait_io_exit_dispatching() ...", + conn, cc_seq); assert(pr_exit_io.has_value()); assert(new_io_state != io_state_t::open); need_exit_io = false; - gate.dispatch_in_background("exit_io", conn, [this] { + gate.dispatch_in_background("exit_io", conn, [this, cc_seq] { return seastar::smp::submit_to( - io_handler.get_shard_id(), [this] { - return io_handler.wait_io_exit_dispatching(); - }).then([this](auto ret) { - logger().debug("{} IOHandler::wait_io_exit_dispatching() finish, {}", - conn, ret.io_states); + io_handler.get_shard_id(), [this, cc_seq] { + return io_handler.wait_io_exit_dispatching(cc_seq); + }).then([this, cc_seq](auto ret) { + logger().debug("{} finish {} IOHandler::wait_io_exit_dispatching(), {}", + conn, cc_seq, ret.io_states); frame_assembler = std::move(ret.frame_assembler); assert(seastar::this_shard_id() == conn.get_messenger_shard_id()); ceph_assert_always( @@ -391,13 +397,15 @@ void ProtocolV2::reset_session(bool full) peer_global_seq = 0; } - logger().debug("{} IOHandler::reset_session({})", conn, full); + auto cc_seq = crosscore.prepare_submit(); + logger().debug("{} send {} IOHandler::reset_session({})", + conn, cc_seq, full); io_states.reset_session(full); gate.dispatch_in_background( - "reset_session", conn, [this, full] { + "reset_session", conn, [this, cc_seq, full] { return seastar::smp::submit_to( - io_handler.get_shard_id(), [this, full] { - io_handler.reset_session(full); + io_handler.get_shard_id(), [this, cc_seq, full] { + return io_handler.reset_session(cc_seq, full); }); }); // user can make changes @@ -687,13 +695,15 @@ ProtocolV2::client_connect() } // handle_server_ident() logic - logger().debug("{} IOHandler::requeue_out_sent()", conn); + auto cc_seq = crosscore.prepare_submit(); + logger().debug("{} send {} IOHandler::requeue_out_sent()", + conn, cc_seq); io_states.requeue_out_sent(); gate.dispatch_in_background( - "requeue_out_sent", conn, [this] { + "requeue_out_sent", conn, [this, cc_seq] { return seastar::smp::submit_to( - io_handler.get_shard_id(), [this] { - io_handler.requeue_out_sent(); + io_handler.get_shard_id(), [this, cc_seq] { + return io_handler.requeue_out_sent(cc_seq); }); }); @@ -834,17 +844,18 @@ ProtocolV2::client_reconnect() // handle_reconnect_ok() logic auto reconnect_ok = ReconnectOkFrame::Decode(payload->back()); + auto cc_seq = crosscore.prepare_submit(); logger().debug("{} GOT ReconnectOkFrame: msg_seq={}, " - "IOHandler::requeue_out_sent_up_to()", - conn, reconnect_ok.msg_seq()); + "send {} IOHandler::requeue_out_sent_up_to()", + conn, reconnect_ok.msg_seq(), cc_seq); io_states.requeue_out_sent_up_to(); auto msg_seq = reconnect_ok.msg_seq(); gate.dispatch_in_background( - "requeue_out_reconnecting", conn, [this, msg_seq] { + "requeue_out_reconnecting", conn, [this, cc_seq, msg_seq] { return seastar::smp::submit_to( - io_handler.get_shard_id(), [this, msg_seq] { - io_handler.requeue_out_sent_up_to(msg_seq); + io_handler.get_shard_id(), [this, cc_seq, msg_seq] { + return io_handler.requeue_out_sent_up_to(cc_seq, msg_seq); }); }); @@ -982,12 +993,13 @@ void ProtocolV2::execute_connecting() abort_protocol(); } + auto cc_seq = crosscore.prepare_submit(); logger().info("{} connected: gs={}, pgs={}, cs={}, " "client_cookie={}, server_cookie={}, {}, new_sid={}, " - "IOHandler::dispatch_connect()", + "send {} IOHandler::dispatch_connect()", conn, global_seq, peer_global_seq, connect_seq, client_cookie, server_cookie, io_states, - frame_assembler->get_socket_shard_id()); + frame_assembler->get_socket_shard_id(), cc_seq); // set io_handler to a new shard auto new_io_shard = frame_assembler->get_socket_shard_id(); @@ -997,9 +1009,10 @@ void ProtocolV2::execute_connecting() pr_switch_io_shard = seastar::shared_promise<>(); return seastar::smp::submit_to( io_handler.get_shard_id(), - [this, new_io_shard, conn_fref=std::move(conn_fref)]() mutable { + [this, cc_seq, new_io_shard, + conn_fref=std::move(conn_fref)]() mutable { return io_handler.dispatch_connect( - new_io_shard, std::move(conn_fref)); + cc_seq, new_io_shard, std::move(conn_fref)); }).then([this, new_io_shard] { ceph_assert_always(io_handler.get_shard_id() == new_io_shard); pr_switch_io_shard->set_value(); @@ -1771,17 +1784,20 @@ void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) { ceph_assert_always(state == state_t::ESTABLISHING); // set io_handler to a new shard + auto cc_seq = crosscore.prepare_submit(); auto new_io_shard = frame_assembler->get_socket_shard_id(); - logger().debug("{} IOHandler::dispatch_accept({})", conn, new_io_shard); + logger().debug("{} send {} IOHandler::dispatch_accept({})", + conn, cc_seq, new_io_shard); ConnectionFRef conn_fref = seastar::make_foreign( conn.shared_from_this()); ceph_assert_always(!pr_switch_io_shard.has_value()); pr_switch_io_shard = seastar::shared_promise<>(); return seastar::smp::submit_to( io_handler.get_shard_id(), - [this, new_io_shard, conn_fref=std::move(conn_fref)]() mutable { + [this, cc_seq, new_io_shard, + conn_fref=std::move(conn_fref)]() mutable { return io_handler.dispatch_accept( - new_io_shard, std::move(conn_fref)); + cc_seq, new_io_shard, std::move(conn_fref)); }).then([this, new_io_shard] { ceph_assert_always(io_handler.get_shard_id() == new_io_shard); pr_switch_io_shard->set_value(); @@ -1820,17 +1836,18 @@ ProtocolV2::send_server_ident() // refered to async-conn v2: not assign gs to global_seq global_seq = messenger.get_global_seq(); + auto cc_seq = crosscore.prepare_submit(); logger().debug("{} UPDATE: gs={} for server ident, " - "IOHandler::reset_peer_state()", - conn, global_seq); + "send {} IOHandler::reset_peer_state()", + conn, global_seq, cc_seq); // this is required for the case when this connection is being replaced io_states.reset_peer_state(); gate.dispatch_in_background( - "reset_peer_state", conn, [this] { + "reset_peer_state", conn, [this, cc_seq] { return seastar::smp::submit_to( - io_handler.get_shard_id(), [this] { - io_handler.reset_peer_state(); + io_handler.get_shard_id(), [this, cc_seq] { + return io_handler.reset_peer_state(cc_seq); }); }); @@ -1938,16 +1955,19 @@ void ProtocolV2::trigger_replacing(bool reconnect, // set io_handler to a new shard // we should prevent parallel switching core attemps - logger().debug("{} IOHandler::dispatch_accept({})", conn, new_io_shard); + auto cc_seq = crosscore.prepare_submit(); + logger().debug("{} send {} IOHandler::dispatch_accept({})", + conn, cc_seq, new_io_shard); ConnectionFRef conn_fref = seastar::make_foreign( conn.shared_from_this()); ceph_assert_always(!pr_switch_io_shard.has_value()); pr_switch_io_shard = seastar::shared_promise<>(); return seastar::smp::submit_to( io_handler.get_shard_id(), - [this, new_io_shard, conn_fref=std::move(conn_fref)]() mutable { + [this, cc_seq, new_io_shard, + conn_fref=std::move(conn_fref)]() mutable { return io_handler.dispatch_accept( - new_io_shard, std::move(conn_fref)); + cc_seq, new_io_shard, std::move(conn_fref)); }).then([this, new_io_shard] { ceph_assert_always(io_handler.get_shard_id() == new_io_shard); pr_switch_io_shard->set_value(); @@ -1994,13 +2014,15 @@ void ProtocolV2::trigger_replacing(bool reconnect, connect_seq = new_connect_seq; // send_reconnect_ok() logic - logger().debug("{} IOHandler::requeue_out_sent_up_to({})", conn, new_msg_seq); + auto cc_seq = crosscore.prepare_submit(); + logger().debug("{} send {} IOHandler::requeue_out_sent_up_to({})", + conn, cc_seq, new_msg_seq); io_states.requeue_out_sent_up_to(); gate.dispatch_in_background( - "requeue_out_replacing", conn, [this, new_msg_seq] { + "requeue_out_replacing", conn, [this, cc_seq, new_msg_seq] { return seastar::smp::submit_to( - io_handler.get_shard_id(), [this, new_msg_seq] { - io_handler.requeue_out_sent_up_to(new_msg_seq); + io_handler.get_shard_id(), [this, cc_seq, new_msg_seq] { + return io_handler.requeue_out_sent_up_to(cc_seq, new_msg_seq); }); }); @@ -2041,15 +2063,27 @@ void ProtocolV2::trigger_replacing(bool reconnect, // READY state -void ProtocolV2::notify_out_fault( +seastar::future<> ProtocolV2::notify_out_fault( + crosscore_t::seq_t cc_seq, const char *where, std::exception_ptr eptr, io_handler_state _io_states) { assert(seastar::this_shard_id() == conn.get_messenger_shard_id()); + if (!crosscore.proceed_or_wait(cc_seq)) { + logger().debug("{} got {} notify_out_fault(), wait at {}", + conn, cc_seq, crosscore.get_in_seq()); + return crosscore.wait(cc_seq + ).then([this, cc_seq, where, eptr, _io_states] { + return notify_out_fault(cc_seq, where, eptr, _io_states); + }); + } + io_states = _io_states; - logger().debug("{} got notify_out_fault(): io_states={}", conn, io_states); + logger().debug("{} got {} notify_out_fault(): io_states={}", + conn, cc_seq, io_states); fault(state_t::READY, where, eptr); + return seastar::now(); } void ProtocolV2::execute_ready() @@ -2070,16 +2104,28 @@ void ProtocolV2::execute_standby() trigger_state(state_t::STANDBY, io_state_t::delay); } -void ProtocolV2::notify_out() +seastar::future<> ProtocolV2::notify_out( + crosscore_t::seq_t cc_seq) { assert(seastar::this_shard_id() == conn.get_messenger_shard_id()); - logger().debug("{} got notify_out(): at {}", conn, get_state_name(state)); + if (!crosscore.proceed_or_wait(cc_seq)) { + logger().debug("{} got {} notify_out(), wait at {}", + conn, cc_seq, crosscore.get_in_seq()); + return crosscore.wait(cc_seq + ).then([this, cc_seq] { + return notify_out(cc_seq); + }); + } + + logger().debug("{} got {} notify_out(): at {}", + conn, cc_seq, get_state_name(state)); io_states.is_out_queued = true; if (unlikely(state == state_t::STANDBY && !conn.policy.server)) { logger().info("{} notify_out(): at {}, going to CONNECTING", conn, get_state_name(state)); execute_connecting(); } + return seastar::now(); } // WAIT state @@ -2147,11 +2193,23 @@ void ProtocolV2::execute_server_wait() // CLOSING state -void ProtocolV2::notify_mark_down() +seastar::future<> ProtocolV2::notify_mark_down( + crosscore_t::seq_t cc_seq) { assert(seastar::this_shard_id() == conn.get_messenger_shard_id()); - logger().debug("{} got notify_mark_down()", conn); + if (!crosscore.proceed_or_wait(cc_seq)) { + logger().debug("{} got {} notify_mark_down(), wait at {}", + conn, cc_seq, crosscore.get_in_seq()); + return crosscore.wait(cc_seq + ).then([this, cc_seq] { + return notify_mark_down(cc_seq); + }); + } + + logger().debug("{} got {} notify_mark_down()", + conn, cc_seq); do_close(false); + return seastar::now(); } seastar::future<> ProtocolV2::close_clean_yielded() @@ -2226,8 +2284,9 @@ void ProtocolV2::do_close( return wait_switch_io_shard( ).then([this, is_dispatch_reset, is_replace] { trigger_state_phase2(state_t::CLOSING, io_state_t::drop); - logger().debug("{} IOHandler::close_io(reset={}, replace={})", - conn, is_dispatch_reset, is_replace); + auto cc_seq = crosscore.prepare_submit(); + logger().debug("{} send {} IOHandler::close_io(reset={}, replace={})", + conn, cc_seq, is_dispatch_reset, is_replace); std::ignore = gate.close( ).then([this] { @@ -2262,8 +2321,8 @@ void ProtocolV2::do_close( return seastar::smp::submit_to( io_handler.get_shard_id(), - [this, is_dispatch_reset, is_replace] { - return io_handler.close_io(is_dispatch_reset, is_replace); + [this, cc_seq, is_dispatch_reset, is_replace] { + return io_handler.close_io(cc_seq, is_dispatch_reset, is_replace); }); // user can make changes }); diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h index f083d1721ac..a9aa4ecdf76 100644 --- a/src/crimson/net/ProtocolV2.h +++ b/src/crimson/net/ProtocolV2.h @@ -28,11 +28,17 @@ public: * as HandshakeListener */ private: - void notify_out() final; + seastar::future<> notify_out( + crosscore_t::seq_t cc_seq) final; - void notify_out_fault(const char *where, std::exception_ptr, io_handler_state) final; + seastar::future<> notify_out_fault( + crosscore_t::seq_t cc_seq, + const char *where, + std::exception_ptr, + io_handler_state) final; - void notify_mark_down() final; + seastar::future<> notify_mark_down( + crosscore_t::seq_t cc_seq) final; /* * as ProtocolV2 to be called by SocketConnection @@ -237,6 +243,8 @@ private: // asynchronously populated from io_handler io_handler_state io_states; + crosscore_t crosscore; + bool has_socket = false; // the socket exists and it is not shutdown diff --git a/src/crimson/net/io_handler.cc b/src/crimson/net/io_handler.cc index f04ffff3a00..adcb8611148 100644 --- a/src/crimson/net/io_handler.cc +++ b/src/crimson/net/io_handler.cc @@ -215,14 +215,15 @@ void IOHandler::mark_down() return; } - logger().info("{} mark_down() at {}, send notify_mark_down()", - conn, io_stat_printer{*this}); - set_io_state(io_state_t::drop); + auto cc_seq = crosscore.prepare_submit(); + logger().info("{} mark_down() at {}, send {} notify_mark_down()", + conn, io_stat_printer{*this}, cc_seq); + do_set_io_state(io_state_t::drop); shard_states->dispatch_in_background( - "notify_mark_down", conn, [this] { + "notify_mark_down", conn, [this, cc_seq] { return seastar::smp::submit_to( - conn.get_messenger_shard_id(), [this] { - handshake_listener->notify_mark_down(); + conn.get_messenger_shard_id(), [this, cc_seq] { + return handshake_listener->notify_mark_down(cc_seq); }); }); } @@ -255,16 +256,19 @@ void IOHandler::assign_frame_assembler(FrameAssemblerV2Ref fa) ceph_assert_always(frame_assembler->is_socket_valid()); } -void IOHandler::set_io_state( +void IOHandler::do_set_io_state( io_state_t new_state, + std::optional cc_seq, FrameAssemblerV2Ref fa, bool set_notify_out) { ceph_assert_always(seastar::this_shard_id() == get_shard_id()); auto prv_state = get_io_state(); - logger().debug("{} got set_io_state(): prv_state={}, new_state={}, " + logger().debug("{} got {}do_set_io_state(): prv_state={}, new_state={}, " "fa={}, set_notify_out={}, at {}", - conn, prv_state, new_state, + conn, + cc_seq.has_value() ? fmt::format("{} ", *cc_seq) : "", + prv_state, new_state, fa ? "present" : "N/A", set_notify_out, io_stat_printer{*this}); ceph_assert_always(!( @@ -331,11 +335,43 @@ void IOHandler::set_io_state( } } +seastar::future<> IOHandler::set_io_state( + crosscore_t::seq_t cc_seq, + io_state_t new_state, + FrameAssemblerV2Ref fa, + bool set_notify_out) +{ + assert(seastar::this_shard_id() == get_shard_id()); + if (!crosscore.proceed_or_wait(cc_seq)) { + logger().debug("{} got {} set_io_state(), wait at {}", + conn, cc_seq, crosscore.get_in_seq()); + return crosscore.wait(cc_seq + ).then([this, cc_seq, new_state, + fa=std::move(fa), set_notify_out]() mutable { + return set_io_state(cc_seq, new_state, std::move(fa), set_notify_out); + }); + } + + do_set_io_state(new_state, cc_seq, std::move(fa), set_notify_out); + return seastar::now(); +} + seastar::future -IOHandler::wait_io_exit_dispatching() +IOHandler::wait_io_exit_dispatching( + crosscore_t::seq_t cc_seq) { assert(seastar::this_shard_id() == get_shard_id()); - logger().debug("{} got wait_io_exit_dispatching()", conn); + if (!crosscore.proceed_or_wait(cc_seq)) { + logger().debug("{} got {} wait_io_exit_dispatching(), wait at {}", + conn, cc_seq, crosscore.get_in_seq()); + return crosscore.wait(cc_seq + ).then([this, cc_seq] { + return wait_io_exit_dispatching(cc_seq); + }); + } + + logger().debug("{} got {} wait_io_exit_dispatching()", + conn, cc_seq); ceph_assert_always(get_io_state() != io_state_t::open); ceph_assert_always(frame_assembler != nullptr); ceph_assert_always(!frame_assembler->is_socket_valid()); @@ -365,31 +401,74 @@ IOHandler::wait_io_exit_dispatching() }); } -void IOHandler::reset_session(bool full) +seastar::future<> IOHandler::reset_session( + crosscore_t::seq_t cc_seq, + bool full) { assert(seastar::this_shard_id() == get_shard_id()); - logger().debug("{} got reset_session({})", conn, full); + if (!crosscore.proceed_or_wait(cc_seq)) { + logger().debug("{} got {} reset_session(), wait at {}", + conn, cc_seq, crosscore.get_in_seq()); + return crosscore.wait(cc_seq + ).then([this, cc_seq, full] { + return reset_session(cc_seq, full); + }); + } + + logger().debug("{} got {} reset_session({})", + conn, cc_seq, full); assert(get_io_state() != io_state_t::open); reset_in(); if (full) { reset_out(); dispatch_remote_reset(); } + return seastar::now(); } -void IOHandler::reset_peer_state() +seastar::future<> IOHandler::reset_peer_state( + crosscore_t::seq_t cc_seq) { assert(seastar::this_shard_id() == get_shard_id()); - logger().debug("{} got reset_peer_state()", conn); + if (!crosscore.proceed_or_wait(cc_seq)) { + logger().debug("{} got {} reset_peer_state(), wait at {}", + conn, cc_seq, crosscore.get_in_seq()); + return crosscore.wait(cc_seq + ).then([this, cc_seq] { + return reset_peer_state(cc_seq); + }); + } + + logger().debug("{} got {} reset_peer_state()", + conn, cc_seq); assert(get_io_state() != io_state_t::open); reset_in(); - requeue_out_sent_up_to(0); + do_requeue_out_sent_up_to(0); discard_out_sent(); + return seastar::now(); } -void IOHandler::requeue_out_sent() +seastar::future<> IOHandler::requeue_out_sent( + crosscore_t::seq_t cc_seq) { assert(seastar::this_shard_id() == get_shard_id()); + if (!crosscore.proceed_or_wait(cc_seq)) { + logger().debug("{} got {} requeue_out_sent(), wait at {}", + conn, cc_seq, crosscore.get_in_seq()); + return crosscore.wait(cc_seq + ).then([this, cc_seq] { + return requeue_out_sent(cc_seq); + }); + } + + logger().debug("{} got {} requeue_out_sent()", + conn, cc_seq); + do_requeue_out_sent(); + return seastar::now(); +} + +void IOHandler::do_requeue_out_sent() +{ assert(get_io_state() != io_state_t::open); if (out_sent_msgs.empty()) { return; @@ -410,9 +489,28 @@ void IOHandler::requeue_out_sent() maybe_notify_out_dispatch(); } -void IOHandler::requeue_out_sent_up_to(seq_num_t seq) +seastar::future<> IOHandler::requeue_out_sent_up_to( + crosscore_t::seq_t cc_seq, + seq_num_t msg_seq) { assert(seastar::this_shard_id() == get_shard_id()); + if (!crosscore.proceed_or_wait(cc_seq)) { + logger().debug("{} got {} requeue_out_sent_up_to(), wait at {}", + conn, cc_seq, crosscore.get_in_seq()); + return crosscore.wait(cc_seq + ).then([this, cc_seq, msg_seq] { + return requeue_out_sent_up_to(cc_seq, msg_seq); + }); + } + + logger().debug("{} got {} requeue_out_sent_up_to({})", + conn, cc_seq, msg_seq); + do_requeue_out_sent_up_to(msg_seq); + return seastar::now(); +} + +void IOHandler::do_requeue_out_sent_up_to(seq_num_t seq) +{ assert(get_io_state() != io_state_t::open); if (out_sent_msgs.empty() && out_pending_msgs.empty()) { logger().debug("{} nothing to requeue, reset out_seq from {} to seq {}", @@ -430,7 +528,7 @@ void IOHandler::requeue_out_sent_up_to(seq_num_t seq) out_sent_msgs.pop_front(); } } - requeue_out_sent(); + do_requeue_out_sent(); } void IOHandler::reset_in() @@ -458,12 +556,23 @@ void IOHandler::discard_out_sent() seastar::future<> IOHandler::dispatch_accept( + crosscore_t::seq_t cc_seq, seastar::shard_id new_sid, ConnectionFRef conn_fref) { ceph_assert_always(seastar::this_shard_id() == get_shard_id()); - logger().debug("{} got dispatch_accept({}) at {}", - conn, new_sid, io_stat_printer{*this}); + if (!crosscore.proceed_or_wait(cc_seq)) { + logger().debug("{} got {} dispatch_accept(), wait at {}", + conn, cc_seq, crosscore.get_in_seq()); + return crosscore.wait(cc_seq + ).then([this, cc_seq, new_sid, + conn_fref=std::move(conn_fref)]() mutable { + return dispatch_accept(cc_seq, new_sid, std::move(conn_fref)); + }); + } + + logger().debug("{} got {} dispatch_accept({}) at {}", + conn, cc_seq, new_sid, io_stat_printer{*this}); if (get_io_state() == io_state_t::drop) { assert(!protocol_is_connected); // it is possible that both io_handler and protocolv2 are @@ -485,12 +594,23 @@ IOHandler::dispatch_accept( seastar::future<> IOHandler::dispatch_connect( + crosscore_t::seq_t cc_seq, seastar::shard_id new_sid, ConnectionFRef conn_fref) { ceph_assert_always(seastar::this_shard_id() == get_shard_id()); - logger().debug("{} got dispatch_connect({}) at {}", - conn, new_sid, io_stat_printer{*this}); + if (!crosscore.proceed_or_wait(cc_seq)) { + logger().debug("{} got {} dispatch_connect(), wait at {}", + conn, cc_seq, crosscore.get_in_seq()); + return crosscore.wait(cc_seq + ).then([this, cc_seq, new_sid, + conn_fref=std::move(conn_fref)]() mutable { + return dispatch_connect(cc_seq, new_sid, std::move(conn_fref)); + }); + } + + logger().debug("{} got {} dispatch_connect({}) at {}", + conn, cc_seq, new_sid, io_stat_printer{*this}); if (get_io_state() == io_state_t::drop) { assert(!protocol_is_connected); // it is possible that both io_handler and protocolv2 are @@ -693,23 +813,24 @@ IOHandler::do_out_dispatch(shard_states_t &ctx) } if (io_state == io_state_t::open) { + auto cc_seq = crosscore.prepare_submit(); logger().info("{} do_out_dispatch(): fault at {}, {}, going to delay -- {}, " - "send notify_out_fault()", - conn, io_state, io_stat_printer{*this}, e.what()); + "send {} notify_out_fault()", + conn, io_state, io_stat_printer{*this}, e.what(), cc_seq); std::exception_ptr eptr; try { throw e; } catch(...) { eptr = std::current_exception(); } - set_io_state(io_state_t::delay); + do_set_io_state(io_state_t::delay); shard_states->dispatch_in_background( - "notify_out_fault(out)", conn, [this, eptr] { + "notify_out_fault(out)", conn, [this, cc_seq, eptr] { auto states = get_states(); return seastar::smp::submit_to( - conn.get_messenger_shard_id(), [this, eptr, states] { - handshake_listener->notify_out_fault( - "do_out_dispatch", eptr, states); + conn.get_messenger_shard_id(), [this, cc_seq, eptr, states] { + return handshake_listener->notify_out_fault( + cc_seq, "do_out_dispatch", eptr, states); }); }); } else { @@ -739,12 +860,14 @@ void IOHandler::notify_out_dispatch() ceph_assert_always(seastar::this_shard_id() == get_shard_id()); assert(is_out_queued()); if (need_notify_out) { - logger().debug("{} send notify_out()", conn); + auto cc_seq = crosscore.prepare_submit(); + logger().debug("{} send {} notify_out()", + conn, cc_seq); shard_states->dispatch_in_background( - "notify_out", conn, [this] { + "notify_out", conn, [this, cc_seq] { return seastar::smp::submit_to( - conn.get_messenger_shard_id(), [this] { - handshake_listener->notify_out(); + conn.get_messenger_shard_id(), [this, cc_seq] { + return handshake_listener->notify_out(cc_seq); }); }); } @@ -967,17 +1090,18 @@ void IOHandler::do_in_dispatch() auto io_state = ctx.get_io_state(); if (io_state == io_state_t::open) { + auto cc_seq = crosscore.prepare_submit(); logger().info("{} do_in_dispatch(): fault at {}, {}, going to delay -- {}, " - "send notify_out_fault()", - conn, io_state, io_stat_printer{*this}, e_what); - set_io_state(io_state_t::delay); + "send {} notify_out_fault()", + conn, io_state, io_stat_printer{*this}, e_what, cc_seq); + do_set_io_state(io_state_t::delay); shard_states->dispatch_in_background( - "notify_out_fault(in)", conn, [this, eptr] { + "notify_out_fault(in)", conn, [this, cc_seq, eptr] { auto states = get_states(); return seastar::smp::submit_to( - conn.get_messenger_shard_id(), [this, eptr, states] { - handshake_listener->notify_out_fault( - "do_in_dispatch", eptr, states); + conn.get_messenger_shard_id(), [this, cc_seq, eptr, states] { + return handshake_listener->notify_out_fault( + cc_seq, "do_in_dispatch", eptr, states); }); }); } else { @@ -996,13 +1120,24 @@ void IOHandler::do_in_dispatch() } seastar::future<> -IOHandler::close_io(bool is_dispatch_reset, bool is_replace) +IOHandler::close_io( + crosscore_t::seq_t cc_seq, + bool is_dispatch_reset, + bool is_replace) { ceph_assert_always(seastar::this_shard_id() == get_shard_id()); - ceph_assert_always(get_io_state() == io_state_t::drop); + if (!crosscore.proceed_or_wait(cc_seq)) { + logger().debug("{} got {} close_io(), wait at {}", + conn, cc_seq, crosscore.get_in_seq()); + return crosscore.wait(cc_seq + ).then([this, cc_seq, is_dispatch_reset, is_replace] { + return close_io(cc_seq, is_dispatch_reset, is_replace); + }); + } - logger().debug("{} got close_io(reset={}, replace={})", - conn, is_dispatch_reset, is_replace); + logger().debug("{} got {} close_io(reset={}, replace={})", + conn, cc_seq, is_dispatch_reset, is_replace); + ceph_assert_always(get_io_state() == io_state_t::drop); if (is_dispatch_reset) { dispatch_reset(is_replace); diff --git a/src/crimson/net/io_handler.h b/src/crimson/net/io_handler.h index f478f142963..108386fc7ae 100644 --- a/src/crimson/net/io_handler.h +++ b/src/crimson/net/io_handler.h @@ -3,6 +3,7 @@ #pragma once +#include #include #include "crimson/common/gated.h" @@ -12,6 +13,54 @@ namespace crimson::net { +/** + * crosscore_t + * + * To preserve the event order across cores. + */ +class crosscore_t { +public: + using seq_t = uint64_t; + + crosscore_t() = default; + ~crosscore_t() = default; + + seq_t get_in_seq() const { + return in_seq; + } + + seq_t prepare_submit() { + ++out_seq; + return out_seq; + } + + bool proceed_or_wait(seq_t seq) { + if (seq == in_seq + 1) { + ++in_seq; + if (unlikely(in_pr_wait.has_value())) { + in_pr_wait->set_value(); + in_pr_wait = std::nullopt; + } + return true; + } else { + return false; + } + } + + seastar::future<> wait(seq_t seq) { + assert(seq != in_seq + 1); + if (!in_pr_wait.has_value()) { + in_pr_wait = seastar::shared_promise<>(); + } + return in_pr_wait->get_shared_future(); + } + +private: + seq_t out_seq = 0; + seq_t in_seq = 0; + std::optional> in_pr_wait; +}; + /** * io_handler_state * @@ -74,14 +123,17 @@ public: HandshakeListener &operator=(const HandshakeListener &) = delete; HandshakeListener &operator=(HandshakeListener &&) = delete; - virtual void notify_out() = 0; + virtual seastar::future<> notify_out( + crosscore_t::seq_t cc_seq) = 0; - virtual void notify_out_fault( + virtual seastar::future<> notify_out_fault( + crosscore_t::seq_t cc_seq, const char *where, std::exception_ptr, io_handler_state) = 0; - virtual void notify_mark_down() = 0; + virtual seastar::future<> notify_mark_down( + crosscore_t::seq_t cc_seq) = 0; protected: HandshakeListener() = default; @@ -157,7 +209,7 @@ public: } io_handler_state get_states() const { - assert(seastar::this_shard_id() == get_shard_id()); + // might be called from prv_sid during wait_io_exit_dispatching() return {in_seq, is_out_queued(), has_out_sent()}; } @@ -170,7 +222,10 @@ public: * may be called cross-core */ - seastar::future<> close_io(bool is_dispatch_reset, bool is_replace); + seastar::future<> close_io( + crosscore_t::seq_t cc_seq, + bool is_dispatch_reset, + bool is_replace); /** * io_state_t @@ -188,30 +243,43 @@ public: }; friend class fmt::formatter; - void set_io_state( + seastar::future<> set_io_state( + crosscore_t::seq_t cc_seq, io_state_t new_state, - FrameAssemblerV2Ref fa = nullptr, - bool set_notify_out = false); + FrameAssemblerV2Ref fa, + bool set_notify_out); struct exit_dispatching_ret { FrameAssemblerV2Ref frame_assembler; io_handler_state io_states; }; - seastar::future wait_io_exit_dispatching(); + seastar::future + wait_io_exit_dispatching( + crosscore_t::seq_t cc_seq); - void reset_session(bool full); + seastar::future<> reset_session( + crosscore_t::seq_t cc_seq, + bool full); - void reset_peer_state(); + seastar::future<> reset_peer_state( + crosscore_t::seq_t cc_seq); - void requeue_out_sent_up_to(seq_num_t seq); + seastar::future<> requeue_out_sent_up_to( + crosscore_t::seq_t cc_seq, + seq_num_t msg_seq); - void requeue_out_sent(); + seastar::future<> requeue_out_sent( + crosscore_t::seq_t cc_seq); seastar::future<> dispatch_accept( - seastar::shard_id new_sid, ConnectionFRef); + crosscore_t::seq_t cc_seq, + seastar::shard_id new_sid, + ConnectionFRef); seastar::future<> dispatch_connect( - seastar::shard_id new_sid, ConnectionFRef); + crosscore_t::seq_t cc_seq, + seastar::shard_id new_sid, + ConnectionFRef); private: class shard_states_t; @@ -348,10 +416,20 @@ public: std::optional> in_exit_dispatching; }; + void do_set_io_state( + io_state_t new_state, + std::optional cc_seq = std::nullopt, + FrameAssemblerV2Ref fa = nullptr, + bool set_notify_out = false); + io_state_t get_io_state() const { return shard_states->get_io_state(); } + void do_requeue_out_sent(); + + void do_requeue_out_sent_up_to(seq_num_t seq); + void assign_frame_assembler(FrameAssemblerV2Ref); seastar::future<> send_redirected(MessageFRef msg); @@ -411,6 +489,8 @@ public: private: shard_states_ref_t shard_states; + crosscore_t crosscore; + // drop was happening in the previous sid std::optional maybe_dropped_sid; -- 2.39.5