]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/net: cleanup dispatches with connection gate
authorYingxin Cheng <yingxin.cheng@intel.com>
Thu, 12 Mar 2020 15:38:22 +0000 (23:38 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Fri, 20 Mar 2020 08:07:48 +0000 (16:07 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/Protocol.cc
src/crimson/net/Protocol.h
src/crimson/net/ProtocolV1.cc
src/crimson/net/ProtocolV2.cc
src/crimson/net/ProtocolV2.h

index 2183543efb4756e692dd3f923e7d31d2dc309339..8ae7bb1d3af080f8739d8783f49b3bf47b96b634 100644 (file)
@@ -314,13 +314,8 @@ void Protocol::write_event()
    case write_state_t::open:
      [[fallthrough]];
    case write_state_t::delay:
-    (void) seastar::with_gate(pending_dispatch, [this] {
-      return do_write_dispatch_sweep(
-      ).handle_exception([this] (std::exception_ptr eptr) {
-        logger().error("{} do_write_dispatch_sweep(): unexpected exception {}",
-                       conn, eptr);
-        ceph_abort();
-      });
+    gated_dispatch("do_write_dispatch_sweep", [this] {
+      return do_write_dispatch_sweep();
     });
     return;
    case write_state_t::drop:
index bb73746e12b7e1277dfef5a17de9fdbe1d2f1748..c5f8a5b728627ed42af2a62cc3b004e322028413 100644 (file)
@@ -6,6 +6,7 @@
 #include <seastar/core/gate.hh>
 #include <seastar/core/shared_future.hh>
 
+#include "crimson/common/log.h"
 #include "Fwd.h"
 #include "SocketConnection.h"
 
@@ -62,17 +63,27 @@ class Protocol {
   const proto_t proto_type;
 
  protected:
+  template <typename Func>
+  void gated_dispatch(const char* what, Func&& func) {
+    (void) seastar::with_gate(pending_dispatch, std::forward<Func>(func)
+    ).handle_exception([this, what] (std::exception_ptr eptr) {
+      crimson::get_logger(ceph_subsys_ms).error(
+          "{} gated_dispatch() {} caught exception: {}", conn, what, eptr);
+      ceph_abort("unexpected exception from gated_dispatch()");
+    });
+  }
+
   Dispatcher &dispatcher;
   SocketConnection &conn;
 
   SocketRef socket;
-  seastar::gate pending_dispatch;
   AuthConnectionMetaRef auth_meta;
 
  private:
   bool closed = false;
   // become valid only after closed == true
   seastar::shared_future<> close_ready;
+  seastar::gate pending_dispatch;
 
 // the write state-machine
  public:
index db18a55044520a45911e49b165d14c57de85343c..26344b0fb4fc9c17e822961ff8562fd6d28784a4 100644 (file)
@@ -318,7 +318,7 @@ void ProtocolV1::start_connect(const entity_addr_t& _peer_addr,
   conn.policy = messenger.get_policy(_peer_type);
   messenger.register_conn(
     seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
-  (void) seastar::with_gate(pending_dispatch, [this] {
+  gated_dispatch("start_connect", [this] {
       return Socket::connect(conn.peer_addr)
         .then([this](SocketRef sock) {
           socket = std::move(sock);
@@ -618,7 +618,7 @@ void ProtocolV1::start_accept(SocketRef&& sock,
   socket = std::move(sock);
   messenger.accept_conn(
     seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
-  (void) seastar::with_gate(pending_dispatch, [this] {
+  gated_dispatch("start_accept", [this] {
       // stop learning my_addr before sending it out, so it won't change
       return messenger.learned_addr(messenger.get_myaddr(), conn).then([this] {
           // encode/send server's handshake header
@@ -848,15 +848,11 @@ seastar::future<> ProtocolV1::read_message()
       }
 
       // start dispatch, ignoring exceptions from the application layer
-      (void) seastar::with_gate(pending_dispatch, [this, msg = std::move(msg_ref)] {
-          logger().debug("{} <== #{} === {} ({})",
-                         conn, msg->get_seq(), *msg, msg->get_type());
-          return dispatcher.ms_dispatch(&conn, std::move(msg))
-            .handle_exception([this] (std::exception_ptr eptr) {
-              logger().error("{} ms_dispatch caught exception: {}", conn, eptr);
-              ceph_assert(false);
-            });
-        });
+      gated_dispatch("ms_dispatch", [this, msg = std::move(msg_ref)] {
+        logger().debug("{} <== #{} === {} ({})",
+                       conn, msg->get_seq(), *msg, msg->get_type());
+        return dispatcher.ms_dispatch(&conn, std::move(msg));
+      });
     });
 }
 
@@ -895,7 +891,7 @@ void ProtocolV1::execute_open()
   state = state_t::open;
   set_write_state(write_state_t::open);
 
-  (void) seastar::with_gate(pending_dispatch, [this] {
+  gated_dispatch("execute_open", [this] {
       // start background processing of tags
       return handle_tags()
         .handle_exception_type([this] (const std::system_error& e) {
index c5732554221c1174a1a25996d9cdc86bc802aa62..cc752f43a1c3a118ef09065c692ff66a85854098 100644 (file)
@@ -440,12 +440,9 @@ void ProtocolV2::reset_session(bool full)
     client_cookie = generate_client_cookie();
     peer_global_seq = 0;
     reset_write();
-    (void) seastar::with_gate(pending_dispatch, [this] {
+    gated_dispatch("ms_handle_remote_reset", [this] {
       return dispatcher.ms_handle_remote_reset(
           seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
-    }).handle_exception([this] (std::exception_ptr eptr) {
-      logger().error("{} ms_handle_remote_reset caught exception: {}", conn, eptr);
-      ceph_abort("unexpected exception from ms_handle_remote_reset()");
     });
   }
 }
@@ -850,7 +847,7 @@ void ProtocolV2::execute_connecting()
   if (socket) {
     socket->shutdown();
   }
-  execution_done = seastar::with_gate(pending_dispatch, [this] {
+  gated_execute("execute_connecting", [this] {
       // we don't know my socket_port yet
       conn.set_ephemeral_port(0, SocketConnection::side_t::none);
       return messenger.get_global_seq().then([this] (auto gs) {
@@ -874,7 +871,8 @@ void ProtocolV2::execute_connecting()
             abort_protocol();
           }
           if (socket) {
-            (void) with_gate(pending_dispatch, [sock = std::move(socket)] () mutable {
+            gated_dispatch("close_sockect_connecting",
+                           [sock = std::move(socket)] () mutable {
               return sock->close().then([sock = std::move(sock)] {});
             });
           }
@@ -932,12 +930,9 @@ void ProtocolV2::execute_connecting()
           }
           switch (next) {
            case next_step_t::ready: {
-            (void) seastar::with_gate(pending_dispatch, [this] {
+            gated_dispatch("ms_handle_connect", [this] {
               return dispatcher.ms_handle_connect(
                   seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
-            }).handle_exception([this] (std::exception_ptr eptr) {
-              logger().error("{} ms_handle_connect caught exception: {}", conn, eptr);
-              ceph_abort("unexpected exception from ms_handle_connect()");
             });
             logger().info("{} connected:"
                           " gs={}, pgs={}, cs={}, client_cookie={},"
@@ -1488,7 +1483,7 @@ ProtocolV2::server_reconnect()
 void ProtocolV2::execute_accepting()
 {
   trigger_state(state_t::ACCEPTING, write_state_t::none, false);
-  (void) seastar::with_gate(pending_dispatch, [this] {
+  gated_dispatch("execute_accepting", [this] {
       return seastar::futurize_apply([this] {
           INTERCEPT_N_RW(custom_bp_t::SOCKET_ACCEPTED);
           auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
@@ -1615,15 +1610,12 @@ void ProtocolV2::execute_establishing(
     accept_me();
   }
 
-  (void) seastar::with_gate(pending_dispatch, [this] {
+  gated_dispatch("ms_handle_accept_establishing", [this] {
     return dispatcher.ms_handle_accept(
         seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
-  }).handle_exception([this] (std::exception_ptr eptr) {
-    logger().error("{} ms_handle_accept caught exception: {}", conn, eptr);
-    ceph_abort("unexpected exception from ms_handle_accept()");
   });
 
-  execution_done = seastar::with_gate(pending_dispatch, [this] {
+  gated_execute("execute_establishing", [this] {
     return seastar::futurize_apply([this] {
       return send_server_ident();
     }).then([this] {
@@ -1716,23 +1708,20 @@ void ProtocolV2::trigger_replacing(bool reconnect,
   if (socket) {
     socket->shutdown();
   }
-  (void) seastar::with_gate(pending_dispatch, [this] {
+  gated_dispatch("ms_handle_accept_replacing", [this] {
     return dispatcher.ms_handle_accept(
         seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
-  }).handle_exception([this] (std::exception_ptr eptr) {
-    logger().error("{} ms_handle_accept caught exception: {}", conn, eptr);
-    ceph_abort("unexpected exception from ms_handle_accept()");
   });
-  (void) seastar::with_gate(pending_dispatch,
-                            [this,
-                             reconnect,
-                             do_reset,
-                             new_socket = std::move(new_socket),
-                             new_auth_meta = std::move(new_auth_meta),
-                             new_rxtx = std::move(new_rxtx),
-                             new_client_cookie, new_peer_name,
-                             new_conn_features, new_peer_global_seq,
-                             new_connect_seq, new_msg_seq] () mutable {
+  gated_dispatch("trigger_replacing",
+                 [this,
+                  reconnect,
+                  do_reset,
+                  new_socket = std::move(new_socket),
+                  new_auth_meta = std::move(new_auth_meta),
+                  new_rxtx = std::move(new_rxtx),
+                  new_client_cookie, new_peer_name,
+                  new_conn_features, new_peer_global_seq,
+                  new_connect_seq, new_msg_seq] () mutable {
     return wait_write_exit().then([this, do_reset] {
       if (do_reset) {
         reset_session(true);
@@ -1754,7 +1743,8 @@ void ProtocolV2::trigger_replacing(bool reconnect,
       }
 
       if (socket) {
-        (void) with_gate(pending_dispatch, [sock = std::move(socket)] () mutable {
+        gated_dispatch("close_socket_replacing",
+                       [sock = std::move(socket)] () mutable {
           return sock->close().then([sock = std::move(sock)] {});
         });
       }
@@ -1946,11 +1936,8 @@ seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp)
 
     // TODO: change MessageRef with seastar::shared_ptr
     auto msg_ref = MessageRef{message, false};
-    (void) seastar::with_gate(pending_dispatch, [this, msg = std::move(msg_ref)] {
+    gated_dispatch("ms_dispatch", [this, msg = std::move(msg_ref)] {
       return dispatcher.ms_dispatch(&conn, std::move(msg));
-    }).handle_exception([this] (std::exception_ptr eptr) {
-      logger().error("{} ms_dispatch caught exception: {}", conn, eptr);
-      ceph_abort("unexpected exception from ms_dispatch()");
     });
   });
 }
@@ -1964,7 +1951,7 @@ void ProtocolV2::execute_ready()
     conn.interceptor->register_conn_ready(conn);
   }
 #endif
-  execution_done = seastar::with_gate(pending_dispatch, [this] {
+  gated_execute("execute_ready", [this] {
     protocol_timer.cancel();
     return seastar::keep_doing([this] {
       return read_main_preamble()
@@ -2070,8 +2057,7 @@ void ProtocolV2::execute_wait(bool max_backoff)
   if (socket) {
     socket->shutdown();
   }
-  execution_done = seastar::with_gate(pending_dispatch,
-                                      [this, max_backoff] {
+  gated_execute("execute_wait", [this, max_backoff] {
     double backoff = protocol_timer.last_dur();
     if (max_backoff) {
       backoff = conf.ms_max_backoff;
@@ -2102,7 +2088,7 @@ void ProtocolV2::execute_wait(bool max_backoff)
 void ProtocolV2::execute_server_wait()
 {
   trigger_state(state_t::SERVER_WAIT, write_state_t::delay, false);
-  execution_done = seastar::with_gate(pending_dispatch, [this] {
+  gated_execute("execute_server_wait", [this] {
     return read_exactly(1).then([this] (auto bl) {
       logger().warn("{} SERVER_WAIT got read, abort", conn);
       abort_in_fault();
index 4aa7b276070900fc823e291f32dec5a4101e9671..f98bf3d4366f263b3f4dff6ad37413a1ecca4bb4 100644 (file)
@@ -80,6 +80,14 @@ class ProtocolV2 final : public Protocol {
 
   seastar::shared_future<> execution_done = seastar::now();
 
+  template <typename Func>
+  void gated_execute(const char* what, Func&& func) {
+    gated_dispatch(what, [this, &func] {
+      execution_done = seastar::futurize_apply(std::forward<Func>(func));
+      return execution_done.get_future();
+    });
+  }
+
   class Timer {
     double last_dur_ = 0.0;
     const SocketConnection& conn;