]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: implement factory method of Socket
authorYingxin Cheng <yingxincheng@gmail.com>
Fri, 15 Feb 2019 03:34:16 +0000 (11:34 +0800)
committerKefu Chai <kchai@redhat.com>
Fri, 5 Apr 2019 03:21:18 +0000 (11:21 +0800)
Signed-off-by: Yingxin Cheng <yingxincheng@gmail.com>
src/crimson/net/ProtocolV1.cc
src/crimson/net/Socket.h
src/crimson/net/SocketMessenger.cc

index 672e04c0f14e9192653645cf2eff40bfe45b69fd..03fa8b7ef5b14bdb901b8636fe18910855cb9313 100644 (file)
@@ -296,14 +296,16 @@ void ProtocolV1::start_connect(const entity_addr_t& _peer_addr,
   messenger.register_conn(
     seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
   seastar::with_gate(pending_dispatch, [this] {
-      return seastar::connect(conn.peer_addr.in4_addr())
-        .then([this](seastar::connected_socket fd) {
+      return Socket::connect(conn.peer_addr)
+        .then([this](SocketFRef sock) {
+          socket = std::move(sock);
           if (state == state_t::closing) {
-            fd.shutdown_input();
-            fd.shutdown_output();
-            throw std::system_error(make_error_code(error::connection_aborted));
+            return socket->close().then([] {
+              throw std::system_error(make_error_code(error::connection_aborted));
+            });
           }
-          socket = seastar::make_foreign(std::make_unique<Socket>(std::move(fd)));
+          return seastar::now();
+        }).then([this] {
           // read server's handshake header
           return socket->read(server_header_size);
         }).then([this] (bufferlist headerbl) {
index 95fc78fbebef29ee77ffaba73ef92329d31f96a1..5191a2b5207174ef3a70c4544dbee82d6e9ce226 100644 (file)
@@ -8,6 +8,7 @@
 #include <seastar/net/packet.hh>
 
 #include "include/buffer.h"
+#include "msg/msg_types.h"
 
 namespace ceph::net {
 
@@ -27,14 +28,39 @@ class Socket
     size_t remaining;
   } r;
 
+  struct construct_tag {};
+
  public:
-  explicit Socket(seastar::connected_socket&& _socket)
+  Socket(seastar::connected_socket&& _socket, construct_tag)
     : sid{seastar::engine().cpu_id()},
       socket(std::move(_socket)),
       in(socket.input()),
       out(socket.output()) {}
+
   Socket(Socket&& o) = delete;
 
+  static seastar::future<SocketFRef>
+  connect(const entity_addr_t& peer_addr) {
+    return seastar::connect(peer_addr.in4_addr())
+      .then([] (seastar::connected_socket socket) {
+        return seastar::make_foreign(std::make_unique<Socket>(std::move(socket),
+                                                             construct_tag{}));
+      });
+  }
+
+  static seastar::future<SocketFRef, entity_addr_t>
+  accept(seastar::server_socket& listener) {
+    return listener.accept().then([] (seastar::connected_socket socket,
+                                     seastar::socket_address paddr) {
+        entity_addr_t peer_addr;
+        peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
+        return seastar::make_ready_future<SocketFRef, entity_addr_t>(
+          seastar::make_foreign(std::make_unique<Socket>(std::move(socket),
+                                                        construct_tag{})),
+         peer_addr);
+      });
+  }
+
   /// read the requested number of bytes into a bufferlist
   seastar::future<bufferlist> read(size_t bytes);
   using tmp_buf = seastar::temporary_buffer<char>;
index 13aa5c56337163b2baac480fd920d0642aeccd3b..6b33e01ffd384cfe20040bba699068f9bb6ce244 100644 (file)
@@ -146,18 +146,14 @@ seastar::future<> SocketMessenger::do_start(Dispatcher *disp)
   // start listening if bind() was called
   if (listener) {
     seastar::keep_doing([this] {
-        return listener->accept()
-          .then([this] (seastar::connected_socket socket,
-                        seastar::socket_address paddr) {
-            // allocate the connection
-            entity_addr_t peer_addr;
-            peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
+        return Socket::accept(*listener)
+          .then([this] (SocketFRef socket,
+                        entity_addr_t peer_addr) {
             auto shard = locate_shard(peer_addr);
+            // don't wait before accepting another
 #warning fixme
             // we currently do dangerous i/o from a Connection core, different from the Socket core.
-            auto sock = seastar::make_foreign(std::make_unique<Socket>(std::move(socket)));
-            // don't wait before accepting another
-            container().invoke_on(shard, [sock = std::move(sock), peer_addr, this](auto& msgr) mutable {
+            container().invoke_on(shard, [sock = std::move(socket), peer_addr, this](auto& msgr) mutable {
                 SocketConnectionRef conn = seastar::make_shared<SocketConnection>(msgr, *msgr.dispatcher);
                 conn->start_accept(std::move(sock), peer_addr);
               });