From 18d2c30e675a7445888570658fdca9aeb6299b2d Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Fri, 2 Jun 2023 17:22:53 +0800 Subject: [PATCH] crimson/net: notify the new connection shard upon accept/connect Users may need to know the new connection shard prior to message dispatching. Otherwise, there will be no chance for user to do any related preparations. This is still a placeholder before multi-core messegner is enabled. Signed-off-by: Yingxin Cheng --- src/crimson/mgr/client.cc | 5 ++++- src/crimson/mgr/client.h | 2 +- src/crimson/net/Dispatcher.h | 8 ++++++-- src/crimson/net/chained_dispatchers.cc | 12 +++++++---- src/crimson/net/chained_dispatchers.h | 6 ++++-- src/crimson/net/io_handler.cc | 4 ++-- src/crimson/osd/heartbeat.cc | 10 +++++++-- src/crimson/osd/heartbeat.h | 4 ++-- src/crimson/tools/perf_crimson_msgr.cc | 5 ++++- src/test/crimson/test_messenger.cc | 25 ++++++++++++++++++----- src/test/crimson/test_messenger_thrash.cc | 10 +++++++-- 11 files changed, 67 insertions(+), 24 deletions(-) diff --git a/src/crimson/mgr/client.cc b/src/crimson/mgr/client.cc index 6e3d7cdd848..b81f2048718 100644 --- a/src/crimson/mgr/client.cc +++ b/src/crimson/mgr/client.cc @@ -65,8 +65,11 @@ Client::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m) return (dispatched ? std::make_optional(seastar::now()) : std::nullopt); } -void Client::ms_handle_connect(crimson::net::ConnectionRef c) +void Client::ms_handle_connect( + crimson::net::ConnectionRef c, + seastar::shard_id new_shard) { + ceph_assert_always(new_shard == seastar::this_shard_id()); gate.dispatch_in_background(__func__, *this, [this, c] { if (conn == c) { // ask for the mgrconfigure message diff --git a/src/crimson/mgr/client.h b/src/crimson/mgr/client.h index e8457543305..feceae55dbb 100644 --- a/src/crimson/mgr/client.h +++ b/src/crimson/mgr/client.h @@ -40,7 +40,7 @@ private: std::optional> ms_dispatch( crimson::net::ConnectionRef conn, Ref m) override; void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) final; - void ms_handle_connect(crimson::net::ConnectionRef conn) final; + void ms_handle_connect(crimson::net::ConnectionRef conn, seastar::shard_id) final; seastar::future<> handle_mgr_map(crimson::net::ConnectionRef conn, Ref m); seastar::future<> handle_mgr_conf(crimson::net::ConnectionRef conn, diff --git a/src/crimson/net/Dispatcher.h b/src/crimson/net/Dispatcher.h index cc6fd4574c7..c563b5e266f 100644 --- a/src/crimson/net/Dispatcher.h +++ b/src/crimson/net/Dispatcher.h @@ -30,9 +30,13 @@ class Dispatcher { // used to throttle the connection if it's too busy. virtual std::optional> ms_dispatch(ConnectionRef, MessageRef) = 0; - virtual void ms_handle_accept(ConnectionRef conn) {} + // The connection is accepted or recoverred(lossless), all the followup + // events and messages will be dispatched to the new_shard. + virtual void ms_handle_accept(ConnectionRef conn, seastar::shard_id new_shard) {} - virtual void ms_handle_connect(ConnectionRef conn) {} + // The connection is (re)connected, all the followup events and messages will + // be dispatched to the new_shard. + virtual void ms_handle_connect(ConnectionRef conn, seastar::shard_id new_shard) {} // a reset event is dispatched when the connection is closed unexpectedly. // is_replace=true means the reset connection is going to be replaced by diff --git a/src/crimson/net/chained_dispatchers.cc b/src/crimson/net/chained_dispatchers.cc index b13d40c8f73..2656c0e5749 100644 --- a/src/crimson/net/chained_dispatchers.cc +++ b/src/crimson/net/chained_dispatchers.cc @@ -39,10 +39,12 @@ ChainedDispatchers::ms_dispatch(crimson::net::ConnectionRef conn, } void -ChainedDispatchers::ms_handle_accept(crimson::net::ConnectionRef conn) { +ChainedDispatchers::ms_handle_accept( + crimson::net::ConnectionRef conn, + seastar::shard_id new_shard) { try { for (auto& dispatcher : dispatchers) { - dispatcher->ms_handle_accept(conn); + dispatcher->ms_handle_accept(conn, new_shard); } } catch (...) { logger().error("{} got unexpected exception in ms_handle_accept() {}", @@ -52,10 +54,12 @@ ChainedDispatchers::ms_handle_accept(crimson::net::ConnectionRef conn) { } void -ChainedDispatchers::ms_handle_connect(crimson::net::ConnectionRef conn) { +ChainedDispatchers::ms_handle_connect( + crimson::net::ConnectionRef conn, + seastar::shard_id new_shard) { try { for(auto& dispatcher : dispatchers) { - dispatcher->ms_handle_connect(conn); + dispatcher->ms_handle_connect(conn, new_shard); } } catch (...) { logger().error("{} got unexpected exception in ms_handle_connect() {}", diff --git a/src/crimson/net/chained_dispatchers.h b/src/crimson/net/chained_dispatchers.h index 712b0894b9f..40356e9d473 100644 --- a/src/crimson/net/chained_dispatchers.h +++ b/src/crimson/net/chained_dispatchers.h @@ -3,6 +3,8 @@ #pragma once +#include + #include "Fwd.h" #include "crimson/common/log.h" @@ -24,8 +26,8 @@ public: return dispatchers.empty(); } seastar::future<> ms_dispatch(crimson::net::ConnectionRef, MessageRef); - void ms_handle_accept(crimson::net::ConnectionRef conn); - void ms_handle_connect(crimson::net::ConnectionRef conn); + void ms_handle_accept(crimson::net::ConnectionRef conn, seastar::shard_id); + void ms_handle_connect(crimson::net::ConnectionRef conn, seastar::shard_id); void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace); void ms_handle_remote_reset(crimson::net::ConnectionRef conn); diff --git a/src/crimson/net/io_handler.cc b/src/crimson/net/io_handler.cc index 3e156ca6730..54ef67356ab 100644 --- a/src/crimson/net/io_handler.cc +++ b/src/crimson/net/io_handler.cc @@ -408,7 +408,7 @@ void IOHandler::dispatch_accept() // happening to a connected connection. protocol_is_connected = true; ceph_assert_always(conn_ref); - dispatchers.ms_handle_accept(conn_ref); + dispatchers.ms_handle_accept(conn_ref, get_shard_id()); } void IOHandler::dispatch_connect() @@ -419,7 +419,7 @@ void IOHandler::dispatch_connect() ceph_assert_always(protocol_is_connected == false); protocol_is_connected = true; ceph_assert_always(conn_ref); - dispatchers.ms_handle_connect(conn_ref); + dispatchers.ms_handle_connect(conn_ref, get_shard_id()); } void IOHandler::dispatch_reset(bool is_replace) diff --git a/src/crimson/osd/heartbeat.cc b/src/crimson/osd/heartbeat.cc index be6f6000851..30de528291a 100644 --- a/src/crimson/osd/heartbeat.cc +++ b/src/crimson/osd/heartbeat.cc @@ -236,8 +236,11 @@ void Heartbeat::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replac } } -void Heartbeat::ms_handle_connect(crimson::net::ConnectionRef conn) +void Heartbeat::ms_handle_connect( + crimson::net::ConnectionRef conn, + seastar::shard_id new_shard) { + ceph_assert_always(seastar::this_shard_id() == new_shard); auto peer = conn->get_peer_id(); if (conn->get_peer_type() != entity_name_t::TYPE_OSD || peer == entity_name_t::NEW) { @@ -249,8 +252,11 @@ void Heartbeat::ms_handle_connect(crimson::net::ConnectionRef conn) } } -void Heartbeat::ms_handle_accept(crimson::net::ConnectionRef conn) +void Heartbeat::ms_handle_accept( + crimson::net::ConnectionRef conn, + seastar::shard_id new_shard) { + ceph_assert_always(seastar::this_shard_id() == new_shard); auto peer = conn->get_peer_id(); if (conn->get_peer_type() != entity_name_t::TYPE_OSD || peer == entity_name_t::NEW) { diff --git a/src/crimson/osd/heartbeat.h b/src/crimson/osd/heartbeat.h index c483a689395..c5bf8f0ded2 100644 --- a/src/crimson/osd/heartbeat.h +++ b/src/crimson/osd/heartbeat.h @@ -52,8 +52,8 @@ public: std::optional> ms_dispatch( crimson::net::ConnectionRef conn, MessageRef m) override; void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override; - void ms_handle_connect(crimson::net::ConnectionRef conn) override; - void ms_handle_accept(crimson::net::ConnectionRef conn) override; + void ms_handle_connect(crimson::net::ConnectionRef conn, seastar::shard_id) override; + void ms_handle_accept(crimson::net::ConnectionRef conn, seastar::shard_id) override; void print(std::ostream&) const; private: diff --git a/src/crimson/tools/perf_crimson_msgr.cc b/src/crimson/tools/perf_crimson_msgr.cc index ef5602b0f27..2d45dfaaff2 100644 --- a/src/crimson/tools/perf_crimson_msgr.cc +++ b/src/crimson/tools/perf_crimson_msgr.cc @@ -301,7 +301,10 @@ static seastar::future<> run( return nr_depth - depth.current(); } - void ms_handle_connect(crimson::net::ConnectionRef conn) override { + void ms_handle_connect( + crimson::net::ConnectionRef conn, + seastar::shard_id new_shard) override { + ceph_assert_always(new_shard == seastar::this_shard_id()); conn_stats.connected_time = mono_clock::now(); } std::optional> ms_dispatch( diff --git a/src/test/crimson/test_messenger.cc b/src/test/crimson/test_messenger.cc index 0107fa6ccc5..8af521d48c8 100644 --- a/src/test/crimson/test_messenger.cc +++ b/src/test/crimson/test_messenger.cc @@ -123,7 +123,10 @@ static seastar::future<> test_echo(unsigned rounds, return found->second; } - void ms_handle_connect(crimson::net::ConnectionRef conn) override { + void ms_handle_connect( + crimson::net::ConnectionRef conn, + seastar::shard_id new_shard) override { + assert(new_shard == seastar::this_shard_id()); auto session = seastar::make_shared(); auto [i, added] = sessions.emplace(conn, session); std::ignore = i; @@ -851,7 +854,10 @@ class FailoverSuite : public Dispatcher { return {seastar::now()}; } - void ms_handle_accept(ConnectionRef conn) override { + void ms_handle_accept( + ConnectionRef conn, + seastar::shard_id new_shard) override { + assert(new_shard == seastar::this_shard_id()); auto result = interceptor.find_result(conn); if (result == nullptr) { logger().error("Untracked accepted connection: {}", *conn); @@ -874,7 +880,10 @@ class FailoverSuite : public Dispatcher { std::ignore = flush_pending_send(); } - void ms_handle_connect(ConnectionRef conn) override { + void ms_handle_connect( + ConnectionRef conn, + seastar::shard_id new_shard) override { + assert(new_shard == seastar::this_shard_id()); auto result = interceptor.find_result(conn); if (result == nullptr) { logger().error("Untracked connected connection: {}", *conn); @@ -1432,7 +1441,10 @@ class FailoverSuitePeer : public Dispatcher { return {seastar::now()}; } - void ms_handle_accept(ConnectionRef conn) override { + void ms_handle_accept( + ConnectionRef conn, + seastar::shard_id new_shard) override { + assert(new_shard == seastar::this_shard_id()); logger().info("[TestPeer] got accept from Test"); ceph_assert(!tracked_conn || tracked_conn->is_closed() || @@ -1587,7 +1599,10 @@ class FailoverTestPeer : public Dispatcher { return {seastar::now()}; } - void ms_handle_accept(ConnectionRef conn) override { + void ms_handle_accept( + ConnectionRef conn, + seastar::shard_id new_shard) override { + assert(new_shard == seastar::this_shard_id()); cmd_conn = conn; } diff --git a/src/test/crimson/test_messenger_thrash.cc b/src/test/crimson/test_messenger_thrash.cc index b3a1d910b5d..7be81f831c8 100644 --- a/src/test/crimson/test_messenger_thrash.cc +++ b/src/test/crimson/test_messenger_thrash.cc @@ -134,12 +134,18 @@ class SyntheticDispatcher final } } - void ms_handle_accept(crimson::net::ConnectionRef conn) { + void ms_handle_accept( + crimson::net::ConnectionRef conn, + seastar::shard_id new_shard) { logger().info("{} - Connection:{}", __func__, *conn); + assert(new_shard == seastar::this_shard_id()); } - void ms_handle_connect(crimson::net::ConnectionRef conn) { + void ms_handle_connect( + crimson::net::ConnectionRef conn, + seastar::shard_id new_shard) { logger().info("{} - Connection:{}", __func__, *conn); + assert(new_shard == seastar::this_shard_id()); } void ms_handle_reset(crimson::net::ConnectionRef con, bool is_replace); -- 2.39.5