From d981f681e5fa1c75cf2951ba9971433b4fa09b44 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Mon, 10 Jul 2023 09:51:39 +0800 Subject: [PATCH] crimson/net: adjust dispatcher interface about cross-core notifications Due to that we aren't able to determine cross-core ordering: * Move ms_handle_connect/accept() to be called in the new shard, so it will notify before ms_dispatch() in the same core; * Introduce another ms_handle_shard_change() when the current core is changed; Signed-off-by: Yingxin Cheng (cherry picked from commit 6bab7e698db8b7f3bec240952ed33a2bb9918a20) --- src/crimson/mgr/client.cc | 4 +- src/crimson/net/Dispatcher.h | 15 ++- src/crimson/net/ProtocolV2.cc | 6 + src/crimson/net/chained_dispatchers.cc | 34 +++-- src/crimson/net/chained_dispatchers.h | 11 +- src/crimson/net/io_handler.cc | 155 +++++++++++----------- src/crimson/net/io_handler.h | 5 +- src/crimson/osd/heartbeat.cc | 8 +- src/crimson/tools/perf_crimson_msgr.cc | 4 +- src/test/crimson/test_messenger.cc | 24 ++-- src/test/crimson/test_messenger_thrash.cc | 8 +- 11 files changed, 154 insertions(+), 120 deletions(-) diff --git a/src/crimson/mgr/client.cc b/src/crimson/mgr/client.cc index 81c508c1918e..169915c9eb3b 100644 --- a/src/crimson/mgr/client.cc +++ b/src/crimson/mgr/client.cc @@ -67,9 +67,9 @@ Client::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m) void Client::ms_handle_connect( crimson::net::ConnectionRef c, - seastar::shard_id new_shard) + seastar::shard_id prv_shard) { - ceph_assert_always(new_shard == seastar::this_shard_id()); + ceph_assert_always(prv_shard == seastar::this_shard_id()); gate.dispatch_in_background(__func__, *this, [this, c] { if (conn == c) { // ask for the mgrconfigure message diff --git a/src/crimson/net/Dispatcher.h b/src/crimson/net/Dispatcher.h index 11908349e7cd..9eea0a858f06 100644 --- a/src/crimson/net/Dispatcher.h +++ b/src/crimson/net/Dispatcher.h @@ -30,17 +30,24 @@ class Dispatcher { // used to throttle the connection if it's too busy. virtual std::optional> ms_dispatch(ConnectionRef, MessageRef) = 0; + // The connection is moving to the new_shard under accept/connect. + // User should not operate conn in this shard thereafter. + virtual void ms_handle_shard_change( + ConnectionRef conn, + seastar::shard_id new_shard, + bool is_accept_or_connect) {} + // The connection is accepted or recoverred(lossless), all the followup - // events and messages will be dispatched to the new_shard. + // events and messages will be dispatched to this shard. // // is_replace=true means the accepted connection has replaced // another connecting connection with the same peer_addr, which currently only // happens under lossy policy when both sides wish to connect to each other. - virtual void ms_handle_accept(ConnectionRef conn, seastar::shard_id new_shard, bool is_replace) {} + virtual void ms_handle_accept(ConnectionRef conn, seastar::shard_id prv_shard, bool is_replace) {} // The connection is (re)connected, all the followup events and messages will - // be dispatched to the new_shard. - virtual void ms_handle_connect(ConnectionRef conn, seastar::shard_id new_shard) {} + // be dispatched to this shard. + virtual void ms_handle_connect(ConnectionRef conn, seastar::shard_id prv_shard) {} // a reset event is dispatched when the connection is closed unexpectedly. // diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index 869c11cf5a55..045022b353cd 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -1003,6 +1003,8 @@ void ProtocolV2::execute_connecting() } auto cc_seq = crosscore.prepare_submit(); + // there are 2 hops with dispatch_connect() + crosscore.prepare_submit(); logger().info("{} connected: gs={}, pgs={}, cs={}, " "client_cookie={}, server_cookie={}, {}, new_sid={}, " "send {} IOHandler::dispatch_connect()", @@ -1797,6 +1799,8 @@ void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) { // set io_handler to a new shard auto cc_seq = crosscore.prepare_submit(); + // there are 2 hops with dispatch_accept() + crosscore.prepare_submit(); auto new_io_shard = frame_assembler->get_socket_shard_id(); logger().debug("{} send {} IOHandler::dispatch_accept({})", conn, cc_seq, new_io_shard); @@ -1968,6 +1972,8 @@ void ProtocolV2::trigger_replacing(bool reconnect, // set io_handler to a new shard // we should prevent parallel switching core attemps auto cc_seq = crosscore.prepare_submit(); + // there are 2 hops with dispatch_accept() + crosscore.prepare_submit(); logger().debug("{} send {} IOHandler::dispatch_accept({})", conn, cc_seq, new_io_shard); ConnectionFRef conn_fref = seastar::make_foreign( diff --git a/src/crimson/net/chained_dispatchers.cc b/src/crimson/net/chained_dispatchers.cc index dfff6d916fa6..1e4af3baa7df 100644 --- a/src/crimson/net/chained_dispatchers.cc +++ b/src/crimson/net/chained_dispatchers.cc @@ -13,7 +13,7 @@ namespace { namespace crimson::net { seastar::future<> -ChainedDispatchers::ms_dispatch(crimson::net::ConnectionRef conn, +ChainedDispatchers::ms_dispatch(ConnectionRef conn, MessageRef m) { try { for (auto& dispatcher : dispatchers) { @@ -39,13 +39,29 @@ ChainedDispatchers::ms_dispatch(crimson::net::ConnectionRef conn, } void -ChainedDispatchers::ms_handle_accept( - crimson::net::ConnectionRef conn, +ChainedDispatchers::ms_handle_shard_change( + ConnectionRef conn, seastar::shard_id new_shard, + bool ac) { + try { + for (auto& dispatcher : dispatchers) { + dispatcher->ms_handle_shard_change(conn, new_shard, ac); + } + } catch (...) { + logger().error("{} got unexpected exception in ms_handle_shard_change() {}", + *conn, std::current_exception()); + ceph_abort(); + } +} + +void +ChainedDispatchers::ms_handle_accept( + ConnectionRef conn, + seastar::shard_id prv_shard, bool is_replace) { try { for (auto& dispatcher : dispatchers) { - dispatcher->ms_handle_accept(conn, new_shard, is_replace); + dispatcher->ms_handle_accept(conn, prv_shard, is_replace); } } catch (...) { logger().error("{} got unexpected exception in ms_handle_accept() {}", @@ -56,11 +72,11 @@ ChainedDispatchers::ms_handle_accept( void ChainedDispatchers::ms_handle_connect( - crimson::net::ConnectionRef conn, - seastar::shard_id new_shard) { + ConnectionRef conn, + seastar::shard_id prv_shard) { try { for(auto& dispatcher : dispatchers) { - dispatcher->ms_handle_connect(conn, new_shard); + dispatcher->ms_handle_connect(conn, prv_shard); } } catch (...) { logger().error("{} got unexpected exception in ms_handle_connect() {}", @@ -70,7 +86,7 @@ ChainedDispatchers::ms_handle_connect( } void -ChainedDispatchers::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) { +ChainedDispatchers::ms_handle_reset(ConnectionRef conn, bool is_replace) { try { for (auto& dispatcher : dispatchers) { dispatcher->ms_handle_reset(conn, is_replace); @@ -83,7 +99,7 @@ ChainedDispatchers::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_re } void -ChainedDispatchers::ms_handle_remote_reset(crimson::net::ConnectionRef conn) { +ChainedDispatchers::ms_handle_remote_reset(ConnectionRef conn) { try { for (auto& dispatcher : dispatchers) { dispatcher->ms_handle_remote_reset(conn); diff --git a/src/crimson/net/chained_dispatchers.h b/src/crimson/net/chained_dispatchers.h index 5835205119d8..ec085864ffac 100644 --- a/src/crimson/net/chained_dispatchers.h +++ b/src/crimson/net/chained_dispatchers.h @@ -25,11 +25,12 @@ public: bool empty() const { return dispatchers.empty(); } - seastar::future<> ms_dispatch(crimson::net::ConnectionRef, MessageRef); - void ms_handle_accept(crimson::net::ConnectionRef conn, seastar::shard_id, bool is_replace); - void ms_handle_connect(crimson::net::ConnectionRef conn, seastar::shard_id); - void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace); - void ms_handle_remote_reset(crimson::net::ConnectionRef conn); + seastar::future<> ms_dispatch(ConnectionRef, MessageRef); + void ms_handle_shard_change(ConnectionRef, seastar::shard_id, bool); + void ms_handle_accept(ConnectionRef conn, seastar::shard_id, bool is_replace); + void ms_handle_connect(ConnectionRef conn, seastar::shard_id); + void ms_handle_reset(ConnectionRef conn, bool is_replace); + void ms_handle_remote_reset(ConnectionRef conn); private: dispatchers_t dispatchers; diff --git a/src/crimson/net/io_handler.cc b/src/crimson/net/io_handler.cc index abb7f5e46734..576f9ff434c9 100644 --- a/src/crimson/net/io_handler.cc +++ b/src/crimson/net/io_handler.cc @@ -561,36 +561,7 @@ IOHandler::dispatch_accept( ConnectionFRef conn_fref, bool is_replace) { - ceph_assert_always(seastar::this_shard_id() == get_shard_id()); - if (!crosscore.proceed_or_wait(cc_seq)) { - logger().debug("{} got {} dispatch_accept(), wait at {}", - conn, cc_seq, crosscore.get_in_seq()); - return crosscore.wait(cc_seq - ).then([this, cc_seq, new_sid, is_replace, - conn_fref=std::move(conn_fref)]() mutable { - return dispatch_accept(cc_seq, new_sid, std::move(conn_fref), is_replace); - }); - } - - logger().debug("{} got {} dispatch_accept(new_sid={}, replace={}) at {}", - conn, cc_seq, new_sid, is_replace, io_stat_printer{*this}); - if (get_io_state() == io_state_t::drop) { - assert(!protocol_is_connected); - // it is possible that both io_handler and protocolv2 are - // trying to close each other from different cores simultaneously. - return to_new_sid(new_sid, std::move(conn_fref)); - } - // protocol_is_connected can be from true to true here if the replacing is - // happening to a connected connection. - protocol_is_connected = true; - ceph_assert_always(conn_ref); - auto _conn_ref = conn_ref; - auto fut = to_new_sid(new_sid, std::move(conn_fref)); - - dispatchers.ms_handle_accept(_conn_ref, new_sid, is_replace); - // user can make changes - - return fut; + return to_new_sid(cc_seq, new_sid, std::move(conn_fref), is_replace); } seastar::future<> @@ -599,35 +570,7 @@ IOHandler::dispatch_connect( seastar::shard_id new_sid, ConnectionFRef conn_fref) { - ceph_assert_always(seastar::this_shard_id() == get_shard_id()); - if (!crosscore.proceed_or_wait(cc_seq)) { - logger().debug("{} got {} dispatch_connect(), wait at {}", - conn, cc_seq, crosscore.get_in_seq()); - return crosscore.wait(cc_seq - ).then([this, cc_seq, new_sid, - conn_fref=std::move(conn_fref)]() mutable { - return dispatch_connect(cc_seq, new_sid, std::move(conn_fref)); - }); - } - - logger().debug("{} got {} dispatch_connect({}) at {}", - conn, cc_seq, new_sid, io_stat_printer{*this}); - if (get_io_state() == io_state_t::drop) { - assert(!protocol_is_connected); - // it is possible that both io_handler and protocolv2 are - // trying to close each other from different cores simultaneously. - return to_new_sid(new_sid, std::move(conn_fref)); - } - ceph_assert_always(protocol_is_connected == false); - protocol_is_connected = true; - ceph_assert_always(conn_ref); - auto _conn_ref = conn_ref; - auto fut = to_new_sid(new_sid, std::move(conn_fref)); - - dispatchers.ms_handle_connect(_conn_ref, new_sid); - // user can make changes - - return fut; + return to_new_sid(cc_seq, new_sid, std::move(conn_fref), std::nullopt); } seastar::future<> @@ -650,18 +593,56 @@ IOHandler::cleanup_prv_shard(seastar::shard_id prv_sid) seastar::future<> IOHandler::to_new_sid( + crosscore_t::seq_t cc_seq, seastar::shard_id new_sid, - ConnectionFRef conn_fref) + ConnectionFRef conn_fref, + std::optional is_replace) { - /* - * Note: - * - It must be called before user is aware of the new core (through dispatching); - * - Messenger must wait the returned future for futher operations to prevent racing; - * - In general, the below submitted continuation should be the first one from the prv sid - * to the new sid; - */ + ceph_assert_always(seastar::this_shard_id() == get_shard_id()); + if (!crosscore.proceed_or_wait(cc_seq)) { + logger().debug("{} got {} to_new_sid(), wait at {}", + conn, cc_seq, crosscore.get_in_seq()); + return crosscore.wait(cc_seq + ).then([this, cc_seq, new_sid, is_replace, + conn_fref=std::move(conn_fref)]() mutable { + return to_new_sid(cc_seq, new_sid, std::move(conn_fref), is_replace); + }); + } + + bool is_accept_or_connect = is_replace.has_value(); + logger().debug("{} got {} to_new_sid_1(new_sid={}, {}) at {}", + conn, cc_seq, new_sid, + fmt::format("{}", + is_accept_or_connect ? + (*is_replace ? "accept(replace)" : "accept(!replace)") : + "connect"), + io_stat_printer{*this}); + auto next_cc_seq = ++cc_seq; + + if (get_io_state() != io_state_t::drop) { + ceph_assert_always(conn_ref); + if (new_sid != seastar::this_shard_id()) { + dispatchers.ms_handle_shard_change(conn_ref, new_sid, is_accept_or_connect); + // user can make changes + } + } else { + // it is possible that both io_handler and protocolv2 are + // trying to close each other from different cores simultaneously. + assert(!protocol_is_connected); + } + + if (get_io_state() != io_state_t::drop) { + if (is_accept_or_connect) { + // protocol_is_connected can be from true to true here if the replacing is + // happening to a connected connection. + } else { + ceph_assert_always(protocol_is_connected == false); + } + protocol_is_connected = true; + } else { + assert(!protocol_is_connected); + } - assert(seastar::this_shard_id() == get_shard_id()); bool is_dropped = false; if (get_io_state() == io_state_t::drop) { is_dropped = true; @@ -679,30 +660,50 @@ IOHandler::to_new_sid( assert(new_sid == get_shard_id()); return seastar::smp::submit_to(new_sid, - [this, is_dropped, prv_sid, conn_fref=std::move(conn_fref)]() mutable { - logger().debug("{} see new_sid in io_handler(new_sid) from {}, is_dropped={}", - conn, prv_sid, is_dropped); + [this, next_cc_seq, is_dropped, prv_sid, is_replace, conn_fref=std::move(conn_fref)]() mutable { + logger().debug("{} got {} to_new_sid_2(prv_sid={}, is_dropped={}, {}) at {}", + conn, next_cc_seq, prv_sid, is_dropped, + fmt::format("{}", + is_replace.has_value() ? + (*is_replace ? "accept(replace)" : "accept(!replace)") : + "connect"), + io_stat_printer{*this}); ceph_assert_always(seastar::this_shard_id() == get_shard_id()); ceph_assert_always(get_io_state() != io_state_t::open); ceph_assert_always(!maybe_dropped_sid.has_value()); - - ceph_assert_always(!conn_ref); - conn_ref = make_local_shared_foreign(std::move(conn_fref)); + ceph_assert_always(crosscore.proceed_or_wait(next_cc_seq)); if (is_dropped) { - // the follow up cleanups will be done in the prv_sid + ceph_assert_always(get_io_state() == io_state_t::drop); ceph_assert_always(shard_states->assert_closed_and_exit()); maybe_dropped_sid = prv_sid; + // cleanup_prv_shard() will be done in a follow-up close_io() } else { - // may be at io_state_t::drop - // cleanup the prvious shard + // possible at io_state_t::drop + + // previous shard is not cleaned, + // but close_io() is responsible to clean up the current shard, + // so cleanup the previous shard here. shard_states->dispatch_in_background( "cleanup_prv_sid", conn, [this, prv_sid] { return cleanup_prv_shard(prv_sid); }); maybe_notify_out_dispatch(); } + + ceph_assert_always(!conn_ref); + // assign even if already dropping + conn_ref = make_local_shared_foreign(std::move(conn_fref)); + + if (get_io_state() != io_state_t::drop) { + if (is_replace.has_value()) { + dispatchers.ms_handle_accept(conn_ref, prv_sid, *is_replace); + } else { + dispatchers.ms_handle_connect(conn_ref, prv_sid); + } + // user can make changes + } }); } diff --git a/src/crimson/net/io_handler.h b/src/crimson/net/io_handler.h index edb69b3407af..843e565672ab 100644 --- a/src/crimson/net/io_handler.h +++ b/src/crimson/net/io_handler.h @@ -447,7 +447,10 @@ public: seastar::future<> do_send_keepalive(); seastar::future<> to_new_sid( - seastar::shard_id new_sid, ConnectionFRef); + crosscore_t::seq_t cc_seq, + seastar::shard_id new_sid, + ConnectionFRef, + std::optional is_replace); void dispatch_reset(bool is_replace); diff --git a/src/crimson/osd/heartbeat.cc b/src/crimson/osd/heartbeat.cc index cdee52731e64..266e56533c3b 100644 --- a/src/crimson/osd/heartbeat.cc +++ b/src/crimson/osd/heartbeat.cc @@ -238,9 +238,9 @@ void Heartbeat::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replac void Heartbeat::ms_handle_connect( crimson::net::ConnectionRef conn, - seastar::shard_id new_shard) + seastar::shard_id prv_shard) { - ceph_assert_always(seastar::this_shard_id() == new_shard); + ceph_assert_always(seastar::this_shard_id() == prv_shard); auto peer = conn->get_peer_id(); if (conn->get_peer_type() != entity_name_t::TYPE_OSD || peer == entity_name_t::NEW) { @@ -254,10 +254,10 @@ void Heartbeat::ms_handle_connect( void Heartbeat::ms_handle_accept( crimson::net::ConnectionRef conn, - seastar::shard_id new_shard, + seastar::shard_id prv_shard, bool is_replace) { - ceph_assert_always(seastar::this_shard_id() == new_shard); + ceph_assert_always(seastar::this_shard_id() == prv_shard); auto peer = conn->get_peer_id(); if (conn->get_peer_type() != entity_name_t::TYPE_OSD || peer == entity_name_t::NEW) { diff --git a/src/crimson/tools/perf_crimson_msgr.cc b/src/crimson/tools/perf_crimson_msgr.cc index 1e4bde97dac5..e1a2db2302fb 100644 --- a/src/crimson/tools/perf_crimson_msgr.cc +++ b/src/crimson/tools/perf_crimson_msgr.cc @@ -605,8 +605,8 @@ static seastar::future<> run( void ms_handle_connect( crimson::net::ConnectionRef conn, - seastar::shard_id new_shard) override { - ceph_assert_always(new_shard == seastar::this_shard_id()); + seastar::shard_id prv_shard) override { + ceph_assert_always(prv_shard == seastar::this_shard_id()); assert(is_active()); unsigned index = static_cast(conn->get_user_private()).index; auto &conn_state = conn_states[index]; diff --git a/src/test/crimson/test_messenger.cc b/src/test/crimson/test_messenger.cc index ab98aa586bc4..e597caa2b38a 100644 --- a/src/test/crimson/test_messenger.cc +++ b/src/test/crimson/test_messenger.cc @@ -122,10 +122,10 @@ static seastar::future<> test_echo(unsigned rounds, void ms_handle_accept( crimson::net::ConnectionRef conn, - seastar::shard_id new_shard, + seastar::shard_id prv_shard, bool is_replace) override { logger().info("server accepted {}", *conn); - ceph_assert(new_shard == seastar::this_shard_id()); + ceph_assert(prv_shard == seastar::this_shard_id()); ceph_assert(!is_replace); } @@ -196,8 +196,8 @@ static seastar::future<> test_echo(unsigned rounds, void ms_handle_connect( crimson::net::ConnectionRef conn, - seastar::shard_id new_shard) override { - assert(new_shard == seastar::this_shard_id()); + seastar::shard_id prv_shard) override { + assert(prv_shard == seastar::this_shard_id()); auto session = seastar::make_shared(); auto [i, added] = sessions.emplace(conn, session); std::ignore = i; @@ -937,9 +937,9 @@ class FailoverSuite : public Dispatcher { void ms_handle_accept( ConnectionRef conn, - seastar::shard_id new_shard, + seastar::shard_id prv_shard, bool is_replace) override { - assert(new_shard == seastar::this_shard_id()); + assert(prv_shard == seastar::this_shard_id()); auto result = interceptor.find_result(conn); if (result == nullptr) { logger().error("Untracked accepted connection: {}", *conn); @@ -964,8 +964,8 @@ class FailoverSuite : public Dispatcher { void ms_handle_connect( ConnectionRef conn, - seastar::shard_id new_shard) override { - assert(new_shard == seastar::this_shard_id()); + seastar::shard_id prv_shard) override { + assert(prv_shard == seastar::this_shard_id()); auto result = interceptor.find_result(conn); if (result == nullptr) { logger().error("Untracked connected connection: {}", *conn); @@ -1533,9 +1533,9 @@ class FailoverSuitePeer : public Dispatcher { void ms_handle_accept( ConnectionRef conn, - seastar::shard_id new_shard, + seastar::shard_id prv_shard, bool is_replace) override { - assert(new_shard == seastar::this_shard_id()); + assert(prv_shard == seastar::this_shard_id()); logger().info("[TestPeer] got accept from Test"); ceph_assert(!tracked_conn || tracked_conn->is_closed() || @@ -1693,9 +1693,9 @@ class FailoverTestPeer : public Dispatcher { void ms_handle_accept( ConnectionRef conn, - seastar::shard_id new_shard, + seastar::shard_id prv_shard, bool is_replace) override { - assert(new_shard == seastar::this_shard_id()); + assert(prv_shard == seastar::this_shard_id()); cmd_conn = conn; } diff --git a/src/test/crimson/test_messenger_thrash.cc b/src/test/crimson/test_messenger_thrash.cc index 7c26d6ffdec6..22cce30a8094 100644 --- a/src/test/crimson/test_messenger_thrash.cc +++ b/src/test/crimson/test_messenger_thrash.cc @@ -136,17 +136,17 @@ class SyntheticDispatcher final void ms_handle_accept( crimson::net::ConnectionRef conn, - seastar::shard_id new_shard, + seastar::shard_id prv_shard, bool is_replace) final { logger().info("{} - Connection:{}", __func__, *conn); - assert(new_shard == seastar::this_shard_id()); + assert(prv_shard == seastar::this_shard_id()); } void ms_handle_connect( crimson::net::ConnectionRef conn, - seastar::shard_id new_shard) final { + seastar::shard_id prv_shard) final { logger().info("{} - Connection:{}", __func__, *conn); - assert(new_shard == seastar::this_shard_id()); + assert(prv_shard == seastar::this_shard_id()); } void ms_handle_reset(crimson::net::ConnectionRef con, bool is_replace) final; -- 2.47.3