]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: remove unecessary future dependencies for accept/dispatch
authorYingxin <yingxin.cheng@intel.com>
Wed, 21 Nov 2018 21:09:20 +0000 (05:09 +0800)
committerYingxin <yingxin.cheng@intel.com>
Thu, 20 Dec 2018 19:09:05 +0000 (03:09 +0800)
Signed-off-by: Yingxin <yingxin.cheng@intel.com>
src/crimson/net/SocketConnection.cc
src/crimson/net/SocketConnection.h
src/crimson/net/SocketMessenger.cc

index 480e416525af4482ce72d2245bf4610f47f3d8b7..1359dc8900848bbc60149948b86a783614f8b49a 100644 (file)
@@ -843,8 +843,7 @@ SocketConnection::connect(const entity_addr_t& _peer_addr,
           // TODO: retry on fault
         }).then([this] {
           // dispatch replies on this connection
-          dispatch()
-            .handle_exception([] (std::exception_ptr eptr) {});
+          dispatch();
         });
     });
 }
@@ -885,7 +884,7 @@ SocketConnection::start_accept()
     });
 }
 
-seastar::future<>
+void
 SocketConnection::accept(seastar::connected_socket&& fd,
                          const entity_addr_t& _peer_addr)
 {
@@ -895,7 +894,7 @@ SocketConnection::accept(seastar::connected_socket&& fd,
   socket.emplace(std::move(fd));
   messenger.accept_conn(this);
   state = state_t::accepting;
-  return seastar::with_gate(pending_dispatch, [this] {
+  seastar::with_gate(pending_dispatch, [this] {
       return start_accept()
         .then([this] {
           // notify the dispatcher and allow them to reject the connection
@@ -909,15 +908,15 @@ SocketConnection::accept(seastar::connected_socket&& fd,
         }).then([this] {
           // dispatch messages until the connection closes or the dispatch
           // queue shuts down
-          return dispatch();
+          dispatch();
         });
     });
 }
 
-seastar::future<>
+void
 SocketConnection::dispatch()
 {
-  return seastar::with_gate(pending_dispatch, [this] {
+  seastar::with_gate(pending_dispatch, [this] {
       return seastar::keep_doing([=] {
           return read_message()
             .then([=] (MessageRef msg) {
@@ -942,6 +941,8 @@ SocketConnection::dispatch()
           } else {
             throw e;
           }
+        }).handle_exception([] (std::exception_ptr eptr) {
+          // TODO: handle fault in the open state
         });
     });
 }
index 3413b4d054e1babf42125c05e8fb6e4328f4d464..bf249d8e46aea18f315ed145f4a252d6d9f0c35f 100644 (file)
@@ -153,7 +153,7 @@ class SocketConnection : public Connection {
 
   seastar::future<> fault();
 
-  seastar::future<> dispatch();
+  void dispatch();
 
   /// start a handshake from the client's perspective,
   /// only call when SocketConnection first construct
@@ -185,8 +185,8 @@ class SocketConnection : public Connection {
  public:
   void connect(const entity_addr_t& peer_addr,
                const entity_type_t& peer_type);
-  seastar::future<> accept(seastar::connected_socket&& socket,
-                           const entity_addr_t& peer_addr);
+  void accept(seastar::connected_socket&& socket,
+              const entity_addr_t& peer_addr);
 
   /// read a message from a connection that has completed its handshake
   seastar::future<MessageRef> read_message();
index 779f5fe2e5f17a95a2ba521a0a83e79316a98a48..6ecf9d3ddeff4ba95d75de06653d09e09fc18005 100644 (file)
@@ -40,18 +40,6 @@ void SocketMessenger::bind(const entity_addr_t& addr)
   listener = seastar::listen(address, lo);
 }
 
-seastar::future<> SocketMessenger::accept(seastar::connected_socket socket,
-                                          seastar::socket_address paddr)
-{
-  // allocate the connection
-  entity_addr_t peer_addr;
-  peer_addr.set_type(entity_addr_t::TYPE_DEFAULT);
-  peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
-  SocketConnectionRef conn = new SocketConnection(*this, get_myaddr(), *dispatcher);
-  // initiate the handshake
-  return conn->accept(std::move(socket), peer_addr);
-}
-
 seastar::future<> SocketMessenger::start(Dispatcher *disp)
 {
   dispatcher = disp;
@@ -62,10 +50,13 @@ seastar::future<> SocketMessenger::start(Dispatcher *disp)
         return listener->accept()
           .then([this] (seastar::connected_socket socket,
                         seastar::socket_address paddr) {
-            // start processing the connection
-            accept(std::move(socket), paddr)
-              .handle_exception([] (std::exception_ptr eptr) {});
+            // allocate the connection
+            entity_addr_t peer_addr;
+            peer_addr.set_type(entity_addr_t::TYPE_DEFAULT);
+            peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
+            SocketConnectionRef conn = new SocketConnection(*this, get_myaddr(), *dispatcher);
             // don't wait before accepting another
+            conn->accept(std::move(socket), peer_addr);
           });
       }).handle_exception_type([this] (const std::system_error& e) {
         // stop gracefully on connection_aborted