]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: make sure Messenger is always called in the same shard
authorYingxin Cheng <yingxin.cheng@intel.com>
Mon, 13 Mar 2023 08:29:12 +0000 (16:29 +0800)
committerMatan Breizman <mbreizma@redhat.com>
Wed, 11 Oct 2023 11:38:30 +0000 (11:38 +0000)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
(cherry picked from commit ec510b57d37c6971f6bfcae4d009f40047dbc537)

src/crimson/net/SocketConnection.cc
src/crimson/net/SocketMessenger.cc
src/crimson/net/SocketMessenger.h

index 38e2748738f7b2b39980a92f7167c98c8c71bcad..cf7b16f48bd9708e020de26a15747453f788b5cd 100644 (file)
@@ -28,7 +28,7 @@ namespace crimson::net {
 
 SocketConnection::SocketConnection(SocketMessenger& messenger,
                                    ChainedDispatchers& dispatchers)
-  : core(messenger.shard_id()),
+  : core(messenger.get_shard_id()),
     messenger(messenger)
 {
   auto ret = create_handlers(dispatchers, *this);
index 8bc7ebbbedc16cd408747426b118898ede4b4412..b3856e6f9a352968df7a5125bf0ceeb67b48b800 100644 (file)
@@ -35,7 +35,7 @@ namespace crimson::net {
 SocketMessenger::SocketMessenger(const entity_name_t& myname,
                                  const std::string& logic_name,
                                  uint32_t nonce)
-  : master_sid{seastar::this_shard_id()},
+  : sid{seastar::this_shard_id()},
     logic_name{logic_name},
     nonce{nonce},
     my_name{myname}
@@ -44,11 +44,13 @@ SocketMessenger::SocketMessenger(const entity_name_t& myname,
 SocketMessenger::~SocketMessenger()
 {
   logger().debug("~SocketMessenger: {}", logic_name);
+  ceph_assert_always(seastar::this_shard_id() == sid);
   ceph_assert(!listener);
 }
 
 bool SocketMessenger::set_addr_unknowns(const entity_addrvec_t &addrs)
 {
+  assert(seastar::this_shard_id() == sid);
   bool ret = false;
 
   entity_addrvec_t newaddrs = my_addrs;
@@ -76,7 +78,7 @@ bool SocketMessenger::set_addr_unknowns(const entity_addrvec_t &addrs)
 
 void SocketMessenger::set_myaddrs(const entity_addrvec_t& addrs)
 {
-  assert(seastar::this_shard_id() == master_sid);
+  assert(seastar::this_shard_id() == sid);
   my_addrs = addrs;
   for (auto& addr : my_addrs.v) {
     addr.nonce = nonce;
@@ -86,7 +88,6 @@ void SocketMessenger::set_myaddrs(const entity_addrvec_t& addrs)
 crimson::net::listen_ertr::future<>
 SocketMessenger::do_listen(const entity_addrvec_t& addrs)
 {
-  assert(seastar::this_shard_id() == master_sid);
   ceph_assert(addrs.front().get_family() == AF_INET);
   set_myaddrs(addrs);
   return seastar::futurize_invoke([this] {
@@ -162,6 +163,7 @@ SocketMessenger::try_bind(const entity_addrvec_t& addrs,
 SocketMessenger::bind_ertr::future<>
 SocketMessenger::bind(const entity_addrvec_t& addrs)
 {
+  assert(seastar::this_shard_id() == sid);
   using crimson::common::local_conf;
   return seastar::do_with(int64_t{local_conf()->ms_bind_retry_count},
                           [this, addrs] (auto& count) {
@@ -207,7 +209,7 @@ SocketMessenger::bind(const entity_addrvec_t& addrs)
 
 seastar::future<> SocketMessenger::start(
     const dispatchers_t& _dispatchers) {
-  assert(seastar::this_shard_id() == master_sid);
+  assert(seastar::this_shard_id() == sid);
 
   dispatchers.assign(_dispatchers);
   if (listener) {
@@ -216,7 +218,7 @@ seastar::future<> SocketMessenger::start(
     ceph_assert(get_myaddr().get_port() > 0);
 
     return listener->accept([this](SocketRef socket, entity_addr_t peer_addr) {
-      assert(seastar::this_shard_id() == master_sid);
+      assert(seastar::this_shard_id() == sid);
       assert(get_myaddr().is_msgr2());
       SocketConnectionRef conn =
         seastar::make_shared<SocketConnection>(*this, dispatchers);
@@ -230,7 +232,7 @@ seastar::future<> SocketMessenger::start(
 crimson::net::ConnectionRef
 SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_name_t& peer_name)
 {
-  assert(seastar::this_shard_id() == master_sid);
+  assert(seastar::this_shard_id() == sid);
 
   // make sure we connect to a valid peer_addr
   if (!peer_addr.is_msgr2()) {
@@ -250,7 +252,7 @@ SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_name_t& pe
 
 seastar::future<> SocketMessenger::shutdown()
 {
-  assert(seastar::this_shard_id() == master_sid);
+  assert(seastar::this_shard_id() == sid);
   return seastar::futurize_invoke([this] {
     assert(dispatchers.empty());
     if (listener) {
@@ -307,7 +309,7 @@ void SocketMessenger::learned_addr(
     const entity_addr_t &peer_addr_for_me,
     const SocketConnection& conn)
 {
-  assert(seastar::this_shard_id() == master_sid);
+  assert(seastar::this_shard_id() == sid);
   if (!need_addr) {
     if ((!get_myaddr().is_any() &&
          get_myaddr().get_type() != peer_addr_for_me.get_type()) ||
@@ -364,34 +366,40 @@ void SocketMessenger::learned_addr(
 
 SocketPolicy SocketMessenger::get_policy(entity_type_t peer_type) const
 {
+  assert(seastar::this_shard_id() == sid);
   return policy_set.get(peer_type);
 }
 
 SocketPolicy SocketMessenger::get_default_policy() const
 {
+  assert(seastar::this_shard_id() == sid);
   return policy_set.get_default();
 }
 
 void SocketMessenger::set_default_policy(const SocketPolicy& p)
 {
+  assert(seastar::this_shard_id() == sid);
   policy_set.set_default(p);
 }
 
 void SocketMessenger::set_policy(entity_type_t peer_type,
                                 const SocketPolicy& p)
 {
+  assert(seastar::this_shard_id() == sid);
   policy_set.set(peer_type, p);
 }
 
 void SocketMessenger::set_policy_throttler(entity_type_t peer_type,
                                           Throttle* throttle)
 {
+  assert(seastar::this_shard_id() == sid);
   // only byte throttler is used in OSD
   policy_set.set_throttlers(peer_type, throttle, nullptr);
 }
 
 crimson::net::SocketConnectionRef SocketMessenger::lookup_conn(const entity_addr_t& addr)
 {
+  assert(seastar::this_shard_id() == sid);
   if (auto found = connections.find(addr);
       found != connections.end()) {
     return found->second;
@@ -402,16 +410,19 @@ crimson::net::SocketConnectionRef SocketMessenger::lookup_conn(const entity_addr
 
 void SocketMessenger::accept_conn(SocketConnectionRef conn)
 {
+  assert(seastar::this_shard_id() == sid);
   accepting_conns.insert(conn);
 }
 
 void SocketMessenger::unaccept_conn(SocketConnectionRef conn)
 {
+  assert(seastar::this_shard_id() == sid);
   accepting_conns.erase(conn);
 }
 
 void SocketMessenger::register_conn(SocketConnectionRef conn)
 {
+  assert(seastar::this_shard_id() == sid);
   auto [i, added] = connections.emplace(conn->get_peer_addr(), conn);
   std::ignore = i;
   ceph_assert(added);
@@ -419,6 +430,7 @@ void SocketMessenger::register_conn(SocketConnectionRef conn)
 
 void SocketMessenger::unregister_conn(SocketConnectionRef conn)
 {
+  assert(seastar::this_shard_id() == sid);
   ceph_assert(conn);
   auto found = connections.find(conn->get_peer_addr());
   ceph_assert(found != connections.end());
@@ -428,11 +440,13 @@ void SocketMessenger::unregister_conn(SocketConnectionRef conn)
 
 void SocketMessenger::closing_conn(SocketConnectionRef conn)
 {
+  assert(seastar::this_shard_id() == sid);
   closing_conns.push_back(conn);
 }
 
 void SocketMessenger::closed_conn(SocketConnectionRef conn)
 {
+  assert(seastar::this_shard_id() == sid);
   for (auto it = closing_conns.begin();
        it != closing_conns.end();) {
     if (*it == conn) {
@@ -445,6 +459,7 @@ void SocketMessenger::closed_conn(SocketConnectionRef conn)
 
 uint32_t SocketMessenger::get_global_seq(uint32_t old)
 {
+  assert(seastar::this_shard_id() == sid);
   if (old > global_seq) {
     global_seq = old;
   }
index 60510666a12cc2e2f54145a8903afc3e4c59b5fd..940894ce1b3d131ff80de60ad7b27199f54ed7de 100644 (file)
@@ -33,38 +33,12 @@ template <bool IS_FIXED_CPU>
 class ShardedServerSocket;
 
 class SocketMessenger final : public Messenger {
-  const seastar::shard_id master_sid;
-  // Distinguish messengers with meaningful names for debugging
-  const std::string logic_name;
-  const uint32_t nonce;
-
-  entity_name_t my_name;
-  entity_addrvec_t my_addrs;
-  crimson::auth::AuthClient* auth_client = nullptr;
-  crimson::auth::AuthServer* auth_server = nullptr;
-
-  ShardedServerSocket<true> *listener = nullptr;
-  ChainedDispatchers dispatchers;
-  std::map<entity_addr_t, SocketConnectionRef> connections;
-  std::set<SocketConnectionRef> accepting_conns;
-  std::vector<SocketConnectionRef> closing_conns;
-  ceph::net::PolicySet<Throttle> policy_set;
-  // specifying we haven't learned our addr; set false when we find it.
-  bool need_addr = true;
-  uint32_t global_seq = 0;
-  bool started = false;
-  seastar::promise<> shutdown_promise;
-
-  listen_ertr::future<> do_listen(const entity_addrvec_t& addr);
-  /// try to bind to the first unused port of given address
-  bind_ertr::future<> try_bind(const entity_addrvec_t& addr,
-                               uint32_t min_port, uint32_t max_port);
-
-
- public:
+// Messenger public interfaces
+public:
   SocketMessenger(const entity_name_t& myname,
                   const std::string& logic_name,
                   uint32_t nonce);
+
   ~SocketMessenger() override;
 
   const entity_name_t &get_myname() const override {
@@ -77,18 +51,18 @@ class SocketMessenger final : public Messenger {
 
   void set_myaddrs(const entity_addrvec_t& addr) override;
 
+  bool set_addr_unknowns(const entity_addrvec_t &addr) override;
+
   void set_auth_client(crimson::auth::AuthClient *ac) override {
+    assert(seastar::this_shard_id() == sid);
     auth_client = ac;
   }
 
   void set_auth_server(crimson::auth::AuthServer *as) override {
+    assert(seastar::this_shard_id() == sid);
     auth_server = as;
   }
 
-
-  bool set_addr_unknowns(const entity_addrvec_t &addr) override;
-  // Messenger interfaces are assumed to be called from its own shard, but its
-  // behavior should be symmetric when called from any shard.
   bind_ertr::future<> bind(const entity_addrvec_t& addr) override;
 
   seastar::future<> start(const dispatchers_t& dispatchers) override;
@@ -97,20 +71,23 @@ class SocketMessenger final : public Messenger {
                         const entity_name_t& peer_name) override;
 
   bool owns_connection(Connection &conn) const override {
+    assert(seastar::this_shard_id() == sid);
     return this == &static_cast<SocketConnection&>(conn).get_messenger();
   }
 
   // can only wait once
   seastar::future<> wait() override {
-    assert(seastar::this_shard_id() == master_sid);
+    assert(seastar::this_shard_id() == sid);
     return shutdown_promise.get_future();
   }
 
   void stop() override {
+    assert(seastar::this_shard_id() == sid);
     dispatchers.clear();
   }
 
   bool is_started() const override {
+    assert(seastar::this_shard_id() == sid);
     return !dispatchers.empty();
   }
 
@@ -132,10 +109,17 @@ class SocketMessenger final : public Messenger {
 
   void set_policy_throttler(entity_type_t peer_type, Throttle* throttle) override;
 
- public:
-  crimson::auth::AuthClient* get_auth_client() const { return auth_client; }
+// SocketMessenger public interfaces
+public:
+  crimson::auth::AuthClient* get_auth_client() const {
+    assert(seastar::this_shard_id() == sid);
+    return auth_client;
+  }
 
-  crimson::auth::AuthServer* get_auth_server() const { return auth_server; }
+  crimson::auth::AuthServer* get_auth_server() const {
+    assert(seastar::this_shard_id() == sid);
+    return auth_server;
+  }
 
   uint32_t get_global_seq(uint32_t old=0);
 
@@ -143,16 +127,21 @@ class SocketMessenger final : public Messenger {
                     const SocketConnection& conn);
 
   SocketConnectionRef lookup_conn(const entity_addr_t& addr);
+
   void accept_conn(SocketConnectionRef);
+
   void unaccept_conn(SocketConnectionRef);
+
   void register_conn(SocketConnectionRef);
+
   void unregister_conn(SocketConnectionRef);
+
   void closing_conn(SocketConnectionRef);
+
   void closed_conn(SocketConnectionRef);
 
-  seastar::shard_id shard_id() const {
-    assert(seastar::this_shard_id() == master_sid);
-    return master_sid;
+  seastar::shard_id get_shard_id() const {
+    return sid;
   }
 
 #ifdef UNIT_TESTS_BUILT
@@ -162,6 +151,35 @@ class SocketMessenger final : public Messenger {
 
   Interceptor *interceptor = nullptr;
 #endif
+
+private:
+  listen_ertr::future<> do_listen(const entity_addrvec_t& addr);
+
+  /// try to bind to the first unused port of given address
+  bind_ertr::future<> try_bind(const entity_addrvec_t& addr,
+                               uint32_t min_port, uint32_t max_port);
+
+  const seastar::shard_id sid;
+  // Distinguish messengers with meaningful names for debugging
+  const std::string logic_name;
+  const uint32_t nonce;
+
+  entity_name_t my_name;
+  entity_addrvec_t my_addrs;
+  crimson::auth::AuthClient* auth_client = nullptr;
+  crimson::auth::AuthServer* auth_server = nullptr;
+
+  ShardedServerSocket<true> *listener = nullptr;
+  ChainedDispatchers dispatchers;
+  std::map<entity_addr_t, SocketConnectionRef> connections;
+  std::set<SocketConnectionRef> accepting_conns;
+  std::vector<SocketConnectionRef> closing_conns;
+  ceph::net::PolicySet<Throttle> policy_set;
+  // specifying we haven't learned our addr; set false when we find it.
+  bool need_addr = true;
+  uint32_t global_seq = 0;
+  bool started = false;
+  seastar::promise<> shutdown_promise;
 };
 
 } // namespace crimson::net