From 09f0e75c9714f065ec050dabafd7ac3673a2e743 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Mon, 13 Mar 2023 16:29:12 +0800 Subject: [PATCH] crimson/net: make sure Messenger is always called in the same shard Signed-off-by: Yingxin Cheng (cherry picked from commit ec510b57d37c6971f6bfcae4d009f40047dbc537) --- src/crimson/net/SocketConnection.cc | 2 +- src/crimson/net/SocketMessenger.cc | 31 ++++++--- src/crimson/net/SocketMessenger.h | 98 +++++++++++++++++------------ 3 files changed, 82 insertions(+), 49 deletions(-) diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index 38e2748738f7b..cf7b16f48bd97 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -28,7 +28,7 @@ namespace crimson::net { SocketConnection::SocketConnection(SocketMessenger& messenger, ChainedDispatchers& dispatchers) - : core(messenger.shard_id()), + : core(messenger.get_shard_id()), messenger(messenger) { auto ret = create_handlers(dispatchers, *this); diff --git a/src/crimson/net/SocketMessenger.cc b/src/crimson/net/SocketMessenger.cc index 8bc7ebbbedc16..b3856e6f9a352 100644 --- a/src/crimson/net/SocketMessenger.cc +++ b/src/crimson/net/SocketMessenger.cc @@ -35,7 +35,7 @@ namespace crimson::net { SocketMessenger::SocketMessenger(const entity_name_t& myname, const std::string& logic_name, uint32_t nonce) - : master_sid{seastar::this_shard_id()}, + : sid{seastar::this_shard_id()}, logic_name{logic_name}, nonce{nonce}, my_name{myname} @@ -44,11 +44,13 @@ SocketMessenger::SocketMessenger(const entity_name_t& myname, SocketMessenger::~SocketMessenger() { logger().debug("~SocketMessenger: {}", logic_name); + ceph_assert_always(seastar::this_shard_id() == sid); ceph_assert(!listener); } bool SocketMessenger::set_addr_unknowns(const entity_addrvec_t &addrs) { + assert(seastar::this_shard_id() == sid); bool ret = false; entity_addrvec_t newaddrs = my_addrs; @@ -76,7 +78,7 @@ bool SocketMessenger::set_addr_unknowns(const entity_addrvec_t &addrs) void SocketMessenger::set_myaddrs(const entity_addrvec_t& addrs) { - assert(seastar::this_shard_id() == master_sid); + assert(seastar::this_shard_id() == sid); my_addrs = addrs; for (auto& addr : my_addrs.v) { addr.nonce = nonce; @@ -86,7 +88,6 @@ void SocketMessenger::set_myaddrs(const entity_addrvec_t& addrs) crimson::net::listen_ertr::future<> SocketMessenger::do_listen(const entity_addrvec_t& addrs) { - assert(seastar::this_shard_id() == master_sid); ceph_assert(addrs.front().get_family() == AF_INET); set_myaddrs(addrs); return seastar::futurize_invoke([this] { @@ -162,6 +163,7 @@ SocketMessenger::try_bind(const entity_addrvec_t& addrs, SocketMessenger::bind_ertr::future<> SocketMessenger::bind(const entity_addrvec_t& addrs) { + assert(seastar::this_shard_id() == sid); using crimson::common::local_conf; return seastar::do_with(int64_t{local_conf()->ms_bind_retry_count}, [this, addrs] (auto& count) { @@ -207,7 +209,7 @@ SocketMessenger::bind(const entity_addrvec_t& addrs) seastar::future<> SocketMessenger::start( const dispatchers_t& _dispatchers) { - assert(seastar::this_shard_id() == master_sid); + assert(seastar::this_shard_id() == sid); dispatchers.assign(_dispatchers); if (listener) { @@ -216,7 +218,7 @@ seastar::future<> SocketMessenger::start( ceph_assert(get_myaddr().get_port() > 0); return listener->accept([this](SocketRef socket, entity_addr_t peer_addr) { - assert(seastar::this_shard_id() == master_sid); + assert(seastar::this_shard_id() == sid); assert(get_myaddr().is_msgr2()); SocketConnectionRef conn = seastar::make_shared(*this, dispatchers); @@ -230,7 +232,7 @@ seastar::future<> SocketMessenger::start( crimson::net::ConnectionRef SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_name_t& peer_name) { - assert(seastar::this_shard_id() == master_sid); + assert(seastar::this_shard_id() == sid); // make sure we connect to a valid peer_addr if (!peer_addr.is_msgr2()) { @@ -250,7 +252,7 @@ SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_name_t& pe seastar::future<> SocketMessenger::shutdown() { - assert(seastar::this_shard_id() == master_sid); + assert(seastar::this_shard_id() == sid); return seastar::futurize_invoke([this] { assert(dispatchers.empty()); if (listener) { @@ -307,7 +309,7 @@ void SocketMessenger::learned_addr( const entity_addr_t &peer_addr_for_me, const SocketConnection& conn) { - assert(seastar::this_shard_id() == master_sid); + assert(seastar::this_shard_id() == sid); if (!need_addr) { if ((!get_myaddr().is_any() && get_myaddr().get_type() != peer_addr_for_me.get_type()) || @@ -364,34 +366,40 @@ void SocketMessenger::learned_addr( SocketPolicy SocketMessenger::get_policy(entity_type_t peer_type) const { + assert(seastar::this_shard_id() == sid); return policy_set.get(peer_type); } SocketPolicy SocketMessenger::get_default_policy() const { + assert(seastar::this_shard_id() == sid); return policy_set.get_default(); } void SocketMessenger::set_default_policy(const SocketPolicy& p) { + assert(seastar::this_shard_id() == sid); policy_set.set_default(p); } void SocketMessenger::set_policy(entity_type_t peer_type, const SocketPolicy& p) { + assert(seastar::this_shard_id() == sid); policy_set.set(peer_type, p); } void SocketMessenger::set_policy_throttler(entity_type_t peer_type, Throttle* throttle) { + assert(seastar::this_shard_id() == sid); // only byte throttler is used in OSD policy_set.set_throttlers(peer_type, throttle, nullptr); } crimson::net::SocketConnectionRef SocketMessenger::lookup_conn(const entity_addr_t& addr) { + assert(seastar::this_shard_id() == sid); if (auto found = connections.find(addr); found != connections.end()) { return found->second; @@ -402,16 +410,19 @@ crimson::net::SocketConnectionRef SocketMessenger::lookup_conn(const entity_addr void SocketMessenger::accept_conn(SocketConnectionRef conn) { + assert(seastar::this_shard_id() == sid); accepting_conns.insert(conn); } void SocketMessenger::unaccept_conn(SocketConnectionRef conn) { + assert(seastar::this_shard_id() == sid); accepting_conns.erase(conn); } void SocketMessenger::register_conn(SocketConnectionRef conn) { + assert(seastar::this_shard_id() == sid); auto [i, added] = connections.emplace(conn->get_peer_addr(), conn); std::ignore = i; ceph_assert(added); @@ -419,6 +430,7 @@ void SocketMessenger::register_conn(SocketConnectionRef conn) void SocketMessenger::unregister_conn(SocketConnectionRef conn) { + assert(seastar::this_shard_id() == sid); ceph_assert(conn); auto found = connections.find(conn->get_peer_addr()); ceph_assert(found != connections.end()); @@ -428,11 +440,13 @@ void SocketMessenger::unregister_conn(SocketConnectionRef conn) void SocketMessenger::closing_conn(SocketConnectionRef conn) { + assert(seastar::this_shard_id() == sid); closing_conns.push_back(conn); } void SocketMessenger::closed_conn(SocketConnectionRef conn) { + assert(seastar::this_shard_id() == sid); for (auto it = closing_conns.begin(); it != closing_conns.end();) { if (*it == conn) { @@ -445,6 +459,7 @@ void SocketMessenger::closed_conn(SocketConnectionRef conn) uint32_t SocketMessenger::get_global_seq(uint32_t old) { + assert(seastar::this_shard_id() == sid); if (old > global_seq) { global_seq = old; } diff --git a/src/crimson/net/SocketMessenger.h b/src/crimson/net/SocketMessenger.h index 60510666a12cc..940894ce1b3d1 100644 --- a/src/crimson/net/SocketMessenger.h +++ b/src/crimson/net/SocketMessenger.h @@ -33,38 +33,12 @@ template class ShardedServerSocket; class SocketMessenger final : public Messenger { - const seastar::shard_id master_sid; - // Distinguish messengers with meaningful names for debugging - const std::string logic_name; - const uint32_t nonce; - - entity_name_t my_name; - entity_addrvec_t my_addrs; - crimson::auth::AuthClient* auth_client = nullptr; - crimson::auth::AuthServer* auth_server = nullptr; - - ShardedServerSocket *listener = nullptr; - ChainedDispatchers dispatchers; - std::map connections; - std::set accepting_conns; - std::vector closing_conns; - ceph::net::PolicySet policy_set; - // specifying we haven't learned our addr; set false when we find it. - bool need_addr = true; - uint32_t global_seq = 0; - bool started = false; - seastar::promise<> shutdown_promise; - - listen_ertr::future<> do_listen(const entity_addrvec_t& addr); - /// try to bind to the first unused port of given address - bind_ertr::future<> try_bind(const entity_addrvec_t& addr, - uint32_t min_port, uint32_t max_port); - - - public: +// Messenger public interfaces +public: SocketMessenger(const entity_name_t& myname, const std::string& logic_name, uint32_t nonce); + ~SocketMessenger() override; const entity_name_t &get_myname() const override { @@ -77,18 +51,18 @@ class SocketMessenger final : public Messenger { void set_myaddrs(const entity_addrvec_t& addr) override; + bool set_addr_unknowns(const entity_addrvec_t &addr) override; + void set_auth_client(crimson::auth::AuthClient *ac) override { + assert(seastar::this_shard_id() == sid); auth_client = ac; } void set_auth_server(crimson::auth::AuthServer *as) override { + assert(seastar::this_shard_id() == sid); auth_server = as; } - - bool set_addr_unknowns(const entity_addrvec_t &addr) override; - // Messenger interfaces are assumed to be called from its own shard, but its - // behavior should be symmetric when called from any shard. bind_ertr::future<> bind(const entity_addrvec_t& addr) override; seastar::future<> start(const dispatchers_t& dispatchers) override; @@ -97,20 +71,23 @@ class SocketMessenger final : public Messenger { const entity_name_t& peer_name) override; bool owns_connection(Connection &conn) const override { + assert(seastar::this_shard_id() == sid); return this == &static_cast(conn).get_messenger(); } // can only wait once seastar::future<> wait() override { - assert(seastar::this_shard_id() == master_sid); + assert(seastar::this_shard_id() == sid); return shutdown_promise.get_future(); } void stop() override { + assert(seastar::this_shard_id() == sid); dispatchers.clear(); } bool is_started() const override { + assert(seastar::this_shard_id() == sid); return !dispatchers.empty(); } @@ -132,10 +109,17 @@ class SocketMessenger final : public Messenger { void set_policy_throttler(entity_type_t peer_type, Throttle* throttle) override; - public: - crimson::auth::AuthClient* get_auth_client() const { return auth_client; } +// SocketMessenger public interfaces +public: + crimson::auth::AuthClient* get_auth_client() const { + assert(seastar::this_shard_id() == sid); + return auth_client; + } - crimson::auth::AuthServer* get_auth_server() const { return auth_server; } + crimson::auth::AuthServer* get_auth_server() const { + assert(seastar::this_shard_id() == sid); + return auth_server; + } uint32_t get_global_seq(uint32_t old=0); @@ -143,16 +127,21 @@ class SocketMessenger final : public Messenger { const SocketConnection& conn); SocketConnectionRef lookup_conn(const entity_addr_t& addr); + void accept_conn(SocketConnectionRef); + void unaccept_conn(SocketConnectionRef); + void register_conn(SocketConnectionRef); + void unregister_conn(SocketConnectionRef); + void closing_conn(SocketConnectionRef); + void closed_conn(SocketConnectionRef); - seastar::shard_id shard_id() const { - assert(seastar::this_shard_id() == master_sid); - return master_sid; + seastar::shard_id get_shard_id() const { + return sid; } #ifdef UNIT_TESTS_BUILT @@ -162,6 +151,35 @@ class SocketMessenger final : public Messenger { Interceptor *interceptor = nullptr; #endif + +private: + listen_ertr::future<> do_listen(const entity_addrvec_t& addr); + + /// try to bind to the first unused port of given address + bind_ertr::future<> try_bind(const entity_addrvec_t& addr, + uint32_t min_port, uint32_t max_port); + + const seastar::shard_id sid; + // Distinguish messengers with meaningful names for debugging + const std::string logic_name; + const uint32_t nonce; + + entity_name_t my_name; + entity_addrvec_t my_addrs; + crimson::auth::AuthClient* auth_client = nullptr; + crimson::auth::AuthServer* auth_server = nullptr; + + ShardedServerSocket *listener = nullptr; + ChainedDispatchers dispatchers; + std::map connections; + std::set accepting_conns; + std::vector closing_conns; + ceph::net::PolicySet policy_set; + // specifying we haven't learned our addr; set false when we find it. + bool need_addr = true; + uint32_t global_seq = 0; + bool started = false; + seastar::promise<> shutdown_promise; }; } // namespace crimson::net -- 2.39.5