]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: use SocketFRef
authorYingxin Cheng <yingxin.cheng@intel.com>
Wed, 26 Apr 2023 09:06:06 +0000 (17:06 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Sun, 25 Jun 2023 03:57:19 +0000 (11:57 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/FrameAssemblerV2.cc
src/crimson/net/FrameAssemblerV2.h
src/crimson/net/ProtocolV2.cc
src/crimson/net/ProtocolV2.h
src/crimson/net/Socket.h
src/crimson/net/SocketConnection.cc
src/crimson/net/SocketConnection.h
src/crimson/net/SocketMessenger.cc
src/crimson/net/SocketMessenger.h
src/test/crimson/test_socket.cc

index c952bb1bd56938d1df68918f0c4879eb8f4dd305..b621279353e5a700623c571a2726c20fb598b890 100644 (file)
@@ -117,7 +117,7 @@ FrameAssemblerV2::stop_recording()
 bool FrameAssemblerV2::has_socket() const
 {
   assert((socket && conn.socket) || (!socket && !conn.socket));
-  return socket != nullptr;
+  return bool(socket);
 }
 
 bool FrameAssemblerV2::is_socket_valid() const
@@ -125,16 +125,17 @@ bool FrameAssemblerV2::is_socket_valid() const
   return has_socket() && !socket->is_shutdown();
 }
 
-SocketRef FrameAssemblerV2::move_socket()
+SocketFRef FrameAssemblerV2::move_socket()
 {
   assert(has_socket());
   conn.set_socket(nullptr);
   return std::move(socket);
 }
 
-void FrameAssemblerV2::set_socket(SocketRef &&new_socket)
+void FrameAssemblerV2::set_socket(SocketFRef &&new_socket)
 {
   assert(!has_socket());
+  assert(new_socket);
   socket = std::move(new_socket);
   conn.set_socket(socket.get());
   assert(is_socket_valid());
@@ -152,7 +153,7 @@ void FrameAssemblerV2::shutdown_socket()
   socket->shutdown();
 }
 
-seastar::future<> FrameAssemblerV2::replace_shutdown_socket(SocketRef &&new_socket)
+seastar::future<> FrameAssemblerV2::replace_shutdown_socket(SocketFRef &&new_socket)
 {
   assert(has_socket());
   assert(socket->is_shutdown());
index a99b5fce14b5086058ca3ac67bbf90979fcaf45c..0ba5f53cc9f84c067564bd61d414a441a7e00ae6 100644 (file)
@@ -38,7 +38,7 @@ public:
    */
 
   struct mover_t {
-    SocketRef socket;
+    SocketFRef socket;
     ceph::crypto::onwire::rxtx_t session_stream_handlers;
     ceph::compression::onwire::rxtx_t session_comp_handlers;
   };
@@ -66,13 +66,13 @@ public:
   // the socket exists and not shutdown
   bool is_socket_valid() const;
 
-  void set_socket(SocketRef &&);
+  void set_socket(SocketFRef &&);
 
   void learn_socket_ephemeral_port_as_connector(uint16_t port);
 
   void shutdown_socket();
 
-  seastar::future<> replace_shutdown_socket(SocketRef &&);
+  seastar::future<> replace_shutdown_socket(SocketFRef &&);
 
   seastar::future<> close_shutdown_socket();
 
@@ -127,7 +127,7 @@ public:
 private:
   bool has_socket() const;
 
-  SocketRef move_socket();
+  SocketFRef move_socket();
 
   void log_main_preamble(const ceph::bufferlist &bl);
 
@@ -137,7 +137,7 @@ private:
 
   SocketConnection &conn;
 
-  SocketRef socket;
+  SocketFRef socket;
 
   /*
    * auth signature
index 9d1956c3c111725eafc74b2c0d72ea4dc430430f..ee8f1a99906f0bc0c565ce741a8e30690212f806 100644 (file)
@@ -172,7 +172,7 @@ void ProtocolV2::start_connect(const entity_addr_t& _peer_addr,
   execute_connecting();
 }
 
-void ProtocolV2::start_accept(SocketRef&& new_socket,
+void ProtocolV2::start_accept(SocketFRef&& new_socket,
                               const entity_addr_t& _peer_addr)
 {
   ceph_assert(state == state_t::NONE);
@@ -813,15 +813,16 @@ void ProtocolV2::execute_connecting()
             abort_protocol();
           }
           return Socket::connect(conn.peer_addr);
-        }).then([this](SocketRef new_socket) {
+        }).then([this](SocketRef _new_socket) {
           logger().debug("{} socket connected", conn);
           if (unlikely(state != state_t::CONNECTING)) {
             logger().debug("{} triggered {} during Socket::connect()",
                            conn, get_state_name(state));
-            return new_socket->close().then([sock=std::move(new_socket)] {
+            return _new_socket->close().then([sock=std::move(_new_socket)] {
               abort_protocol();
             });
           }
+          SocketFRef new_socket = seastar::make_foreign(std::move(_new_socket));
           if (!has_socket) {
             frame_assembler->set_socket(std::move(new_socket));
             has_socket = true;
index b6f67b566510e53d1369eb346b56700efae35a62..f81ffdbfbc69a81c1eeac59630294dfb7ed31a20 100644 (file)
@@ -41,7 +41,7 @@ public:
   void start_connect(const entity_addr_t& peer_addr,
                      const entity_name_t& peer_name);
 
-  void start_accept(SocketRef&& socket,
+  void start_accept(SocketFRef&& socket,
                     const entity_addr_t& peer_addr);
 
   seastar::future<> close_clean_yielded();
index 789a83a6c03a83d1b7733f96b118b96a3f178715..3d42abdb44b89adc2971ea9f5d8d77bedf7394ef 100644 (file)
@@ -21,6 +21,7 @@ namespace crimson::net {
 
 class Socket;
 using SocketRef = std::unique_ptr<Socket>;
+using SocketFRef = seastar::foreign_ptr<SocketRef>;
 
 class Socket {
   struct construct_tag {};
index 70abd6fc62fe42f33f63517f88a805576f4ce62a..9f3740a5c77701059368ad1ef7500db392d7c96d 100644 (file)
@@ -118,7 +118,7 @@ SocketConnection::start_connect(const entity_addr_t& _peer_addr,
 }
 
 void
-SocketConnection::start_accept(SocketRef&& sock,
+SocketConnection::start_accept(SocketFRef&& sock,
                                const entity_addr_t& _peer_addr)
 {
   assert(seastar::this_shard_id() == msgr_sid);
index d1d4ae8a71201a028b60ac5332a4b9f6f9da468a..f54ac7fd9a47464390a3d3022505a2297d0a8307 100644 (file)
@@ -134,7 +134,7 @@ class SocketConnection : public Connection {
 
   /// start a handshake from the server's perspective,
   /// only call when SocketConnection first construct
-  void start_accept(SocketRef&& socket,
+  void start_accept(SocketFRef&& socket,
                     const entity_addr_t& peer_addr);
 
   seastar::future<> close_clean_yielded();
index 97795652388f95b38738a4979550e649b271c104..11cec9b974fff4ea0d7c5608efbc75e9d0f4f739 100644 (file)
@@ -207,6 +207,16 @@ SocketMessenger::bind(const entity_addrvec_t& addrs)
   });
 }
 
+seastar::future<> SocketMessenger::accept(
+    SocketFRef &&socket, const entity_addr_t &peer_addr)
+{
+  assert(seastar::this_shard_id() == sid);
+  SocketConnectionRef conn =
+    seastar::make_shared<SocketConnection>(*this, dispatchers);
+  conn->start_accept(std::move(socket), peer_addr);
+  return seastar::now();
+}
+
 seastar::future<> SocketMessenger::start(
     const dispatchers_t& _dispatchers) {
   assert(seastar::this_shard_id() == sid);
@@ -217,14 +227,17 @@ seastar::future<> SocketMessenger::start(
     ceph_assert(get_myaddr().is_msgr2());
     ceph_assert(get_myaddr().get_port() > 0);
 
-    return listener->accept([this](SocketRef socket, entity_addr_t peer_addr) {
-      assert(listener->is_fixed());
-      assert(seastar::this_shard_id() == sid);
+    return listener->accept([this](SocketRef _socket, entity_addr_t peer_addr) {
       assert(get_myaddr().is_msgr2());
-      SocketConnectionRef conn =
-        seastar::make_shared<SocketConnection>(*this, dispatchers);
-      conn->start_accept(std::move(socket), peer_addr);
-      return seastar::now();
+      SocketFRef socket = seastar::make_foreign(std::move(_socket));
+      if (listener->is_fixed()) {
+        return accept(std::move(socket), peer_addr);
+      } else {
+        return seastar::smp::submit_to(sid,
+            [this, peer_addr, socket = std::move(socket)]() mutable {
+          return accept(std::move(socket), peer_addr);
+        });
+      }
     });
   }
   return seastar::now();
index 6e749abac76ac52dd6433ae9e210cef5ea0a66e6..36d814379a611d0a6cb9f2528e94d48399840ee6 100644 (file)
@@ -152,6 +152,8 @@ public:
 #endif
 
 private:
+  seastar::future<> accept(SocketFRef &&, const entity_addr_t &);
+
   listen_ertr::future<> do_listen(const entity_addrvec_t& addr);
 
   /// try to bind to the first unused port of given address
index 8df0f1be747ea1770209f879654497ab1ab844da..1fa029c2ca2b44a748a7c0c74eeb2183f0fc78ba 100644 (file)
@@ -165,7 +165,7 @@ class SocketFactory {
   ShardedServerSocket *pss = nullptr;
 
   seastar::shard_id server_socket_CPU;
-  SocketRef server_socket;
+  SocketFRef server_socket;
 
  public:
   template <typename FuncC, typename FuncS>
@@ -198,13 +198,14 @@ class SocketFactory {
           });
         }),
         seastar::smp::submit_to(SERVER_CPU, [psf] {
-          return psf->pss->accept([psf](auto socket, auto paddr) {
+          return psf->pss->accept([psf](auto _socket, auto paddr) {
             logger().info("dispatch_sockets(): accepted at shard {}",
                           seastar::this_shard_id());
             psf->server_socket_CPU = seastar::this_shard_id();
             if (psf->pss->is_fixed()) {
               ceph_assert_always(SERVER_CPU == seastar::this_shard_id());
             }
+            SocketFRef socket = seastar::make_foreign(std::move(_socket));
             psf->server_socket = std::move(socket);
             return seastar::smp::submit_to(CLIENT_CPU, [psf] {
               psf->server_connected.set_value();