]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: encapsulate protocol implementations with accept/connect
authorYingxin <yingxin.cheng@intel.com>
Wed, 21 Nov 2018 22:00:08 +0000 (06:00 +0800)
committerYingxin <yingxin.cheng@intel.com>
Thu, 20 Dec 2018 19:10:09 +0000 (03:10 +0800)
Signed-off-by: Yingxin <yingxin.cheng@intel.com>
src/crimson/net/SocketConnection.cc
src/crimson/net/SocketConnection.h
src/crimson/net/SocketMessenger.cc

index e0d4c36b9156bf3c73f81feba575ebc70af20bc1..e09d2b408e5a4d1d713fba4694c5ce1d18e4a84b 100644 (file)
@@ -769,52 +769,9 @@ SocketConnection::repeat_connect()
     });
 }
 
-seastar::future<>
-SocketConnection::start_connect()
-{
-  return seastar::connect(peer_addr.in4_addr())
-    .then([this](seastar::connected_socket fd) {
-      if (state == state_t::closing) {
-        fd.shutdown_input();
-        fd.shutdown_output();
-        throw std::system_error(make_error_code(error::connection_aborted));
-      }
-      socket.emplace(std::move(fd));
-      // read server's handshake header
-      return socket->read(server_header_size);
-    }).then([this] (bufferlist headerbl) {
-      auto p = headerbl.cbegin();
-      validate_banner(p);
-      entity_addr_t saddr, caddr;
-      ::decode(saddr, p);
-      ::decode(caddr, p);
-      ceph_assert(p.end());
-      validate_peer_addr(saddr, peer_addr);
-
-      if (my_addr != caddr) {
-        // take peer's address for me, but preserve my nonce
-        caddr.nonce = my_addr.nonce;
-        my_addr = caddr;
-      }
-      // encode/send client's handshake header
-      bufferlist bl;
-      bl.append(buffer::create_static(banner_size, banner));
-      ::encode(my_addr, bl, 0);
-      h.global_seq = messenger.get_global_seq();
-      return socket->write_flush(std::move(bl));
-    }).then([=] {
-      return seastar::repeat([this] {
-        return repeat_connect();
-      });
-    }).then_wrapped([this] (auto fut) {
-      // satisfy the handshake's promise
-      fut.forward_to(std::move(h.promise));
-    });
-}
-
 void
