]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: introduce IOHandler::sid and the related assertions
authorYingxin Cheng <yingxin.cheng@intel.com>
Mon, 27 Mar 2023 09:00:13 +0000 (17:00 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Sun, 25 Jun 2023 03:57:19 +0000 (11:57 +0800)
Note that Connection and Messenger may be destructed in any order, so
keep a copy of msgr_sid in SocketConnection.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/SocketConnection.cc
src/crimson/net/SocketConnection.h
src/crimson/net/io_handler.cc
src/crimson/net/io_handler.h

index 88a57c4ab541a73e5ab8c3dcac258ed8065779b4..f0eccfe2d43e2fdd48814daab20f46a5324e2bd1 100644 (file)
@@ -28,8 +28,7 @@ namespace crimson::net {
 
 SocketConnection::SocketConnection(SocketMessenger& messenger,
                                    ChainedDispatchers& dispatchers)
-  : core(messenger.get_shard_id()),
-    messenger(messenger)
+  : msgr_sid{messenger.get_shard_id()}, messenger(messenger)
 {
   auto ret = create_handlers(dispatchers, *this);
   io_handler = std::move(ret.io_handler);
@@ -46,33 +45,33 @@ SocketConnection::~SocketConnection() {}
 
 bool SocketConnection::is_connected() const
 {
-  assert(seastar::this_shard_id() == shard_id());
   return io_handler->is_connected();
 }
 
 #ifdef UNIT_TESTS_BUILT
 bool SocketConnection::is_closed() const
 {
-  assert(seastar::this_shard_id() == shard_id());
+  assert(seastar::this_shard_id() == msgr_sid);
   return protocol->is_closed();
 }
 
 bool SocketConnection::is_closed_clean() const
 {
-  assert(seastar::this_shard_id() == shard_id());
+  assert(seastar::this_shard_id() == msgr_sid);
   return protocol->is_closed_clean();
 }
 
 #endif
 bool SocketConnection::peer_wins() const
 {
+  assert(seastar::this_shard_id() == msgr_sid);
   return (messenger.get_myaddr() > peer_addr || policy.server);
 }
 
 seastar::future<> SocketConnection::send(MessageURef msg)
 {
   return seastar::smp::submit_to(
-    shard_id(),
+    io_handler->get_shard_id(),
     [this, msg=std::move(msg)]() mutable {
       return io_handler->send(std::move(msg));
     });
@@ -81,7 +80,7 @@ seastar::future<> SocketConnection::send(MessageURef msg)
 seastar::future<> SocketConnection::send_keepalive()
 {
   return seastar::smp::submit_to(
-    shard_id(),
+    io_handler->get_shard_id(),
     [this] {
       return io_handler->send_keepalive();
     });
@@ -106,7 +105,6 @@ void SocketConnection::set_last_keepalive_ack(clock_t::time_point when)
 
 void SocketConnection::mark_down()
 {
-  assert(seastar::this_shard_id() == shard_id());
   io_handler->mark_down();
 }
 
@@ -114,6 +112,7 @@ void
 SocketConnection::start_connect(const entity_addr_t& _peer_addr,
                                 const entity_name_t& _peer_name)
 {
+  assert(seastar::this_shard_id() == msgr_sid);
   protocol->start_connect(_peer_addr, _peer_name);
 }
 
@@ -121,47 +120,89 @@ void
 SocketConnection::start_accept(SocketRef&& sock,
                                const entity_addr_t& _peer_addr)
 {
+  assert(seastar::this_shard_id() == msgr_sid);
   protocol->start_accept(std::move(sock), _peer_addr);
 }
 
 seastar::future<>
 SocketConnection::close_clean_yielded()
 {
+  assert(seastar::this_shard_id() == msgr_sid);
   return protocol->close_clean_yielded();
 }
 
-seastar::shard_id SocketConnection::shard_id() const {
-  return core;
-}
-
 seastar::socket_address SocketConnection::get_local_address() const {
+  assert(seastar::this_shard_id() == msgr_sid);
   return socket->get_local_address();
 }
 
 ConnectionRef
 SocketConnection::get_local_shared_foreign_from_this()
 {
-  assert(seastar::this_shard_id() == shard_id());
+  assert(seastar::this_shard_id() == msgr_sid);
   return make_local_shared_foreign(
       seastar::make_foreign(shared_from_this()));
 }
 
+SocketMessenger &
+SocketConnection::get_messenger() const
+{
+  assert(seastar::this_shard_id() == msgr_sid);
+  return messenger;
+}
+
+void SocketConnection::set_peer_type(entity_type_t peer_type) {
+  assert(seastar::this_shard_id() == msgr_sid);
+  // it is not allowed to assign an unknown value when the current
+  // value is known
+  assert(!(peer_type == 0 &&
+           peer_name.type() != 0));
+  // it is not allowed to assign a different known value when the
+  // current value is also known.
+  assert(!(peer_type != 0 &&
+           peer_name.type() != 0 &&
+           peer_type != peer_name.type()));
+  peer_name._type = peer_type;
+}
+
+void SocketConnection::set_peer_id(int64_t peer_id) {
+  assert(seastar::this_shard_id() == msgr_sid);
+  // it is not allowed to assign an unknown value when the current
+  // value is known
+  assert(!(peer_id == entity_name_t::NEW &&
+           peer_name.num() != entity_name_t::NEW));
+  // it is not allowed to assign a different known value when the
+  // current value is also known.
+  assert(!(peer_id != entity_name_t::NEW &&
+           peer_name.num() != entity_name_t::NEW &&
+           peer_id != peer_name.num()));
+  peer_name._num = peer_id;
+}
+
+void SocketConnection::set_features(uint64_t f) {
+  assert(seastar::this_shard_id() == msgr_sid);
+  features = f;
+}
+
 void SocketConnection::set_socket(Socket *s) {
+  assert(seastar::this_shard_id() == msgr_sid);
   socket = s;
 }
 
 void SocketConnection::print(ostream& out) const {
-    out << (void*)this << " ";
-    messenger.print(out);
-    if (!socket) {
-      out << " >> " << get_peer_name() << " " << peer_addr;
-    } else if (socket->get_side() == Socket::side_t::acceptor) {
-      out << " >> " << get_peer_name() << " " << peer_addr
-          << "@" << socket->get_ephemeral_port();
-    } else { // socket->get_side() == Socket::side_t::connector
-      out << "@" << socket->get_ephemeral_port()
-          << " >> " << get_peer_name() << " " << peer_addr;
-    }
+  out << (void*)this << " ";
+  messenger.print(out);
+  if (seastar::this_shard_id() != msgr_sid) {
+    out << " >> " << get_peer_name() << " " << peer_addr;
+  } else if (!socket) {
+    out << " >> " << get_peer_name() << " " << peer_addr;
+  } else if (socket->get_side() == Socket::side_t::acceptor) {
+    out << " >> " << get_peer_name() << " " << peer_addr
+        << "@" << socket->get_ephemeral_port();
+  } else { // socket->get_side() == Socket::side_t::connector
+    out << "@" << socket->get_ephemeral_port()
+        << " >> " << get_peer_name() << " " << peer_addr;
+  }
 }
 
 } // namespace crimson::net
index 32624ccccce754ab08d7f2421579ea93b24c8a4a..718d1da318a0b82cf4c888e4dfbc00c1044e587d 100644 (file)
@@ -47,6 +47,8 @@ public:
   ConnectionHandler &operator=(const ConnectionHandler &) = delete;
   ConnectionHandler &operator=(ConnectionHandler &&) = delete;
 
+  virtual seastar::shard_id get_shard_id() const = 0;
+
   virtual bool is_connected() const = 0;
 
   virtual seastar::future<> send(MessageURef) = 0;
@@ -66,32 +68,6 @@ protected:
 };
 
 class SocketConnection : public Connection {
-  const seastar::shard_id core;
-
-  SocketMessenger& messenger;
-
-  std::unique_ptr<ConnectionHandler> io_handler;
-
-  std::unique_ptr<ProtocolV2> protocol;
-
-  Socket *socket = nullptr;
-
-  entity_name_t peer_name = {0, entity_name_t::NEW};
-
-  entity_addr_t peer_addr;
-
-  // which of the peer_addrs we're connecting to (as client)
-  // or should reconnect to (as peer)
-  entity_addr_t target_addr;
-
-  uint64_t features = 0;
-
-  ceph::net::Policy<crimson::common::Throttle> policy;
-
-  uint64_t peer_global_id = 0;
-
-  std::unique_ptr<user_private_t> user_private;
-
  // Connection interfaces, public to users
  public:
   SocketConnection(SocketMessenger& messenger,
@@ -145,7 +121,10 @@ class SocketConnection : public Connection {
 
   void print(std::ostream& out) const override;
 
- // public to SocketMessenger
+ /*
+  * Public to SocketMessenger
+  * Working in SocketMessenger::get_shard_id();
+  */
  public:
   /// start a handshake from the client's perspective,
   /// only call when SocketConnection first construct
@@ -161,49 +140,21 @@ class SocketConnection : public Connection {
 
   seastar::socket_address get_local_address() const;
 
-  SocketMessenger &get_messenger() const {
-    return messenger;
-  }
+  SocketMessenger &get_messenger() const;
 
   ConnectionRef get_local_shared_foreign_from_this();
 
 private:
-  seastar::shard_id shard_id() const;
-
-  void set_peer_type(entity_type_t peer_type) {
-    // it is not allowed to assign an unknown value when the current
-    // value is known
-    assert(!(peer_type == 0 &&
-             peer_name.type() != 0));
-    // it is not allowed to assign a different known value when the
-    // current value is also known.
-    assert(!(peer_type != 0 &&
-             peer_name.type() != 0 &&
-             peer_type != peer_name.type()));
-    peer_name._type = peer_type;
-  }
+  void set_peer_type(entity_type_t peer_type);
 
-  void set_peer_id(int64_t peer_id) {
-    // it is not allowed to assign an unknown value when the current
-    // value is known
-    assert(!(peer_id == entity_name_t::NEW &&
-             peer_name.num() != entity_name_t::NEW));
-    // it is not allowed to assign a different known value when the
-    // current value is also known.
-    assert(!(peer_id != entity_name_t::NEW &&
-             peer_name.num() != entity_name_t::NEW &&
-             peer_id != peer_name.num()));
-    peer_name._num = peer_id;
-  }
+  void set_peer_id(int64_t peer_id);
 
   void set_peer_name(entity_name_t name) {
     set_peer_type(name.type());
     set_peer_id(name.num());
   }
 
-  void set_features(uint64_t f) {
-    features = f;
-  }
+  void set_features(uint64_t f);
 
   void set_socket(Socket *s);
 
@@ -221,6 +172,42 @@ private:
   bool peer_wins() const;
 #endif
 
+private:
+  const seastar::shard_id msgr_sid;
+
+  /*
+   * Core owner is messenger core, may allow to access from the I/O core.
+   */
+  SocketMessenger& messenger;
+
+  std::unique_ptr<ProtocolV2> protocol;
+
+  Socket *socket = nullptr;
+
+  entity_name_t peer_name = {0, entity_name_t::NEW};
+
+  entity_addr_t peer_addr;
+
+  // which of the peer_addrs we're connecting to (as client)
+  // or should reconnect to (as peer)
+  entity_addr_t target_addr;
+
+  uint64_t features = 0;
+
+  ceph::net::Policy<crimson::common::Throttle> policy;
+
+  uint64_t peer_global_id = 0;
+
+  /*
+   * Core owner is I/O core (mutable).
+   */
+  std::unique_ptr<ConnectionHandler> io_handler;
+
+  /*
+   * Core owner is up to the connection user.
+   */
+  std::unique_ptr<user_private_t> user_private;
+
   friend class IOHandler;
   friend class ProtocolV2;
   friend class FrameAssemblerV2;
index 80d578363282e8c4b364cfd7d694328e5be5c10c..328ca724eb7aaf48da2aca18de7a13463c333c0a 100644 (file)
@@ -47,7 +47,8 @@ namespace crimson::net {
 
 IOHandler::IOHandler(ChainedDispatchers &dispatchers,
                      SocketConnection &conn)
-  : dispatchers(dispatchers),
+  : sid(seastar::this_shard_id()),
+    dispatchers(dispatchers),
     conn(conn),
     conn_ref(conn.get_local_shared_foreign_from_this())
 {}
@@ -123,6 +124,7 @@ ceph::bufferlist IOHandler::sweep_out_pending_msgs_to_sent(
 
 seastar::future<> IOHandler::send(MessageURef msg)
 {
+  ceph_assert_always(seastar::this_shard_id() == sid);
   if (io_state != io_state_t::drop) {
     out_pending_msgs.push_back(std::move(msg));
     notify_out_dispatch();
@@ -132,6 +134,7 @@ seastar::future<> IOHandler::send(MessageURef msg)
 
 seastar::future<> IOHandler::send_keepalive()
 {
+  ceph_assert_always(seastar::this_shard_id() == sid);
   if (!need_keepalive) {
     need_keepalive = true;
     notify_out_dispatch();
@@ -141,6 +144,7 @@ seastar::future<> IOHandler::send_keepalive()
 
 void IOHandler::mark_down()
 {
+  ceph_assert_always(seastar::this_shard_id() == sid);
   ceph_assert_always(io_state != io_state_t::none);
   need_dispatch_reset = false;
   if (io_state == io_state_t::drop) {
index e04b6356e8674ea9d95854616ae99426d8276e80..3f2d6f9a453a6f9385124d1d9daf65ae787917f6 100644 (file)
@@ -61,7 +61,12 @@ public:
  * as ConnectionHandler
  */
 private:
+  seastar::shard_id get_shard_id() const final {
+    return sid;
+  }
+
   bool is_connected() const final {
+    ceph_assert_always(seastar::this_shard_id() == sid);
     return protocol_is_connected;
   }
 
@@ -70,14 +75,17 @@ private:
   seastar::future<> send_keepalive() final;
 
   clock_t::time_point get_last_keepalive() const final {
+    ceph_assert_always(seastar::this_shard_id() == sid);
     return last_keepalive;
   }
 
   clock_t::time_point get_last_keepalive_ack() const final {
+    ceph_assert_always(seastar::this_shard_id() == sid);
     return last_keepalive_ack;
   }
 
   void set_last_keepalive_ack(clock_t::time_point when) final {
+    ceph_assert_always(seastar::this_shard_id() == sid);
     last_keepalive_ack = when;
   }
 
@@ -183,6 +191,8 @@ public:
   void do_in_dispatch();
 
 private:
+  seastar::shard_id sid;
+
   ChainedDispatchers &dispatchers;
 
   SocketConnection &conn;