]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/net: encapsulate protocol implementations with open state
authorYingxin <yingxin.cheng@intel.com>
Wed, 21 Nov 2018 21:24:31 +0000 (05:24 +0800)
committerYingxin <yingxin.cheng@intel.com>
Thu, 20 Dec 2018 19:09:05 +0000 (03:09 +0800)
Signed-off-by: Yingxin <yingxin.cheng@intel.com>
src/crimson/net/SocketConnection.cc
src/crimson/net/SocketConnection.h

index 1359dc8900848bbc60149948b86a783614f8b49a..23a9e3de25180daac29fa1decaeb59d39a88ffd7 100644 (file)
@@ -809,10 +809,6 @@ SocketConnection::start_connect()
         return repeat_connect();
       });
       // TODO: handle errors for state_t::connecting
-    }).then([this] {
-      state = state_t::open;
-      // start background processing of tags
-      read_tags_until_next_message();
     }).then_wrapped([this] (auto fut) {
       // satisfy the handshake's promise
       fut.forward_to(std::move(h.promise));
@@ -842,8 +838,7 @@ SocketConnection::connect(const entity_addr_t& _peer_addr,
             .finally([this] { close(); });
           // TODO: retry on fault
         }).then([this] {
-          // dispatch replies on this connection
-          dispatch();
+          execute_open();
         });
     });
 }
@@ -874,10 +869,6 @@ SocketConnection::start_accept()
         return repeat_handle_connect();
       });
       // TODO: handle errors for state_t::accepting
-    }).then([this] {
-      state = state_t::open;
-      // start background processing of tags
-      read_tags_until_next_message();
     }).then_wrapped([this] (auto fut) {
       // satisfy the handshake's promise
       fut.forward_to(std::move(h.promise));
@@ -906,36 +897,37 @@ SocketConnection::accept(seastar::connected_socket&& fd,
           return seastar::make_exception_future<>(eptr)
             .finally([this] { close(); });
         }).then([this] {
-          // dispatch messages until the connection closes or the dispatch
-          // queue shuts down
-          dispatch();
+          execute_open();
         });
     });
 }
 
 void
-SocketConnection::dispatch()
+SocketConnection::execute_open()
 {
+  state = state_t::open;
   seastar::with_gate(pending_dispatch, [this] {
-      return seastar::keep_doing([=] {
+      // start background processing of tags
+      read_tags_until_next_message();
+      return seastar::keep_doing([this] {
           return read_message()
-            .then([=] (MessageRef msg) {
+            .then([this] (MessageRef msg) {
               // start dispatch, ignoring exceptions from the application layer
-              seastar::with_gate(messenger.pending_dispatch, [=, msg = std::move(msg)] {
+              seastar::with_gate(messenger.pending_dispatch, [this, msg = std::move(msg)] {
                   return dispatcher.ms_dispatch(this, std::move(msg))
                     .handle_exception([] (std::exception_ptr eptr) {});
                 });
               // return immediately to start on the next message
               return seastar::now();
             });
-        }).handle_exception_type([=] (const std::system_error& e) {
+        }).handle_exception_type([this] (const std::system_error& e) {
           if (e.code() == error::connection_aborted ||
               e.code() == error::connection_reset) {
-            return seastar::with_gate(messenger.pending_dispatch, [=] {
+            return seastar::with_gate(messenger.pending_dispatch, [this] {
                 return dispatcher.ms_handle_reset(this);
               });
           } else if (e.code() == error::read_eof) {
-            return seastar::with_gate(messenger.pending_dispatch, [=] {
+            return seastar::with_gate(messenger.pending_dispatch, [this] {
                 return dispatcher.ms_handle_remote_reset(this);
               });
           } else {
index bf249d8e46aea18f315ed145f4a252d6d9f0c35f..b69d78d3c637955e03f654bb8b5aee76d619c6dd 100644 (file)
@@ -153,7 +153,7 @@ class SocketConnection : public Connection {
 
   seastar::future<> fault();
 
-  void dispatch();
+  void execute_open();
 
   /// start a handshake from the client's perspective,
   /// only call when SocketConnection first construct