-SocketConnection::connect(const entity_addr_t& _peer_addr,
-                          const entity_type_t& _peer_type)
+SocketConnection::start_connect(const entity_addr_t& _peer_addr,
+                                const entity_type_t& _peer_type)
 {
   ceph_assert(state == state_t::none);
   ceph_assert(!socket);
@@ -823,8 +780,46 @@ SocketConnection::connect(const entity_addr_t& _peer_addr,
   messenger.register_conn(this);
   state = state_t::connecting;
   seastar::with_gate(pending_dispatch, [this] {
-      return start_connect()
-        .then([this] {
+      return seastar::connect(peer_addr.in4_addr())
+        .then([this](seastar::connected_socket fd) {
+          if (state == state_t::closing) {
+            fd.shutdown_input();
+            fd.shutdown_output();
+            throw std::system_error(make_error_code(error::connection_aborted));
+          }
+          socket.emplace(std::move(fd));
+          // read server's handshake header
+          return socket->read(server_header_size);
+        }).then([this] (bufferlist headerbl) {
+          auto p = headerbl.cbegin();
+          validate_banner(p);
+          entity_addr_t saddr, caddr;
+          ::decode(saddr, p);
+          ::decode(caddr, p);
+          ceph_assert(p.end());
+          validate_peer_addr(saddr, peer_addr);
+
+          if (my_addr != caddr) {
+            // take peer's address for me, but preserve my nonce
+            caddr.nonce = my_addr.nonce;
+            my_addr = caddr;
+          }
+          // encode/send client's handshake header
+          bufferlist bl;
+          bl.append(buffer::create_static(banner_size, banner));
+          ::encode(my_addr, bl, 0);
+          h.global_seq = messenger.get_global_seq();
+          return socket->write_flush(std::move(bl));
+        }).then([=] {
+          return seastar::repeat([this] {
+            return repeat_connect();
+          });
+        }).then_wrapped([this] (auto fut) {
+          // TODO: do not forward the exception
+          //       and let the reconnect happen transparently inside connection
+          // satisfy the handshake's promise
+          fut.forward_to(std::move(h.promise));
+        }).then([this] {
           // notify the dispatcher and allow them to reject the connection
           return seastar::with_gate(messenger.pending_dispatch, [this] {
             return dispatcher.ms_handle_connect(this);
@@ -838,40 +833,9 @@ SocketConnection::connect(const entity_addr_t& _peer_addr,
     });
 }
 
-seastar::future<>
-SocketConnection::start_accept()
-{
-  // encode/send server's handshake header
-  bufferlist bl;
-  bl.append(buffer::create_static(banner_size, banner));
-  ::encode(my_addr, bl, 0);
-  ::encode(peer_addr, bl, 0);
-  return socket->write_flush(std::move(bl))
-    .then([this] {
-      // read client's handshake header and connect request
-      return socket->read(client_header_size);
-    }).then([this] (bufferlist bl) {
-      auto p = bl.cbegin();
-      validate_banner(p);
-      entity_addr_t addr;
-      ::decode(addr, p);
-      ceph_assert(p.end());
-      if (!addr.is_blank_ip()) {
-        peer_addr = addr;
-      }
-    }).then([this] {
-      return seastar::repeat([this] {
-        return repeat_handle_connect();
-      });
-    }).then_wrapped([this] (auto fut) {
-      // satisfy the handshake's promise
-      fut.forward_to(std::move(h.promise));
-    });
-}
-
 void
-SocketConnection::accept(seastar::connected_socket&& fd,
-                         const entity_addr_t& _peer_addr)
+SocketConnection::start_accept(seastar::connected_socket&& fd,
+                               const entity_addr_t& _peer_addr)
 {
   ceph_assert(state == state_t::none);
   ceph_assert(!socket);
@@ -880,8 +844,34 @@ SocketConnection::accept(seastar::connected_socket&& fd,
   messenger.accept_conn(this);
   state = state_t::accepting;
   seastar::with_gate(pending_dispatch, [this] {
-      return start_accept()
+      // encode/send server's handshake header
+      bufferlist bl;
+      bl.append(buffer::create_static(banner_size, banner));
+      ::encode(my_addr, bl, 0);
+      ::encode(peer_addr, bl, 0);
+      return socket->write_flush(std::move(bl))
         .then([this] {
+          // read client's handshake header and connect request
+          return socket->read(client_header_size);
+        }).then([this] (bufferlist bl) {
+          auto p = bl.cbegin();
+          validate_banner(p);
+          entity_addr_t addr;
+          ::decode(addr, p);
+          ceph_assert(p.end());
+          if (!addr.is_blank_ip()) {
+            peer_addr = addr;
+          }
+        }).then([this] {
+          return seastar::repeat([this] {
+            return repeat_handle_connect();
+          });
+        }).then_wrapped([this] (auto fut) {
+          // TODO: do not forward the exception
+          //       and let the reconnect happen transparently inside connection
+          // satisfy the handshake's promise
+          fut.forward_to(std::move(h.promise));
+        }).then([this] {
           // notify the dispatcher and allow them to reject the connection
           return seastar::with_gate(messenger.pending_dispatch, [=] {
               return dispatcher.ms_handle_accept(this);
index b69d78d3c637955e03f654bb8b5aee76d619c6dd..effb594c14fa87a250b4046bed0527b4ffb78001 100644 (file)
@@ -155,13 +155,6 @@ class SocketConnection : public Connection {
 
   void execute_open();
 
-  /// start a handshake from the client's perspective,
-  /// only call when SocketConnection first construct
-  seastar::future<> start_connect();
-  /// start a handshake from the server's perspective,
-  /// only call when SocketConnection first construct
-  seastar::future<> start_accept();
-
  public:
   SocketConnection(SocketMessenger& messenger,
                    const entity_addr_t& my_addr,
@@ -183,10 +176,14 @@ class SocketConnection : public Connection {
   seastar::future<> close() override;
 
  public:
-  void connect(const entity_addr_t& peer_addr,
-               const entity_type_t& peer_type);
-  void accept(seastar::connected_socket&& socket,
-              const entity_addr_t& peer_addr);
+  /// start a handshake from the client's perspective,
+  /// only call when SocketConnection first construct
+  void start_connect(const entity_addr_t& peer_addr,
+                     const entity_type_t& peer_type);
+  /// start a handshake from the server's perspective,
+  /// only call when SocketConnection first construct
+  void start_accept(seastar::connected_socket&& socket,
+                    const entity_addr_t& peer_addr);
 
   /// read a message from a connection that has completed its handshake
   seastar::future<MessageRef> read_message();
index 6ecf9d3ddeff4ba95d75de06653d09e09fc18005..8ec2db7b293d503b54c0426f72f1f361fa49176a 100644 (file)
@@ -56,7 +56,7 @@ seastar::future<> SocketMessenger::start(Dispatcher *disp)
             peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
             SocketConnectionRef conn = new SocketConnection(*this, get_myaddr(), *dispatcher);
             // don't wait before accepting another
-            conn->accept(std::move(socket), peer_addr);
+            conn->start_accept(std::move(socket), peer_addr);
           });
       }).handle_exception_type([this] (const std::system_error& e) {
         // stop gracefully on connection_aborted
@@ -76,7 +76,7 @@ SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_type_t& pe
     return found;
   }
   SocketConnectionRef conn = new SocketConnection(*this, get_myaddr(), *dispatcher);
-  conn->connect(peer_addr, peer_type);
+  conn->start_connect(peer_addr, peer_type);
   return conn;
 }